构建在 AWS EKS 上的 Django 应用消费 Google Cloud Pub/Sub 消息的多云架构实践


一个无法回避的架构难题摆在面前:核心业务应用部署在 AWS EKS 上,但要求其消息处理系统必须具备跨云的灾难恢复能力,能够抵御单一云厂商的区域性甚至全局性故障。这个要求直接排除了完全依赖 AWS SQS/SNS 的方案,因为即使它们具备跨区域复制能力,其控制平面依然受限于单一厂商的生态系统。我们需要一个真正独立的、高可用的消息中间件作为系统的“主动脉”。

架构决策:为何选择 Google Cloud Pub/Sub

在设计任何关键系统时,对技术选型的权衡是架构师的核心职责。面对这个挑战,我们评估了两种主流方案。

方案 A: AWS 原生方案 (SQS + SNS)

  • 优势:

    • 深度集成: 与 EKS、IAM 等 AWS 服务无缝集成,认证授权逻辑清晰直接,通常使用 IAM Role for Service Accounts (IRSA) 即可。
    • 低延迟: 当生产者和消费者都在 AWS 内部时,网络延迟最低。
    • 生态成熟: 监控、日志、报警等配套设施在 AWS CloudWatch 中有完整的解决方案。
  • 劣势:

    • 厂商锁定: 这是此方案的致命弱点。它无法满足我们抵御单云厂商故障的核心需求。一旦 AWS 的认证服务或消息服务本身出现大范围问题,整个系统将陷入停滞。
    • 跨区域复杂性: 虽然 SNS 可以将消息扇出到不同区域的 SQS 队列,但这增加了配置的复杂性和管理成本,且并未从根本上解决对单一厂商控制平面的依赖。

方案 B: 多云方案 (Google Cloud Pub/Sub)

  • 优势:

    • 真正的解耦: Pub/Sub 是一个全球性的服务,其 Topic 不局限于某个区域。这为我们提供了与 AWS 基础设施完全隔离的通信层,是实现多云韧性的理想选择。
    • 卓越的伸缩性: Pub/Sub 的设计能够轻松处理海量吞吐,其基于拉取 (Pull) 的订阅模型非常适合在 Kubernetes 中进行水平扩展的消费者应用。
    • 简单的客户端逻辑: Google Cloud 提供的客户端库封装了复杂的重试、确认和流量控制逻辑。
  • 劣势:

    • 认证复杂性: 这是最大的挑战。一个运行在 AWS EKS 上的 Pod 如何安全、无凭证地向 Google Cloud API 进行身份验证?
    • 网络成本与延迟: 跨云的数据传输会产生额外的网络出口费用,并且网络延迟相比同云环境会更高。这要求我们评估消息的价值和实时性要求。

最终选择

我们最终选择了方案 B。对于这个核心系统,架构韧性带来的长期价值远超于其带来的认证复杂性和可预期的网络成本。业务的连续性是第一位的。接下来的核心任务,就是攻克 EKS 到 GCP 的安全认证,并构建一个生产级的 Django 消费者。

架构概览

整个系统的交互流程可以被清晰地描绘出来。生产者(可能在任何地方,包括另一个云或本地数据中心)将消息发布到 Google Cloud Pub/Sub 的一个全局 Topic。我们在 AWS EKS 上部署的 Django 消费者应用,通过一个长期运行的 Pod,从对应的 Subscription 中拉取并处理这些消息。

graph TD
    subgraph Google Cloud Platform
        Producer[外部生产者] -->|发布消息| PubSubTopic(Pub/Sub Topic)
        PubSubTopic --> PubSubSubscription(Pub/Sub Subscription)
    end

    subgraph AWS
        subgraph EKS Cluster
            Deployment(K8s Deployment)
            Deployment --> Pod1(Django Pod 1)
            Deployment --> Pod2(Django Pod 2)
            Deployment --> PodN(Django Pod N...)
        end
    end

    subgraph Authentication Flow
        EKS_OIDC[EKS OIDC Provider] -.->|1. 身份提供商| GCP_WIF(GCP Workload Identity Federation)
        K8s_SA(K8s ServiceAccount) -.->|2. 声明身份| GCP_WIF
        GCP_WIF -.->|3. 签发临时Token| Pod1
    end

    Pod1 -->|4. 使用GCP Token拉取消息| PubSubSubscription
    Pod2 -->|4. 使用GCP Token拉取消息| PubSubSubscription
    PodN -->|4. 使用GCP Token拉取消息| PubSubSubscription

核心实现:从认证到消费

步骤一:配置跨云身份认证 (Workload Identity Federation)

这是整个方案中最关键、也是最容易出错的部分。我们的目标是让 EKS 中的 Pod 能够扮演 (impersonate) 一个 Google Cloud Service Account (GCSA),而无需在 Pod 中存储任何静态密钥文件。

  1. 在 EKS 端获取 OIDC Provider 信息:
    每个 EKS 集群都有一个 OIDC Issuer URL,它是我们建立信任关系的基础。

    # 替换 YOUR_CLUSTER_NAME 和 YOUR_REGION
    aws eks describe-cluster --name YOUR_CLUSTER_NAME --region YOUR_REGION --query "cluster.identity.oidc.issuer" --output text
    # 输出示例: https://oidc.eks.us-west-2.amazonaws.com/id/EXAMPLED539D4633E53BF8DE7E7A559B
  2. 在 Google Cloud 端创建 Workload Identity Pool 和 Provider:

    # 替换 YOUR_GCP_PROJECT_ID
    GCP_PROJECT_ID="YOUR_GCP_PROJECT_ID"
    gcloud config set project $GCP_PROJECT_ID
    
    # 创建一个身份池
    gcloud iam workload-identity-pools create "eks-identity-pool" \
      --project="${GCP_PROJECT_ID}" \
      --location="global" \
      --display-name="EKS Identity Pool"
    
    # 获取身份池的全名
    WORKLOAD_IDENTITY_POOL_ID=$(gcloud iam workload-identity-pools describe "eks-identity-pool" \
      --project="${GCP_PROJECT_ID}" \
      --location="global" \
      --format="value(name)")
    
    # 创建 OIDC Provider,并将其与 EKS OIDC Issuer 关联
    # 这里的 OIDC_ISSUER_URL 是上一步从 AWS 获取的
    OIDC_ISSUER_URL="https://oidc.eks.us-west-2.amazonaws.com/id/EXAMPLED539D4633E53BF8DE7E7A559B"
    gcloud iam workload-identity-pools providers create-oidc "eks-oidc-provider" \
      --project="${GCP_PROJECT_ID}" \
      --location="global" \
      --workload-identity-pool="eks-identity-pool" \
      --display-name="EKS OIDC Provider" \
      --attribute-mapping="google.subject=assertion.sub" \
      --issuer-uri="${OIDC_ISSUER_URL}"

    这里的 attribute-mapping 是关键,它将 OIDC Token 中的 sub (subject) 字段映射为 Google Cloud 识别的主体。对于 EKS,这个字段的格式是 system:serviceaccount:<namespace>:<serviceaccount-name>

  3. 创建 Google Cloud Service Account (GCSA) 并授予 Pub/Sub 权限:

    # 创建一个用于消费消息的 GCSA
    gcloud iam service-accounts create "pubsub-consumer-sa" \
      --project="${GCP_PROJECT_ID}" \
      --display-name="Pub/Sub Consumer SA"
    
    # 授予 GCSA 消费 Pub/Sub 消息的权限
    # 这里的 YOUR_SUBSCRIPTION_ID 是你要消费的订阅名
    GCP_SUBSCRIPTION_ID="YOUR_SUBSCRIPTION_ID"
    gcloud pubsub subscriptions add-iam-policy-binding "${GCP_SUBSCRIPTION_ID}" \
      --member="serviceAccount:pubsub-consumer-sa@${GCP_PROJECT_ID}.iam.gserviceaccount.com" \
      --role="roles/pubsub.subscriber"
  4. 建立 K8s ServiceAccount (KSA) 和 GCSA 之间的信任关系:

    # 允许 EKS 中的特定 KSA 模拟 GCSA
    # 这里的 K8S_NAMESPACE 和 K8S_SA_NAME 对应你将在 Kubernetes 中创建的 ServiceAccount
    K8S_NAMESPACE="default"
    K8S_SA_NAME="django-consumer-sa"
    
    gcloud iam service-accounts add-iam-policy-binding "pubsub-consumer-sa@${GCP_PROJECT_ID}.iam.gserviceaccount.com" \
      --project="${GCP_PROJECT_ID}" \
      --role="roles/iam.workloadIdentityUser" \
      --member="principalSet://iam.googleapis.com/${WORKLOAD_IDENTITY_POOL_ID}/subject/system:serviceaccount:${K8S_NAMESPACE}:${K8S_SA_NAME}"

    这条命令的含义是:允许来自我们创建的 eks-identity-pool 中,且主体名称完全匹配 system:serviceaccount:default:django-consumer-sa 的身份,扮演 pubsub-consumer-sa 这个 GCSA。

步骤二:构建生产级的 Django 消费者

我们将创建一个 Django Management Command 作为常驻进程来消费消息。这种方式比视图或 Celery 任务更适合这种专用的、需要长期运行的后台工作。

  1. 项目结构:

    my_django_project/
    ├── myapp/
    │   ├── management/
    │   │   ├── commands/
    │   │   │   ├── __init__.py
    │   │   │   └── consume_pubsub.py
    │   ├── __init__.py
    │   ├── models.py
    │   └── ...
    ├── my_django_project/
    │   ├── settings.py
    │   └── ...
    └── manage.py
  2. 配置文件 settings.py:
    从不硬编码配置。所有配置都应通过环境变量注入。

    # my_django_project/settings.py
    import os
    from pathlib import Path
    
    # ... (其他 Django 配置)
    
    # Google Cloud Pub/Sub 配置
    GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
    GCP_PUBSUB_SUBSCRIPTION_ID = os.environ.get("GCP_PUBSUB_SUBSCRIPTION_ID")
    
    # 用于 Workload Identity Federation 的配置文件路径
    # 这个文件将由 Kubernetes 自动挂载到 Pod 中
    GOOGLE_APPLICATION_CREDENTIALS = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
    
    # 消费者配置
    CONSUMER_MAX_MESSAGES = int(os.environ.get("CONSUMER_MAX_MESSAGES", 10))
    CONSUMER_MAX_LEASE_DURATION = int(os.environ.get("CONSUMER_MAX_LEASE_DURATION", 60)) # seconds
  3. 消费者命令 consume_pubsub.py:
    这是代码的核心。它必须健壮,处理各种异常,并正确地确认 (ack) 或否定确认 (nack) 消息。

    # myapp/management/commands/consume_pubsub.py
    
    import logging
    import json
    from concurrent.futures import TimeoutError
    from django.core.management.base import BaseCommand
    from django.conf import settings
    from google.cloud import pubsub_v1
    from google.api_core import exceptions as google_exceptions
    
    # 配置一个专用的 logger
    logger = logging.getLogger("pubsub_consumer")
    
    class Command(BaseCommand):
        help = "Starts a long-running process to consume messages from Google Cloud Pub/Sub"
    
        def handle(self, *args, **options):
            if not all([settings.GCP_PROJECT_ID, settings.GCP_PUBSUB_SUBSCRIPTION_ID]):
                logger.error("GCP_PROJECT_ID and GCP_PUBSUB_SUBSCRIPTION_ID must be set in environment variables.")
                return
    
            subscriber = pubsub_v1.SubscriberClient()
            subscription_path = subscriber.subscription_path(settings.GCP_PROJECT_ID, settings.GCP_PUBSUB_SUBSCRIPTION_ID)
    
            logger.info(f"Starting consumer for subscription: {subscription_path}")
            logger.info(f"Using credentials from: {settings.GOOGLE_APPLICATION_CREDENTIALS or 'default location'}")
    
            # flow_control 控制客户端如何管理消息流
            # 这里的配置意味着客户端最多同时处理10条消息
            flow_control = pubsub_v1.types.FlowControl(max_messages=settings.CONSUMER_MAX_MESSAGES)
    
            # 使用 streaming_pull 来高效地接收消息
            streaming_pull_future = subscriber.subscribe(
                subscription_path,
                callback=self.process_message,
                flow_control=flow_control
            )
    
            self.stdout.write(self.style.SUCCESS(f"Listening for messages on {subscription_path}..."))
    
            try:
                # result() 是一个阻塞调用,它会一直运行直到 future 被取消或出现异常
                streaming_pull_future.result()
            except TimeoutError:
                streaming_pull_future.cancel()
                streaming_pull_future.result() # 等待清理完成
                logger.warning("Consumer stopped due to timeout.")
            except google_exceptions.GoogleAPICallError as e:
                logger.error(f"Google API Error: {e}", exc_info=True)
                streaming_pull_future.cancel()
            except KeyboardInterrupt:
                logger.info("Graceful shutdown initiated by user.")
                streaming_pull_future.cancel()
            except Exception as e:
                logger.critical(f"An unexpected error occurred: {e}", exc_info=True)
                streaming_pull_future.cancel()
            finally:
                subscriber.close()
                logger.info("Subscriber client closed. Exiting.")
    
        def process_message(self, message: pubsub_v1.subscriber.message.Message) -> None:
            """
            Callback function to process a single Pub/Sub message.
            业务逻辑的核心。必须保证幂等性。
            """
            try:
                # 这里的 message.data 是 bytes 类型
                data_str = message.data.decode("utf-8")
                payload = json.loads(data_str)
                message_id = message.message_id
                
                logger.info(f"Received message ID: {message_id} with data: {payload}")
    
                # =========================================================
                # 在这里实现你的核心业务逻辑
                # 例如:更新数据库,调用其他服务等
                # 这个过程必须是幂等的,因为 Pub/Sub 保证至少一次的投递语义
                # self.handle_business_logic(payload)
                # =========================================================
    
                # 如果业务逻辑成功,必须确认消息
                logger.info(f"Acknowledging message ID: {message_id}")
                message.ack()
    
            except json.JSONDecodeError as e:
                logger.error(f"Failed to decode message data: {message.data}. Error: {e}")
                # 消息格式错误,我们不希望它被重试,直接确认
                message.ack()
            except Exception as e:
                # 任何其他的业务逻辑异常,我们否定确认消息
                # Pub/Sub 会在稍后重新投递这个消息
                logger.error(f"Error processing message ID {message.message_id}: {e}", exc_info=True)
                message.nack()
    
    # def handle_business_logic(self, payload):
    #     # 模拟数据库操作
    #     from myapp.models import ProcessedEvent
    #     event_id = payload.get('event_id')
    #     if not event_id:
    #         raise ValueError("Missing event_id in payload")
    #
    #     # 幂等性检查
    #     if ProcessedEvent.objects.filter(event_id=event_id).exists():
    #         logger.warning(f"Event {event_id} already processed. Skipping.")
    #         return
    #
    #     ProcessedEvent.objects.create(event_id=event_id, data=payload)
    #     logger.info(f"Successfully processed and stored event {event_id}")
    

    这里的坑在于:message.ack()message.nack() 的调用时机。ack() 表示成功处理,消息将不再被投递。nack() 表示临时失败,消息将很快被重新投递。如果是一个无法修复的错误(比如JSON格式错误),直接ack()掉,避免无限重试循环,这种消息通常被称为“毒丸消息”。

步骤三:容器化与 Kubernetes 部署

  1. Dockerfile:
    一个生产级的 Dockerfile 应该使用多阶段构建,减小最终镜像的体积,并以非 root 用户运行。

    # --- Build Stage ---
    FROM python:3.10-slim-bullseye AS builder
    
    ENV PYTHONDONTWRITEBYTECODE 1
    ENV PYTHONUNBUFFERED 1
    
    WORKDIR /app
    
    COPY requirements.txt .
    # 使用虚拟环境,并只安装生产依赖
    RUN python -m venv /opt/venv
    ENV PATH="/opt/venv/bin:$PATH"
    RUN pip install --no-cache-dir -r requirements.txt
    
    # --- Final Stage ---
    FROM python:3.10-slim-bullseye AS final
    
    # 创建一个非 root 用户来运行应用
    RUN groupadd -r django && useradd -r -g django django
    
    WORKDIR /home/django/app
    
    # 从构建阶段拷贝虚拟环境和代码
    COPY --from=builder /opt/venv /opt/venv
    COPY . .
    
    # 更改文件所有权
    RUN chown -R django:django /home/django/app
    
    USER django
    
    ENV PATH="/opt/venv/bin:$PATH"
    
    # 容器启动时执行的命令
    CMD ["python", "manage.py", "consume_pubsub"]

    requirements.txt 应包含 django, google-cloud-pubsub, google-auth 等库。

  2. Kubernetes Manifests (deployment.yaml):
    这是将所有部分串联起来的最后一步。

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: django-consumer-sa
      namespace: default
      # 这个注解是 EKS IRSA 的机制,它会让 EKS 的 Pod Identity Webhook
      # 注入 AWS 相关的环境变量和 token,用于后续交换 GCP token。
      # 但在这个场景下,我们主要依赖 K8s projected token,所以这个注解不是必须的,
      # 但保留它以防未来需要与 AWS 服务交互。
      annotations:
        eks.amazonaws.com/role-arn: "arn:aws:iam::YOUR_AWS_ACCOUNT_ID:role/YourEksRoleForOtherPurposes" # 可选
    
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: django-pubsub-consumer
      namespace: default
    spec:
      replicas: 2 # 从2个副本开始,可以根据负载进行 HPA
      selector:
        matchLabels:
          app: django-consumer
      template:
        metadata:
          labels:
            app: django-consumer
        spec:
          serviceAccountName: django-consumer-sa
          containers:
            - name: django-consumer-app
              image: YOUR_DOCKER_REGISTRY/django-consumer:latest
              imagePullPolicy: Always
              resources:
                requests:
                  cpu: "250m"
                  memory: "256Mi"
                limits:
                  cpu: "500m"
                  memory: "512Mi"
              env:
                - name: DJANGO_SETTINGS_MODULE
                  value: "my_django_project.settings"
                - name: DATABASE_URL
                  valueFrom:
                    secretKeyRef:
                      name: django-db-secret
                      key: url
                # 关键环境变量,注入 GCP 配置
                - name: GCP_PROJECT_ID
                  value: "your-gcp-project-id"
                - name: GCP_PUBSUB_SUBSCRIPTION_ID
                  value: "your-subscription-id"
                # 这两个环境变量由 Kubelet 自动设置,用于 Workload Identity Federation
                - name: GOOGLE_APPLICATION_CREDENTIALS
                  value: "/var/run/secrets/sts.googleapis.com/service-account-token/token"
                - name: GOOGLE_CLOUD_PROJECT
                  value: "your-gcp-project-id"
              # 挂载由 Kubelet 投影的服务账号令牌,用于与 GCP STS 交换
              volumeMounts:
              - name: gcp-sa-token
                mountPath: /var/run/secrets/sts.googleapis.com/service-account-token
                readOnly: true
          volumes:
          - name: gcp-sa-token
            projected:
              sources:
              - serviceAccountToken:
                  path: token
                  # audience 必须与 GCP Workload Identity Pool Provider 中配置的受众匹配
                  # 这里使用 Workload Identity Pool 的全名
                  audience: "//iam.googleapis.com/projects/YOUR_GCP_PROJECT_NUMBER/locations/global/workloadIdentityPools/eks-identity-pool/providers/eks-oidc-provider"
                  expirationSeconds: 3600

    这里的坑在于 volumesvolumeMounts 部分。我们使用了一个 projected volume 来挂载一个为特定 audience 生成的 Kubernetes Service Account Token。Google Cloud 的客户端库会自动检测到 GOOGLE_APPLICATION_CREDENTIALS 环境变量,读取这个 Token,然后用它与 Google 的 STS (Security Token Service) API 通信,最终换取一个可以访问 Pub/Sub 的短期 GCP Access Token。这个过程对应用程序是完全透明的。

架构的局限性与未来展望

此方案并非银弹。它引入了跨云的复杂性。运维团队需要同时具备 AWS 和 GCP 的专业知识,尤其是在排查认证问题时。AWS 到 GCP 的数据出口会产生费用,对于消息量巨大的场景,这笔开销必须被精确计算和评估。此外,跨云的网络延迟也是一个不可忽视的因素,这个架构更适合对延迟不敏感的异步任务,而非需要实时响应的场景。

未来的优化路径是清晰的。可以引入 KEDA (Kubernetes-based Event-Driven Autoscaling),根据 Google Cloud Pub/Sub 订阅中的未确认消息数量来动态伸缩消费者 Pod 的数量,实现更精细化的成本和性能控制。在可观测性方面,Django 应用需要暴露 Prometheus 指标,例如消息处理的延迟、成功率和失败率,并与 Kubernetes 的监控系统集成,从而构建一个完整的、可观测的、具备跨云韧性的事件处理系统。


  目录