- Published on
AI 智能体高可靠设计模式:分片与分散检索
- Authors
- Name
- 俞凡
本系列介绍增强现代智能体系统可靠性的设计模式,以直观方式逐一介绍每个概念,拆解其目的,然后实现简单可行的版本,演示其如何融入现实世界的智能体系统。本系列一共 14 篇文章,这是第 11 篇。原文:Building the 14 Key Pillars of Agentic AI
优化智能体解决方案需要软件工程确保组件协调、并行运行并与系统高效交互。例如预测执行,会尝试处理可预测查询以降低时延,或者进行冗余执行,即对同一智能体重复执行多次以防单点故障。其他增强现代智能体系统可靠性的模式包括:
- 并行工具:智能体同时执行独立 API 调用以隐藏 I/O 时延。
- 层级智能体:管理者将任务拆分为由执行智能体处理的小步骤。
- 竞争性智能体组合:多个智能体提出答案,系统选出最佳。
- 冗余执行:即两个或多个智能体解决同一任务以检测错误并提高可靠性。
- 并行检索和混合检索:多种检索策略协同运行以提升上下文质量。
- 多跳检索:智能体通过迭代检索步骤收集更深入、更相关的信息。
还有很多其他模式。
本系列将实现最常用智能体模式背后的基础概念,以直观方式逐一介绍每个概念,拆解其目的,然后实现简单可行的版本,演示其如何融入现实世界的智能体系统。
所有理论和代码都在 GitHub 仓库里:🤖 Agentic Parallelism: A Practical Guide 🚀
代码库组织如下:
agentic-parallelism/
├── 01_parallel_tool_use.ipynb
├── 02_parallel_hypothesis.ipynb
...
├── 06_competitive_agent_ensembles.ipynb
├── 07_agent_assembly_line.ipynb
├── 08_decentralized_blackboard.ipynb
...
├── 13_parallel_context_preprocessing.ipynb
└── 14_parallel_multi_hop_retrieval.ipynb
分片与分散检索
随着知识库从数千份文档增长到数百万或数十亿份文档……单一、单体化的向量存储成为主要瓶颈,搜索时延增加,索引变得难以管理和更新。
分散检索
解决这一问题的架构方案是分片和分散检索,核心思想是不使用庞大的单一索引,而是将知识库分割(或分片)成多个更小、独立的向量存储。
可以按任何逻辑划分组织分片,例如主题、日期或数据源。当用户查询到达时,中央协调器将查询“分散”到所有分片中,这些分片并行执行搜索,然后收集结果并重新排序,以找到全局最佳文档。
我们将构建一个模拟双分片系统(工程 vs. 市场),并将其与单体系统进行比较,以了解其在时延和答案质量方面的优势。
首先需要创建知识库分片,我们将创建两个独立文档列表,每个列表包含特定领域信息。
from langchain_core.documents import Document
# 模拟“工程”知识库分片的文档列表
eng_docs = [
Document(page_content="The QuantumLeap V3 processor utilizes a 3nm process node and features a dedicated AI accelerator core with 128 tensor units. API endpoint `/api/v3/status` provides real-time thermal throttling data.", metadata={"source": "eng-kb"}),
Document(page_content="Firmware update v2.1 for the Aura Smart Ring optimizes the photoplethysmography (PPG) sensor algorithm for more accurate sleep stage detection. The update is deployed via the mobile app.", metadata={"source": "eng-kb"}),
Document(page_content="The Smart Mug's heating element is a nickel-chromium coil controlled by a PID controller. It maintains temperature within +/- 1 degree Celsius. Battery polling is done via the `getBattery` function.", metadata={"source": "eng-kb"})
]
# 模拟“市场营销”知识库分片的文档列表
mkt_docs = [
Document(page_content="Press Release: Unveiling the QuantumLeap V3, the AI processor that redefines speed. 'It's a game-changer for creative professionals,' says CEO Jane Doe. Available Q4.", metadata={"source": "mkt-kb"}),
Document(page_content="Product Page: The Aura Smart Ring is your personal wellness companion. Crafted from aerospace-grade titanium, it empowers you to unlock your full potential by understanding your body's signals.", metadata={"source": "mkt-kb"}),
Document(page_content="Blog Post: 'Five Ways Our Smart Mug Supercharges Your Morning Routine.' The perfect temperature, from the first sip to the last, means your coffee is always perfect.", metadata={"source": "mkt-kb"})
]
我们有意创建了两个不同的知识领域。关于 QuantumLeap V3 的技术规格信息仅在 eng_docs 中,而关于其市场定位的信息仅在 mkt_docs 中。这种分离能够测试系统是否能够从这两个来源正确检索信息。
然后从文档中创建两个向量存储分片。
from langchain_community.vectorstores import FAISS
# 创建两个独立的FAISS矢量存储库
eng_vectorstore = FAISS.from_documents(eng_docs, embedding=embeddings)
mkt_vectorstore = FAISS.from_documents(mkt_docs, embedding=embeddings)
# 然后为每个分片创建一个检索器
eng_retriever = eng_vectorstore.as_retriever(search_kwargs={"k": 2})
mkt_retriever = mkt_vectorstore.as_retriever(search_kwargs={"k": 2})
print(f"Knowledge Base shards created: Engineering KB ({len(eng_docs)} docs), Marketing KB ({len(mkt_docs)} docs).")
现在有了两个分片 eng_retriever 和 mkt_retriever,也就是有了分布式知识库,每个检索器只在其小型专用索引上运行。
接下来构建分片式 RAG 系统,核心是一个 LangGraph 节点,可以将查询分散到两个分片中,然后收集结果。
from typing import TypedDict, List
from concurrent.futures import ThreadPoolExecutor
import time
class ShardedRAGState(TypedDict):
question: str
retrieved_docs: List[Document]
final_answer: str
def parallel_retrieval_node(state: ShardedRAGState):
"""该模式的核心是:将查询并行分散到所有分片并收集结果"""
print("--- [Meta-Retriever] Scattering query to Engineering and Marketing shards in parallel... ---")
# 用 ThreadPoolExecutor 并发运行两个检索调用
with ThreadPoolExecutor(max_workers=2) as executor:
# 创建一个简单的助手,为每个分片搜索添加模拟延迟
# 这模拟了现实世界中搜索一个较小索引所花费的时间
def p_retrieval(retriever):
time.sleep(0.5)
return retriever.invoke(state['question'])
# 将两个检索任务提交给执行器
futures = [executor.submit(p_retrieval, retriever) for retriever in [eng_retriever, mkt_retriever]]
all_docs = []
for future in futures:
all_docs.extend(future.result())
# #“收集”步骤:合并并删除所有分片的结果
# 在真实系统中,这里会有一个更复杂的重排序步骤
unique_docs = list({doc.page_content: doc for doc in all_docs}.values())
print(f"--- [Meta-Retriever] Gathered {len(unique_docs)} unique documents from 2 shards. ---")
return {"retrieved_docs": unique_docs}
# 用生成节点组装完整的图
from langgraph.graph import StateGraph, END
workflow = StateGraph(ShardedRAGState)
workflow.add_node("parallel_retrieval", parallel_retrieval_node)
workflow.add_node("generate_answer", generation_node)
workflow.set_entry_point("parallel_retrieval")
workflow.add_edge("parallel_retrieval", "generate_answer")
workflow.add_edge("generate_answer", END)
sharded_rag_app = workflow.compile()
分散检索
由 ThreadPoolExecutor 执行分散操作,同时将相同查询发送到 eng_retriever 和 mkt_retriever。收集结果时,从 futures 收集结果后进行去重,确保从分布式知识库中获取全面、统一的内容。
为了进行最终对比分析,我们对一个需要两个分片信息才能完整回答的查询运行单体式 RAG(模拟高延迟)和分片式 RAG。
# 查询包含强大的市场关键字(‘game-changer‘)和一个特定技术问题(’API status endpoint’)
user_query = "I heard the new QuantumLeap V3 is a 'game-changer for creative professionals'. Can you tell me more about it, and is there an API endpoint to check its status?"
# --- 执行单体 RAG ---
print("--- [MONOLITHIC RAG] Starting run... ---")
start_time = time.time()
monolithic_answer = monolithic_rag_chain.invoke(user_query)
monolithic_time = time.time() - start_time
# --- 执行分片 RAG ---
print("\n--- [SHARDED RAG] Starting run... ---")
start_time = time.time()
inputs = {"question": user_query}
sharded_time = time.time() - start_time
# --- 最终分析 ---
print("\n" + "="*60)
print(" ACCURACY & RECALL ANALYSIS")
print("="*60 + "\n")
print("="*60)
print(" PERFORMANCE ANALYSIS")
print("="*60 + "\n")
print(f"Monolithic RAG Total Time: {monolithic_time:.2f} seconds")
print(f"Sharded RAG Total Time: {sharded_time:.2f} seconds\n")
latency_improvement = ((monolithic_time - sharded_time) / monolithic_time) * 100
print(f"Latency Improvement: {latency_improvement:.0f}%\n")
结果如下……
#### 输出 ####
============================================================
ACCURACY & RECALL ANALYSIS
============================================================
**Monolithic System:** Retrieved 3 documents. While it found the two correct documents, it also retrieved an irrelevant document about the 'Aura Smart Ring'. The strong semantic similarity of 'empowers you to unlock your full potential' to 'game-changer for creative professionals' pulled in this unrelated document. This noise can degrade the quality of the final answer.
**Sharded System:** Retrieved 2 documents. The parallel search was more precise. The marketing shard found the press release, and the engineering shard found the technical specs. It correctly ignored all irrelevant documents from other product lines. This resulted in a cleaner, more focused context for the generator.
**Conclusion:** The sharded architecture improved retrieval precision by isolating knowledge domains. This prevents context pollution from irrelevant but semantically similar documents, leading to a more accurate and trustworthy final answer.
============================================================
PERFORMANCE ANALYSIS
============================================================
Monolithic RAG Total Time: 6.89 seconds
Sharded RAG Total Time: 4.95 seconds
Latency Improvement: 28%
最终分析揭示了分片与分散检索架构的两个优势。
- 查询营销组件在营销分片中顺利解决,而技术组件则匹配了工程分片。面向所有内容的单体索引被语义相似但无关的营销文本干扰,降低了上下文质量。
- 分片系统运行速度提高了 28%,因为并行查询在更小、专属领域范围的索引上执行。这种设计可以很好扩展:单体检索随着语料库的扩展而变慢,而分片延迟保持稳定,仅受最大分片大小的影响。