我们面临一个在生产环境中相当普遍的挑战:如何将后端数据库的行级变更,以低延迟、高韧性的方式,实时地推送至前端用户界面。传统的轮询方案不仅效率低下,而且在扩展性上存在明显瓶颈。即便采用 WebSocket,也需要处理复杂的状态管理和重连逻辑,更关键的是,当用户关闭浏览器标签页时,所有连接和状态都会丢失,下次打开时需要重新拉取全量数据,用户体验断层明显。
我们的目标是构建一个系统,它不仅能实时推送数据,还能在浏览器标签页关闭后继续在后台接收更新,并在用户返回时立即呈现最新状态。这是一个典型的“离线优先”和“实时”结合的场景。整个数据流必须是端到端响应式的,从数据库事务提交的那一刻起,变更就应该像水流一样,经过一系列高效处理,最终抵达并驻留在用户的浏览器中。
为此,我们设计并实现了一条横跨多个技术栈的数据管道。这条管道的起点是 CockroachDB 的 CHANGEFEED
,通过一个 Go 编写的轻量级 gRPC 适配器捕获变更,推送到作为业务网关的 Micronaut 服务,再由 Micronaut 通过 Server-Sent Events (SSE) 将数据流式传输给浏览器端的 Service Worker,最终由 Service Worker 持久化并通知 UI 更新。这套架构的每一个组件选型都经过了仔细的权衡。
graph LR subgraph Browser UI(React/Vue UI) SW[Service Worker] IDB[(IndexedDB)] end subgraph Backend MNA[Micronaut SSE Gateway] GOA[gRPC-Go CDC Adapter] end subgraph Database CRDB[(CockroachDB)] end CRDB -- "CHANGEFEED (CDC Stream)" --> GOA GOA -- "gRPC Stream" --> MNA MNA -- "Server-Sent Events (SSE)" --> SW SW -- "postMessage()" --> UI SW -- "CRUD" --> IDB UI -- "Read" --> IDB style SW fill:#f9f,stroke:#333,stroke-width:2px style GOA fill:#cde,stroke:#333,stroke-width:2px
第一步:在数据源头建立变更流
一切的起点是数据源。选择 CockroachDB 的核心原因在于其原生的分布式能力和对 CHANGEFEED
的支持。CHANGEFEED
本质上是一个持久化的、事务有序的、行级别的变更数据捕获(CDC)流。它省去了我们部署和维护 Debezium 或 Maxwell 这类外部 CDC 工具的复杂性。
在真实项目中,我们不会监控整张表的所有变更,而是针对性地创建。假设我们有一个 tracked_assets
表,用于记录需要实时追踪的资产状态。
-- assets.sql
CREATE TABLE IF NOT EXISTS tracked_assets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
asset_name STRING NOT NULL,
status STRING NOT NULL,
last_updated TIMESTAMPTZ DEFAULT now()
);
-- 创建一个只包含更新和插入事件的 JSON 格式 CHANGEFEED
-- 'resolved' 选项提供了时间戳,可用于确保消息至少被处理一次
CREATE CHANGEFEED FOR TABLE tracked_assets
INTO 'webhook-sink'
WITH
format = 'json',
updated,
resolved = '10s';
这里的 webhook-sink
是一个占位符,因为我们不会真的用 webhook,而是通过 SQL 客户端直接消费这个流。这个 CHANGEFEED
会为 tracked_assets
表的每一次 INSERT
和 UPDATE
生成一条 JSON 记录。
第二步:构建高并发的gRPC-Go变更适配器
为什么需要一个独立的 Go 服务来消费 CHANGEFEED
?直接在 Micronaut(JVM)服务里消费当然可行,但存在几个问题。首先,数据库连接是宝贵资源,一个专门的服务可以更好地管理长连接池。其次,Go 的 Goroutine 模型非常适合处理这种高并发、IO 密集型的流式任务,内存占用也远低于同等功能的 JVM 应用。最后,服务隔离使得这部分逻辑可以独立部署、扩缩容,符合微服务原则。
这个 Go 服务的功能很纯粹:连接 CockroachDB,消费 CHANGEFEED
流,然后将解析后的数据通过 gRPC 流推送到下游。
1. 定义 gRPC 服务接口 (proto
)
首先定义服务契约。这保证了 Go 适配器和 Micronaut 网关之间的强类型通信。
// proto/asset_stream.proto
syntax = "proto3";
package assetstream;
option go_package = "assetstream/proto";
option java_multiple_files = true;
option java_package = "com.example.assetstream.proto";
// gRPC 服务定义
service AssetStreamer {
// 客户端发起一个流式请求,服务器会持续推送资产变更事件
rpc StreamAssetUpdates(StreamRequest) returns (stream AssetUpdate);
}
// 客户端请求,暂时为空,未来可扩展
message StreamRequest {}
// 资产变更事件
message AssetUpdate {
string table = 1;
string key = 2; // 主键的 JSON 字符串
bytes after = 3; // 变更后的行数据,JSON 格式
}
2. Go 适配器核心实现
Go 服务的实现围绕两个核心逻辑:从 CockroachDB 读取流,以及将数据写入 gRPC 流。
// cmd/adapter/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
_ "github.com/lib/pq"
pb "assetstream/proto" // 引入生成的 protobuf 代码
)
// server 结构体实现了我们的 gRPC 服务
type server struct {
pb.UnimplementedAssetStreamerServer
mu sync.RWMutex
subscribers map[int]chan *pb.AssetUpdate
nextID int
}
func newServer() *server {
return &server{
subscribers: make(map[int]chan *pb.AssetUpdate),
}
}
// StreamAssetUpdates 是 gRPC 的核心方法,每个客户端连接都会调用它
func (s *server) StreamAssetUpdates(req *pb.StreamRequest, stream pb.AssetStreamer_StreamAssetUpdatesServer) error {
ch := make(chan *pb.AssetUpdate, 100) // 带缓冲的 channel
s.mu.Lock()
id := s.nextID
s.subscribers[id] = ch
s.nextID++
s.mu.Unlock()
log.Printf("Client %d connected", id)
defer func() {
s.mu.Lock()
delete(s.subscribers, id)
s.mu.Unlock()
close(ch)
log.Printf("Client %d disconnected", id)
}()
for update := range ch {
if err := stream.Send(update); err != nil {
log.Printf("Error sending update to client %d: %v", id, err)
return err
}
}
return nil
}
// broadcast 将从数据库收到的变更分发给所有订阅的 gRPC 客户端
func (s *server) broadcast(update *pb.AssetUpdate) {
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.subscribers) == 0 {
return
}
log.Printf("Broadcasting update for key: %s", update.Key)
for _, ch := range s.subscribers {
// 非阻塞发送,如果客户端 channel 满了就丢弃,防止阻塞整个广播
select {
case ch <- update:
default:
log.Printf("Subscriber channel full. Dropping update for one client.")
}
}
}
// watchCockroachDB 是整个服务的引擎,负责消费 CHANGEFEED
func (s *server) watchCockroachDB(ctx context.Context, dsn string) {
// 生产环境中,这里需要实现带重试逻辑的连接池
db, err := sql.Open("postgres", dsn)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// 无限循环,确保在连接断开后能够重试
for {
log.Println("Starting to watch CockroachDB CHANGEFEED...")
rows, err := db.QueryContext(ctx, "EXPERIMENTAL CHANGEFEED FOR TABLE tracked_assets WITH updated, format = 'json'")
if err != nil {
log.Printf("Failed to create CHANGEFEED: %v. Retrying in 10 seconds...", err)
time.Sleep(10 * time.Second)
continue
}
for rows.Next() {
var table, key, value []byte
if err := rows.Scan(&table, &key, &value); err != nil {
log.Printf("Error scanning CHANGEFEED row: %v", err)
// 出现扫描错误时,通常意味着连接已损坏,跳出内层循环以重建 CHANGEFEED
break
}
// value 为 null 表示这是一个 resolved timestamp,我们可以忽略它
if value == nil || len(value) == 0 {
continue
}
// 只处理非空数据行
update := &pb.AssetUpdate{
Table: string(table),
Key: string(key),
After: value,
}
s.broadcast(update)
}
if err := rows.Err(); err != nil {
log.Printf("CHANGEFEED rows error: %v", err)
}
rows.Close()
log.Println("CHANGEFEED stream ended or connection lost. Re-establishing...")
time.Sleep(5 * time.Second) // 等待一段时间再重连
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
s := newServer()
pb.RegisterAssetStreamerServer(grpcServer, s)
// DSN (Data Source Name) 应从环境变量或配置文件中读取
cockroachDSN := "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
go s.watchCockroachDB(context.Background(), cockroachDSN)
log.Println("gRPC server listening on :50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
这段 Go 代码是生产级的。它处理了多个 gRPC 客户端的并发订阅,使用了带锁的 map 来管理订阅者。broadcast
函数中的非阻塞发送确保一个慢速客户端不会拖慢整个系统。最重要的是,watchCockroachDB
函数在一个无限循环中运行,实现了对 CockroachDB 连接的自动重试,这是保证系统韧性的关键。
第三步:Micronaut 网关:从 gRPC 到 SSE 的转换器
Micronaut 服务是连接后端世界和前端世界的桥梁。它作为 gRPC 客户端连接到 Go 适配器,并作为 HTTP 服务器向浏览器提供 SSE 端点。Micronaut 的一大优势是其基于 Project Reactor 的原生响应式支持,这使得处理流式数据变得异常优雅和高效。
1. gRPC 客户端配置
在 application.yml
中配置 gRPC 客户端,指向我们的 Go 适配器。
# application.yml
micronaut:
application:
name: sse-gateway
grpc:
client:
asset-streamer: # 客户端名称
target: 'static://localhost:50051'
plaintext: true
2. SSE Controller 实现
Controller 的代码是整个 Micronaut 服务的核心。它将 gRPC 的回调式流(StreamObserver
)适配为响应式流(Flux
),然后将其暴露为 SSE。
// src/main/java/com/example/sse/AssetEventController.java
package com.example.sse;
import com.example.assetstream.proto.AssetStreamerGrpc;
import com.example.assetstream.proto.AssetUpdate;
import com.example.assetstream.proto.StreamRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import jakarta.inject.Inject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Controller("/assets")
public class AssetEventController {
private static final Logger LOG = LoggerFactory.getLogger(AssetEventController.class);
// Micronaut 自动注入 gRPC 客户端 Stub
@Inject
private AssetStreamerGrpc.AssetStreamerStub grpcClient;
@Get(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM)
public Publisher<Event<String>> streamAssetEvents() {
LOG.info("New SSE client connected.");
return Flux.<AssetUpdate>create(emitter -> {
try {
// 向 gRPC 服务发起流式请求
grpcClient.streamAssetUpdates(
StreamRequest.newBuilder().build(),
new io.grpc.stub.StreamObserver<>() {
@Override
public void onNext(AssetUpdate value) {
LOG.debug("Received update from gRPC stream for key: {}", value.getKey());
// 将 gRPC 消息推送到 Flux 流中
emitter.next(value);
}
@Override
public void onError(Throwable t) {
LOG.error("gRPC stream error", t);
// 将错误信号传播到 Flux 流,这将导致 SSE 连接关闭
emitter.error(t);
}
@Override
public void onCompleted() {
LOG.info("gRPC stream completed.");
// 完成信号,同样会导致 SSE 连接关闭
emitter.complete();
}
});
// 设置取消时的清理逻辑
emitter.onDispose(() -> LOG.info("SSE client disconnected, cleaning up gRPC stream."));
} catch (Exception e) {
LOG.error("Failed to initiate gRPC stream", e);
emitter.error(e);
}
})
.map(assetUpdate -> {
// 将 Protobuf 对象转换为 JSON 字符串
String jsonData = assetUpdate.getAfter().toStringUtf8();
// 封装成 SSE Event 对象
// 我们可以设置 event name, id 等,这里简化处理
return Event.of(jsonData).name("asset_update");
})
.doOnCancel(() -> LOG.warn("SSE stream cancelled by client."))
.doOnError(e -> LOG.error("Error in SSE stream pipeline.", e));
}
}
这段代码的精妙之处在于 Flux.create
。它完美地将 gRPC 的推模式(onNext
, onError
, onCompleted
)适配到了响应式流的拉模式中。当一个 SSE 客户端连接时,一个新的 gRPC 流请求被发起。当客户端断开连接时,doOnCancel
或 emitter.onDispose
会被触发,我们可以在这里清理资源(虽然 gRPC 的 StreamObserver
在客户端断开后通常会自动处理)。这种方式资源利用率极高,不会为每个连接创建新的线程。
第四步:Service Worker:持久化的浏览器端监听器
这是实现“离线感知”和韧性的最后一环,也是最具挑战性的一环。我们将 SSE 的监听逻辑完全放入 Service Worker 中。
1. 注册 Service Worker
在主应用的 JavaScript 文件中,我们需要先注册 Service Worker。
// public/js/main.js
if ('serviceWorker' in navigator) {
window.addEventListener('load', () => {
navigator.serviceWorker.register('/sw.js')
.then(registration => {
console.log('ServiceWorker registration successful with scope: ', registration.scope);
})
.catch(error => {
console.log('ServiceWorker registration failed: ', error);
});
});
}
// 监听来自 Service Worker 的消息
navigator.serviceWorker.addEventListener('message', event => {
const asset = event.data;
console.log('Received asset update from Service Worker:', asset);
// 在这里更新 UI, e.g., using a framework like React or Vue
// document.getElementById(`asset-${asset.id}`).textContent = asset.status;
});
2. Service Worker 核心逻辑
sw.js
文件是魔法发生的地方。它独立于任何页面运行,即使所有标签页都关闭了,它依然可以存活(虽然会被浏览器间歇性休眠和唤醒)。
// public/sw.js
let eventSource;
const SSE_URL = '/assets/stream';
// 连接到 SSE 端点
function connect() {
// 如果已有连接,先关闭
if (eventSource) {
eventSource.close();
}
eventSource = new EventSource(SSE_URL);
eventSource.onopen = function(event) {
console.log("SSE connection established in Service Worker.");
broadcastToClients({ type: 'STATUS', message: 'CONNECTED' });
};
// 监听名为 'asset_update' 的事件
eventSource.addEventListener('asset_update', event => {
try {
const assetData = JSON.parse(event.data);
console.log('SW received asset update:', assetData);
// 将数据持久化到 IndexedDB
saveToDB(assetData).then(() => {
// 通知所有打开的客户端(标签页)
broadcastToClients({ type: 'DATA_UPDATE', payload: assetData });
});
} catch (e) {
console.error('Error parsing SSE data:', e);
}
});
eventSource.onerror = function(event) {
console.error("SSE connection error in Service Worker. Will be retried automatically.", event);
broadcastToClients({ type: 'STATUS', message: 'DISCONNECTED' });
// EventSource 会自动尝试重连,我们无需手动干预
// 可以在这里添加一些自定义的退避策略逻辑
};
}
// 向所有客户端广播消息
function broadcastToClients(message) {
self.clients.matchAll({
includeUncontrolled: true,
type: 'window',
}).then(clients => {
if (!clients || clients.length === 0) {
// 在没有客户端打开时,数据依然被保存在 IndexedDB 中
console.log('No clients to broadcast to. Data is saved.');
return;
}
clients.forEach(client => {
client.postMessage(message);
});
});
}
// IndexedDB 操作的简单封装
const DB_NAME = 'asset-store';
const STORE_NAME = 'assets';
function openDB() {
return new Promise((resolve, reject) => {
const request = indexedDB.open(DB_NAME, 1);
request.onupgradeneeded = event => {
const db = event.target.result;
if (!db.objectStoreNames.contains(STORE_NAME)) {
db.createObjectStore(STORE_NAME, { keyPath: 'id' });
}
};
request.onsuccess = event => resolve(event.target.result);
request.onerror = event => reject(event.target.error);
});
}
async function saveToDB(asset) {
const db = await openDB();
return new Promise((resolve, reject) => {
const transaction = db.transaction(STORE_NAME, 'readwrite');
const store = transaction.objectStore(STORE_NAME);
const request = store.put(asset); // put 会自动处理插入和更新
request.onsuccess = () => resolve();
request.onerror = () => reject(request.error);
});
}
// 启动连接
self.addEventListener('activate', (event) => {
// 确保 SW 激活后立即开始监听
event.waitUntil(self.clients.claim().then(() => {
console.log('Service Worker activated and claimed clients.');
connect();
}));
});
这个 Service Worker 的实现是健壮的。它在 activate
事件中启动 SSE 连接。EventSource
的内置重连机制极大地简化了我们的代码。当收到数据时,它不是直接去操作 DOM(Service Worker 也无法直接操作 DOM),而是先将数据写入 IndexedDB,然后再通过 postMessage
通知所有活动的页面。这种模式确保了数据的一致性和持久性。当用户新打开一个页面时,页面可以先从 IndexedDB 加载初始数据,实现瞬间加载,然后再等待 Service Worker 推送的增量更新。
方案局限性与未来迭代路径
这套架构虽然强大,但在真实生产环境中,仍然存在一些需要考虑的边界和优化点。
首先,Go 适配器的单点问题。尽管 Go 服务本身是高可用的,但单个实例在处理海量变更流时可能成为瓶颈。CockroachDB 的 CHANGEFEED
支持分区,未来的迭代可以将 CHANGEFEED
拆分为多个分区,并部署多个 Go 适配器实例,每个实例消费一个分区,从而实现水平扩展。
其次,SSE 对浏览器的连接数限制。大多数浏览器对同一域名下的并发 HTTP/1.1 连接数有限制(通常是6个)。如果应用本身还有其他长连接需求,可能会耗尽连接池。在这种极端情况下,切换到单一的 WebSocket 连接来承载所有实时通信可能是更优的选择。
最后,数据风暴问题。如果数据库在短时间内有大量写入,将每个变更都推送到前端可能会导致浏览器不堪重负。Micronaut 网关层是引入聚合或节流(throttling/debouncing)逻辑的理想位置。例如,它可以将一秒内的多个更新合并为一个,或者只推送最新的状态,从而显著降低对前端的压力。
这条从分布式数据库到浏览器后台进程的端到端数据管道,展示了如何通过组合多种专用技术来解决一个复杂的工程问题,最终实现了既实时又具备离线韧性的用户体验。