我们团队的服务网格在捕获标准L4/L7遥测(如延迟、吞吐量、HTTP状态码)方面已经做得相当不错,但一个持续的痛点是,这些指标与实际业务逻辑几乎完全脱钩。当支付服务的p99延迟飙升时,我们无法立刻知道是影响了所有用户,还是仅仅影响了某个特定支付渠道的高价值订单。为了获取这种深度的业务上下文,唯一的办法是侵入应用代码,手动埋点,然后通过独立的Agent上报。这个过程不仅增加了业务团队的负担,也让遥测系统的迭代与业务应用的发布紧密耦合,这与服务网格解耦基础设施与业务的初衷背道而驰。
最初的构想是在基础设施层,也就是服务网格的数据平面,直接对流量进行深度解析,提取业务语义。如果Sidecar代理不仅能看到HTTP POST请求,还能理解其Payload中携带的是{"transactionId": "...", "amount": 100.0, "currency": "USD", "userTier": "premium"}
这样的业务数据,我们就能在不触碰任何一行业务代码的情况下,生成极具价值的业务指标。
问题是,如何安全、高效地实现这个“智能”的解析逻辑?在Envoy这样的高性能代理中,任何一个有Bug或性能低下的过滤器都可能成为整个系统的瓶颈甚至引爆点。我们考虑过用Lua,但其动态类型和性能表现让我们在处理复杂二进制协议时信心不足。也评估了用C++或Rust从头编写Envoy原生过滤器,但这需要深入理解Envoy的内部机制,开发周期长,且与特定Envoy版本绑定。
最终,我们将目光投向了WebAssembly (WASM)。它提供了一个安全的沙箱环境和稳定的ABI,让我们能用更高级的语言来编写过滤器逻辑,并动态加载到Envoy中。而在语言选择上,我们做了一个大胆的决定:Haskell。
选择Haskell并非一时兴起。对于解析复杂、结构化的网络流量这个任务,Haskell的强项简直是量身定做:
- 表达力与安全性:代数数据类型(ADT)能精确地为我们的内部RPC协议建模。
data PaymentRequest = NewPayment AuthToken MoneyAmount | Refund PaymentId
这样的代码不仅清晰,还能在编译时杜绝无效状态。这对于需要100%正确解析流量的场景至关重要。 - 世界级的解析器库:诸如
Attoparsec
这样的库,让我们能以声明式的方式构建出极其高效、健壮的流式解析器,避免了手动管理缓冲区和状态机的种种陷阱。 - 强大的并发模型:尽管在WASM沙箱中我们无法直接使用GHC的绿色线程,但Haskell处理异步和并发的思维模式有助于我们设计出无阻塞、事件驱动的过滤器逻辑。
- 高可信度:函数式编程的纯粹性使得对核心解析和转换逻辑进行单元测试变得异常简单,我们可以对这部分代码的正确性有极高的信心。
我们的目标是:使用Haskell编写一个Envoy WASM过滤器,它能拦截特定的gRPC-like内部协议流量,解析出业务字段,将它们聚合成结构化的遥测事件,并以批处理方式直接异步推送到我们的OpenSearch集群。
数据流架构设计
整个流程被设计为在数据平面内部完成闭环,避免对控制平面或外部组件产生额外依赖。
sequenceDiagram participant Client participant Envoy as Sidecar (App A) participant App A participant HaskellWASM as Haskell WASM Filter participant OpenSearch Client->>+Envoy: 发起业务请求 (RPC over HTTP/2) Envoy->>+HaskellWASM: onHttpRequestHeaders / onHttpRequestBody Note right of HaskellWASM: 1. 缓冲区累积请求体 HaskellWASM-->>-Envoy: ContinueRequest Envoy->>+App A: 转发请求 App A-->>-Envoy: 业务响应 Envoy->>+HaskellWASM: onHttpResponseHeaders / onHttpResponseBody Note right of HaskellWASM: 2. 累积响应体并触发解析 HaskellWASM->>HaskellWASM: 3. 使用Attoparsec解析请求/响应体 HaskellWASM->>HaskellWASM: 4. 提取业务字段 (e.g., userId, orderValue) HaskellWASM->>HaskellWASM: 5. 聚合遥测事件到本地缓冲区 Note right of HaskellWASM: (当缓冲区满或定时器触发) HaskellWASM-)+OpenSearch: 6. 异步发送批量索引请求 (_bulk API) OpenSearch--)-HaskellWASM: 批量索引响应 HaskellWASM-->>-Envoy: ContinueResponse Envoy-->>-Client: 返回业务响应
这个设计的关键在于Haskell WASM过滤器内部的状态管理和异步通信能力。它不仅是一个无状态的转换器,还是一个微型的、高效的遥测聚合与发送代理。
核心实现:Haskell侧
我们使用asterius
作为GHC到WASM的编译器。首先,定义项目的cabal
文件,引入必要的依赖。
-- app.cabal
cabal-version: 2.4
name: envoy-telemetry-filter
version: 0.1.0.0
author: TechExplorer
build-type: Simple
common deps
ghc-options: -O2
build-depends:
base >=4.12 && <5,
asterius >=0.2,
bytestring,
aeson,
attoparsec,
text,
unordered-containers,
vector
executable filter
import: deps
main-is: Main.hs
hs-source-dirs: app
default-language: Haskell2010
ghc-options:
-fomit-frame-pointer
-fobject-code
-optl--allow-undefined
-optl--export-dynamic
-optl--no-entry
1. 定义业务协议与遥测事件
我们用ADT来精确描述我们内部的Payment
RPC协议。
-- src/Protocol.hs
{-# LANGUAGE OverloadedStrings #-}
module Protocol where
import Data.Aeson
import Data.ByteString (ByteString)
import Data.Text (Text)
-- | 我们的内部RPC协议消息体
data PaymentRequest
= CreatePayment { reqId :: Text, amount :: Double, currency :: Text, userId :: Text }
| GetStatus { reqId :: Text, paymentId :: Text }
deriving (Show, Eq)
data PaymentResponse
= PaymentSuccess { respId :: Text, transactionId :: Text }
| PaymentFailed { respId :: Text, errorCode :: Int, message :: Text }
| StatusResponse { respId :: Text, status :: Text }
deriving (Show, Eq)
-- | 我们要发送到OpenSearch的遥测事件结构
data TelemetryEvent = TelemetryEvent
{ serviceName :: Text
, operationName :: Text
, requestBody :: Value
, responseBody :: Value
, latencyMs :: Int
, isError :: Bool
, userIdentifier :: Text
, transactionValue :: Double
} deriving (Show)
-- | 为遥测事件提供Aeson的ToJSON实例,以便序列化
instance ToJSON TelemetryEvent where
toJSON te = object
[ "service_name" .= serviceName te
, "operation_name" .= operationName te
-- ... 其他字段
, "latency_ms" .= latencyMs te
, "is_error" .= isError te
, "user_id" .= userIdentifier te
, "transaction_value" .= transactionValue te
, "@timestamp" .= ("2023-10-27T10:00:00.000Z" :: Text) -- 实际应为当前时间
]
这里的TelemetryEvent
是我们最终的数据模型,它包含了从请求/响应中提取的业务价值字段,如userIdentifier
和transactionValue
。
2. 实现协议解析器
使用Attoparsec
,我们可以构建一个高效的二进制协议解析器。假设我们的协议是简单的 [4-byte-length][JSON-payload]
。
-- src/Parser.hs
module Parser (parseRequest, parseResponse) where
import Protocol
import Data.Attoparsec.ByteString
import Data.Aeson (eitherDecodeStrict)
import Data.ByteString (ByteString)
import Data.Word (Word32)
import Control.Applicative ((<|>))
-- | 解析一个32位大端整数作为长度前缀
parseLength :: Parser Int
parseLength = fromIntegral <$> anyWord32be
-- | 解析JSON payload
parseJsonPayload :: Parser a
parseJsonPayload = do
len <- parseLength
jsonBytes <- take len
case eitherDecodeStrict jsonBytes of
Left err -> fail $ "JSON decoding failed: " ++ err
Right val -> pure val
-- | 组合成一个完整的请求解析器
parseRequest :: Parser PaymentRequest
parseRequest = parseCreatePayment <|> parseGetStatus
where
parseCreatePayment = CreatePayment <$> (parseJsonPayload >>= (.: "reqId")) <*> ...
parseGetStatus = GetStatus <$> (parseJsonPayload >>= (.: "reqId")) <*> ...
-- | 响应解析器
parseResponse :: Parser PaymentResponse
parseResponse = -- ... 类似实现
这个解析器的美妙之处在于它的组合性。我们可以为每种消息类型编写一个小解析器,然后用<|>
组合它们。Attoparsec
会自动处理回溯,非常健壮。
3. 与Envoy WASM ABI交互
这是连接Haskell世界和Envoy宿主环境的桥梁。我们需要导出遵循Proxy-WASM ABI规范的函数。
-- app/Main.hs
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Main where
import Asterius.Types
import Foreign.Ptr
import Foreign.C.Types
import Data.ByteString.Unsafe (unsafePackCStringLen)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import Data.IORef
import System.IO.Unsafe (unsafePerformIO)
import Data.Aeson (encode)
import Protocol
import Parser
-- | 使用IORef在WASM实例的生命周期内维持状态
-- IORef 在单线程WASM环境中是安全的
data FilterState = FilterState
{ reqBuffer :: BS.ByteString
, respBuffer :: BS.ByteString
, telemetryBuffer :: [TelemetryEvent]
}
-- 全局可变状态,这是必要的恶,因为WASM ABI是基于C的无状态函数调用
{-# NOINLINE globalState #-}
globalState :: IORef FilterState
globalState = unsafePerformIO $ newIORef (FilterState BS.empty BS.empty [])
-- | 外部函数定义,这些是Envoy提供的回调
foreign import ccall "proxy_get_buffer_bytes"
proxy_get_buffer_bytes :: CInt -> CInt -> CInt -> Ptr (Ptr CChar) -> Ptr CInt -> IO CInt
foreign import ccall "proxy_dispatch_http_call"
proxy_dispatch_http_call :: JSVal -> JSVal -> JSVal -> JSVal -> CInt -> IO CInt
-- | 工具函数:从Envoy宿主获取数据块
getBuffer :: CInt -> IO BS.ByteString
getBuffer bufferType = do
-- ... 实现细节:调用proxy_get_buffer_bytes获取数据
-- 这是一个复杂的指针操作,需要非常小心
pure BS.empty -- 简化示例
-- | 当HTTP请求体数据块到达时被调用
onRequestBody :: CInt -> CInt -> CInt -> IO CInt
onRequestBody _contextId _bodyBufferLength _endOfStream = do
chunk <- getBuffer 1 -- 1 for RequestBody
modifyIORef' globalState $ \s -> s { reqBuffer = reqBuffer s <> chunk }
pure 0 -- 0 is Action::Continue
-- | 当HTTP响应体数据块到达时被调用
onResponseBody :: CInt -> CInt -> CInt -> IO CInt
onResponseBody _contextId _bodyBufferLength _endOfStream = do
chunk <- getBuffer 2 -- 2 for ResponseBody
modifyIORef' globalState $ \s -> s { respBuffer = respBuffer s <> chunk }
pure 0 -- Action::Continue
-- | 在日志阶段,所有数据都已收齐,是处理遥测的理想时机
onLog :: CInt -> IO ()
onLog _contextId = do
state <- readIORef globalState
let reqResult = parseOnly parseRequest (reqBuffer state)
let respResult = parseOnly parseResponse (respBuffer state)
case (reqResult, respResult) of
(Right req, Right resp) -> do
let event = createTelemetryEvent req resp
-- 聚合事件
let currentEvents = telemetryBuffer state
let newEvents = event : currentEvents
-- 检查是否需要刷新到OpenSearch
if length newEvents >= 10 then do -- 批量大小为10
flushTelemetry newEvents
-- 清空缓冲区
writeIORef globalState $ state { telemetryBuffer = [] }
else
writeIORef globalState $ state { telemetryBuffer = newEvents }
_ -> pure () -- 解析失败,可以选择记录一个错误
-- | 异步发送遥测数据到OpenSearch
flushTelemetry :: [TelemetryEvent] -> IO ()
flushTelemetry events = do
-- OpenSearch _bulk API要求每行是一个JSON对象,用\n分隔
let payload = LBS.toStrict $ LBS.intercalate "\n" $ map formatForBulk events
-- 构建HTTP请求头
let headers = ... -- ":method POST", ":path /_bulk", "content-type application/json"
-- 调用Envoy的异步HTTP客户端
_ <- proxy_dispatch_http_call "opensearch_cluster" headers (toJSVal payload) (toJSVal ("" :: BS.ByteString)) 5000 -- 5s timeout
pure ()
where
formatForBulk event =
let indexLine = encode $ object ["index" .= object ["_index" .= ("telemetry-haskell" :: String)]]
in indexLine <> "\n" <> encode event
-- | 导出WASM模块的入口点
foreign export ccall "proxy_on_http_request_body" onRequestBody :: CInt -> CInt -> CInt -> IO CInt
foreign export ccall "proxy_on_http_response_body" onResponseBody :: CInt -> CInt -> CInt -> IO CInt
foreign export ccall "proxy_on_log" onLog :: CInt -> IO ()
main :: IO ()
main = pure ()
这段代码是整个方案的核心。globalState
通过IORef
维护了请求-响应流的状态。onRequestBody
和onResponseBody
是流式处理函数,它们只负责把数据块追加到缓冲区。真正的处理逻辑延迟到onLog
阶段,此时请求和响应都已完整,确保我们能关联它们。flushTelemetry
函数构建了符合OpenSearch _bulk
API格式的请求体,并调用proxy_dispatch_http_call
这个由Envoy宿主提供的函数,将遥测数据异步发送出去。这种方式性能极高,因为它复用了Envoy自身成熟的HTTP客户端和连接池,而不是在WASM内部重新实现一套。
Envoy配置
最后,我们需要配置Envoy来加载并运行这个WASM过滤器。
# envoy.yaml
static_resources:
listeners:
- address: # ...
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
http_filters:
- name: envoy.filters.http.wasm
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
config:
name: "haskell_telemetry_filter"
root_id: "my_root_id"
vm_config:
vm_id: "haskell_vm"
runtime: "envoy.wasm.runtime.v8" # 使用V8运行时
code:
local:
filename: "/etc/envoy/filter.wasm" # WASM文件路径
- name: envoy.filters.http.router
typed_config: {}
clusters:
- name: my_app_service
# ...
- name: opensearch_cluster # 必须定义用于遥测上报的集群
connect_timeout: 1s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: opensearch_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: opensearch.local
port_value: 9200
这里的配置定义了一个名为opensearch_cluster
的上游集群,我们的Haskell代码在调用proxy_dispatch_http_call
时会用到这个集群名。envoy.filters.http.wasm
过滤器被插入到HTTP处理链中,指向我们编译好的filter.wasm
文件。
局限性与未来展望
这个方案最令人兴奋的部分是它展现了一种全新的可能性:用一种高度抽象、类型安全的语言来编写底层网络基础设施组件。我们获得的不仅是业务洞察力,更是一种构建高可信网络中间件的工程实践。
然而,这并非没有代价。首先,Haskell到WASM的工具链(如Asterius)远不如Rust或Go成熟,编译出的WASM文件体积较大,且调试体验有待提升。在真实生产环境中,这会是一个需要严肃评估的工程风险。
其次,WASM与宿主环境的交互性能开销是存在的。每次数据拷贝(如getBuffer
)和外部函数调用(FFI)都会带来一定的延迟。尽管我们的批处理和异步发送策略可以摊销大部分成本,但在极端性能敏感的场景下,仍需进行详尽的基准测试,与原生C++过滤器进行对比。
未来的方向非常明确。一方面是持续跟进和贡献Haskell-WASM生态,期待更小的二进制体积和更优化的FFI性能。另一方面,我们可以扩展Haskell过滤器内部的逻辑,比如引入一个嵌入式DSL,让SRE或数据分析师能够以配置的形式动态定义需要从流量中提取哪些字段,而无需重新编译和部署WASM模块。这将把服务网格的动态可配置性提升到一个全新的高度。