基于 Tonic 和 Consul 的 gRPC 服务集成 Cassandra 实现持久化死信队列


一个健壮的分布式系统中,消息或事件处理失败是常态而非意外。直接丢弃失败的请求是不可接受的,无限重试又可能导致系统雪崩。我们面临的第一个具体挑战是:为一个高吞吐的 Rust gRPC 服务(我们称之为OrderProcessor)设计一个可靠的失败处理机制。当OrderProcessor处理上游消息失败,并在有限次重试后仍然失败时,需要将该消息及其上下文安全地转移,以供后续的审计、手动干预或自动重放。

传统的做法是引入一个消息队列(如 RabbitMQ 或 Kafka)作为死信队列(Dead Letter Queue, DLQ)。但在我们的场景中,对失败消息的分析需求远超简单的“先进先出”。我们需要能够按来源服务、错误类型、时间窗口进行复杂查询,甚至对失败载荷的内容进行分析。一个标准的消息队列在这方面能力有限。因此,我们构想了一个不同的方案:构建一个专门的 DeadLetterSink gRPC 服务,它接收死信消息,并将其持久化到具备强大查询能力的 NoSQL 数据库中。

技术选型决策如下:

  1. RPC框架: Tonic - 既然我们的主业务栈是基于 Rust 的高性能 gRPC 服务,DeadLetterSink 自然也应使用 Tonic 构建,以保持技术栈统一和生态兼容性。
  2. 服务发现: Consul - 在动态的微服务环境中,OrderProcessor 不应硬编码 DeadLetterSink 的地址。Consul 提供了服务注册与发现机制,允许 OrderProcessor 在需要时动态查找健康的 DeadLetterSink 实例。
  3. 持久化存储: Apache Cassandra - 这是本次架构的核心决策。我们选择 Cassandra 作为 DLQ 的后端存储,原因有三:
    • 高写入吞吐量: 死信场景通常是突发性的,系统某个环节故障可能瞬间产生大量死信消息。Cassandra 的无主架构和 LSM-Tree 存储引擎使其能够轻松应对高并发写入。
    • 强大的查询能力 (CQL): 相比于消息队列,我们可以通过精心设计的表结构,使用 CQL 对死信进行多维度、细粒度的查询,这对于故障排查至关重要。
    • 水平扩展与高可用: Cassandra 的分布式特性确保了死信存储本身不会成为单点故障。

整个流程的架构图如下:

sequenceDiagram
    participant Client as 调用方
    participant OrderProcessor as 订单处理服务
    participant Consul
    participant DeadLetterSink as 死信存储服务
    participant Cassandra

    Client->>+OrderProcessor: ProcessOrder(request)
    OrderProcessor-->>OrderProcessor: 处理失败 (e.g., 外部依赖超时)
    OrderProcessor-->>OrderProcessor: 执行本地重试策略 (e.g., 指数退避)
    Note right of OrderProcessor: 重试 3 次后依旧失败
    OrderProcessor->>+Consul: 查询 'dead-letter-sink' 服务地址
    Consul-->>-OrderProcessor: 返回健康实例列表
    OrderProcessor->>+DeadLetterSink: SendToDLQ(failed_request, error_context)
    DeadLetterSink->>+Cassandra: INSERT INTO dead_letters (...)
    Cassandra-->>-DeadLetterSink: 写入成功
    DeadLetterSink-->>-OrderProcessor: Ack
    OrderProcessor-->>-Client: 返回处理失败响应

第一步:定义 Protobuf 和项目结构

我们的核心是DeadLetterSink服务,所以先定义它的接口。

proto/dead_letter.proto

syntax = "proto3";

package dead_letter;

import "google/protobuf/timestamp.proto";

// 死信存储服务
service DeadLetterSink {
  // 发送单条死信
  rpc Send (DeadLetter) returns (SendResponse);
}

// 死信消息体
message DeadLetter {
  // 唯一ID,用于幂等性控制,建议使用 UUID
  string event_id = 1;

  // 消息来源的服务名,例如 "order-processor"
  string source_service = 2;

  // 原始消息的业务标识,例如订单ID
  string business_key = 3;

  // 序列化后的原始消息载荷
  bytes original_payload = 4;

  // 原始消息的编码类型,例如 "application/json" or "application/protobuf"
  string payload_encoding = 5;

  // 记录的错误信息
  ErrorContext error_context = 6;

  // 消息最初创建的时间戳
  google.protobuf.Timestamp original_timestamp = 7;
}

// 错误上下文
message ErrorContext {
  // 错误分类,例如 "DATABASE_TIMEOUT", "VALIDATION_ERROR"
  string error_type = 1;

  // 详细的错误信息或堆栈
  string error_message = 2;

  // 尝试处理的次数
  uint32 attempt_count = 3;
}

message SendResponse {
  // 确认接收到的 event_id
  string event_id = 1;
  // 是否成功
  bool success = 2;
}

项目结构如下:

dead-letter-system/
├── dead-letter-sink/     # 死信服务
│   ├── Cargo.toml
│   └── src/
│       ├── main.rs
│       ├── service.rs
│       ├── config.rs
│       └── storage.rs
├── order-processor/      # 模拟的业务服务
│   ├── Cargo.toml
│   └── src/
│       └── main.rs
└── proto/
    └── dead_letter.proto
└── build.rs              # 用于编译 proto

build.rs 文件:

// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::configure()
        .build_server(true)
        .build_client(true)
        .compile(&["proto/dead_letter.proto"], &["proto/"])?;
    Ok(())
}

第二步:实现 DeadLetterSink 服务

这个服务是整个架构的核心,它需要处理 gRPC 请求、与 Cassandra 交互,并向 Consul 注册自己。

Cassandra 表结构设计

在 Cassandra 中,数据模型的设计至关重要。我们需要根据查询模式来设计表。常见的查询需求是:

  1. 按来源服务查找最近的死信。
  2. 按时间范围和来源服务查找死信。
  3. 根据 event_idbusiness_key 精确查找某条死信。

一个合理的 CQL Schema 如下:

CREATE KEYSPACE IF NOT EXISTS dead_letter_store WITH REPLICATION = { 
  'class' : 'SimpleStrategy', 
  'replication_factor' : 3 
};

USE dead_letter_store;

CREATE TABLE dead_letters (
    source_service text,
    // 将时间分桶,例如按天,避免分区过大
    time_bucket text, 
    // 将原始时间戳作为集群键,实现按时间排序
    original_timestamp timestamp,
    // 事件ID作为另一个集群键,保证唯一性
    event_id uuid,
    business_key text,
    original_payload blob,
    payload_encoding text,
    error_type text,
    error_message text,
    attempt_count int,
    PRIMARY KEY ((source_service, time_bucket), original_timestamp, event_id)
) WITH CLUSTERING ORDER BY (original_timestamp DESC);

这里的 Partition Key 是 (source_service, time_bucket)time_bucket 可以是 YYYY-MM-DD 格式的日期字符串。这样设计使得查询某个服务在某一天的所有死信非常高效。original_timestampevent_id 作为 Clustering Key,保证了分区内数据按时间倒序排列,并且唯一。

服务实现 (dead-letter-sink/src)

首先是配置模块 config.rs

// dead-letter-sink/src/config.rs
use serde::Deserialize;
use std::fs;

#[derive(Debug, Deserialize)]
pub struct Config {
    pub server: ServerConfig,
    pub cassandra: CassandraConfig,
    pub consul: ConsulConfig,
}

#[derive(Debug, Deserialize)]
pub struct ServerConfig {
    pub host: String,
    pub port: u16,
}

#[derive(Debug, Deserialize)]
pub struct CassandraConfig {
    pub contact_points: Vec<String>,
    pub keyspace: String,
    pub username: Option<String>,
    pub password: Option<String>,
}

#[derive(Debug, Deserialize)]
pub struct ConsulConfig {
    pub address: String,
    pub service_name: String,
    pub service_id: String,
}

impl Config {
    pub fn from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let content = fs::read_to_string(path)?;
        let config: Config = toml::from_str(&content)?;
        Ok(config)
    }
}

一个示例 config.toml:

[server]
host = "0.0.0.0"
port = 50051

[cassandra]
contact_points = ["127.0.0.1:9042"]
keyspace = "dead_letter_store"
username = "cassandra"
password = "cassandra"

[consul]
address = "http://127.0.0.1:8500"
service_name = "dead-letter-sink"
service_id = "dead-letter-sink-instance-1"

接下来是 Cassandra 的交互逻辑 storage.rs

// dead-letter-sink/src/storage.rs
use cassandra_cpp::{Session, Statement, Cluster, CassError};
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tonic::Status;
use uuid::Uuid;

use crate::config::CassandraConfig;

// 引入生成的 protobuf 类型
pub mod dead_letter {
    tonic::include_proto!("dead_letter");
}
use dead_letter::DeadLetter;

#[derive(Clone)]
pub struct CassandraRepo {
    session: Arc<Session>,
}

impl CassandraRepo {
    pub async fn connect(config: &CassandraConfig) -> Result<Self, CassError> {
        let mut cluster = Cluster::default();
        cluster.set_contact_points(&config.contact_points.join(","))?;
        if let (Some(user), Some(pass)) = (&config.username, &config.password) {
            cluster.set_credentials(user, pass)?;
        }

        // 在生产环境中,需要更精细的策略配置,例如重试策略、负载均衡策略等
        let session = Arc::new(cluster.connect_async().await?);
        Ok(Self { session })
    }

    pub async fn save_dead_letter(&self, letter: DeadLetter) -> Result<(), Status> {
        // 从 Protobuf 的 Timestamp 转换为 chrono::DateTime
        let original_ts: DateTime<Utc> = letter.original_timestamp
            .ok_or_else(|| Status::invalid_argument("original_timestamp is missing"))?
            .try_into()
            .map_err(|_| Status::invalid_argument("Invalid timestamp format"))?;

        // 生成 time_bucket,格式为 YYYY-MM-DD
        let time_bucket = original_ts.format("%Y-%m-%d").to_string();

        let event_id = Uuid::parse_str(&letter.event_id)
            .map_err(|_| Status::invalid_argument("Invalid UUID format for event_id"))?;

        let error_ctx = letter.error_context
            .ok_or_else(|| Status::invalid_argument("error_context is missing"))?;
            
        let cql = "INSERT INTO dead_letters (source_service, time_bucket, original_timestamp, event_id, business_key, original_payload, payload_encoding, error_type, error_message, attempt_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
        
        let mut statement = Statement::new(cql, 10);
        statement.bind(0, &letter.source_service)?;
        statement.bind(1, &time_bucket)?;
        statement.bind(2, original_ts.timestamp_millis())?;
        statement.bind(3, event_id)?;
        statement.bind(4, &letter.business_key)?;
        statement.bind(5, &letter.original_payload)?;
        statement.bind(6, &letter.payload_encoding)?;
        statement.bind(7, &error_ctx.error_type)?;
        statement.bind(8, &error_ctx.error_message)?;
        statement.bind(9, error_ctx.attempt_count as i32)?;

        self.session.execute(&statement).await.map_err(|e| {
            // 这里的错误处理非常重要,如果写入 Cassandra 失败,那么死信就真的丢失了。
            // 生产级系统需要有备用策略,例如写入本地文件日志或另一个备用系统。
            log::error!("Failed to write to Cassandra: {}", e);
            Status::internal("Failed to persist dead letter")
        })?;

        Ok(())
    }
}

然后是 gRPC 服务实现 service.rs:

// dead-letter-sink/src/service.rs
use tonic::{Request, Response, Status};
use crate::storage::{CassandraRepo, dead_letter::*};

use dead_letter_sink_server::{DeadLetterSink};

pub struct DeadLetterSinkService {
    repo: CassandraRepo,
}

impl DeadLetterSinkService {
    pub fn new(repo: CassandraRepo) -> Self {
        Self { repo }
    }
}

#[tonic::async_trait]
impl DeadLetterSink for DeadLetterSinkService {
    async fn send(&self, request: Request<DeadLetter>) -> Result<Response<SendResponse>, Status> {
        let letter = request.into_inner();
        let event_id = letter.event_id.clone();
        
        log::info!("Received dead letter {} from service {}", event_id, letter.source_service);

        // 基本的校验
        if event_id.is_empty() || letter.source_service.is_empty() {
            return Err(Status::invalid_argument("event_id and source_service are required"));
        }
        
        self.repo.save_dead_letter(letter).await?;

        let response = SendResponse {
            event_id,
            success: true,
        };

        Ok(Response::new(response))
    }
}

最后是主程序 main.rs,负责启动服务并注册到 Consul:

```rust
// dead-letter-sink/src/main.rs
mod config;
mod service;
mod storage;

use crate::config::Config;
use crate::service::DeadLetterSinkService;
use crate::storage::dead_letter::dead_letter_sink_server::DeadLetterSinkServer;
use crate::storage::CassandraRepo;
use std::net::SocketAddr;
use tonic::transport::Server;
use consul::{Client, Config as ConsulClientConfig};
use consul::agent::{Agent, ServiceRegistration};

#[tokio::main]
async fn main() -> Result<(), Box> {
env_logger::init();

let config = Config::from_file("dead-letter-sink/config.toml")?;
let server_addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port).parse()?;

// 1. 连接 Cassandra
log::info!("Connecting to Cassandra...");
let cassandra_repo = CassandraRepo::connect(&config.cassandra).await
    .expect("Failed to connect to Cassandra");
log::info!("Cassandra connection established.");

// 2. 注册到 Consul
// 这里的坑在于,服务注册必须在 gRPC 服务启动并监听端口 *之后* 进行,
// 或者健康检查要配置得当,否则 Consul 可能在服务就绪前就将其标记为健康。
// 一个更好的实践是在服务启动后,在另一个 task 中执行注册和健康检查心跳。
register_with_consul(&config).await?;
log::info!("Service registered with Consul: {}", config.consul.service_id);

// 3. 启动 gRPC 服务
let sink_service = DeadLetterSinkService::new(cassandra_repo);
log::info!("DeadLetterSink service listening on {}", server_addr);

Server::builder()
    .add_service(DeadLetterSinkServer::new(sink_service))
    .serve(server_addr)
    .await?;

// 在真实项目中,需要处理 SIGTERM 信号,优雅地从 Consul 注销服务。
// deregister_from_consul(&config).await?;

Ok(())

}

async fn register_with_consul(config: &Config) -> Result<(), Box> {
let consul_config = ConsulClientConfig {
address: config.consul.address.clone(),
..ConsulClientConfig::default()
};
let client = Client::new(consul_config);

// 健康检查的定义非常关键,它决定了 Consul 何时认为我们的服务实例是可用的。
let check = ServiceRegistration {
    name: config.consul.service_name.clone(),
    id: Some(config.consul.service_id.clone()),
    address: Some(config.server.host.clone()),
    port: Some(config.server.port as i32),
    tags: Some(vec

  目录