Apache Cassandra 向量存储

本节将指导您设置 CassandraVectorStore 以存储文档嵌入并执行相似性搜索。

什么是 Apache Cassandra?

Apache Cassandra® 是一个真正的开源分布式数据库,以线性可伸缩性、经过验证的容错性和低延迟而闻名,使其成为任务关键型事务数据的完美平台。

其向量相似性搜索 (VSS) 基于 JVector 库,可确保一流的性能和相关性。

Apache Cassandra 中的向量搜索可以简单地完成:

SELECT content FROM table ORDER BY content_vector ANN OF query_embedding;

有关此内容的更多文档可以在 此处 阅读。

此 Spring AI 向量存储旨在适用于全新的 RAG 应用程序,并且能够改造到现有数据和表之上。

该存储还可以用于现有数据库中的非 RAG 用例,例如语义搜索、地理邻近搜索等。

该存储将根据其配置自动创建或增强模式。如果您不希望修改模式,请使用 initializeSchema 配置存储。

使用 spring-boot-autoconfigure 时,initializeSchema 默认为 false,根据 Spring Boot 标准,您必须通过在 application.properties 文件中设置 …​initialize-schema=true 来选择模式创建/修改。

什么是 JVector?

JVector 是一个纯 Java 嵌入式向量搜索引擎。

它通过以下特点在其他 HNSW 向量相似性搜索实现中脱颖而出:

  • 算法快速。JVector 使用受 DiskANN 和相关研究启发的最先进图算法,提供高召回率和低延迟。

  • 实现快速。JVector 使用 Panama SIMD API 来加速索引构建和查询。

  • 内存高效。JVector 使用乘积量化压缩向量,使其在搜索期间可以保留在内存中。

  • 磁盘感知。JVector 的磁盘布局旨在在查询时执行最少必要的 I/O 操作。

  • 并发。索引构建可线性扩展到至少 32 个线程。线程加倍,构建时间减半。

  • 增量。在构建索引时查询索引。添加向量和在搜索结果中找到它之间没有延迟。

  • 易于嵌入。API 旨在方便嵌入,由在生产中使用它的人员设计。

先决条件

  1. 一个 EmbeddingModel 实例来计算文档嵌入。这通常配置为 Spring Bean。有几个选项可用:

    • Transformers Embedding - 在您的本地环境中计算嵌入。默认是通过 ONNX 和 all-MiniLM-L6-v2 Sentence Transformers。这只是工作。

    • 如果您想使用 OpenAI 的 Embeddings - 使用 OpenAI 嵌入端点。您需要在 OpenAI 注册 创建一个帐户,并在 API 密钥 生成 api-key 令牌。

    • 还有更多选择,请参阅 Embeddings API 文档。

  2. 一个 Apache Cassandra 实例,版本 5.0-beta1[style="loweralpha"][style="loweralpha"][style="loweralpha"]

    1. DIY 快速入门

    2. 对于托管服务,Astra DB 提供健康的免费套餐。

依赖项

Spring AI 自动配置、启动器模块的工件名称发生了重大变化。 有关更多信息,请参阅 升级说明

对于依赖管理,我们建议使用 Spring AI BOM,如 依赖管理 部分所述。

将这些依赖项添加到您的项目中:

  • 仅用于 Cassandra 向量存储:

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-cassandra-store</artifactId>
</dependency>
  • 或者,用于 RAG 应用程序所需的一切(使用默认 ONNX 嵌入模型):

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-starter-vector-store-cassandra</artifactId>
</dependency>

配置属性

您可以在 Spring Boot 配置中使用以下属性来自定义 Apache Cassandra 向量存储。

属性 默认值

spring.ai.vectorstore.cassandra.keyspace

springframework

spring.ai.vectorstore.cassandra.table

ai_vector_store

spring.ai.vectorstore.cassandra.initialize-schema

false

spring.ai.vectorstore.cassandra.index-name

spring.ai.vectorstore.cassandra.content-column-name

content

spring.ai.vectorstore.cassandra.embedding-column-name

embedding

spring.ai.vectorstore.cassandra.fixed-thread-pool-executor-size

16

用法

基本用法

将 CassandraVectorStore 实例创建为 Spring Bean:

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        .build();
}

一旦有了向量存储实例,就可以添加文档并执行搜索:

// Add documents
vectorStore.add(List.of(
    new Document("1", "content1", Map.of("key1", "value1")),
    new Document("2", "content2", Map.of("key2", "value2"))
));

// Search with filters
List<Document> results = vectorStore.similaritySearch(
    SearchRequest.query("search text")
        .withTopK(5)
        .withSimilarityThreshold(0.7f)
        .withFilterExpression("metadata.key1 == 'value1'")
);

高级配置

对于更复杂的用例,您可以在 Spring Bean 中配置其他设置:

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        // Configure primary keys
        .partitionKeys(List.of(
            new SchemaColumn("id", DataTypes.TEXT),
            new SchemaColumn("category", DataTypes.TEXT)
        ))
        .clusteringKeys(List.of(
            new SchemaColumn("timestamp", DataTypes.TIMESTAMP)
        ))
        // Add metadata columns with optional indexing
        .addMetadataColumns(
            new SchemaColumn("category", DataTypes.TEXT, SchemaColumnTags.INDEXED),
            new SchemaColumn("score", DataTypes.DOUBLE)
        )
        // Customize column names
        .contentColumnName("text")
        .embeddingColumnName("vector")
        // Performance tuning
        .fixedThreadPoolExecutorSize(32)
        // Schema management
        .initializeSchema(true)
        // Custom batching strategy
        .batchingStrategy(new TokenCountBatchingStrategy())
        .build();
}

连接配置

有两种方法可以配置与 Cassandra 的连接:

  • 使用注入的 CqlSession(推荐):

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .session(session)
        .keyspace("my_keyspace")
        .table("my_vectors")
        .build();
}
  • 直接在构建器中使用连接详细信息:

@Bean
public VectorStore vectorStore(EmbeddingModel embeddingModel) {
    return CassandraVectorStore.builder(embeddingModel)
        .contactPoint(new InetSocketAddress("localhost", 9042))
        .localDatacenter("datacenter1")
        .keyspace("my_keyspace")
        .build();
}

元数据过滤

您可以利用通用、可移植的元数据过滤器与 CassandraVectorStore。为了使元数据列可搜索,它们必须是主键或 SAI 索引。要使非主键列索引,请使用 SchemaColumnTags.INDEXED 配置元数据列。

例如,您可以使用文本表达式语言:

vectorStore.similaritySearch(
    SearchRequest.builder().query("The World")
        .topK(5)
        .filterExpression("country in ['UK', 'NL'] && year >= 2020").build());

或通过编程方式使用表达式 DSL:

Filter.Expression f = new FilterExpressionBuilder()
    .and(
        f.in("country", "UK", "NL"),
        f.gte("year", 2020)
    ).build();

vectorStore.similaritySearch(
    SearchRequest.builder().query("The World")
        .topK(5)
        .filterExpression(f).build());

可移植的过滤器表达式会自动转换为 CQL 查询

高级示例:基于 Wikipedia 数据集的向量存储

以下示例演示如何在现有模式上使用存储。这里我们使用来自 [role="bare"][role="bare"][role="bare"]https://github.com/datastax-labs/colbert-wikipedia-data 项目的模式,该项目提供了完整的 Wikipedia 数据集,已为您向量化。

首先,在 Cassandra 数据库中创建模式:

wget https://s.apache.org/colbert-wikipedia-schema-cql -O colbert-wikipedia-schema.cql
cqlsh -f colbert-wikipedia-schema.cql

然后使用构建器模式配置存储:

@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
    List<SchemaColumn> partitionColumns = List.of(
        new SchemaColumn("wiki", DataTypes.TEXT),
        new SchemaColumn("language", DataTypes.TEXT),
        new SchemaColumn("title", DataTypes.TEXT)
    );

    List<SchemaColumn> clusteringColumns = List.of(
        new SchemaColumn("chunk_no", DataTypes.INT),
        new SchemaColumn("bert_embedding_no", DataTypes.INT)
    );

    List<SchemaColumn> extraColumns = List.of(
        new SchemaColumn("revision", DataTypes.INT),
        new SchemaColumn("id", DataTypes.INT)
    );

    return CassandraVectorStore.builder()
        .session(session)
        .embeddingModel(embeddingModel)
        .keyspace("wikidata")
        .table("articles")
        .partitionKeys(partitionColumns)
        .clusteringKeys(clusteringColumns)
        .contentColumnName("body")
        .embeddingColumnName("all_minilm_l6_v2_embedding")
        .indexName("all_minilm_l6_v2_ann")
        .initializeSchema(false)
        .addMetadataColumns(extraColumns)
        .primaryKeyTranslator((List<Object> primaryKeys) -> {
            if (primaryKeys.isEmpty()) {
                return "test§¶0";
            }
            return String.format("%s§¶%s", primaryKeys.get(2), primaryKeys.get(3));
        })
        .documentIdTranslator((id) -> {
            String[] parts = id.split("§¶");
            String title = parts[0];
            int chunk_no = parts.length > 1 ? Integer.parseInt(parts[1]) : 0;
            return List.of("simplewiki", "en", title, chunk_no, 0);
        })
        .build();
}

@Bean
public EmbeddingModel embeddingModel() {
    // default is ONNX all-MiniLM-L6-v2 which is what we want
    return new TransformersEmbeddingModel();
}

加载完整的 Wikipedia 数据集

要加载完整的 Wikipedia 数据集:

  1. 从 [role="bare"][role="bare"][role="bare"]https://s.apache.org/simplewiki-sstable-tar 下载 simplewiki-sstable.tar(这将需要一段时间,文件大小为几十 GB)

  2. 加载数据:

tar -xf simplewiki-sstable.tar -C ${CASSANDRA_DATA}/data/wikidata/articles-*/
nodetool import wikidata articles ${CASSANDRA_DATA}/data/wikidata/articles-*/
  • 如果此表中已有数据,请检查 tarball 的文件在执行 tar 时不会覆盖现有 sstables。

  • nodetool import 的替代方法是重新启动 Cassandra。

  • 如果索引有任何故障,它们将自动重建。

访问本机客户端

Cassandra 向量存储实现通过 getNativeClient() 方法提供对底层本机 Cassandra 客户端 (CqlSession) 的访问:

CassandraVectorStore vectorStore = context.getBean(CassandraVectorStore.class);
Optional<CqlSession> nativeClient = vectorStore.getNativeClient();

if (nativeClient.isPresent()) {
    CqlSession session = nativeClient.get();
    // Use the native client for Cassandra-specific operations
}

本机客户端使您能够访问 Cassandra 特定的功能和操作,这些功能和操作可能不会通过 VectorStore 接口公开。