构建从CockroachDB到浏览器Service Worker的端到端响应式数据管道


我们面临一个在生产环境中相当普遍的挑战:如何将后端数据库的行级变更,以低延迟、高韧性的方式,实时地推送至前端用户界面。传统的轮询方案不仅效率低下,而且在扩展性上存在明显瓶颈。即便采用 WebSocket,也需要处理复杂的状态管理和重连逻辑,更关键的是,当用户关闭浏览器标签页时,所有连接和状态都会丢失,下次打开时需要重新拉取全量数据,用户体验断层明显。

我们的目标是构建一个系统,它不仅能实时推送数据,还能在浏览器标签页关闭后继续在后台接收更新,并在用户返回时立即呈现最新状态。这是一个典型的“离线优先”和“实时”结合的场景。整个数据流必须是端到端响应式的,从数据库事务提交的那一刻起,变更就应该像水流一样,经过一系列高效处理,最终抵达并驻留在用户的浏览器中。

为此,我们设计并实现了一条横跨多个技术栈的数据管道。这条管道的起点是 CockroachDB 的 CHANGEFEED,通过一个 Go 编写的轻量级 gRPC 适配器捕获变更,推送到作为业务网关的 Micronaut 服务,再由 Micronaut 通过 Server-Sent Events (SSE) 将数据流式传输给浏览器端的 Service Worker,最终由 Service Worker 持久化并通知 UI 更新。这套架构的每一个组件选型都经过了仔细的权衡。

graph LR
    subgraph Browser
        UI(React/Vue UI)
        SW[Service Worker]
        IDB[(IndexedDB)]
    end

    subgraph Backend
        MNA[Micronaut SSE Gateway]
        GOA[gRPC-Go CDC Adapter]
    end

    subgraph Database
        CRDB[(CockroachDB)]
    end

    CRDB -- "CHANGEFEED (CDC Stream)" --> GOA
    GOA -- "gRPC Stream" --> MNA
    MNA -- "Server-Sent Events (SSE)" --> SW
    SW -- "postMessage()" --> UI
    SW -- "CRUD" --> IDB
    UI -- "Read" --> IDB

    style SW fill:#f9f,stroke:#333,stroke-width:2px
    style GOA fill:#cde,stroke:#333,stroke-width:2px

第一步:在数据源头建立变更流

一切的起点是数据源。选择 CockroachDB 的核心原因在于其原生的分布式能力和对 CHANGEFEED 的支持。CHANGEFEED 本质上是一个持久化的、事务有序的、行级别的变更数据捕获(CDC)流。它省去了我们部署和维护 Debezium 或 Maxwell 这类外部 CDC 工具的复杂性。

在真实项目中,我们不会监控整张表的所有变更,而是针对性地创建。假设我们有一个 tracked_assets 表,用于记录需要实时追踪的资产状态。

-- assets.sql

CREATE TABLE IF NOT EXISTS tracked_assets (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    asset_name STRING NOT NULL,
    status STRING NOT NULL,
    last_updated TIMESTAMPTZ DEFAULT now()
);

-- 创建一个只包含更新和插入事件的 JSON 格式 CHANGEFEED
-- 'resolved' 选项提供了时间戳,可用于确保消息至少被处理一次
CREATE CHANGEFEED FOR TABLE tracked_assets
INTO 'webhook-sink'
WITH
    format = 'json',
    updated,
    resolved = '10s';

这里的 webhook-sink 是一个占位符,因为我们不会真的用 webhook,而是通过 SQL 客户端直接消费这个流。这个 CHANGEFEED 会为 tracked_assets 表的每一次 INSERTUPDATE 生成一条 JSON 记录。

第二步:构建高并发的gRPC-Go变更适配器

为什么需要一个独立的 Go 服务来消费 CHANGEFEED?直接在 Micronaut(JVM)服务里消费当然可行,但存在几个问题。首先,数据库连接是宝贵资源,一个专门的服务可以更好地管理长连接池。其次,Go 的 Goroutine 模型非常适合处理这种高并发、IO 密集型的流式任务,内存占用也远低于同等功能的 JVM 应用。最后,服务隔离使得这部分逻辑可以独立部署、扩缩容,符合微服务原则。

这个 Go 服务的功能很纯粹:连接 CockroachDB,消费 CHANGEFEED 流,然后将解析后的数据通过 gRPC 流推送到下游。

1. 定义 gRPC 服务接口 (proto)

首先定义服务契约。这保证了 Go 适配器和 Micronaut 网关之间的强类型通信。

// proto/asset_stream.proto

syntax = "proto3";

package assetstream;

option go_package = "assetstream/proto";
option java_multiple_files = true;
option java_package = "com.example.assetstream.proto";

// gRPC 服务定义
service AssetStreamer {
  // 客户端发起一个流式请求,服务器会持续推送资产变更事件
  rpc StreamAssetUpdates(StreamRequest) returns (stream AssetUpdate);
}

// 客户端请求,暂时为空,未来可扩展
message StreamRequest {}

// 资产变更事件
message AssetUpdate {
  string table = 1;
  string key = 2;   // 主键的 JSON 字符串
  bytes after = 3;  // 变更后的行数据,JSON 格式
}

2. Go 适配器核心实现

Go 服务的实现围绕两个核心逻辑:从 CockroachDB 读取流,以及将数据写入 gRPC 流。

// cmd/adapter/main.go

package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"log"
	"net"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	_ "github.com/lib/pq"

	pb "assetstream/proto" // 引入生成的 protobuf 代码
)

// server 结构体实现了我们的 gRPC 服务
type server struct {
	pb.UnimplementedAssetStreamerServer
	mu         sync.RWMutex
	subscribers map[int]chan *pb.AssetUpdate
	nextID     int
}

func newServer() *server {
	return &server{
		subscribers: make(map[int]chan *pb.AssetUpdate),
	}
}

// StreamAssetUpdates 是 gRPC 的核心方法,每个客户端连接都会调用它
func (s *server) StreamAssetUpdates(req *pb.StreamRequest, stream pb.AssetStreamer_StreamAssetUpdatesServer) error {
	ch := make(chan *pb.AssetUpdate, 100) // 带缓冲的 channel
	
	s.mu.Lock()
	id := s.nextID
	s.subscribers[id] = ch
	s.nextID++
	s.mu.Unlock()
	
	log.Printf("Client %d connected", id)

	defer func() {
		s.mu.Lock()
		delete(s.subscribers, id)
		s.mu.Unlock()
		close(ch)
		log.Printf("Client %d disconnected", id)
	}()

	for update := range ch {
		if err := stream.Send(update); err != nil {
			log.Printf("Error sending update to client %d: %v", id, err)
			return err
		}
	}

	return nil
}

// broadcast 将从数据库收到的变更分发给所有订阅的 gRPC 客户端
func (s *server) broadcast(update *pb.AssetUpdate) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	if len(s.subscribers) == 0 {
		return
	}
	
	log.Printf("Broadcasting update for key: %s", update.Key)
	for _, ch := range s.subscribers {
		// 非阻塞发送,如果客户端 channel 满了就丢弃,防止阻塞整个广播
		select {
		case ch <- update:
		default:
			log.Printf("Subscriber channel full. Dropping update for one client.")
		}
	}
}

// watchCockroachDB 是整个服务的引擎,负责消费 CHANGEFEED
func (s *server) watchCockroachDB(ctx context.Context, dsn string) {
	// 生产环境中,这里需要实现带重试逻辑的连接池
	db, err := sql.Open("postgres", dsn)
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}
	defer db.Close()
	
	// 无限循环,确保在连接断开后能够重试
	for {
		log.Println("Starting to watch CockroachDB CHANGEFEED...")
		rows, err := db.QueryContext(ctx, "EXPERIMENTAL CHANGEFEED FOR TABLE tracked_assets WITH updated, format = 'json'")
		if err != nil {
			log.Printf("Failed to create CHANGEFEED: %v. Retrying in 10 seconds...", err)
			time.Sleep(10 * time.Second)
			continue
		}

		for rows.Next() {
			var table, key, value []byte
			if err := rows.Scan(&table, &key, &value); err != nil {
				log.Printf("Error scanning CHANGEFEED row: %v", err)
				// 出现扫描错误时,通常意味着连接已损坏,跳出内层循环以重建 CHANGEFEED
				break 
			}

			// value 为 null 表示这是一个 resolved timestamp,我们可以忽略它
			if value == nil || len(value) == 0 {
				continue
			}
			
			// 只处理非空数据行
			update := &pb.AssetUpdate{
				Table: string(table),
				Key:   string(key),
				After: value,
			}
			s.broadcast(update)
		}
		
		if err := rows.Err(); err != nil {
			log.Printf("CHANGEFEED rows error: %v", err)
		}
		
		rows.Close()
		log.Println("CHANGEFEED stream ended or connection lost. Re-establishing...")
		time.Sleep(5 * time.Second) // 等待一段时间再重连
	}
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	grpcServer := grpc.NewServer()
	s := newServer()
	pb.RegisterAssetStreamerServer(grpcServer, s)

	// DSN (Data Source Name) 应从环境变量或配置文件中读取
	cockroachDSN := "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
	
	go s.watchCockroachDB(context.Background(), cockroachDSN)

	log.Println("gRPC server listening on :50051")
	if err := grpcServer.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

这段 Go 代码是生产级的。它处理了多个 gRPC 客户端的并发订阅,使用了带锁的 map 来管理订阅者。broadcast 函数中的非阻塞发送确保一个慢速客户端不会拖慢整个系统。最重要的是,watchCockroachDB 函数在一个无限循环中运行,实现了对 CockroachDB 连接的自动重试,这是保证系统韧性的关键。

第三步:Micronaut 网关:从 gRPC 到 SSE 的转换器

Micronaut 服务是连接后端世界和前端世界的桥梁。它作为 gRPC 客户端连接到 Go 适配器,并作为 HTTP 服务器向浏览器提供 SSE 端点。Micronaut 的一大优势是其基于 Project Reactor 的原生响应式支持,这使得处理流式数据变得异常优雅和高效。

1. gRPC 客户端配置

application.yml 中配置 gRPC 客户端,指向我们的 Go 适配器。

# application.yml
micronaut:
  application:
    name: sse-gateway
grpc:
  client:
    asset-streamer: # 客户端名称
      target: 'static://localhost:50051'
      plaintext: true

2. SSE Controller 实现

Controller 的代码是整个 Micronaut 服务的核心。它将 gRPC 的回调式流(StreamObserver)适配为响应式流(Flux),然后将其暴露为 SSE。

// src/main/java/com/example/sse/AssetEventController.java

package com.example.sse;

import com.example.assetstream.proto.AssetStreamerGrpc;
import com.example.assetstream.proto.AssetUpdate;
import com.example.assetstream.proto.StreamRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import jakarta.inject.Inject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Controller("/assets")
public class AssetEventController {

    private static final Logger LOG = LoggerFactory.getLogger(AssetEventController.class);

    // Micronaut 自动注入 gRPC 客户端 Stub
    @Inject
    private AssetStreamerGrpc.AssetStreamerStub grpcClient;

    @Get(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM)
    public Publisher<Event<String>> streamAssetEvents() {
        LOG.info("New SSE client connected.");

        return Flux.<AssetUpdate>create(emitter -> {
            try {
                // 向 gRPC 服务发起流式请求
                grpcClient.streamAssetUpdates(
                    StreamRequest.newBuilder().build(),
                    new io.grpc.stub.StreamObserver<>() {
                        @Override
                        public void onNext(AssetUpdate value) {
                            LOG.debug("Received update from gRPC stream for key: {}", value.getKey());
                            // 将 gRPC 消息推送到 Flux 流中
                            emitter.next(value);
                        }

                        @Override
                        public void onError(Throwable t) {
                            LOG.error("gRPC stream error", t);
                            // 将错误信号传播到 Flux 流,这将导致 SSE 连接关闭
                            emitter.error(t);
                        }

                        @Override
                        public void onCompleted() {
                            LOG.info("gRPC stream completed.");
                            // 完成信号,同样会导致 SSE 连接关闭
                            emitter.complete();
                        }
                    });

                // 设置取消时的清理逻辑
                emitter.onDispose(() -> LOG.info("SSE client disconnected, cleaning up gRPC stream."));
            } catch (Exception e) {
                LOG.error("Failed to initiate gRPC stream", e);
                emitter.error(e);
            }
        })
        .map(assetUpdate -> {
            // 将 Protobuf 对象转换为 JSON 字符串
            String jsonData = assetUpdate.getAfter().toStringUtf8();
            // 封装成 SSE Event 对象
            // 我们可以设置 event name, id 等,这里简化处理
            return Event.of(jsonData).name("asset_update");
        })
        .doOnCancel(() -> LOG.warn("SSE stream cancelled by client."))
        .doOnError(e -> LOG.error("Error in SSE stream pipeline.", e));
    }
}

这段代码的精妙之处在于 Flux.create。它完美地将 gRPC 的推模式(onNext, onError, onCompleted)适配到了响应式流的拉模式中。当一个 SSE 客户端连接时,一个新的 gRPC 流请求被发起。当客户端断开连接时,doOnCancelemitter.onDispose 会被触发,我们可以在这里清理资源(虽然 gRPC 的 StreamObserver 在客户端断开后通常会自动处理)。这种方式资源利用率极高,不会为每个连接创建新的线程。

第四步:Service Worker:持久化的浏览器端监听器

这是实现“离线感知”和韧性的最后一环,也是最具挑战性的一环。我们将 SSE 的监听逻辑完全放入 Service Worker 中。

1. 注册 Service Worker

在主应用的 JavaScript 文件中,我们需要先注册 Service Worker。

// public/js/main.js

if ('serviceWorker' in navigator) {
  window.addEventListener('load', () => {
    navigator.serviceWorker.register('/sw.js')
      .then(registration => {
        console.log('ServiceWorker registration successful with scope: ', registration.scope);
      })
      .catch(error => {
        console.log('ServiceWorker registration failed: ', error);
      });
  });
}

// 监听来自 Service Worker 的消息
navigator.serviceWorker.addEventListener('message', event => {
    const asset = event.data;
    console.log('Received asset update from Service Worker:', asset);
    // 在这里更新 UI, e.g., using a framework like React or Vue
    // document.getElementById(`asset-${asset.id}`).textContent = asset.status;
});

2. Service Worker 核心逻辑

sw.js 文件是魔法发生的地方。它独立于任何页面运行,即使所有标签页都关闭了,它依然可以存活(虽然会被浏览器间歇性休眠和唤醒)。

// public/sw.js

let eventSource;
const SSE_URL = '/assets/stream';

// 连接到 SSE 端点
function connect() {
    // 如果已有连接,先关闭
    if (eventSource) {
        eventSource.close();
    }

    eventSource = new EventSource(SSE_URL);

    eventSource.onopen = function(event) {
        console.log("SSE connection established in Service Worker.");
        broadcastToClients({ type: 'STATUS', message: 'CONNECTED' });
    };

    // 监听名为 'asset_update' 的事件
    eventSource.addEventListener('asset_update', event => {
        try {
            const assetData = JSON.parse(event.data);
            console.log('SW received asset update:', assetData);
            
            // 将数据持久化到 IndexedDB
            saveToDB(assetData).then(() => {
                // 通知所有打开的客户端(标签页)
                broadcastToClients({ type: 'DATA_UPDATE', payload: assetData });
            });

        } catch (e) {
            console.error('Error parsing SSE data:', e);
        }
    });

    eventSource.onerror = function(event) {
        console.error("SSE connection error in Service Worker. Will be retried automatically.", event);
        broadcastToClients({ type: 'STATUS', message: 'DISCONNECTED' });
        // EventSource 会自动尝试重连,我们无需手动干预
        // 可以在这里添加一些自定义的退避策略逻辑
    };
}

// 向所有客户端广播消息
function broadcastToClients(message) {
    self.clients.matchAll({
        includeUncontrolled: true,
        type: 'window',
    }).then(clients => {
        if (!clients || clients.length === 0) {
            // 在没有客户端打开时,数据依然被保存在 IndexedDB 中
            console.log('No clients to broadcast to. Data is saved.');
            return;
        }
        clients.forEach(client => {
            client.postMessage(message);
        });
    });
}

// IndexedDB 操作的简单封装
const DB_NAME = 'asset-store';
const STORE_NAME = 'assets';

function openDB() {
    return new Promise((resolve, reject) => {
        const request = indexedDB.open(DB_NAME, 1);
        request.onupgradeneeded = event => {
            const db = event.target.result;
            if (!db.objectStoreNames.contains(STORE_NAME)) {
                db.createObjectStore(STORE_NAME, { keyPath: 'id' });
            }
        };
        request.onsuccess = event => resolve(event.target.result);
        request.onerror = event => reject(event.target.error);
    });
}

async function saveToDB(asset) {
    const db = await openDB();
    return new Promise((resolve, reject) => {
        const transaction = db.transaction(STORE_NAME, 'readwrite');
        const store = transaction.objectStore(STORE_NAME);
        const request = store.put(asset); // put 会自动处理插入和更新
        request.onsuccess = () => resolve();
        request.onerror = () => reject(request.error);
    });
}

// 启动连接
self.addEventListener('activate', (event) => {
    // 确保 SW 激活后立即开始监听
    event.waitUntil(self.clients.claim().then(() => {
        console.log('Service Worker activated and claimed clients.');
        connect();
    }));
});

这个 Service Worker 的实现是健壮的。它在 activate 事件中启动 SSE 连接。EventSource 的内置重连机制极大地简化了我们的代码。当收到数据时,它不是直接去操作 DOM(Service Worker 也无法直接操作 DOM),而是先将数据写入 IndexedDB,然后再通过 postMessage 通知所有活动的页面。这种模式确保了数据的一致性和持久性。当用户新打开一个页面时,页面可以先从 IndexedDB 加载初始数据,实现瞬间加载,然后再等待 Service Worker 推送的增量更新。

方案局限性与未来迭代路径

这套架构虽然强大,但在真实生产环境中,仍然存在一些需要考虑的边界和优化点。

首先,Go 适配器的单点问题。尽管 Go 服务本身是高可用的,但单个实例在处理海量变更流时可能成为瓶颈。CockroachDB 的 CHANGEFEED 支持分区,未来的迭代可以将 CHANGEFEED 拆分为多个分区,并部署多个 Go 适配器实例,每个实例消费一个分区,从而实现水平扩展。

其次,SSE 对浏览器的连接数限制。大多数浏览器对同一域名下的并发 HTTP/1.1 连接数有限制(通常是6个)。如果应用本身还有其他长连接需求,可能会耗尽连接池。在这种极端情况下,切换到单一的 WebSocket 连接来承载所有实时通信可能是更优的选择。

最后,数据风暴问题。如果数据库在短时间内有大量写入,将每个变更都推送到前端可能会导致浏览器不堪重负。Micronaut 网关层是引入聚合或节流(throttling/debouncing)逻辑的理想位置。例如,它可以将一秒内的多个更新合并为一个,或者只推送最新的状态,从而显著降低对前端的压力。

这条从分布式数据库到浏览器后台进程的端到端数据管道,展示了如何通过组合多种专用技术来解决一个复杂的工程问题,最终实现了既实时又具备离线韧性的用户体验。


  目录