View a markdown version of this page

使用 ElastiCache for Valkey 实现语义缓存 - Amazon ElastiCache

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 ElastiCache for Valkey 实现语义缓存

以下演练展示了如何使用 ElastiCache 带有 Amazon Bedrock 的 Valkey 实现直读语义缓存。

步骤 1:创建 ElastiCache 适用于 Valkey 的集群

使用以下命令 ElastiCache 创建 8.2 或更高版本的 Valkey 集群: AWS CLI

aws elasticache create-replication-group \ --replication-group-id "valkey-semantic-cache" \ --cache-node-type cache.r7g.large \ --engine valkey \ --engine-version 8.2 \ --num-node-groups 1 \ --replicas-per-node-group 1

步骤 2:Connect 连接到集群并配置嵌入式组件

通过在 Amazon EC2 实例上运行的应用程序代码,连接到 ElastiCache 集群并设置嵌入模型:

from valkey.cluster import ValkeyCluster from langchain_aws import BedrockEmbeddings # Connect to ElastiCache for Valkey valkey_client = ValkeyCluster( host="mycluster.xxxxxx.clustercfg.use1.cache.amazonaws.com", # Your cluster endpoint port=6379, decode_responses=False ) # Set up Amazon Bedrock Titan embeddings embeddings = BedrockEmbeddings( model_id="amazon.titan-embed-text-v2:0", region_name="us-east-1" )

将主机值替换为 ElastiCache 集群的配置终端节点。有关查找集群终端节点的说明,请参阅访问您的 ElastiCache 集群

步骤 3:为语义缓存创建向量索引

配置使用带有余弦距离 ValkeyStore 的 HNSW 索引自动嵌入查询,以进行矢量搜索:

from langgraph_checkpoint_aws import ValkeyStore from hashlib import md5 store = ValkeyStore( client=valkey_client, index={ "collection_name": "semantic_cache", "embed": embeddings, "fields": ["query"], # Fields to vectorize "index_type": "HNSW", # Vector search algorithm "distance_metric": "COSINE", # Similarity metric "dims": 1024 # Titan V2 produces 1024-d vectors } ) store.setup() def cache_key_for_query(query: str): """Generate a deterministic cache key for a query.""" return md5(query.encode("utf-8")).hexdigest()
注意

ElastiCache for Valkey 使用索引来提供快速而准确的矢量搜索。该FT.CREATE命令创建基础索引。有关更多信息,请参阅向量搜索 ElastiCache

第 4 步:实现缓存搜索和更新功能

创建函数以在缓存中搜索语义上相似的查询并存储新的查询-响应对:

def search_cache(user_message: str, k: int = 3, min_similarity: float = 0.8): """Look up a semantically similar cached response from ElastiCache.""" hits = store.search( namespace="semantic-cache", query=user_message, limit=k ) if not hits: return None # Sort by similarity score (highest first) hits = sorted(hits, key=lambda h: h["score"], reverse=True) top_hit = hits[0] score = top_hit["score"] if score < min_similarity: return None # Below similarity threshold return top_hit["value"]["answer"] # Return cached answer def store_cache(user_message: str, result_message: str): """Store a new query-response pair in the semantic cache.""" key = cache_key_for_query(user_message) store.put( namespace="semantic-cache", key=key, value={ "query": user_message, "answer": result_message } )

第 5 步:实现直读缓存模式

将缓存集成到应用程序的请求处理中:

import time def handle_query(user_message: str) -> dict: """Handle a user query with read-through semantic cache.""" start = time.time() # Step 1: Search the semantic cache cached_response = search_cache(user_message, min_similarity=0.8) if cached_response: # Cache hit - return cached response elapsed = (time.time() - start) * 1000 return { "response": cached_response, "source": "cache", "latency_ms": round(elapsed, 1), } # Step 2: Cache miss - invoke LLM llm_response = invoke_llm(user_message) # Your LLM invocation function # Step 3: Store the response in cache for future reuse store_cache(user_message, llm_response) elapsed = (time.time() - start) * 1000 return { "response": llm_response, "source": "llm", "latency_ms": round(elapsed, 1), }

底层 Valkey 命令

下表显示了用于实现语义缓存的 Valkey 命令:

操作 Valkey 命令 典型延迟
创建索引 FT.CREATE semantic_cache SCHEMA query TEXT answer TEXT embedding VECTOR HNSW 6 TYPE FLOAT32 DIM 1024 DISTANCE_METRIC COSINE One-time 设置
缓存查询 FT.SEARCH semantic_cache "*=>[KNN 3 @embedding $query_vec]" PARAMS 2 query_vec [bytes] DIALECT 2 微秒
商店响应 HSET cache:{hash} query "..." answer "..." embedding [bytes] 微秒
设置 TTL EXPIRE cache:{hash} 82800 微秒
法学硕士推理(错过) 对 Amazon Bedrock 的外部 API 调用 500—6000 毫秒