构建基于 NumPy 分析的 Tekton 自定义任务以动态管理 Tyk API 策略


管理超过五十个微服务的 Tyk API Gateway 配置,一直是个棘手的难题。最初,我们所有的 API 定义(API Definition)都是以 JSON 文件的形式存储在 Git 仓库中,通过一个简单的 CI/CD 管道应用到 Tyk 集群。问题在于策略部分,尤其是速率限制(Rate Limiting)和配额(Quotas)。业务发展迅速,流量模式变化莫测,静态配置的 rateper 值很快就变得不合时宜:要么过于宽松,无法保护后端服务;要么过于严格,在流量高峰期误伤正常用户,触发大量 429 Too Many Requests 错误。

手动调整这些值成了一场噩梦。运维团队需要不断地分析 Prometheus 中的监控数据,然后手动更新 Git 仓库里的 JSON 文件,再触发管道。这个过程不仅效率低下,而且极易出错。一个错误的逗号,一个不恰当的数值,都可能导致整个 API 不可用。我们需要的不是更多的人工介入,而是一个能自我调节的自动化系统。

初步构想是,让 CI/CD 管道变得“智能”。在部署新的 API 定义或更新现有定义时,管道应该能自动分析最近的流量数据,并据此计算出合理的速率限制阈值,然后将这些值动态地注入到 Tyk 的 API 定义文件中,最后再应用这个“新鲜出炉”的配置。这本质上是将一部分 SRE 的决策过程代码化、自动化。

技术选型上,我们早已全面拥抱 Kubernetes 和 Tekton。Tekton 的 TaskPipeline 的声明式、可复用特性,非常适合构建这种复杂的、模块化的工作流。问题在于,Tekton 本身并不具备数据分析能力。我们需要一个能够执行计算逻辑的 Task

一个显而易见的方案是编写一个自定义的容器镜像,内置我们的分析脚本。Python 是自然的选择,因为它拥有强大的数据科学生态。而在这个生态中,NumPy 是进行数值计算的基础。用 NumPy 来处理时间序列数据,计算百分位数(percentile)等统计指标,性能高且代码简洁。于是,最终方案浮出水面:创建一个自定义的 Tekton Task,该 Task 运行一个包含 Python 和 NumPy 的容器,接收历史流量数据和基础 API 定义作为输入,输出一个动态调整了速率限制策略的 API 定义。

步骤一:设计并实现自定义 Tekton 任务

我们的核心是那个执行分析的 Python 脚本。这个脚本需要做到:

  1. 从指定路径读取基础的 Tyk API 定义 JSON 文件。
  2. 从另一个路径读取流量指标数据(为简化起见,我们假设这是从 Prometheus 导出的 CSV 文件)。
  3. 使用 NumPy 分析指标数据,计算出新的速率限制阈值。
  4. 将计算出的新值更新到 API 定义的 JSON 结构中。
  5. 将更新后的 JSON 写回到指定输出路径。
  6. 包含必要的日志和错误处理,确保在生产管道中稳定运行。

下面是这个脚本 policy_analyzer.py 的一个生产级实现。

#!/usr/bin/env python3

import os
import json
import logging
import sys
import numpy as np
import pandas as pd

# 配置结构化日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    stream=sys.stdout
)

class TykPolicyAnalyzer:
    """
    分析流量数据并动态调整 Tyk API 定义中的速率限制策略。
    """

    def __init__(self, api_def_path, metrics_path, output_path):
        """
        初始化分析器。

        :param api_def_path: 基础 Tyk API 定义 JSON 文件的路径。
        :param metrics_path: 流量指标数据 (CSV) 的路径。
        :param output_path: 更新后的 API 定义 JSON 文件的输出路径。
        """
        self.api_def_path = api_def_path
        self.metrics_path = metrics_path
        self.output_path = output_path
        
        # 确保输入路径存在
        if not os.path.exists(self.api_def_path):
            logging.error(f"API definition file not found at: {self.api_def_path}")
            raise FileNotFoundError(f"API definition file not found at: {self.api_def_path}")
        if not os.path.exists(self.metrics_path):
            logging.error(f"Metrics file not found at: {self.metrics_path}")
            raise FileNotFoundError(f"Metrics file not found at: {self.metrics_path}")

    def load_data(self):
        """加载 API 定义和指标数据。"""
        try:
            with open(self.api_def_path, 'r') as f:
                self.api_definition = json.load(f)
            logging.info(f"Successfully loaded API definition from {self.api_def_path}")

            # 使用 pandas 读取 CSV,它在处理大型数据集时比标准库更高效
            self.metrics_df = pd.read_csv(self.metrics_path)
            logging.info(f"Successfully loaded metrics data from {self.metrics_path}")
            
            # 数据校验
            if 'requests_per_second' not in self.metrics_df.columns:
                raise ValueError("Metrics data must contain a 'requests_per_second' column.")

        except json.JSONDecodeError as e:
            logging.error(f"Error decoding JSON from {self.api_def_path}: {e}")
            raise
        except Exception as e:
            logging.error(f"An error occurred during data loading: {e}")
            raise

    def analyze_and_calculate_limits(self, percentile=95, safety_margin=1.2):
        """
        使用 NumPy 分析指标数据并计算新的速率限制。
        这里的策略是:将速率限制设置为流量的 P95 值,并增加 20% 的安全余量。
        这样可以覆盖绝大多数正常流量峰值,同时为突发流量留出空间。

        :param percentile: 用于计算阈值的百分位数。
        :param safety_margin: 安全余量乘数。
        :return: (new_rate, new_per) 元组。
        """
        if self.metrics_df.empty:
            logging.warning("Metrics data is empty. Skipping analysis.")
            return None, None

        traffic_data = self.metrics_df['requests_per_second'].to_numpy()
        
        # 使用 NumPy 计算百分位数,这是核心计算步骤
        # P95 是一个稳健的指标,可以有效过滤掉极端异常值
        p_value = np.percentile(traffic_data, percentile)
        logging.info(f"Calculated {percentile}th percentile of traffic: {p_value:.2f} req/s")
        
        # 计算最终速率,并向上取整,因为速率限制必须是整数
        new_rate = int(np.ceil(p_value * safety_margin))
        new_per = 1  # 我们将限制设置为每秒
        
        logging.info(f"Calculated new rate limit: {new_rate} requests per {new_per} second(s)")
        
        # 设定一个最小阈值,防止在流量极低时设置过低的限制
        min_rate_threshold = 10
        if new_rate < min_rate_threshold:
            logging.warning(f"Calculated rate {new_rate} is below minimum threshold {min_rate_threshold}. Adjusting.")
            new_rate = min_rate_threshold

        return new_rate, new_per

    def update_api_definition(self, new_rate, new_per):
        """
        使用新的速率和周期更新 API 定义。
        这个函数需要了解 Tyk API 定义的具体 JSON 结构。
        """
        if new_rate is None or new_per is None:
            logging.warning("No new rate/per values calculated. API definition will not be updated.")
            return

        if 'rate_limits' not in self.api_definition:
             self.api_definition['rate_limits'] = {'global': {}}

        # 更新全局速率限制
        # 在真实项目中,这里可能需要更复杂的逻辑来处理每个版本的不同限制
        self.api_definition['rate_limits']['global'] = {
            "rate": new_rate,
            "per": new_per,
            "throttle_interval": 0,
            "throttle_retry_limit": 0
        }
        
        logging.info("Successfully updated API definition with new rate limits.")

    def save_output(self):
        """将更新后的 API 定义保存到输出文件。"""
        output_dir = os.path.dirname(self.output_path)
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            logging.info(f"Created output directory: {output_dir}")

        try:
            with open(self.output_path, 'w') as f:
                # 使用 indent=2 格式化输出,便于调试和 Git diff
                json.dump(self.api_definition, f, indent=2)
            logging.info(f"Successfully saved updated API definition to {self.output_path}")
        except Exception as e:
            logging.error(f"Failed to save output file: {e}")
            raise

    def run(self):
        """执行完整的分析和更新流程。"""
        logging.info("Starting Tyk policy analysis process...")
        self.load_data()
        new_rate, new_per = self.analyze_and_calculate_limits()
        self.update_api_definition(new_rate, new_per)
        self.save_output()
        logging.info("Tyk policy analysis process completed successfully.")


if __name__ == "__main__":
    # Tekton 会通过环境变量或文件系统将参数传入
    # 我们这里使用约定的路径
    # 在 Tekton Task 中,这些路径将是 Workspace 的挂载点
    base_dir = "/workspace/source"
    
    # 单元测试思路:
    # 1. Mock 文件系统,提供虚拟的 api_def.json 和 metrics.csv。
    # 2. 测试 load_data 在文件不存在或格式错误时的异常处理。
    # 3. 构造不同的 metrics.csv 数据,断言 analyze_and_calculate_limits 的计算结果是否符合预期。
    # 4. 测试 update_api_definition 是否正确修改了 JSON 结构。
    # 5. 验证整个 run() 流程是否能生成正确的输出文件。

    try:
        # Tekton Task会将输入参数映射到文件或环境变量,这里我们硬编码路径用于演示
        api_def_input_path = os.path.join(base_dir, "api-definition.json")
        metrics_input_path = os.path.join(base_dir, "metrics.csv")
        api_def_output_path = os.path.join(base_dir, "api-definition-updated.json")
        
        analyzer = TykPolicyAnalyzer(
            api_def_path=api_def_input_path,
            metrics_path=metrics_input_path,
            output_path=api_def_output_path
        )
        analyzer.run()
    except Exception as e:
        logging.critical(f"A critical error occurred: {e}")
        sys.exit(1)

接下来,我们需要将这个脚本打包到一个容器镜像中,并定义一个 Tekton Task 来使用它。

Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# 安装依赖
RUN pip install --no-cache-dir numpy pandas

COPY policy_analyzer.py .

# 脚本设置为可执行
RUN chmod +x policy_analyzer.py

ENTRYPOINT ["/app/policy_analyzer.py"]

构建并推送镜像到你的镜像仓库,例如 your-registry/tyk-policy-analyzer:v1.0

现在,是定义 Tekton Task 的时候了。
tyk-policy-analyzer-task.yaml:

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: tyk-policy-analyzer
spec:
  description: >-
    Analyzes API traffic metrics to dynamically adjust rate limits in a Tyk API definition.
  workspaces:
    - name: source
      description: The workspace containing input files and for storing output.
  params:
    - name: baseApiDefFile
      type: string
      description: The name of the base Tyk API definition file.
      default: "api-definition.json"
    - name: metricsFile
      type: string
      description: The name of the metrics CSV file.
      default: "metrics.csv"
    - name: updatedApiDefFile
      type: string
      description: The name for the output updated API definition file.
      default: "api-definition-updated.json"
  steps:
    - name: analyze-and-update
      image: your-registry/tyk-policy-analyzer:v1.0
      workingDir: $(workspaces.source.path)
      # 这里的核心是将 Task 的参数和 Workspace 路径传递给 Python 脚本
      # 虽然脚本中是硬编码的,但在一个更健壮的实现中,会通过命令行参数传递
      # 这里我们通过约定的文件位置来简化
      script: |
        #!/bin/bash
        set -e
        
        echo "Starting policy analysis..."
        
        # 确保输入文件确实存在于工作区中
        if [ ! -f "$(workspaces.source.path)/$(params.baseApiDefFile)" ]; then
          echo "Error: Base API definition file not found!"
          exit 1
        fi
        
        if [ ! -f "$(workspaces.source.path)/$(params.metricsFile)" ]; then
          echo "Error: Metrics file not found!"
          exit 1
        fi
        
        # 运行分析脚本
        # 这里我们通过重命名来模拟脚本的输入/输出,以匹配脚本内的硬编码路径
        # 更好的做法是修改Python脚本,让其从命令行参数接收文件路径
        cp "$(workspaces.source.path)/$(params.baseApiDefFile)" "$(workspaces.source.path)/api-definition.json"
        cp "$(workspaces.source.path)/$(params.metricsFile)" "$(workspaces.source.path)/metrics.csv"
        
        /app/policy_analyzer.py
        
        # 将生成的文件重命名为 Task 参数指定的输出文件名
        mv "$(workspaces.source.path)/api-definition-updated.json" "$(workspaces.source.path)/$(params.updatedApiDefFile)"
        
        echo "Policy analysis complete. Updated definition saved to $(params.updatedApiDefFile)."

这个 Task 定义了一个独立、可复用的工作单元。它声明了一个名为 sourceworkspace,用于读写文件,并通过 params 接收输入输出的文件名。

步骤二:构建完整的 Tekton 管道

有了自定义任务,我们就可以将其编排到一个完整的 Pipeline 中。这个管道的流程如下:

  1. 从 Git 仓库克隆包含 Tyk API 定义的源码。
  2. (可选)一个任务,用于从 Prometheus 查询并生成 metrics.csv 文件。为了简化,我们假设这个文件已经存在于 Git 仓库中。
  3. 运行我们刚才创建的 tyk-policy-analyzer 任务。
  4. 最后一个任务,使用 curl 或 Tyk 的管理工具,将更新后的 API 定义应用到 Tyk Gateway。
graph TD
    A[Start] --> B(Clone Git Repo);
    B --> C{Fetch Metrics};
    C --> D(Run tyk-policy-analyzer Task);
    D --> E(Apply Config to Tyk);
    E --> F[End];

以下是管道的 YAML 定义 tyk-dynamic-policy-pipeline.yaml:

apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: tyk-dynamic-policy-pipeline
spec:
  workspaces:
    - name: shared-data
  params:
    - name: repoUrl
      type: string
      description: The git repository URL.
    - name: revision
      type: string
      description: The git revision to clone.
      default: "main"
    - name: tykAdminSecret
      type: string
      description: The secret containing Tyk admin API key.
    - name: tykControlPlaneUrl
      type: string
      description: The URL for the Tyk control plane API.

  tasks:
    - name: fetch-source
      taskRef:
        name: git-clone
      workspaces:
        - name: output
          workspace: shared-data
      params:
        - name: url
          value: $(params.repoUrl)
        - name: revision
          value: $(params.revision)

    # 这是一个占位任务,实际应替换为从监控系统拉取数据的任务
    - name: fetch-metrics-placeholder
      runAfter: [fetch-source]
      workspaces:
        - name: source
          workspace: shared-data
      taskSpec:
        workspaces:
          - name: source
        steps:
          - name: generate-dummy-metrics
            image: bash:5.1
            workingDir: $(workspaces.source.path)
            script: |
              #!/bin/bash
              echo "Simulating metrics fetch..."
              # 在真实场景中,这里会调用Prometheus API
              # 为演示,我们创建一个虚拟的CSV文件
              echo "timestamp,requests_per_second" > metrics.csv
              for i in {1..100}; do
                echo "$i,$(($RANDOM % 50 + 20))" >> metrics.csv
              done
              echo "Dummy metrics.csv created."

    - name: analyze-and-generate-policy
      runAfter: [fetch-metrics-placeholder]
      taskRef:
        name: tyk-policy-analyzer
      workspaces:
        - name: source
          workspace: shared-data
      params:
        - name: baseApiDefFile
          value: "my-service/api-definition.json" # 假设API定义在此路径
        - name: metricsFile
          value: "metrics.csv"
        - name: updatedApiDefFile
          value: "my-service/api-definition-final.json"

    - name: apply-to-tyk
      runAfter: [analyze-and-generate-policy]
      workspaces:
        - name: source
          workspace: shared-data
      params:
        - name: tykAdminSecret
          value: $(params.tykAdminSecret)
        - name: tykControlPlaneUrl
          value: $(params.tykControlPlaneUrl)
      taskSpec:
        workspaces:
          - name: source
        params:
          - name: tykAdminSecret
          - name: tykControlPlaneUrl
        steps:
          - name: apply-config
            image: curlimages/curl:7.78.0
            workingDir: $(workspaces.source.path)
            env:
              - name: TYK_ADMIN_AUTH
                valueFrom:
                  secretKeyRef:
                    name: $(params.tykAdminSecret)
                    key: admin-auth-key
            script: |
              #!/bin/sh
              set -e
              
              API_DEF_FILE="my-service/api-definition-final.json"
              API_DEF_CONTENT=$(cat ${API_DEF_FILE})
              
              # 从API定义中提取API ID,如果存在的话
              API_ID=$(echo ${API_DEF_CONTENT} | grep -o '"api_id": "[^"]*' | grep -o '[^"]*$')
              
              if [ -z "$API_ID" ]; then
                echo "api_id not found in definition, creating new API..."
                # Tyk 创建 API 的端点
                curl -s -X POST \
                  "$(params.tykControlPlaneUrl)/tyk/apis" \
                  -H "Authorization: ${TYK_ADMIN_AUTH}" \
                  -H "Content-Type: application/json" \
                  -d "${API_DEF_CONTENT}"
              else
                echo "api_id '${API_ID}' found, updating existing API..."
                # Tyk 更新 API 的端点
                curl -s -X PUT \
                  "$(params.tykControlPlaneUrl)/tyk/apis/${API_ID}" \
                  -H "Authorization: ${TYK_ADMIN_AUTH}" \
                  -H "Content-Type: application/json" \
                  -d "${API_DEF_CONTENT}"
              fi
              echo "Successfully applied configuration to Tyk."

通过这个管道,我们实现了端到端的自动化。开发者只需关心业务逻辑和基础 API 定义,而速率限制这一关键的运维策略,则由管道根据真实数据自动优化和应用。整个过程遵循了 GitOps 的原则,唯一的“真相来源”仍然是 Git,但管道在应用之前对其进行了增强。

遗留问题与未来迭代

当前方案虽然解决了核心痛点,但仍有几个可以改进的地方。首先,以 CSV 文件作为数据交换媒介较为原始。一个更优雅的方案是 tyk-policy-analyzer 任务直接通过参数接收 Prometheus 的查询 URL 和时间范围,在任务内部使用 Prometheus 的客户端库查询数据,从而消除对中间文件的依赖。

其次,当前的分析模型(P95 + 安全余量)相对简单。对于有明显周期性(如工作日与周末)或业务活动(如大促)的流量模式,这种模型可能不够精确。未来的迭代可以引入更复杂的时序分析模型,例如 ARIMA 或者简单的机器学习模型,来预测未来的流量趋势,从而设定更具前瞻性的速率限制。但这会增加 Task 的复杂度和执行时间,需要在收益和成本之间做出权衡。

最后,该管道目前是针对单个 API 定义进行操作的。在一个大规模的微服务环境中,可能需要一个更高层次的编排逻辑,能够发现仓库中所有已变更的 API 定义,并为每一个并行地执行这个动态策略生成和应用流程。这可以通过 Tekton 的 Matrix 功能来实现,进一步提升效率和可扩展性。


  目录