整合 Kubeflow、Dask、Redis 与 OpenFaaS 构建混合模式实时特征管道


在构建机器学习服务的特征工程体系时,我们通常面临一个两难的抉择。批量计算的特征(例如用户近90天的平均消费额)虽然计算成本可控,但新鲜度不足,无法反映用户的即时意图;而纯流式计算的实时特征(例如用户近30秒的点击次数)虽然能捕捉到最新动态,但其基础设施复杂、资源消耗高,并且并非所有特征都需要如此高的实时性。这种矛盾导致我们不得不寻找一种更灵活、成本效益更高的架构。

我们的目标是构建一个混合模式的特征管道,它必须能同时提供:

  1. 批量预计算特征 (Batch Features): 针对变化缓慢、计算复杂的特征,通过分布式计算框架定时生成,并存入低延迟的在线存储。
  2. 即时派生特征 (On-demand Features): 针对变化极快、或仅在特定请求中才需要的特征,通过一个轻量级的计算服务,在请求时实时生成。

这个架构的核心在于如何将重量级的批处理、轻量级的实时计算以及高性能的存储无缝地编排在一起。在真实项目中,我们最终选定的技术栈是:Kubeflow Pipelines作为工作流编排器,Dask作为分布式批处理引擎,Redis作为在线特征存储,以及OpenFaaS作为即时计算的无服务器函数平台。

架构设计与技术选型考量

这个选型并非偶然,而是基于对生产环境中稳定性和维护性的深度考量。

  • Kubeflow Pipelines: 作为建立在 Kubernetes 之上的工作流平台,它天然具备了管理容器化任务的能力。我们可以将 Dask 作业、模型训练、函数部署等一切操作都定义为 Pipeline 的一个组件(Component),实现了 MLOps 流程的标准化和可复现。
  • Dask on Kubernetes: Dask 提供了与 Pandas 和 Scikit-learn 兼容的 API,让数据科学家可以无缝地将其本地代码扩展到分布式集群中。dask-kubernetes 库使其能够在 Kubernetes 上动态创建和销毁计算集群,完美契合了 Kubeflow 基于容器的调度模式。这里的关键是资源弹性,Dask 作业完成,计算资源就立刻释放。
  • Redis: 在线特征存储要求毫秒级的读取延迟。Redis 不仅满足这个要求,其丰富的数据结构(尤其是 Hash)也非常适合存储结构化的特征数据。例如,一个用户的全部特征可以存储在一个 Hash Key 下,一次 HGETALL 操作即可获取,避免了多次网络往返。
  • OpenFaaS: 为什么不用常见的 Flask 或 FastAPI 应用来做即时计算?因为很多即时特征的计算逻辑非常简单,但调用频率可能呈现极端的峰谷。OpenFaaS 这种无服务器框架,能根据负载自动伸缩到零,极大地节约了空闲时间的资源成本。它让我们能以函数为单位进行部署和管理,非常轻量。

下面是整个系统的架构流程图,展示了批量路径和即时路径如何协同工作。

graph TD
    subgraph "Kubernetes Cluster"
        subgraph "Orchestration Layer: Kubeflow Pipelines"
            A[Pipeline Trigger: CronJob] --> B{Dask Feature Engineering Job};
            B --> C[Write Features to Redis];
            A --> D{Deploy/Update OpenFaaS Function};
        end

        subgraph "Batch Computation Layer"
            B -- Creates --> E[Dask Cluster: Scheduler + Workers];
        end

        subgraph "On-demand Computation Layer"
            F[OpenFaaS Gateway] --> G[Feature Function Pod];
        end

        subgraph "Online Storage Layer"
            H[Redis Cluster];
        end
    end

    subgraph "Data Sources"
        I[Data Lake / DWH];
    end

    subgraph "Consumers"
        J[Inference Service];
    end

    I --> E;
    C --> H;
    J -- HTTP Request --> F;
    G -- Read Base Data --> H;
    F -- Invokes --> G;
    G -- Returns Derived Feature --> J;
    J -- Read Batch Feature --> H;

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style G fill:#ccf,stroke:#333,stroke-width:2px

步骤一:批量特征工程的 Dask 组件实现

这是管道的“重量级”部分。我们的任务是编写一个 Dask 作业,从数据源(假设为 Parquet 文件)中读取数据,计算用户级别的聚合特征,并将其写入 Redis。

首先,是 Dask 作业的核心逻辑。这段代码需要被打包到一个 Docker 镜像中。

batch_feature_job/main.py

import os
import argparse
import logging
from datetime import datetime

import dask.dataframe as dd
from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client
import redis

# --- 配置日志 ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# --- Redis 连接配置 ---
REDIS_HOST = os.environ.get("REDIS_HOST", "redis-master.default.svc.cluster.local")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_DB = int(os.environ.get("REDIS_DB", 0))

def get_redis_client():
    """获取 Redis 连接,包含错误处理"""
    try:
        client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
        client.ping()
        logger.info(f"Successfully connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
        return client
    except redis.exceptions.ConnectionError as e:
        logger.error(f"Failed to connect to Redis: {e}")
        raise

def compute_and_store_features(dask_client: Client, source_path: str, redis_client: redis.Redis):
    """
    使用 Dask 从数据源计算特征并写入 Redis
    """
    try:
        logger.info(f"Reading data from {source_path}")
        # 假设数据源是 Parquet 格式的用户行为日志
        df = dd.read_parquet(source_path)

        # --- 特征计算示例 ---
        # 1. 用户总购买金额
        total_purchase = df.groupby('user_id')['purchase_amount'].sum().compute()
        # 2. 用户近90天购买次数
        # 在真实场景中,这里需要进行时间窗口过滤
        purchase_count_90d = df.groupby('user_id')['event_id'].count().compute()

        logger.info(f"Computed features for {len(total_purchase)} users.")

        # --- 写入 Redis ---
        # 使用 pipeline 批量写入以提高性能
        pipe = redis_client.pipeline()
        for user_id, amount in total_purchase.items():
            key = f"user_features:{user_id}"
            pipe.hset(key, "total_purchase_amount", float(amount))
        
        for user_id, count in purchase_count_90d.items():
            key = f"user_features:{user_id}"
            pipe.hset(key, "purchase_count_90d", int(count))
            # 增加一个元数据字段,记录更新时间
            pipe.hset(key, "_last_updated_utc", datetime.utcnow().isoformat())

        logger.info("Executing Redis pipeline to store features...")
        results = pipe.execute()
        logger.info(f"Redis pipeline executed. Results count: {len(results)}")

    except Exception as e:
        logger.error(f"An error occurred during feature computation or storage: {e}", exc_info=True)
        raise

def main():
    parser = argparse.ArgumentParser(description="Dask Batch Feature Engineering Job")
    parser.add_argument("--source-path", type=str, required=True, help="Path to the source data (e.g., 's3://my-bucket/data/').")
    parser.add_argument("--dask-workers", type=int, default=3, help="Number of Dask workers to spawn.")
    args = parser.parse_args()

    # 在 Kubernetes 中动态创建 Dask 集群
    # 这里的 pod_spec 很关键,它定义了 worker 的资源、镜像等
    pod_spec = make_pod_spec(
        image='your-repo/dask-feature-job:latest', # 使用包含此代码和依赖的镜像
        memory_limit='4G',
        memory_request='2G',
        cpu_limit='2',
        cpu_request='1',
        env={'REDIS_HOST': REDIS_HOST} # 传递环境变量
    )
    
    # 这里的坑在于:Dask KubeCluster 的 namespace 和 RBAC 权限必须正确配置,
    # 否则 scheduler 将无法创建 worker pod。
    with KubeCluster(pod_spec) as cluster:
        cluster.scale(args.dask_workers)
        logger.info(f"Dask cluster dashboard link: {cluster.dashboard_link}")
        
        with Client(cluster) as client:
            redis_conn = get_redis_client()
            compute_and_store_features(client, args.source_path, redis_conn)

if __name__ == "__main__":
    main()

接着,我们为这个 Dask 作业定义一个 Kubeflow Pipeline 组件。

dask_component.yaml

name: Dask Batch Feature Engineering
description: Runs a Dask job on Kubernetes to compute batch features and store them in Redis.

inputs:
  - {name: source_path, type: String, description: 'Path to source data in Parquet format.'}
  - {name: dask_workers, type: Integer, default: '3', description: 'Number of Dask workers.'}

implementation:
  container:
    image: your-repo/dask-feature-job:latest # 与 Dask Worker 相同的镜像
    command: [
      python, main.py,
      --source-path, {inputValue: source_path},
      --dask-workers, {inputValue: dask_workers}
    ]

一个常见的错误是,运行此组件的 Pod 需要有权限在自己的 Namespace 中创建、管理其他 Pod(即 Dask Workers)。这通常需要一个特定的 ServiceAccount 并绑定相应的 RoleRoleBinding。在生产环境中,权限必须最小化。

步骤二:即时派生特征的 OpenFaaS 函数

现在转向“轻量级”部分。假设我们的模型需要一个特征:“用户当前购物车中商品的总价值”。这个特征变化非常快,不适合批处理。我们将用一个 OpenFaaS 函数来实现它。

首先,是 OpenFaaS 函数的处理器代码。

on_demand_feature_function/handler.py

import os
import json
import redis
import logging

# --- 配置 ---
# 在生产环境中,这些配置应该通过 OpenFaaS 的 secret 管理
REDIS_HOST = os.environ.get("REDIS_HOST", "redis-master.default.svc.cluster.local")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_DB = int(os.environ.get("REDIS_DB_CARTS", 1)) # 使用不同的 DB 隔离数据

# --- 日志 ---
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# --- Redis 客户端 ---
# 客户端实例在全局范围创建,利用函数实例的重用,避免每次调用都重建连接
try:
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
    redis_client.ping()
except redis.exceptions.ConnectionError as e:
    # 如果在启动时无法连接,后续调用将失败。这是合理的快速失败策略。
    redis_client = None
    logger.error(f"Initial Redis connection failed: {e}")

def handle(req):
    """
    处理请求,计算用户购物车总价值
    请求体格式: {"user_id": "some_user_id"}
    """
    if redis_client is None:
        return {"error": "Redis connection not available"}, 503

    try:
        body = json.loads(req)
        user_id = body.get("user_id")

        if not user_id:
            return {"error": "user_id is required"}, 400

        # --- 核心逻辑 ---
        # 假设购物车数据存储在 Redis Hash 中,key 为 cart:<user_id>
        # field 为 item_id, value 为 quantity
        cart_key = f"cart:{user_id}"
        cart_items = redis_client.hgetall(cart_key)

        if not cart_items:
            return {"user_id": user_id, "cart_total_value": 0.0}, 200

        # 假设商品价格也存储在 Redis 的一个 Hash 中,key 为 "item_prices"
        item_ids = list(cart_items.keys())
        
        # 使用 MGET 批量获取价格,减少网络延迟
        item_prices = redis_client.hmget("item_prices", item_ids)

        total_value = 0.0
        for item_id, price_str in zip(item_ids, item_prices):
            try:
                quantity = int(cart_items[item_id])
                price = float(price_str) if price_str else 0.0
                total_value += quantity * price
            except (ValueError, TypeError):
                # 如果数据格式错误,记录日志并跳过该项
                logger.warning(f"Invalid data for user {user_id}, item {item_id}. Quantity: {cart_items.get(item_id)}, Price: {price_str}")
                continue

        response = {
            "user_id": user_id,
            "cart_total_value": round(total_value, 2)
        }
        return json.dumps(response), 200, {"Content-Type": "application/json"}

    except json.JSONDecodeError:
        return {"error": "Invalid JSON request body"}, 400
    except redis.exceptions.RedisError as e:
        logger.error(f"Redis error for user_id {body.get('user_id')}: {e}")
        return {"error": "Internal data store error"}, 500
    except Exception as e:
        logger.error(f"Unexpected error for request '{req}': {e}", exc_info=True)
        return {"error": "An unexpected error occurred"}, 500

这个函数的部署配置文件 stack.yml 如下:

on_demand_feature_function/stack.yml

version: 1.0
provider:
  name: openfaas
  gateway: http://openfaas.openfaas.svc.cluster.local:8080

functions:
  on-demand-cart-value:
    lang: python3-http
    handler: ./on-demand_feature_function
    image: your-repo/on-demand-cart-value:latest
    environment:
      REDIS_HOST: redis-master.default.svc.cluster.local
      REDIS_PORT: 6379
      REDIS_DB_CARTS: 1
    labels:
      com.openfaas.scale.min: "1" # 保持至少一个实例以减少冷启动
      com.openfaas.scale.max: "10"
    limits:
      memory: 256Mi
      cpu: 500m
    requests:
      memory: 128Mi
      cpu: 100m
    build_options:
      - dev

步骤三:使用 Kubeflow Pipeline 编排一切

最后,我们用 Kubeflow Pipeline 将 Dask 批量作业和 OpenFaaS 函数的部署串联起来。这个 Pipeline 将会:

  1. 加载 Dask 作业的组件定义。
  2. 创建一个自定义组件,用于通过 faas-cli 部署 OpenFaaS 函数。
  3. 定义一个线性工作流:先执行 Dask 作业,成功后再部署或更新 OpenFaaS 函数。

pipeline.py

import kfp
from kfp import dsl
from kfp.components import load_component_from_file

# --- 加载 Dask 组件 ---
dask_op = load_component_from_file('dask_component.yaml')

# --- OpenFaaS 部署组件 ---
# 在一个真正的项目中,应该创建一个可重用的组件。
# 这里为了演示,我们直接定义一个容器操作。
# 这个容器需要包含 faas-cli 并且配置了访问 OpenFaaS Gateway 的凭证。
# 凭证通常通过 Kubernetes Secret 挂载到 Pod 中。
def deploy_faas_op(image: str, stack_file_path: str, gateway_url: str, function_name: str) -> dsl.ContainerOp:
    """
    一个用于部署 OpenFaaS 函数的 Kubeflow 组件。
    这里的坑是认证,faas-cli 需要登录。
    最佳实践是创建一个包含 gateway 密码的 K8s Secret,并将其挂载到此 Pod 中。
    然后 command 的第一步是从 secret 文件读取密码并执行 `faas-cli login`。
    """
    return dsl.ContainerOp(
        name='Deploy OpenFaaS Function',
        image='your-repo/faas-cli-deployer:latest', # 一个包含 faas-cli 和 stack.yml 的镜像
        command=[
            'sh', '-c',
            # 简化版命令,假设已通过环境变量或 secret 配置好登录
            f'faas-cli deploy -f {stack_file_path} --gateway={gateway_url} --filter {function_name}'
        ],
    )

@dsl.pipeline(
    name='Hybrid Feature Engineering Pipeline',
    description='A pipeline that runs batch feature jobs and deploys on-demand feature functions.'
)
def hybrid_feature_pipeline(
    data_source: str = 's3://my-data/user-events/latest/',
    dask_worker_count: int = 5,
    faas_image_tag: str = 'latest',
    faas_gateway: str = 'http://openfaas.openfaas.svc.cluster.local:8080'
):
    # 步骤 1: 运行 Dask 批量特征作业
    dask_task = dask_op(
        source_path=data_source,
        dask_workers=dask_worker_count
    ).set_display_name('Batch Feature Calculation')

    # 步骤 2: 部署或更新 OpenFaaS 函数
    # 这个步骤依赖于 Dask 作业的成功完成
    # `faas_image_tag` 参数允许我们随流水线更新函数镜像版本
    faas_deploy_task = deploy_faas_op(
        image=f'your-repo/on-demand-cart-value:{faas_image_tag}',
        stack_file_path='./on_demand_feature_function/stack.yml',
        gateway_url=faas_gateway,
        function_name='on-demand-cart-value'
    ).after(dask_task).set_display_name('Deploy On-demand Function')

if __name__ == '__main__':
    # 编译 Pipeline
    kfp.compiler.Compiler().compile(
        pipeline_func=hybrid_feature_pipeline,
        package_path='hybrid_feature_pipeline.yaml'
    )

局限性与未来迭代路径

这套架构虽然解决了混合模式特征供给的问题,但在生产环境中仍有其局限性和需要考量的地方:

  1. 特征一致性问题: 批量特征和即时特征的时间戳是不同步的。模型在训练时如果使用了这两个特征,那么在推理时,两者之间的时间差可能会引入偏差。解决方案之一是在即时特征函数中,也从 Redis 读取批量特征,并一起返回给调用方,至少保证了同一请求内所有特征的时间切片是一致的。
  2. 操作复杂性: 集成了 Kubeflow, Dask, Redis, OpenFaaS 四个系统,对 SRE 团队的技能要求较高。每个组件都需要独立的监控、告警和维护策略。这套方案的价值在于其弹性与性能,对于规模不大的业务来说可能过度设计。
  3. 冷启动敏感度: 尽管我们在 stack.yml 中设置了 scale.min=1,但在流量高峰期,如果 OpenFaaS 函数需要快速扩容,新实例的冷启动延迟(虽然通常在亚秒级)对于某些 P99 延迟要求苛刻的服务仍可能是个问题。更激进的优化包括自定义 OpenFaaS 的调度策略或使用预热机制。

未来的迭代方向可以考虑引入一个中心化的特征注册表(Feature Registry),例如 Feast。当前的架构可以作为 Feast 的一个强大的、自定义的后端执行引擎。通过 Feast,数据科学家可以声明式地定义特征,而无需关心背后是由 Dask 批处理还是 OpenFaaS 函数计算,进一步解耦了特征的定义与实现。


  目录