在构建机器学习服务的特征工程体系时,我们通常面临一个两难的抉择。批量计算的特征(例如用户近90天的平均消费额)虽然计算成本可控,但新鲜度不足,无法反映用户的即时意图;而纯流式计算的实时特征(例如用户近30秒的点击次数)虽然能捕捉到最新动态,但其基础设施复杂、资源消耗高,并且并非所有特征都需要如此高的实时性。这种矛盾导致我们不得不寻找一种更灵活、成本效益更高的架构。
我们的目标是构建一个混合模式的特征管道,它必须能同时提供:
- 批量预计算特征 (Batch Features): 针对变化缓慢、计算复杂的特征,通过分布式计算框架定时生成,并存入低延迟的在线存储。
- 即时派生特征 (On-demand Features): 针对变化极快、或仅在特定请求中才需要的特征,通过一个轻量级的计算服务,在请求时实时生成。
这个架构的核心在于如何将重量级的批处理、轻量级的实时计算以及高性能的存储无缝地编排在一起。在真实项目中,我们最终选定的技术栈是:Kubeflow Pipelines
作为工作流编排器,Dask
作为分布式批处理引擎,Redis
作为在线特征存储,以及OpenFaaS
作为即时计算的无服务器函数平台。
架构设计与技术选型考量
这个选型并非偶然,而是基于对生产环境中稳定性和维护性的深度考量。
- Kubeflow Pipelines: 作为建立在 Kubernetes 之上的工作流平台,它天然具备了管理容器化任务的能力。我们可以将 Dask 作业、模型训练、函数部署等一切操作都定义为 Pipeline 的一个组件(Component),实现了 MLOps 流程的标准化和可复现。
- Dask on Kubernetes: Dask 提供了与 Pandas 和 Scikit-learn 兼容的 API,让数据科学家可以无缝地将其本地代码扩展到分布式集群中。
dask-kubernetes
库使其能够在 Kubernetes 上动态创建和销毁计算集群,完美契合了 Kubeflow 基于容器的调度模式。这里的关键是资源弹性,Dask 作业完成,计算资源就立刻释放。 - Redis: 在线特征存储要求毫秒级的读取延迟。Redis 不仅满足这个要求,其丰富的数据结构(尤其是 Hash)也非常适合存储结构化的特征数据。例如,一个用户的全部特征可以存储在一个 Hash Key 下,一次
HGETALL
操作即可获取,避免了多次网络往返。 - OpenFaaS: 为什么不用常见的 Flask 或 FastAPI 应用来做即时计算?因为很多即时特征的计算逻辑非常简单,但调用频率可能呈现极端的峰谷。OpenFaaS 这种无服务器框架,能根据负载自动伸缩到零,极大地节约了空闲时间的资源成本。它让我们能以函数为单位进行部署和管理,非常轻量。
下面是整个系统的架构流程图,展示了批量路径和即时路径如何协同工作。
graph TD subgraph "Kubernetes Cluster" subgraph "Orchestration Layer: Kubeflow Pipelines" A[Pipeline Trigger: CronJob] --> B{Dask Feature Engineering Job}; B --> C[Write Features to Redis]; A --> D{Deploy/Update OpenFaaS Function}; end subgraph "Batch Computation Layer" B -- Creates --> E[Dask Cluster: Scheduler + Workers]; end subgraph "On-demand Computation Layer" F[OpenFaaS Gateway] --> G[Feature Function Pod]; end subgraph "Online Storage Layer" H[Redis Cluster]; end end subgraph "Data Sources" I[Data Lake / DWH]; end subgraph "Consumers" J[Inference Service]; end I --> E; C --> H; J -- HTTP Request --> F; G -- Read Base Data --> H; F -- Invokes --> G; G -- Returns Derived Feature --> J; J -- Read Batch Feature --> H; style B fill:#f9f,stroke:#333,stroke-width:2px style G fill:#ccf,stroke:#333,stroke-width:2px
步骤一:批量特征工程的 Dask 组件实现
这是管道的“重量级”部分。我们的任务是编写一个 Dask 作业,从数据源(假设为 Parquet 文件)中读取数据,计算用户级别的聚合特征,并将其写入 Redis。
首先,是 Dask 作业的核心逻辑。这段代码需要被打包到一个 Docker 镜像中。
batch_feature_job/main.py
import os
import argparse
import logging
from datetime import datetime
import dask.dataframe as dd
from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client
import redis
# --- 配置日志 ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Redis 连接配置 ---
REDIS_HOST = os.environ.get("REDIS_HOST", "redis-master.default.svc.cluster.local")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_DB = int(os.environ.get("REDIS_DB", 0))
def get_redis_client():
"""获取 Redis 连接,包含错误处理"""
try:
client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
client.ping()
logger.info(f"Successfully connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
return client
except redis.exceptions.ConnectionError as e:
logger.error(f"Failed to connect to Redis: {e}")
raise
def compute_and_store_features(dask_client: Client, source_path: str, redis_client: redis.Redis):
"""
使用 Dask 从数据源计算特征并写入 Redis
"""
try:
logger.info(f"Reading data from {source_path}")
# 假设数据源是 Parquet 格式的用户行为日志
df = dd.read_parquet(source_path)
# --- 特征计算示例 ---
# 1. 用户总购买金额
total_purchase = df.groupby('user_id')['purchase_amount'].sum().compute()
# 2. 用户近90天购买次数
# 在真实场景中,这里需要进行时间窗口过滤
purchase_count_90d = df.groupby('user_id')['event_id'].count().compute()
logger.info(f"Computed features for {len(total_purchase)} users.")
# --- 写入 Redis ---
# 使用 pipeline 批量写入以提高性能
pipe = redis_client.pipeline()
for user_id, amount in total_purchase.items():
key = f"user_features:{user_id}"
pipe.hset(key, "total_purchase_amount", float(amount))
for user_id, count in purchase_count_90d.items():
key = f"user_features:{user_id}"
pipe.hset(key, "purchase_count_90d", int(count))
# 增加一个元数据字段,记录更新时间
pipe.hset(key, "_last_updated_utc", datetime.utcnow().isoformat())
logger.info("Executing Redis pipeline to store features...")
results = pipe.execute()
logger.info(f"Redis pipeline executed. Results count: {len(results)}")
except Exception as e:
logger.error(f"An error occurred during feature computation or storage: {e}", exc_info=True)
raise
def main():
parser = argparse.ArgumentParser(description="Dask Batch Feature Engineering Job")
parser.add_argument("--source-path", type=str, required=True, help="Path to the source data (e.g., 's3://my-bucket/data/').")
parser.add_argument("--dask-workers", type=int, default=3, help="Number of Dask workers to spawn.")
args = parser.parse_args()
# 在 Kubernetes 中动态创建 Dask 集群
# 这里的 pod_spec 很关键,它定义了 worker 的资源、镜像等
pod_spec = make_pod_spec(
image='your-repo/dask-feature-job:latest', # 使用包含此代码和依赖的镜像
memory_limit='4G',
memory_request='2G',
cpu_limit='2',
cpu_request='1',
env={'REDIS_HOST': REDIS_HOST} # 传递环境变量
)
# 这里的坑在于:Dask KubeCluster 的 namespace 和 RBAC 权限必须正确配置,
# 否则 scheduler 将无法创建 worker pod。
with KubeCluster(pod_spec) as cluster:
cluster.scale(args.dask_workers)
logger.info(f"Dask cluster dashboard link: {cluster.dashboard_link}")
with Client(cluster) as client:
redis_conn = get_redis_client()
compute_and_store_features(client, args.source_path, redis_conn)
if __name__ == "__main__":
main()
接着,我们为这个 Dask 作业定义一个 Kubeflow Pipeline 组件。
dask_component.yaml
name: Dask Batch Feature Engineering
description: Runs a Dask job on Kubernetes to compute batch features and store them in Redis.
inputs:
- {name: source_path, type: String, description: 'Path to source data in Parquet format.'}
- {name: dask_workers, type: Integer, default: '3', description: 'Number of Dask workers.'}
implementation:
container:
image: your-repo/dask-feature-job:latest # 与 Dask Worker 相同的镜像
command: [
python, main.py,
--source-path, {inputValue: source_path},
--dask-workers, {inputValue: dask_workers}
]
一个常见的错误是,运行此组件的 Pod 需要有权限在自己的 Namespace 中创建、管理其他 Pod(即 Dask Workers)。这通常需要一个特定的 ServiceAccount
并绑定相应的 Role
和 RoleBinding
。在生产环境中,权限必须最小化。
步骤二:即时派生特征的 OpenFaaS 函数
现在转向“轻量级”部分。假设我们的模型需要一个特征:“用户当前购物车中商品的总价值”。这个特征变化非常快,不适合批处理。我们将用一个 OpenFaaS 函数来实现它。
首先,是 OpenFaaS 函数的处理器代码。
on_demand_feature_function/handler.py
import os
import json
import redis
import logging
# --- 配置 ---
# 在生产环境中,这些配置应该通过 OpenFaaS 的 secret 管理
REDIS_HOST = os.environ.get("REDIS_HOST", "redis-master.default.svc.cluster.local")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_DB = int(os.environ.get("REDIS_DB_CARTS", 1)) # 使用不同的 DB 隔离数据
# --- 日志 ---
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# --- Redis 客户端 ---
# 客户端实例在全局范围创建,利用函数实例的重用,避免每次调用都重建连接
try:
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
redis_client.ping()
except redis.exceptions.ConnectionError as e:
# 如果在启动时无法连接,后续调用将失败。这是合理的快速失败策略。
redis_client = None
logger.error(f"Initial Redis connection failed: {e}")
def handle(req):
"""
处理请求,计算用户购物车总价值
请求体格式: {"user_id": "some_user_id"}
"""
if redis_client is None:
return {"error": "Redis connection not available"}, 503
try:
body = json.loads(req)
user_id = body.get("user_id")
if not user_id:
return {"error": "user_id is required"}, 400
# --- 核心逻辑 ---
# 假设购物车数据存储在 Redis Hash 中,key 为 cart:<user_id>
# field 为 item_id, value 为 quantity
cart_key = f"cart:{user_id}"
cart_items = redis_client.hgetall(cart_key)
if not cart_items:
return {"user_id": user_id, "cart_total_value": 0.0}, 200
# 假设商品价格也存储在 Redis 的一个 Hash 中,key 为 "item_prices"
item_ids = list(cart_items.keys())
# 使用 MGET 批量获取价格,减少网络延迟
item_prices = redis_client.hmget("item_prices", item_ids)
total_value = 0.0
for item_id, price_str in zip(item_ids, item_prices):
try:
quantity = int(cart_items[item_id])
price = float(price_str) if price_str else 0.0
total_value += quantity * price
except (ValueError, TypeError):
# 如果数据格式错误,记录日志并跳过该项
logger.warning(f"Invalid data for user {user_id}, item {item_id}. Quantity: {cart_items.get(item_id)}, Price: {price_str}")
continue
response = {
"user_id": user_id,
"cart_total_value": round(total_value, 2)
}
return json.dumps(response), 200, {"Content-Type": "application/json"}
except json.JSONDecodeError:
return {"error": "Invalid JSON request body"}, 400
except redis.exceptions.RedisError as e:
logger.error(f"Redis error for user_id {body.get('user_id')}: {e}")
return {"error": "Internal data store error"}, 500
except Exception as e:
logger.error(f"Unexpected error for request '{req}': {e}", exc_info=True)
return {"error": "An unexpected error occurred"}, 500
这个函数的部署配置文件 stack.yml
如下:
on_demand_feature_function/stack.yml
version: 1.0
provider:
name: openfaas
gateway: http://openfaas.openfaas.svc.cluster.local:8080
functions:
on-demand-cart-value:
lang: python3-http
handler: ./on-demand_feature_function
image: your-repo/on-demand-cart-value:latest
environment:
REDIS_HOST: redis-master.default.svc.cluster.local
REDIS_PORT: 6379
REDIS_DB_CARTS: 1
labels:
com.openfaas.scale.min: "1" # 保持至少一个实例以减少冷启动
com.openfaas.scale.max: "10"
limits:
memory: 256Mi
cpu: 500m
requests:
memory: 128Mi
cpu: 100m
build_options:
- dev
步骤三:使用 Kubeflow Pipeline 编排一切
最后,我们用 Kubeflow Pipeline 将 Dask 批量作业和 OpenFaaS 函数的部署串联起来。这个 Pipeline 将会:
- 加载 Dask 作业的组件定义。
- 创建一个自定义组件,用于通过
faas-cli
部署 OpenFaaS 函数。 - 定义一个线性工作流:先执行 Dask 作业,成功后再部署或更新 OpenFaaS 函数。
pipeline.py
import kfp
from kfp import dsl
from kfp.components import load_component_from_file
# --- 加载 Dask 组件 ---
dask_op = load_component_from_file('dask_component.yaml')
# --- OpenFaaS 部署组件 ---
# 在一个真正的项目中,应该创建一个可重用的组件。
# 这里为了演示,我们直接定义一个容器操作。
# 这个容器需要包含 faas-cli 并且配置了访问 OpenFaaS Gateway 的凭证。
# 凭证通常通过 Kubernetes Secret 挂载到 Pod 中。
def deploy_faas_op(image: str, stack_file_path: str, gateway_url: str, function_name: str) -> dsl.ContainerOp:
"""
一个用于部署 OpenFaaS 函数的 Kubeflow 组件。
这里的坑是认证,faas-cli 需要登录。
最佳实践是创建一个包含 gateway 密码的 K8s Secret,并将其挂载到此 Pod 中。
然后 command 的第一步是从 secret 文件读取密码并执行 `faas-cli login`。
"""
return dsl.ContainerOp(
name='Deploy OpenFaaS Function',
image='your-repo/faas-cli-deployer:latest', # 一个包含 faas-cli 和 stack.yml 的镜像
command=[
'sh', '-c',
# 简化版命令,假设已通过环境变量或 secret 配置好登录
f'faas-cli deploy -f {stack_file_path} --gateway={gateway_url} --filter {function_name}'
],
)
@dsl.pipeline(
name='Hybrid Feature Engineering Pipeline',
description='A pipeline that runs batch feature jobs and deploys on-demand feature functions.'
)
def hybrid_feature_pipeline(
data_source: str = 's3://my-data/user-events/latest/',
dask_worker_count: int = 5,
faas_image_tag: str = 'latest',
faas_gateway: str = 'http://openfaas.openfaas.svc.cluster.local:8080'
):
# 步骤 1: 运行 Dask 批量特征作业
dask_task = dask_op(
source_path=data_source,
dask_workers=dask_worker_count
).set_display_name('Batch Feature Calculation')
# 步骤 2: 部署或更新 OpenFaaS 函数
# 这个步骤依赖于 Dask 作业的成功完成
# `faas_image_tag` 参数允许我们随流水线更新函数镜像版本
faas_deploy_task = deploy_faas_op(
image=f'your-repo/on-demand-cart-value:{faas_image_tag}',
stack_file_path='./on_demand_feature_function/stack.yml',
gateway_url=faas_gateway,
function_name='on-demand-cart-value'
).after(dask_task).set_display_name('Deploy On-demand Function')
if __name__ == '__main__':
# 编译 Pipeline
kfp.compiler.Compiler().compile(
pipeline_func=hybrid_feature_pipeline,
package_path='hybrid_feature_pipeline.yaml'
)
局限性与未来迭代路径
这套架构虽然解决了混合模式特征供给的问题,但在生产环境中仍有其局限性和需要考量的地方:
- 特征一致性问题: 批量特征和即时特征的时间戳是不同步的。模型在训练时如果使用了这两个特征,那么在推理时,两者之间的时间差可能会引入偏差。解决方案之一是在即时特征函数中,也从 Redis 读取批量特征,并一起返回给调用方,至少保证了同一请求内所有特征的时间切片是一致的。
- 操作复杂性: 集成了 Kubeflow, Dask, Redis, OpenFaaS 四个系统,对 SRE 团队的技能要求较高。每个组件都需要独立的监控、告警和维护策略。这套方案的价值在于其弹性与性能,对于规模不大的业务来说可能过度设计。
- 冷启动敏感度: 尽管我们在
stack.yml
中设置了scale.min=1
,但在流量高峰期,如果 OpenFaaS 函数需要快速扩容,新实例的冷启动延迟(虽然通常在亚秒级)对于某些 P99 延迟要求苛刻的服务仍可能是个问题。更激进的优化包括自定义 OpenFaaS 的调度策略或使用预热机制。
未来的迭代方向可以考虑引入一个中心化的特征注册表(Feature Registry),例如 Feast。当前的架构可以作为 Feast 的一个强大的、自定义的后端执行引擎。通过 Feast,数据科学家可以声明式地定义特征,而无需关心背后是由 Dask 批处理还是 OpenFaaS 函数计算,进一步解耦了特征的定义与实现。