一个健壮的分布式系统中,消息或事件处理失败是常态而非意外。直接丢弃失败的请求是不可接受的,无限重试又可能导致系统雪崩。我们面临的第一个具体挑战是:为一个高吞吐的 Rust gRPC 服务(我们称之为OrderProcessor
)设计一个可靠的失败处理机制。当OrderProcessor
处理上游消息失败,并在有限次重试后仍然失败时,需要将该消息及其上下文安全地转移,以供后续的审计、手动干预或自动重放。
传统的做法是引入一个消息队列(如 RabbitMQ 或 Kafka)作为死信队列(Dead Letter Queue, DLQ)。但在我们的场景中,对失败消息的分析需求远超简单的“先进先出”。我们需要能够按来源服务、错误类型、时间窗口进行复杂查询,甚至对失败载荷的内容进行分析。一个标准的消息队列在这方面能力有限。因此,我们构想了一个不同的方案:构建一个专门的 DeadLetterSink
gRPC 服务,它接收死信消息,并将其持久化到具备强大查询能力的 NoSQL 数据库中。
技术选型决策如下:
- RPC框架: Tonic - 既然我们的主业务栈是基于 Rust 的高性能 gRPC 服务,
DeadLetterSink
自然也应使用 Tonic 构建,以保持技术栈统一和生态兼容性。 - 服务发现: Consul - 在动态的微服务环境中,
OrderProcessor
不应硬编码DeadLetterSink
的地址。Consul 提供了服务注册与发现机制,允许OrderProcessor
在需要时动态查找健康的DeadLetterSink
实例。 - 持久化存储: Apache Cassandra - 这是本次架构的核心决策。我们选择 Cassandra 作为 DLQ 的后端存储,原因有三:
- 高写入吞吐量: 死信场景通常是突发性的,系统某个环节故障可能瞬间产生大量死信消息。Cassandra 的无主架构和 LSM-Tree 存储引擎使其能够轻松应对高并发写入。
- 强大的查询能力 (CQL): 相比于消息队列,我们可以通过精心设计的表结构,使用 CQL 对死信进行多维度、细粒度的查询,这对于故障排查至关重要。
- 水平扩展与高可用: Cassandra 的分布式特性确保了死信存储本身不会成为单点故障。
整个流程的架构图如下:
sequenceDiagram participant Client as 调用方 participant OrderProcessor as 订单处理服务 participant Consul participant DeadLetterSink as 死信存储服务 participant Cassandra Client->>+OrderProcessor: ProcessOrder(request) OrderProcessor-->>OrderProcessor: 处理失败 (e.g., 外部依赖超时) OrderProcessor-->>OrderProcessor: 执行本地重试策略 (e.g., 指数退避) Note right of OrderProcessor: 重试 3 次后依旧失败 OrderProcessor->>+Consul: 查询 'dead-letter-sink' 服务地址 Consul-->>-OrderProcessor: 返回健康实例列表 OrderProcessor->>+DeadLetterSink: SendToDLQ(failed_request, error_context) DeadLetterSink->>+Cassandra: INSERT INTO dead_letters (...) Cassandra-->>-DeadLetterSink: 写入成功 DeadLetterSink-->>-OrderProcessor: Ack OrderProcessor-->>-Client: 返回处理失败响应
第一步:定义 Protobuf 和项目结构
我们的核心是DeadLetterSink
服务,所以先定义它的接口。
proto/dead_letter.proto
syntax = "proto3";
package dead_letter;
import "google/protobuf/timestamp.proto";
// 死信存储服务
service DeadLetterSink {
// 发送单条死信
rpc Send (DeadLetter) returns (SendResponse);
}
// 死信消息体
message DeadLetter {
// 唯一ID,用于幂等性控制,建议使用 UUID
string event_id = 1;
// 消息来源的服务名,例如 "order-processor"
string source_service = 2;
// 原始消息的业务标识,例如订单ID
string business_key = 3;
// 序列化后的原始消息载荷
bytes original_payload = 4;
// 原始消息的编码类型,例如 "application/json" or "application/protobuf"
string payload_encoding = 5;
// 记录的错误信息
ErrorContext error_context = 6;
// 消息最初创建的时间戳
google.protobuf.Timestamp original_timestamp = 7;
}
// 错误上下文
message ErrorContext {
// 错误分类,例如 "DATABASE_TIMEOUT", "VALIDATION_ERROR"
string error_type = 1;
// 详细的错误信息或堆栈
string error_message = 2;
// 尝试处理的次数
uint32 attempt_count = 3;
}
message SendResponse {
// 确认接收到的 event_id
string event_id = 1;
// 是否成功
bool success = 2;
}
项目结构如下:
dead-letter-system/
├── dead-letter-sink/ # 死信服务
│ ├── Cargo.toml
│ └── src/
│ ├── main.rs
│ ├── service.rs
│ ├── config.rs
│ └── storage.rs
├── order-processor/ # 模拟的业务服务
│ ├── Cargo.toml
│ └── src/
│ └── main.rs
└── proto/
└── dead_letter.proto
└── build.rs # 用于编译 proto
build.rs
文件:
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(true)
.build_client(true)
.compile(&["proto/dead_letter.proto"], &["proto/"])?;
Ok(())
}
第二步:实现 DeadLetterSink
服务
这个服务是整个架构的核心,它需要处理 gRPC 请求、与 Cassandra 交互,并向 Consul 注册自己。
Cassandra 表结构设计
在 Cassandra 中,数据模型的设计至关重要。我们需要根据查询模式来设计表。常见的查询需求是:
- 按来源服务查找最近的死信。
- 按时间范围和来源服务查找死信。
- 根据
event_id
或business_key
精确查找某条死信。
一个合理的 CQL Schema 如下:
CREATE KEYSPACE IF NOT EXISTS dead_letter_store WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 3
};
USE dead_letter_store;
CREATE TABLE dead_letters (
source_service text,
// 将时间分桶,例如按天,避免分区过大
time_bucket text,
// 将原始时间戳作为集群键,实现按时间排序
original_timestamp timestamp,
// 事件ID作为另一个集群键,保证唯一性
event_id uuid,
business_key text,
original_payload blob,
payload_encoding text,
error_type text,
error_message text,
attempt_count int,
PRIMARY KEY ((source_service, time_bucket), original_timestamp, event_id)
) WITH CLUSTERING ORDER BY (original_timestamp DESC);
这里的 Partition Key 是 (source_service, time_bucket)
。time_bucket
可以是 YYYY-MM-DD
格式的日期字符串。这样设计使得查询某个服务在某一天的所有死信非常高效。original_timestamp
和 event_id
作为 Clustering Key,保证了分区内数据按时间倒序排列,并且唯一。
服务实现 (dead-letter-sink/src
)
首先是配置模块 config.rs
:
// dead-letter-sink/src/config.rs
use serde::Deserialize;
use std::fs;
#[derive(Debug, Deserialize)]
pub struct Config {
pub server: ServerConfig,
pub cassandra: CassandraConfig,
pub consul: ConsulConfig,
}
#[derive(Debug, Deserialize)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
}
#[derive(Debug, Deserialize)]
pub struct CassandraConfig {
pub contact_points: Vec<String>,
pub keyspace: String,
pub username: Option<String>,
pub password: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ConsulConfig {
pub address: String,
pub service_name: String,
pub service_id: String,
}
impl Config {
pub fn from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let content = fs::read_to_string(path)?;
let config: Config = toml::from_str(&content)?;
Ok(config)
}
}
一个示例 config.toml
:
[server]
host = "0.0.0.0"
port = 50051
[cassandra]
contact_points = ["127.0.0.1:9042"]
keyspace = "dead_letter_store"
username = "cassandra"
password = "cassandra"
[consul]
address = "http://127.0.0.1:8500"
service_name = "dead-letter-sink"
service_id = "dead-letter-sink-instance-1"
接下来是 Cassandra 的交互逻辑 storage.rs
:
// dead-letter-sink/src/storage.rs
use cassandra_cpp::{Session, Statement, Cluster, CassError};
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tonic::Status;
use uuid::Uuid;
use crate::config::CassandraConfig;
// 引入生成的 protobuf 类型
pub mod dead_letter {
tonic::include_proto!("dead_letter");
}
use dead_letter::DeadLetter;
#[derive(Clone)]
pub struct CassandraRepo {
session: Arc<Session>,
}
impl CassandraRepo {
pub async fn connect(config: &CassandraConfig) -> Result<Self, CassError> {
let mut cluster = Cluster::default();
cluster.set_contact_points(&config.contact_points.join(","))?;
if let (Some(user), Some(pass)) = (&config.username, &config.password) {
cluster.set_credentials(user, pass)?;
}
// 在生产环境中,需要更精细的策略配置,例如重试策略、负载均衡策略等
let session = Arc::new(cluster.connect_async().await?);
Ok(Self { session })
}
pub async fn save_dead_letter(&self, letter: DeadLetter) -> Result<(), Status> {
// 从 Protobuf 的 Timestamp 转换为 chrono::DateTime
let original_ts: DateTime<Utc> = letter.original_timestamp
.ok_or_else(|| Status::invalid_argument("original_timestamp is missing"))?
.try_into()
.map_err(|_| Status::invalid_argument("Invalid timestamp format"))?;
// 生成 time_bucket,格式为 YYYY-MM-DD
let time_bucket = original_ts.format("%Y-%m-%d").to_string();
let event_id = Uuid::parse_str(&letter.event_id)
.map_err(|_| Status::invalid_argument("Invalid UUID format for event_id"))?;
let error_ctx = letter.error_context
.ok_or_else(|| Status::invalid_argument("error_context is missing"))?;
let cql = "INSERT INTO dead_letters (source_service, time_bucket, original_timestamp, event_id, business_key, original_payload, payload_encoding, error_type, error_message, attempt_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
let mut statement = Statement::new(cql, 10);
statement.bind(0, &letter.source_service)?;
statement.bind(1, &time_bucket)?;
statement.bind(2, original_ts.timestamp_millis())?;
statement.bind(3, event_id)?;
statement.bind(4, &letter.business_key)?;
statement.bind(5, &letter.original_payload)?;
statement.bind(6, &letter.payload_encoding)?;
statement.bind(7, &error_ctx.error_type)?;
statement.bind(8, &error_ctx.error_message)?;
statement.bind(9, error_ctx.attempt_count as i32)?;
self.session.execute(&statement).await.map_err(|e| {
// 这里的错误处理非常重要,如果写入 Cassandra 失败,那么死信就真的丢失了。
// 生产级系统需要有备用策略,例如写入本地文件日志或另一个备用系统。
log::error!("Failed to write to Cassandra: {}", e);
Status::internal("Failed to persist dead letter")
})?;
Ok(())
}
}
然后是 gRPC 服务实现 service.rs
:
// dead-letter-sink/src/service.rs
use tonic::{Request, Response, Status};
use crate::storage::{CassandraRepo, dead_letter::*};
use dead_letter_sink_server::{DeadLetterSink};
pub struct DeadLetterSinkService {
repo: CassandraRepo,
}
impl DeadLetterSinkService {
pub fn new(repo: CassandraRepo) -> Self {
Self { repo }
}
}
#[tonic::async_trait]
impl DeadLetterSink for DeadLetterSinkService {
async fn send(&self, request: Request<DeadLetter>) -> Result<Response<SendResponse>, Status> {
let letter = request.into_inner();
let event_id = letter.event_id.clone();
log::info!("Received dead letter {} from service {}", event_id, letter.source_service);
// 基本的校验
if event_id.is_empty() || letter.source_service.is_empty() {
return Err(Status::invalid_argument("event_id and source_service are required"));
}
self.repo.save_dead_letter(letter).await?;
let response = SendResponse {
event_id,
success: true,
};
Ok(Response::new(response))
}
}
最后是主程序 main.rs
,负责启动服务并注册到 Consul:
```rust
// dead-letter-sink/src/main.rs
mod config;
mod service;
mod storage;
use crate::config::Config;
use crate::service::DeadLetterSinkService;
use crate::storage::dead_letter::dead_letter_sink_server::DeadLetterSinkServer;
use crate::storage::CassandraRepo;
use std::net::SocketAddr;
use tonic::transport::Server;
use consul::{Client, Config as ConsulClientConfig};
use consul::agent::{Agent, ServiceRegistration};
#[tokio::main]
async fn main() -> Result<(), Box
env_logger::init();
let config = Config::from_file("dead-letter-sink/config.toml")?;
let server_addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port).parse()?;
// 1. 连接 Cassandra
log::info!("Connecting to Cassandra...");
let cassandra_repo = CassandraRepo::connect(&config.cassandra).await
.expect("Failed to connect to Cassandra");
log::info!("Cassandra connection established.");
// 2. 注册到 Consul
// 这里的坑在于,服务注册必须在 gRPC 服务启动并监听端口 *之后* 进行,
// 或者健康检查要配置得当,否则 Consul 可能在服务就绪前就将其标记为健康。
// 一个更好的实践是在服务启动后,在另一个 task 中执行注册和健康检查心跳。
register_with_consul(&config).await?;
log::info!("Service registered with Consul: {}", config.consul.service_id);
// 3. 启动 gRPC 服务
let sink_service = DeadLetterSinkService::new(cassandra_repo);
log::info!("DeadLetterSink service listening on {}", server_addr);
Server::builder()
.add_service(DeadLetterSinkServer::new(sink_service))
.serve(server_addr)
.await?;
// 在真实项目中,需要处理 SIGTERM 信号,优雅地从 Consul 注销服务。
// deregister_from_consul(&config).await?;
Ok(())
}
async fn register_with_consul(config: &Config) -> Result<(), Box
let consul_config = ConsulClientConfig {
address: config.consul.address.clone(),
..ConsulClientConfig::default()
};
let client = Client::new(consul_config);
// 健康检查的定义非常关键,它决定了 Consul 何时认为我们的服务实例是可用的。
let check = ServiceRegistration {
name: config.consul.service_name.clone(),
id: Some(config.consul.service_id.clone()),
address: Some(config.server.host.clone()),
port: Some(config.server.port as i32),
tags: Some(vec