一个无法回避的架构难题摆在面前:核心业务应用部署在 AWS EKS 上,但要求其消息处理系统必须具备跨云的灾难恢复能力,能够抵御单一云厂商的区域性甚至全局性故障。这个要求直接排除了完全依赖 AWS SQS/SNS 的方案,因为即使它们具备跨区域复制能力,其控制平面依然受限于单一厂商的生态系统。我们需要一个真正独立的、高可用的消息中间件作为系统的“主动脉”。
架构决策:为何选择 Google Cloud Pub/Sub
在设计任何关键系统时,对技术选型的权衡是架构师的核心职责。面对这个挑战,我们评估了两种主流方案。
方案 A: AWS 原生方案 (SQS + SNS)
优势:
- 深度集成: 与 EKS、IAM 等 AWS 服务无缝集成,认证授权逻辑清晰直接,通常使用 IAM Role for Service Accounts (IRSA) 即可。
- 低延迟: 当生产者和消费者都在 AWS 内部时,网络延迟最低。
- 生态成熟: 监控、日志、报警等配套设施在 AWS CloudWatch 中有完整的解决方案。
劣势:
- 厂商锁定: 这是此方案的致命弱点。它无法满足我们抵御单云厂商故障的核心需求。一旦 AWS 的认证服务或消息服务本身出现大范围问题,整个系统将陷入停滞。
- 跨区域复杂性: 虽然 SNS 可以将消息扇出到不同区域的 SQS 队列,但这增加了配置的复杂性和管理成本,且并未从根本上解决对单一厂商控制平面的依赖。
方案 B: 多云方案 (Google Cloud Pub/Sub)
优势:
- 真正的解耦: Pub/Sub 是一个全球性的服务,其 Topic 不局限于某个区域。这为我们提供了与 AWS 基础设施完全隔离的通信层,是实现多云韧性的理想选择。
- 卓越的伸缩性: Pub/Sub 的设计能够轻松处理海量吞吐,其基于拉取 (Pull) 的订阅模型非常适合在 Kubernetes 中进行水平扩展的消费者应用。
- 简单的客户端逻辑: Google Cloud 提供的客户端库封装了复杂的重试、确认和流量控制逻辑。
劣势:
- 认证复杂性: 这是最大的挑战。一个运行在 AWS EKS 上的 Pod 如何安全、无凭证地向 Google Cloud API 进行身份验证?
- 网络成本与延迟: 跨云的数据传输会产生额外的网络出口费用,并且网络延迟相比同云环境会更高。这要求我们评估消息的价值和实时性要求。
最终选择
我们最终选择了方案 B。对于这个核心系统,架构韧性带来的长期价值远超于其带来的认证复杂性和可预期的网络成本。业务的连续性是第一位的。接下来的核心任务,就是攻克 EKS 到 GCP 的安全认证,并构建一个生产级的 Django 消费者。
架构概览
整个系统的交互流程可以被清晰地描绘出来。生产者(可能在任何地方,包括另一个云或本地数据中心)将消息发布到 Google Cloud Pub/Sub 的一个全局 Topic。我们在 AWS EKS 上部署的 Django 消费者应用,通过一个长期运行的 Pod,从对应的 Subscription 中拉取并处理这些消息。
graph TD subgraph Google Cloud Platform Producer[外部生产者] -->|发布消息| PubSubTopic(Pub/Sub Topic) PubSubTopic --> PubSubSubscription(Pub/Sub Subscription) end subgraph AWS subgraph EKS Cluster Deployment(K8s Deployment) Deployment --> Pod1(Django Pod 1) Deployment --> Pod2(Django Pod 2) Deployment --> PodN(Django Pod N...) end end subgraph Authentication Flow EKS_OIDC[EKS OIDC Provider] -.->|1. 身份提供商| GCP_WIF(GCP Workload Identity Federation) K8s_SA(K8s ServiceAccount) -.->|2. 声明身份| GCP_WIF GCP_WIF -.->|3. 签发临时Token| Pod1 end Pod1 -->|4. 使用GCP Token拉取消息| PubSubSubscription Pod2 -->|4. 使用GCP Token拉取消息| PubSubSubscription PodN -->|4. 使用GCP Token拉取消息| PubSubSubscription
核心实现:从认证到消费
步骤一:配置跨云身份认证 (Workload Identity Federation)
这是整个方案中最关键、也是最容易出错的部分。我们的目标是让 EKS 中的 Pod 能够扮演 (impersonate) 一个 Google Cloud Service Account (GCSA),而无需在 Pod 中存储任何静态密钥文件。
在 EKS 端获取 OIDC Provider 信息:
每个 EKS 集群都有一个 OIDC Issuer URL,它是我们建立信任关系的基础。# 替换 YOUR_CLUSTER_NAME 和 YOUR_REGION aws eks describe-cluster --name YOUR_CLUSTER_NAME --region YOUR_REGION --query "cluster.identity.oidc.issuer" --output text # 输出示例: https://oidc.eks.us-west-2.amazonaws.com/id/EXAMPLED539D4633E53BF8DE7E7A559B
在 Google Cloud 端创建 Workload Identity Pool 和 Provider:
# 替换 YOUR_GCP_PROJECT_ID GCP_PROJECT_ID="YOUR_GCP_PROJECT_ID" gcloud config set project $GCP_PROJECT_ID # 创建一个身份池 gcloud iam workload-identity-pools create "eks-identity-pool" \ --project="${GCP_PROJECT_ID}" \ --location="global" \ --display-name="EKS Identity Pool" # 获取身份池的全名 WORKLOAD_IDENTITY_POOL_ID=$(gcloud iam workload-identity-pools describe "eks-identity-pool" \ --project="${GCP_PROJECT_ID}" \ --location="global" \ --format="value(name)") # 创建 OIDC Provider,并将其与 EKS OIDC Issuer 关联 # 这里的 OIDC_ISSUER_URL 是上一步从 AWS 获取的 OIDC_ISSUER_URL="https://oidc.eks.us-west-2.amazonaws.com/id/EXAMPLED539D4633E53BF8DE7E7A559B" gcloud iam workload-identity-pools providers create-oidc "eks-oidc-provider" \ --project="${GCP_PROJECT_ID}" \ --location="global" \ --workload-identity-pool="eks-identity-pool" \ --display-name="EKS OIDC Provider" \ --attribute-mapping="google.subject=assertion.sub" \ --issuer-uri="${OIDC_ISSUER_URL}"
这里的
attribute-mapping
是关键,它将 OIDC Token 中的sub
(subject) 字段映射为 Google Cloud 识别的主体。对于 EKS,这个字段的格式是system:serviceaccount:<namespace>:<serviceaccount-name>
。创建 Google Cloud Service Account (GCSA) 并授予 Pub/Sub 权限:
# 创建一个用于消费消息的 GCSA gcloud iam service-accounts create "pubsub-consumer-sa" \ --project="${GCP_PROJECT_ID}" \ --display-name="Pub/Sub Consumer SA" # 授予 GCSA 消费 Pub/Sub 消息的权限 # 这里的 YOUR_SUBSCRIPTION_ID 是你要消费的订阅名 GCP_SUBSCRIPTION_ID="YOUR_SUBSCRIPTION_ID" gcloud pubsub subscriptions add-iam-policy-binding "${GCP_SUBSCRIPTION_ID}" \ --member="serviceAccount:pubsub-consumer-sa@${GCP_PROJECT_ID}.iam.gserviceaccount.com" \ --role="roles/pubsub.subscriber"
建立 K8s ServiceAccount (KSA) 和 GCSA 之间的信任关系:
# 允许 EKS 中的特定 KSA 模拟 GCSA # 这里的 K8S_NAMESPACE 和 K8S_SA_NAME 对应你将在 Kubernetes 中创建的 ServiceAccount K8S_NAMESPACE="default" K8S_SA_NAME="django-consumer-sa" gcloud iam service-accounts add-iam-policy-binding "pubsub-consumer-sa@${GCP_PROJECT_ID}.iam.gserviceaccount.com" \ --project="${GCP_PROJECT_ID}" \ --role="roles/iam.workloadIdentityUser" \ --member="principalSet://iam.googleapis.com/${WORKLOAD_IDENTITY_POOL_ID}/subject/system:serviceaccount:${K8S_NAMESPACE}:${K8S_SA_NAME}"
这条命令的含义是:允许来自我们创建的
eks-identity-pool
中,且主体名称完全匹配system:serviceaccount:default:django-consumer-sa
的身份,扮演pubsub-consumer-sa
这个 GCSA。
步骤二:构建生产级的 Django 消费者
我们将创建一个 Django Management Command 作为常驻进程来消费消息。这种方式比视图或 Celery 任务更适合这种专用的、需要长期运行的后台工作。
项目结构:
my_django_project/ ├── myapp/ │ ├── management/ │ │ ├── commands/ │ │ │ ├── __init__.py │ │ │ └── consume_pubsub.py │ ├── __init__.py │ ├── models.py │ └── ... ├── my_django_project/ │ ├── settings.py │ └── ... └── manage.py
配置文件
settings.py
:
从不硬编码配置。所有配置都应通过环境变量注入。# my_django_project/settings.py import os from pathlib import Path # ... (其他 Django 配置) # Google Cloud Pub/Sub 配置 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID") GCP_PUBSUB_SUBSCRIPTION_ID = os.environ.get("GCP_PUBSUB_SUBSCRIPTION_ID") # 用于 Workload Identity Federation 的配置文件路径 # 这个文件将由 Kubernetes 自动挂载到 Pod 中 GOOGLE_APPLICATION_CREDENTIALS = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") # 消费者配置 CONSUMER_MAX_MESSAGES = int(os.environ.get("CONSUMER_MAX_MESSAGES", 10)) CONSUMER_MAX_LEASE_DURATION = int(os.environ.get("CONSUMER_MAX_LEASE_DURATION", 60)) # seconds
消费者命令
consume_pubsub.py
:
这是代码的核心。它必须健壮,处理各种异常,并正确地确认 (ack) 或否定确认 (nack) 消息。# myapp/management/commands/consume_pubsub.py import logging import json from concurrent.futures import TimeoutError from django.core.management.base import BaseCommand from django.conf import settings from google.cloud import pubsub_v1 from google.api_core import exceptions as google_exceptions # 配置一个专用的 logger logger = logging.getLogger("pubsub_consumer") class Command(BaseCommand): help = "Starts a long-running process to consume messages from Google Cloud Pub/Sub" def handle(self, *args, **options): if not all([settings.GCP_PROJECT_ID, settings.GCP_PUBSUB_SUBSCRIPTION_ID]): logger.error("GCP_PROJECT_ID and GCP_PUBSUB_SUBSCRIPTION_ID must be set in environment variables.") return subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(settings.GCP_PROJECT_ID, settings.GCP_PUBSUB_SUBSCRIPTION_ID) logger.info(f"Starting consumer for subscription: {subscription_path}") logger.info(f"Using credentials from: {settings.GOOGLE_APPLICATION_CREDENTIALS or 'default location'}") # flow_control 控制客户端如何管理消息流 # 这里的配置意味着客户端最多同时处理10条消息 flow_control = pubsub_v1.types.FlowControl(max_messages=settings.CONSUMER_MAX_MESSAGES) # 使用 streaming_pull 来高效地接收消息 streaming_pull_future = subscriber.subscribe( subscription_path, callback=self.process_message, flow_control=flow_control ) self.stdout.write(self.style.SUCCESS(f"Listening for messages on {subscription_path}...")) try: # result() 是一个阻塞调用,它会一直运行直到 future 被取消或出现异常 streaming_pull_future.result() except TimeoutError: streaming_pull_future.cancel() streaming_pull_future.result() # 等待清理完成 logger.warning("Consumer stopped due to timeout.") except google_exceptions.GoogleAPICallError as e: logger.error(f"Google API Error: {e}", exc_info=True) streaming_pull_future.cancel() except KeyboardInterrupt: logger.info("Graceful shutdown initiated by user.") streaming_pull_future.cancel() except Exception as e: logger.critical(f"An unexpected error occurred: {e}", exc_info=True) streaming_pull_future.cancel() finally: subscriber.close() logger.info("Subscriber client closed. Exiting.") def process_message(self, message: pubsub_v1.subscriber.message.Message) -> None: """ Callback function to process a single Pub/Sub message. 业务逻辑的核心。必须保证幂等性。 """ try: # 这里的 message.data 是 bytes 类型 data_str = message.data.decode("utf-8") payload = json.loads(data_str) message_id = message.message_id logger.info(f"Received message ID: {message_id} with data: {payload}") # ========================================================= # 在这里实现你的核心业务逻辑 # 例如:更新数据库,调用其他服务等 # 这个过程必须是幂等的,因为 Pub/Sub 保证至少一次的投递语义 # self.handle_business_logic(payload) # ========================================================= # 如果业务逻辑成功,必须确认消息 logger.info(f"Acknowledging message ID: {message_id}") message.ack() except json.JSONDecodeError as e: logger.error(f"Failed to decode message data: {message.data}. Error: {e}") # 消息格式错误,我们不希望它被重试,直接确认 message.ack() except Exception as e: # 任何其他的业务逻辑异常,我们否定确认消息 # Pub/Sub 会在稍后重新投递这个消息 logger.error(f"Error processing message ID {message.message_id}: {e}", exc_info=True) message.nack() # def handle_business_logic(self, payload): # # 模拟数据库操作 # from myapp.models import ProcessedEvent # event_id = payload.get('event_id') # if not event_id: # raise ValueError("Missing event_id in payload") # # # 幂等性检查 # if ProcessedEvent.objects.filter(event_id=event_id).exists(): # logger.warning(f"Event {event_id} already processed. Skipping.") # return # # ProcessedEvent.objects.create(event_id=event_id, data=payload) # logger.info(f"Successfully processed and stored event {event_id}")
这里的坑在于:
message.ack()
和message.nack()
的调用时机。ack()
表示成功处理,消息将不再被投递。nack()
表示临时失败,消息将很快被重新投递。如果是一个无法修复的错误(比如JSON格式错误),直接ack()
掉,避免无限重试循环,这种消息通常被称为“毒丸消息”。
步骤三:容器化与 Kubernetes 部署
Dockerfile:
一个生产级的 Dockerfile 应该使用多阶段构建,减小最终镜像的体积,并以非 root 用户运行。# --- Build Stage --- FROM python:3.10-slim-bullseye AS builder ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 WORKDIR /app COPY requirements.txt . # 使用虚拟环境,并只安装生产依赖 RUN python -m venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" RUN pip install --no-cache-dir -r requirements.txt # --- Final Stage --- FROM python:3.10-slim-bullseye AS final # 创建一个非 root 用户来运行应用 RUN groupadd -r django && useradd -r -g django django WORKDIR /home/django/app # 从构建阶段拷贝虚拟环境和代码 COPY --from=builder /opt/venv /opt/venv COPY . . # 更改文件所有权 RUN chown -R django:django /home/django/app USER django ENV PATH="/opt/venv/bin:$PATH" # 容器启动时执行的命令 CMD ["python", "manage.py", "consume_pubsub"]
requirements.txt
应包含django
,google-cloud-pubsub
,google-auth
等库。Kubernetes Manifests (
deployment.yaml
):
这是将所有部分串联起来的最后一步。apiVersion: v1 kind: ServiceAccount metadata: name: django-consumer-sa namespace: default # 这个注解是 EKS IRSA 的机制,它会让 EKS 的 Pod Identity Webhook # 注入 AWS 相关的环境变量和 token,用于后续交换 GCP token。 # 但在这个场景下,我们主要依赖 K8s projected token,所以这个注解不是必须的, # 但保留它以防未来需要与 AWS 服务交互。 annotations: eks.amazonaws.com/role-arn: "arn:aws:iam::YOUR_AWS_ACCOUNT_ID:role/YourEksRoleForOtherPurposes" # 可选 --- apiVersion: apps/v1 kind: Deployment metadata: name: django-pubsub-consumer namespace: default spec: replicas: 2 # 从2个副本开始,可以根据负载进行 HPA selector: matchLabels: app: django-consumer template: metadata: labels: app: django-consumer spec: serviceAccountName: django-consumer-sa containers: - name: django-consumer-app image: YOUR_DOCKER_REGISTRY/django-consumer:latest imagePullPolicy: Always resources: requests: cpu: "250m" memory: "256Mi" limits: cpu: "500m" memory: "512Mi" env: - name: DJANGO_SETTINGS_MODULE value: "my_django_project.settings" - name: DATABASE_URL valueFrom: secretKeyRef: name: django-db-secret key: url # 关键环境变量,注入 GCP 配置 - name: GCP_PROJECT_ID value: "your-gcp-project-id" - name: GCP_PUBSUB_SUBSCRIPTION_ID value: "your-subscription-id" # 这两个环境变量由 Kubelet 自动设置,用于 Workload Identity Federation - name: GOOGLE_APPLICATION_CREDENTIALS value: "/var/run/secrets/sts.googleapis.com/service-account-token/token" - name: GOOGLE_CLOUD_PROJECT value: "your-gcp-project-id" # 挂载由 Kubelet 投影的服务账号令牌,用于与 GCP STS 交换 volumeMounts: - name: gcp-sa-token mountPath: /var/run/secrets/sts.googleapis.com/service-account-token readOnly: true volumes: - name: gcp-sa-token projected: sources: - serviceAccountToken: path: token # audience 必须与 GCP Workload Identity Pool Provider 中配置的受众匹配 # 这里使用 Workload Identity Pool 的全名 audience: "//iam.googleapis.com/projects/YOUR_GCP_PROJECT_NUMBER/locations/global/workloadIdentityPools/eks-identity-pool/providers/eks-oidc-provider" expirationSeconds: 3600
这里的坑在于
volumes
和volumeMounts
部分。我们使用了一个projected
volume 来挂载一个为特定audience
生成的 Kubernetes Service Account Token。Google Cloud 的客户端库会自动检测到GOOGLE_APPLICATION_CREDENTIALS
环境变量,读取这个 Token,然后用它与 Google 的 STS (Security Token Service) API 通信,最终换取一个可以访问 Pub/Sub 的短期 GCP Access Token。这个过程对应用程序是完全透明的。
架构的局限性与未来展望
此方案并非银弹。它引入了跨云的复杂性。运维团队需要同时具备 AWS 和 GCP 的专业知识,尤其是在排查认证问题时。AWS 到 GCP 的数据出口会产生费用,对于消息量巨大的场景,这笔开销必须被精确计算和评估。此外,跨云的网络延迟也是一个不可忽视的因素,这个架构更适合对延迟不敏感的异步任务,而非需要实时响应的场景。
未来的优化路径是清晰的。可以引入 KEDA (Kubernetes-based Event-Driven Autoscaling),根据 Google Cloud Pub/Sub 订阅中的未确认消息数量来动态伸缩消费者 Pod 的数量,实现更精细化的成本和性能控制。在可观测性方面,Django 应用需要暴露 Prometheus 指标,例如消息处理的延迟、成功率和失败率,并与 Kubernetes 的监控系统集成,从而构建一个完整的、可观测的、具备跨云韧性的事件处理系统。