基于Kotlin与Spring构建处理长时OpenCV任务的Saga模式及其IaC实践


一个看似常规的业务需求摆在了面前:构建一个自动化的视频内容分析与处理管道。具体流程分为三步:视频转码、使用OpenCV进行特征提取(例如人脸识别或物体追踪),最后添加水印并发布。这三个步骤是独立的微服务。问题在于,这是一个全有或全无的操作。如果第三步添加水印失败,我们必须撤销前两步的操作——即删除已转码的文件和分析结果,以避免产生脏数据和存储浪费。

传统的数据库ACID事务在这里完全不适用。首先,处理步骤操作的不是数据库记录,而是文件系统、GPU资源这类“物理”资源。其次,OpenCV的特征提取过程可能持续数分钟甚至数小时,任何形式的分布式锁或两阶段提交(2PC)都会导致系统长时间资源锁定,可用性几乎为零。这正是分布式Saga模式的用武之地,但标准的Saga教程通常只演示如何在不同微服务间补偿数据库记录,而我们面对的,是长时运行、状态复杂且具备物理副作用的任务。

架构决策:编排式 vs. 协调式 Saga

在Saga模式的实现上,主要有两种流派:

  1. 协调式 (Choreography): 各个服务通过监听彼此发布的事件来触发后续操作。服务A完成后发布事件A_DONE,服务B监听到后开始工作。服务B失败则发布B_FAILED,服务A监听到后执行自己的补偿操作。

    • 优点: 服务间高度解耦,没有单点瓶颈。
    • 缺点: 整体业务流程分散在各个服务中,难以追踪和调试。当流程复杂时,事件链会变得非常混乱,形成所谓的“事件风暴”。
  2. 编排式 (Orchestration): 引入一个中心化的“编排器”(Orchestrator)来统一调度。编排器负责调用各个服务,并根据执行结果(成功/失败)来决定是调用下一个服务还是调用前面服务的补偿接口。

    • 优点: 业务流程集中管理,状态清晰,易于监控和排错。
    • 缺点: 引入了额外的编排器服务,可能成为单点故障或性能瓶颈。

在我们的视频处理场景中,任务的执行时间长且状态复杂,清晰地追踪每个视频的处理进度至关重要。如果采用协调式,当一个视频处理失败时,要拼凑出它到底失败在哪一步、哪些补偿操作已执行、哪些待执行,将是一场灾难。因此,编排式Saga是更务实的选择。它牺牲了一部分极致的解耦,换来了确定性和可维护性。

我们将使用Spring Framework和Kotlin构建这个编排器,并利用其生态来简化开发。

架构概览与组件设计

我们的系统由以下几个核心组件构成:

  • Saga Orchestrator Service: 基于Spring Boot + Kotlin实现,负责驱动整个Saga流程。它内部维护一个状态机,记录每个处理任务的当前状态,并与消息队列交互。
  • Transcoding Service: 负责视频转码。
  • CV Analysis Service: 核心处理服务,内嵌OpenCV库,执行耗时的视频分析。这是整个流程中最可能失败、也最需要资源管理的环节。
  • Watermarking Service: 添加水印并完成最终发布。
  • 消息队列 (Message Broker): 我们选择RabbitMQ,利用其Topic交换机和持久化队列来解耦编排器和参与者服务。编排器发送“命令”消息,参与者服务执行后回复“事件”消息。
  • IaC (Terraform): CV Analysis服务是计算密集型的,不适合作为常驻服务。我们将它定义为一个Kubernetes Job,由编排器根据需要动态创建。整个Job的定义、资源请求(如GPU)和配置,都通过Terraform进行管理。

下面是整个Saga流程的Mermaid图示,包括正向执行流和逆向补偿流:

sequenceDiagram
    participant Client
    participant Orchestrator
    participant TranscodingSvc
    participant CVAnalysisSvc
    participant WatermarkingSvc

    Client->>+Orchestrator: startVideoProcessing(videoId)
    Orchestrator->>Orchestrator: 创建Saga实例, 状态: PENDING
    Orchestrator-->>+TranscodingSvc: command:startTranscoding
    TranscodingSvc-->>-Orchestrator: event:transcodingCompleted
    Orchestrator->>Orchestrator: 更新Saga状态: TRANSCODED
    
    Orchestrator-->>+CVAnalysisSvc: command:startAnalysis
    CVAnalysisSvc-->>-Orchestrator: event:analysisFailed
    Orchestrator->>Orchestrator: 更新Saga状态: ANALYSIS_FAILED

    Note right of Orchestrator: 开始补偿流程
    Orchestrator-->>+TranscodingSvc: command:compensateTranscoding (删除文件)
    TranscodingSvc-->>-Orchestrator: event:transcodingCompensated
    Orchestrator->>Orchestrator: 更新Saga状态: FAILED_AND_COMPENSATED
    Orchestrator-->>-Client: processingFailed

核心实现:Kotlin与Spring StateMachine

编排器的核心是Saga状态机。Spring Statemachine是一个强大的工具,可以清晰地定义状态、事件和转换。

1. 定义状态和事件

我们用枚举来定义Saga的所有可能状态和触发转换的事件。

// src/main/kotlin/com/example/saga/orchestrator/state/SagaState.kt
package com.example.saga.orchestrator.state

enum class SagaState {
    NEW,
    TRANSCODING_STARTED,
    TRANSCODING_COMPLETED,
    ANALYSIS_STARTED,
    ANALYSIS_COMPLETED,
    WATERMARKING_STARTED,
    COMPLETED,
    
    // 失败与补偿状态
    TRANSCODING_FAILED,
    ANALYSIS_FAILED,
    WATERMARKING_FAILED,

    COMPENSATING_TRANSCODING,
    COMPENSATING_ANALYSIS,
    
    ROLLBACK_COMPLETED
}
// src/main/kotlin/com/example/saga/orchestrator/state/SagaEvent.kt
package com.example.saga.orchestrator.state

enum class SagaEvent {
    START_PROCESSING,
    
    TRANSCODING_SUCCEEDED,
    TRANSCODING_FAILED,

    ANALYSIS_SUCCEEDED,
    ANALYSIS_FAILED,

    WATERMARKING_SUCCEEDED,
    WATERMARKING_FAILED,

    // 补偿事件
    START_COMPENSATION,
    COMPENSATION_FOR_TRANSCODING_SUCCEEDED,
    COMPENSATION_FOR_ANALYSIS_SUCCEEDED,
}

2. 配置状态机

使用StateMachineConfigurerAdapter来配置状态转换、事件触发和关联的动作(Action)。

// src/main/kotlin/com/example/saga/orchestrator/config/StateMachineConfig.kt
package com.example.saga.orchestrator.config

import com.example.saga.orchestrator.state.SagaEvent
import com.example.saga.orchestrator.state.SagaState
import com.example.saga.orchestrator.action.SagaActions
import org.springframework.context.annotation.Configuration
import org.springframework.statemachine.config.EnableStateMachineFactory
import org.springframework.statemachine.config.StateMachineConfigurerAdapter
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer

@Configuration
@EnableStateMachineFactory
class StateMachineConfig(private val sagaActions: SagaActions) : StateMachineConfigurerAdapter<SagaState, SagaEvent>() {

    override fun configure(states: StateMachineStateConfigurer<SagaState, SagaEvent>) {
        states.withStates()
            .initial(SagaState.NEW)
            .states(SagaState.entries.toSet())
    }

    override fun configure(transitions: StateMachineTransitionConfigurer<SagaState, SagaEvent>) {
        transitions
            // 正向流程
            .withExternal().source(SagaState.NEW).target(SagaState.TRANSCODING_STARTED).event(SagaEvent.START_PROCESSING).action(sagaActions.startTranscoding())
            .and()
            .withExternal().source(SagaState.TRANSCODING_STARTED).target(SagaState.TRANSCODING_COMPLETED).event(SagaEvent.TRANSCODING_SUCCEEDED)
            .and()
            .withExternal().source(SagaState.TRANSCODING_COMPLETED).target(SagaState.ANALYSIS_STARTED).event(SagaEvent.ANALYSIS_SUCCEEDED).action(sagaActions.startAnalysis())
            // ... more forward transitions

            // 异常与补偿流程
            .and()
            .withExternal().source(SagaState.TRANSCODING_STARTED).target(SagaState.TRANSCODING_FAILED).event(SagaEvent.TRANSCODING_FAILED)
            .and()
            .withExternal().source(SagaState.ANALYSIS_STARTED).target(SagaState.ANALYSIS_FAILED).event(SagaEvent.ANALYSIS_FAILED).action(sagaActions.compensateTranscoding()) // 失败后立即触发补偿
            .and()
            .withExternal().source(SagaState.ANALYSIS_FAILED).target(SagaState.COMPENSATING_TRANSCODING).event(SagaEvent.START_COMPENSATION) // For clarity, could be merged with above
            .and()
            .withExternal().source(SagaState.COMPENSATING_TRANSCODING).target(SagaState.ROLLBACK_COMPLETED).event(SagaEvent.COMPENSATION_FOR_TRANSCODING_SUCCEEDED)
    }
}

3. 实现Action

Action是状态转换时执行的具体逻辑,比如发送消息到RabbitMQ。

// src/main/kotlin/com/example/saga/orchestrator/action/SagaActions.kt
package com.example.saga.orchestrator.action

import com.example.saga.orchestrator.messaging.CommandPublisher
import com.example.saga.orchestrator.state.SagaEvent
import com.example.saga.orchestrator.state.SagaState
import org.slf4j.LoggerFactory
import org.springframework.statemachine.StateContext
import org.springframework.statemachine.action.Action
import org.springframework.stereotype.Component

const val SAGA_ID_HEADER = "SAGA_ID"
const val VIDEO_ID_HEADER = "VIDEO_ID"

@Component
class SagaActions(private val commandPublisher: CommandPublisher) {

    private val logger = LoggerFactory.getLogger(javaClass)

    fun startTranscoding(): Action<SagaState, SagaEvent> {
        return Action { context ->
            val sagaId = context.stateMachine.id
            val videoId = context.messageHeaders.get(VIDEO_ID_HEADER, String::class.java)
            logger.info("[SagaID: $sagaId] Starting transcoding for video $videoId")
            commandPublisher.sendTranscodingCommand(sagaId, videoId!!)
        }
    }

    fun startAnalysis(): Action<SagaState, SagaEvent> {
        return Action { context ->
            val sagaId = context.stateMachine.id
            val videoId = context.messageHeaders.get(VIDEO_ID_HEADER, String::class.java)
            logger.info("[SagaID: $sagaId] Starting analysis for video $videoId")
            // Here we would trigger the IaC part
            commandPublisher.sendAnalysisCommand(sagaId, videoId!!)
        }
    }

    fun compensateTranscoding(): Action<SagaState, SagaEvent> {
        return Action { context ->
            val sagaId = context.stateMachine.id
            val videoId = context.messageHeaders.get(VIDEO_ID_HEADER, String::class.java)
            logger.warn("[SagaID: $sagaId] Compensating transcoding for video $videoId")
            commandPublisher.sendDeleteTranscodedFileCommand(sagaId, videoId!!)
        }
    }
}

这里的CommandPublisher是一个简单的RabbitMQ生产者封装。事件监听器则负责接收来自参与者服务的回复,并驱动状态机进入下一个状态。

// src/main/kotlin/com/example/saga/orchestrator/messaging/EventListener.kt
package com.example.saga.orchestrator.messaging

import com.example.saga.orchestrator.service.SagaService
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class EventListener(private val sagaService: SagaService) {

    // 假设消息体中包含sagaId和payload
    @RabbitListener(queues = ["q.transcoding.events"])
    fun handleTranscodingEvent(event: TranscodingResultEvent) {
        if (event.success) {
            sagaService.triggerEvent(event.sagaId, SagaEvent.TRANSCODING_SUCCEEDED)
        } else {
            sagaService.triggerEvent(event.sagaId, SagaEvent.TRANSCODING_FAILED)
        }
    }

    @RabbitListener(queues = ["q.analysis.events"])
    fun handleAnalysisEvent(event: AnalysisResultEvent) {
        // ... similar logic
    }
}

关键环节:OpenCV服务的实现与IaC管理

CV Analysis服务是整个架构的重点和难点。它不是一个简单的REST服务。

1. OpenCV处理逻辑 (Kotlin + OpenCV Java Bindings)

首先,我们需要一个能处理视频文件的Kotlin应用。这里使用OpenCV的Java绑定。

// In CV Analysis Service
// src/main/kotlin/com/example/cv/processor/VideoProcessor.kt
package com.example.cv.processor

import org.opencv.core.Mat
import org.opencv.core.MatOfRect
import org.opencv.objdetect.CascadeClassifier
import org.opencv.videoio.VideoCapture
import org.opencv.videoio.Videoio
import org.slf4j.LoggerFactory
import java.nio.file.Paths

// 必须在使用前加载OpenCV原生库
// Nu.pattern.OpenCV.loadShared() in main method is a good way.

class VideoProcessor(private val classifierPath: String) {

    private val logger = LoggerFactory.getLogger(javaClass)

    fun detectFacesInVideo(inputPath: String, outputPath: String): Result<Int> {
        val faceCascade = CascadeClassifier()
        if (!faceCascade.load(classifierPath)) {
            logger.error("Error loading face cascade classifier from $classifierPath")
            return Result.failure(IllegalStateException("Cascade classifier not found"))
        }

        val capture = VideoCapture(inputPath)
        if (!capture.isOpened) {
            logger.error("Error opening video file: $inputPath")
            return Result.failure(IllegalArgumentException("Cannot open video"))
        }

        val frameWidth = capture.get(Videoio.CAP_PROP_FRAME_WIDTH)
        val frameHeight = capture.get(Videoio.CAP_PROP_FRAME_HEIGHT)
        val fps = capture.get(Videoio.CAP_PROP_FPS)

        var frameCount = 0
        var faceCount = 0
        val frame = Mat()
        
        // 此处为简化,没有写输出视频流。生产级代码需要VideoWriter
        // val writer = VideoWriter(outputPath, VideoWriter.fourcc('M', 'J', 'P', 'G'), fps, Size(frameWidth, frameHeight))

        return runCatching {
            while (capture.read(frame)) {
                if (frame.empty()) break

                val faces = MatOfRect()
                faceCascade.detectMultiScale(frame, faces)
                
                faceCount += faces.toArray().size
                frameCount++

                if (frameCount % 100 == 0) {
                    logger.info("Processed $frameCount frames...")
                }
            }
            faceCount
        }.onSuccess {
            logger.info("Successfully processed video. Total faces detected: $it")
        }.onFailure {
            logger.error("Failed during video processing", it)
        }.also {
            capture.release()
            // writer.release()
        }
    }
}

这个服务本质上是一个命令行应用,接收输入文件路径,处理后退出。它通过标准输出或文件报告成功/失败,或者更健壮的方式是,在处理完成后向RabbitMQ发送一条结果消息。

2. 基础设施即代码 (Terraform for Kubernetes Job)

我们不希望CV服务作为Deployment一直运行。它应该在需要时启动,完成后销毁,这样可以有效利用昂贵的GPU资源。Kubernetes Job是完美的模型。我们使用Terraform来定义这个Job。

# modules/cv_job/main.tf

variable "job_name" {
  type        = string
  description = "A unique name for the Kubernetes Job."
}

variable "saga_id" {
  type        = string
  description = "The Saga ID to be passed as an environment variable."
}

variable "video_id" {
  type        = string
  description = "The Video ID to process."
}

variable "image_tag" {
  type        = string
  default     = "latest"
}

variable "input_pvc_claim_name" {
  type = string
  description = "Name of the PVC where input video resides."
}

resource "kubernetes_job" "cv_analysis_job" {
  metadata {
    name      = var.job_name
    namespace = "video-processing"
  }

  spec {
    template {
      metadata {
        name = var.job_name
      }

      spec {
        container {
          name  = "opencv-processor"
          image = "my-registry/cv-analysis-service:${var.image_tag}"
          
          # 传递Saga上下文和任务参数
          env {
            name  = "SAGA_ID"
            value = var.saga_id
          }
          env {
            name  = "VIDEO_ID"
            value = var.video_id
          }
          env {
            name = "RABBITMQ_HOST"
            value = "rabbitmq.default.svc.cluster.local"
          }

          # 挂载包含视频文件的共享存储
          volume_mount {
            name       = "video-storage"
            mount_path = "/data"
          }

          # 请求GPU资源
          resources {
            limits = {
              "nvidia.com/gpu" = "1"
            }
            requests = {
              "nvidia.com/gpu" = "1"
            }
          }
        }

        volume {
            name = "video-storage"
            persistent_volume_claim {
                claim_name = var.input_pvc_claim_name
            }
        }
        
        restart_policy = "Never" # 关键:Saga中的任务不应自动重试,失败应由编排器决定
      }
    }
    
    backoff_limit = 0 # 同样,禁止K8s级别的重试
  }
}

Orchestrator服务在startAnalysis这个Action中,不再是简单地发消息,而是通过调用Terraform CLI或Terraform Cloud/Enterprise的API来应用(apply)上述模板,从而动态地创建一个K8s Job。当Job执行完毕(无论成功或失败),它会向RabbitMQ发送一条包含Saga ID的结果消息,Orchestrator监听到该消息后,继续驱动状态机。

这种方式的优势在于:

  • 资源隔离与弹性: 每个视频分析任务都在独立的、资源受限的环境(Pod)中运行,互不干扰。资源按需分配,用完即释放。
  • 环境一致性: Docker镜像和Terraform定义确保了每次运行的环境都是完全一致的,避免了“在我机器上能跑”的问题。
  • Saga与基础设施的联动: 编排器真正做到了“编排”,不仅是业务逻辑,还包括其依赖的计算基础设施。

单元测试与集成测试的思考

测试这种复杂的分布式系统是个挑战。

  • 状态机单元测试: Spring Statemachine本身提供了测试支持,可以验证在给定当前状态和事件时,状态机是否正确转换到了预期状态,以及是否触发了正确的Action。我们可以用Mockito模拟SagaActions中的外部调用(如commandPublisher)。
  • Action单元测试: 单独测试每个Action,验证它是否构造了正确的命令消息。
  • 集成测试: 这是最关键的。使用Testcontainers库,我们可以在测试环境中启动一个真实的RabbitMQ容器和一个PostgreSQL容器(用于持久化Saga状态)。然后,我们可以编写测试用例,模拟一个完整的Saga流程,包括模拟参与者服务发送回执事件。对于CV服务,可以创建一个模拟版的Docker镜像,它不执行真正的OpenCV处理,而是根据环境变量或接收到的消息,立即返回成功或失败的事件,用于测试补偿逻辑。

一个集成测试的伪代码思路:

  1. 使用Testcontainers启动RabbitMQ和Postgres。
  2. 调用SagaService.startNewSaga(videoId)
  3. 断言Saga状态已持久化,并且一个startTranscoding命令已发送到RabbitMQ。
  4. 手动向q.transcoding.events队列发送一个“成功”消息。
  5. 断言Saga状态机转换到ANALYSIS_STARTED,并且一个startAnalysis命令已发送。
  6. 手动向q.analysis.events队列发送一个“失败”消息。
  7. 断言Saga状态机转换到ANALYSIS_FAILED,并且一个compensateTranscoding命令被发送出去。
  8. 断言最终Saga状态为ROLLBACK_COMPLETED

局限性与未来优化路径

当前的编排式Saga方案虽然解决了核心问题,但并非银弹。它存在一些固有的复杂性和潜在风险。

首先,编排器成为了解所有业务流程细节的“上帝类”。虽然这带来了可见性的好处,但也增加了其自身的复杂度和维护成本。对编排器的任何修改都需要对整个端到端流程有深入的理解。

其次,对Terraform的直接命令行调用是一种比较粗糙的集成方式。在生产环境中,更可靠的做法是通过一个内部的“基础设施API”来抽象化Job的创建,或者使用像Argo Workflows这样的云原生工作流引擎,由Saga编排器通过API与其交互,将基础设施管理与业务流程调度更清晰地分离。

最后,当前的补偿逻辑是线性的。如果补偿操作本身也失败了(例如删除文件时遇到存储系统故障),Saga会陷入一个无法自动恢复的“卡死”状态。这需要引入人工干预的流程,或者设计更复杂的重试与告警机制,例如将失败的补偿任务放入一个“死信补偿队列”等待后续处理。这是一个在任何复杂Saga实现中都无法回避的难题。


  目录