构建基于Quarkus的混合搜索服务并集成Pinecone、SQL与自定义Prometheus度量


我们面临一个典型的搜索演进困境。单纯基于SQL LIKE 的文本搜索,在处理同义词、用户意图方面早已力不从心。引入向量搜索是必然选择,但业务的复杂性在于,用户不仅需要语义上的相似,还需要对结构化数据进行精确过滤——例如“价格低于100元”、“红色”、“库存大于0”。这要求我们必须构建一个混合搜索系统,同时查询向量数据库和关系型数据库,并将结果融合。

真正的挑战并非实现这个查询本身,而在于如何确保它在生产环境中是健壮、可观测的。标准的APM指标,如延迟、吞吐量和错误率,根本无法回答关键业务问题:“我们的向量召回率是否足够高?”、“SQL过滤掉了多少向量结果,这是否意味着向量索引已经过时?”、“两个数据源的延迟分布如何,瓶颈在哪一侧?”。如果不能量化这些问题,系统就只是一个黑盒,优化和排障将无从谈起。

我们的目标是构建一个基于Quarkus的混合搜索服务,它不仅要融合Pinecone的向量搜索能力和传统SQL数据库的精确过滤能力,更要将内部关键的运行状态,以自定义的Prometheus度量(Metrics)形式暴露出来,实现真正的深度可观测性。

# 架构与技术栈决策

在真实项目中,技术选型是基于务实的权衡。

  1. Quarkus: 选择它的理由是性能和开发效率。基于GraalVM的本地镜像编译(Native Image)能提供极快的启动速度和更低的内存占用,这对于部署在容器环境中的微服务至关重要。其集成的响应式编程模型和强大的依赖注入,也使得编写高性能、可维护的代码变得简单。最关键的是,它对MicroProfile Metrics(通过Micrometer实现)的一流支持,是我们实现自定义监控的基础。
  2. Pinecone: 作为一个全托管的向量数据库服务,它让我们免于维护复杂的HNSW索引和分布式集群的痛苦。在项目早期,快速验证想法比自建一切更重要。其API简单,支持元数据过滤,是我们混合查询方案的良好起点。
  3. PostgreSQL (SQL): 它是我们系统中“事实的来源(Source of Truth)”。所有产品的权威结构化数据都存储于此。任何向量搜索的结果最终都需要用这里的数据进行丰富和验证。我们使用Quarkus的Panache ORM来简化数据库交互。
  4. Prometheus & Micrometer: Prometheus是云原生时代监控的事实标准。通过在Quarkus应用中引入Micrometer库,我们可以用一种标准化的方式定义和暴露度量,而无需关心具体的监控系统实现。

整个请求流程的架构如下:

sequenceDiagram
    participant Client
    participant QuarkusService as Quarkus 混合搜索服务
    participant VectorEmbedding as 向量转换服务
    participant Pinecone
    participant PostgreSQL
    participant Prometheus

    Client->>+QuarkusService: 发起混合搜索请求 (query, filters)
    QuarkusService->>+VectorEmbedding: 转换查询文本为向量
    VectorEmbedding-->>-QuarkusService: 返回查询向量
    
    Note right of QuarkusService: 开始计时 vector_query_latency
    QuarkusService->>+Pinecone: 使用向量和元数据过滤器查询
    Pinecone-->>-QuarkusService: 返回Top-K候选ID列表
    Note right of QuarkusService: 结束计时, 记录 vector_candidates_total
    
    QuarkusService->>+PostgreSQL: 使用ID列表和用户过滤器查询完整数据
    PostgreSQL-->>-QuarkusService: 返回符合所有条件的最终产品数据
    Note right of QuarkusService: 记录 sql_final_results_total

    QuarkusService->>-Client: 返回融合后的搜索结果
    
    loop 定期Scrape
        Prometheus->>QuarkusService: GET /q/metrics
        QuarkusService-->>Prometheus: 暴露自定义度量
    end

# 依赖与配置

一个稳固的系统始于清晰的依赖管理。在pom.xml中,核心依赖如下:

<!-- pom.xml -->
<dependencies>
    <!-- Quarkus核心与REST支持 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-resteasy-reactive-jackson</artifactId>
    </dependency>
    
    <!-- PostgreSQL 客户端与Panache ORM -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-hibernate-orm-panache</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-jdbc-postgresql</artifactId>
    </dependency>

    <!-- 可观测性: Micrometer与Prometheus Registry -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-micrometer-registry-prometheus</artifactId>
    </dependency>
    
    <!-- 用于调用Pinecone的REST客户端 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client-reactive-jackson</artifactId>
    </dependency>
</dependencies>

配置是生产环境的命脉。我们使用Quarkus的类型安全配置来管理所有外部依赖的连接信息,避免硬编码。

# application.properties

# PostgreSQL Datasource
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=user
quarkus.datasource.password=password
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/search_db
quarkus.hibernate-orm.database.generation=none

# Pinecone REST Client Configuration
# 使用@RegisterRestClient注解的接口会自动读取此前缀的配置
com.mycompany.search.clients.PineconeClient/mp-rest/url=https://your-index-name-and-host.pinecone.io
com.mycompany.search.clients.PineconeClient/mp-rest/scope=javax.inject.Singleton
# 我们将API Key放在Header中,也可以通过其他方式
# 此处的值通常通过环境变量或Secrets Manager注入
pinecone.api.key=${PINECONE_API_KEY}

# Prometheus endpoint is enabled by default at /q/metrics
quarkus.micrometer.export.prometheus.enabled=true

# 核心服务实现:融合查询与度量埋点

这里的代码是系统的核心。它展示了如何编排对Pinecone和PostgreSQL的调用,以及如何在此过程中精确地植入我们的自定义度量。

首先,定义一个SearchService,并注入所有需要的依赖,包括MeterRegistry,它是我们创建自定义度量的入口。

package com.mycompany.search.service;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.quarkus.logging.Log;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

// ... imports for domain objects, clients, etc.

@ApplicationScoped
public class HybridSearchService {

    @Inject
    PineconeClient pineconeClient;

    @Inject
    ProductRepository productRepository; // Panache Repository for PostgreSQL

    @Inject
    VectorEmbeddingService vectorEmbeddingService;

    // Micrometer MeterRegistry for creating custom metrics
    private final MeterRegistry registry;

    // --- Custom Metrics Definitions ---
    private final Timer pineconeQueryTimer;
    private final Timer sqlFetchTimer;
    private final Counter totalHybridRequests;
    private final Counter failedRequests;

    public HybridSearchService(MeterRegistry registry) {
        this.registry = registry;

        // 初始化度量。这里的tag可以用来做多维度分析,例如按租户、按场景
        this.pineconeQueryTimer = registry.timer("hybrid_search.pinecone.latency", "stage", "query");
        this.sqlFetchTimer = registry.timer("hybrid_search.sql.latency", "stage", "fetch");
        this.totalHybridRequests = registry.counter("hybrid_search.requests.total");
        this.failedRequests = registry.counter("hybrid_search.requests.failed", "reason", "unknown");
    }

    public List<Product> search(String query, SearchFilters filters) {
        totalHybridRequests.increment();
        try {
            // 1. 获取查询向量
            List<Float> queryVector = vectorEmbeddingService.embed(query);

            // 2. 查询Pinecone获取候选ID
            // 使用Timer.record()可以优雅地记录代码块的执行时间
            List<String> candidateIds = pineconeQueryTimer.record(() -> {
                PineconeQueryRequest request = buildPineconeRequest(queryVector, filters);
                PineconeQueryResponse response = pineconeClient.query(request);
                return response.getMatches().stream()
                        .map(PineconeMatch::getId)
                        .collect(Collectors.toList());
            });

            // 关键业务度量:向量召回的候选集大小
            registry.summary("hybrid_search.pinecone.candidates_size").record(candidateIds.size());
            
            if (candidateIds.isEmpty()) {
                Log.warnf("Pinecone returned no candidates for query: %s", query);
                return Collections.emptyList();
            }

            // 3. 使用ID列表和精确过滤器查询PostgreSQL
            List<Product> finalResults = sqlFetchTimer.record(() ->
                productRepository.findByIdsAndFilters(candidateIds, filters.getMinPrice(), filters.getColor())
            );

            // 关键业务度量:SQL过滤率
            // 这个比率非常重要:如果长期接近0,说明向量索引与SQL数据严重脱节
            double filterRatio = candidateIds.isEmpty() ? 0 : (double) finalResults.size() / candidateIds.size();
            registry.gauge("hybrid_search.sql.filter_ratio", filterRatio);

            return finalResults;

        } catch (Exception e) {
            // 异常处理与度量
            Log.error("Hybrid search failed", e);
            // 可以根据异常类型添加不同的tag,以区分是Pinecone问题还是DB问题
            registry.counter("hybrid_search.requests.failed", "reason", classifyException(e)).increment();
            // 抛出或返回空,取决于业务需求
            throw new SearchServiceException("Failed to perform hybrid search", e);
        }
    }

    private PineconeQueryRequest buildPineconeRequest(List<Float> vector, SearchFilters filters) {
        // ... 构建Pinecone请求的逻辑,可能包含元数据过滤
        // Pinecone的元数据过滤能力有限,复杂的过滤逻辑需要放在SQL层
        return new PineconeQueryRequest(vector, 100); // topK=100
    }
    
    private String classifyException(Exception e) {
        if (e instanceof PineconeClientException) {
            return "pinecone_api_error";
        } else if (e instanceof DatabaseAccessException) {
            return "database_error";
        }
        return "internal_error";
    }
}

这段代码有几个关键的设计考量:

  1. 度量定义与注入: 我们在构造函数中通过MeterRegistry初始化了所有需要的度量。这是一种最佳实践,可以确保度量在服务生命周期开始时就已定义好,避免在每次请求时都去查找或创建,从而减少开销。
  2. 业务度量而非技术度量: 我们监控的不是CPU使用率,而是pinecone.candidates_size(向量召回数)和sql.filter_ratio(SQL过滤率)。这些指标直接反映了混合搜索策略的健康度。例如,当filter_ratio突然下降,SRE或数据科学家就能立刻意识到,可能是向量索引没有及时同步最新的产品状态(如库存、价格变动),从而导致大量无效的召回。
  3. 计时器(Timer)的正确使用: Timer.record()方法简洁地包裹了需要计时的代码块。Micrometer会自动为我们计算总时间、次数、平均值和百分位P95/P99等统计数据,这对于定位性能瓶颈至关重要。
  4. 精细化的错误计数: 我们没有使用一个笼统的errors_total计数器,而是通过classifyException方法为错误打上reason标签。在Prometheus中,我们可以通过sum(rate(hybrid_search_requests_failed_total{reason="pinecone_api_error"}[5m]))这样的查询,精确地告警和分析特定类型的失败。
  5. 日志与度量的协同: 日志记录了错误的细节(例如堆栈跟踪),而度量则提供了聚合的趋势视图。两者结合才能形成完整的可观测性画面。

# 数据访问层与外部客户端

Panache Repository 极大地简化了SQL查询。对于“根据ID列表和条件查询”这种常见模式,代码如下:

package com.mycompany.search.data;

import io.quarkus.hibernate.orm.panache.PanacheRepository;
import io.quarkus.panache.common.Parameters;

import javax.enterprise.context.ApplicationScoped;
import java.util.List;

@ApplicationScoped
public class ProductRepository implements PanacheRepository<Product> {

    public List<Product> findByIdsAndFilters(List<String> ids, Double minPrice, String color) {
        // Panache让复杂的动态查询变得相对简单
        String query = "from Product where id in :ids and price >= :minPrice and color = :color and stock > 0";
        Parameters params = Parameters.with("ids", ids)
                                      .and("minPrice", minPrice)
                                      .and("color", color);
        
        return find(query, params).list();
    }
}

而与Pinecone交互的客户端,我们使用Quarkus的REST Client Reactive,它能将一个Java接口自动实现为HTTP客户端。

package com.mycompany.search.clients;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.jboss.resteasy.reactive.ClientHeaderParam;

import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/query")
@RegisterRestClient(configKey="pinecone-api") // 对应application.properties中的配置前缀
public interface PineconeClient {

    @POST
    @ClientHeaderParam(name = "Api-Key", value = "{getConfigValue('pinecone.api.key')}")
    PineconeQueryResponse query(PineconeQueryRequest request);

    // getConfigValue是MP Config的功能,用于动态获取配置
    default String getConfigValue(String key) {
        return ConfigProvider.getConfig().getValue(key, String.class);
    }
}

// DTOs for Pinecone request/response would be defined elsewhere
// record PineconeQueryRequest(...) {}
// record PineconeQueryResponse(...) {}

# 观察与验证

服务启动后,Quarkus会自动在/q/metrics端点暴露Prometheus格式的度量。访问该端点,可以看到我们自定义的度量输出:

# HELP hybrid_search_pinecone_latency_seconds 
# TYPE hybrid_search_pinecone_latency_seconds summary
hybrid_search_pinecone_latency_seconds_count{stage="query",} 15.0
hybrid_search_pinecone_latency_seconds_sum{stage="query",} 0.843
# HELP hybrid_search_pinecone_latency_seconds_max 
# TYPE hybrid_search_pinecone_latency_seconds_max gauge
hybrid_search_pinecone_latency_seconds_max{stage="query",} 0.098

# HELP hybrid_search_sql_filter_ratio 
# TYPE hybrid_search_sql_filter_ratio gauge
hybrid_search_sql_filter_ratio 0.65

# HELP hybrid_search_requests_failed_total 
# TYPE hybrid_search_requests_failed_total counter
hybrid_search_requests_failed_total{reason="pinecone_api_error",} 2.0
hybrid_search_requests_failed_total{reason="internal_error",} 0.0

# ... 其他度量

有了这些数据,我们可以在Grafana中创建仪表盘,实时监控混合搜索的核心健康指标。例如,我们可以将pinecone.latency的P99值与sql.latency的P99值并排展示,一眼就能看出性能瓶颈。更重要的是,我们可以为sql.filter_ratio设置告警规则,当该值持续低于某个阈值(比如0.2)时,自动触发告警,通知数据团队检查向量索引的质量和时效性。

# 局限性与未来路径

这个实现虽然解决了核心的可观测性问题,但在生产环境中仍有几个方面需要深化。

首先,结果融合逻辑非常基础。当前只是简单地用Pinecone的结果集去过滤SQL,最终排序依赖于SQL的默认排序或Pinecone的得分。一个更高级的系统会采用更复杂的重排(Re-ranking)模型,例如使用Reciprocal Rank Fusion (RRF)算法结合向量相似度得分和业务权重(如销量、评分),来生成一个更优的最终排序。

其次,数据同步链路是隐形的。我们没有讨论如何将PostgreSQL中的数据变更实时同步到Pinecone的向量索引中。在真实场景下,这需要一个健壮的CDC(Change Data Capture)管道,例如使用Debezium。这个管道自身的延迟和失败率,也需要被纳入我们的可观测性体系。

最后,当前的度量还是基于单次请求的。对于更复杂的分析,比如“哪些类型的查询会导致过滤率降低?”,我们需要将这些度量与分布式追踪(Tracing)结合起来,将请求的上下文(如用户ID、查询关键词)作为追踪的属性(attribute/tag),这样才能进行更深度的下钻分析。Quarkus对OpenTelemetry的支持为实现这一点提供了可能。
```


  目录