本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 ElastiCache for Valkey 实现语义缓存
以下演练展示了如何使用 ElastiCache 带有 Amazon Bedrock 的 Valkey 实现直读语义缓存。
步骤 1:创建 ElastiCache 适用于 Valkey 的集群
使用以下命令 ElastiCache 创建 8.2 或更高版本的 Valkey 集群: AWS CLI
aws elasticache create-replication-group \ --replication-group-id "valkey-semantic-cache" \ --cache-node-type cache.r7g.large \ --engine valkey \ --engine-version 8.2 \ --num-node-groups 1 \ --replicas-per-node-group 1
步骤 2:Connect 连接到集群并配置嵌入式组件
通过在 Amazon EC2 实例上运行的应用程序代码,连接到 ElastiCache 集群并设置嵌入模型:
from valkey.cluster import ValkeyCluster from langchain_aws import BedrockEmbeddings # Connect to ElastiCache for Valkey valkey_client = ValkeyCluster( host="mycluster.xxxxxx.clustercfg.use1.cache.amazonaws.com", # Your cluster endpoint port=6379, decode_responses=False ) # Set up Amazon Bedrock Titan embeddings embeddings = BedrockEmbeddings( model_id="amazon.titan-embed-text-v2:0", region_name="us-east-1" )
将主机值替换为 ElastiCache 集群的配置终端节点。有关查找集群终端节点的说明,请参阅访问您的 ElastiCache 集群。
步骤 3:为语义缓存创建向量索引
配置使用带有余弦距离 ValkeyStore 的 HNSW 索引自动嵌入查询,以进行矢量搜索:
from langgraph_checkpoint_aws import ValkeyStore from hashlib import md5 store = ValkeyStore( client=valkey_client, index={ "collection_name": "semantic_cache", "embed": embeddings, "fields": ["query"], # Fields to vectorize "index_type": "HNSW", # Vector search algorithm "distance_metric": "COSINE", # Similarity metric "dims": 1024 # Titan V2 produces 1024-d vectors } ) store.setup() def cache_key_for_query(query: str): """Generate a deterministic cache key for a query.""" return md5(query.encode("utf-8")).hexdigest()
注意
ElastiCache for Valkey 使用索引来提供快速而准确的矢量搜索。该FT.CREATE命令创建基础索引。有关更多信息,请参阅向量搜索 ElastiCache。
第 4 步:实现缓存搜索和更新功能
创建函数以在缓存中搜索语义上相似的查询并存储新的查询-响应对:
def search_cache(user_message: str, k: int = 3, min_similarity: float = 0.8): """Look up a semantically similar cached response from ElastiCache.""" hits = store.search( namespace="semantic-cache", query=user_message, limit=k ) if not hits: return None # Sort by similarity score (highest first) hits = sorted(hits, key=lambda h: h["score"], reverse=True) top_hit = hits[0] score = top_hit["score"] if score < min_similarity: return None # Below similarity threshold return top_hit["value"]["answer"] # Return cached answer def store_cache(user_message: str, result_message: str): """Store a new query-response pair in the semantic cache.""" key = cache_key_for_query(user_message) store.put( namespace="semantic-cache", key=key, value={ "query": user_message, "answer": result_message } )
第 5 步:实现直读缓存模式
将缓存集成到应用程序的请求处理中:
import time def handle_query(user_message: str) -> dict: """Handle a user query with read-through semantic cache.""" start = time.time() # Step 1: Search the semantic cache cached_response = search_cache(user_message, min_similarity=0.8) if cached_response: # Cache hit - return cached response elapsed = (time.time() - start) * 1000 return { "response": cached_response, "source": "cache", "latency_ms": round(elapsed, 1), } # Step 2: Cache miss - invoke LLM llm_response = invoke_llm(user_message) # Your LLM invocation function # Step 3: Store the response in cache for future reuse store_cache(user_message, llm_response) elapsed = (time.time() - start) * 1000 return { "response": llm_response, "source": "llm", "latency_ms": round(elapsed, 1), }
底层 Valkey 命令
下表显示了用于实现语义缓存的 Valkey 命令:
| 操作 | Valkey 命令 | 典型延迟 |
|---|---|---|
| 创建索引 | FT.CREATE semantic_cache SCHEMA query TEXT answer TEXT embedding VECTOR HNSW 6 TYPE FLOAT32 DIM 1024 DISTANCE_METRIC COSINE |
One-time 设置 |
| 缓存查询 | FT.SEARCH semantic_cache "*=>[KNN 3 @embedding $query_vec]" PARAMS 2 query_vec [bytes] DIALECT 2 |
微秒 |
| 商店响应 | HSET cache:{hash} query "..." answer "..." embedding [bytes] |
微秒 |
| 设置 TTL | EXPIRE cache:{hash} 82800 |
微秒 |
| 法学硕士推理(错过) | 对 Amazon Bedrock 的外部 API 调用 | 500—6000 毫秒 |