加微信获取完整源码:huice666/danwen668
LangChain+Milvus检索质量纠错系统实战
一、传统 RAG 的局限性
1.1 传统 RAG 的核心假设缺陷
传统 RAG 系统基于一个简单假设:语义相似度高的文档 = 能回答用户问题。这个假设在生产环境中存在三大漏洞:
二值决策的致命问题:
● 检索偏差:查 “Nginx HTTPS” → 返回 Apache 配置(语义相似,场景不符)
● 时效性缺失:新旧版本文档混杂,无法区分 2018 废弃代码 vs 2026 推荐写法
● 记忆污染:错误信息进入向量库后,形成恶性循环,系统越用越错
二、CRAG 架构设计
2.1 核心创新:四步闭环流程
CRAG (Corrective Retrieval-Augmented Generation)
将传统 RAG 的二步走升级为四步闭环:
2.2 状态机流转图
三、核心组件详解
3.1 评估器 (Evaluator) 设计
评估器是 CRAG 的核心,负责快速(非精确)判断检索质量:
评估 Prompt 模板:
grader_prompt = PromptTemplate(
template="""你是文档相关性评估专家。评估以下文档是否能回答查询。
查询:{query}
文档内容:
{document}
评估标准:
- relevant:文档直接包含答案,置信度 > 0.9
- ambiguous:文档部分相关,置信度 0.5-0.9
- incorrect:文档不相关,置信度 < 0.5
返回 JSON 格式:
{{
"verdict": "relevant|ambiguous|incorrect",
"confidence": 0.95,
"reasoning": "简要说明判断理由"
}}""",
input_variables=["query", "document"]
)
3.2 知识精炼 (Knowledge Refinement)
传统 RAG 将整篇文档塞给 LLM,导致:
● Token 浪费
● 无关信息干扰
● 幻觉风险增加
CRAG 采用句子级精炼:
精炼算法实现:
def _refine_documents(self, docs: list, query: str) -> list:
"""基于关键词的句子级精炼"""
refined = []
keywords = [kw.strip() for kw in query.split() if kw.strip()]
for doc in docs:
text = doc.page_content or ""
# 句子切分(支持中英文)
sentences = (
text.replace("。", "。\n")
.replace(". ", ".\n")
.replace("! ", "!\n")
.replace("? ", "?\n")
.split("\n")
)
# 关键词匹配
relevant_sentences = [
s for s in sentences
if any(keyword in s for keyword in keywords)
]
if relevant_sentences:
refined_text = "。".join(relevant_sentences[:3]) # 取前3句
refined.append(Document(
page_content=refined_text,
metadata=doc.metadata
))
return refined if refined else docs # 兜底:返回原文
精炼效果对比:
3.3 动态路由决策
根据评估结果,系统执行不同的纠正策略:
四、Milvus 向量数据库架构
4.1 Schema 设计
核心字段说明:
4.2 混合检索实现
混合检索代码:
results = collection.hybrid_search(
reqs=[
AnnSearchRequest(
data=[dense_vec],
anns_field="dense_embedding",
limit=20
),
AnnSearchRequest(
data=[sparse_vec],
anns_field="sparse_embedding",
limit=20
)
],
rerank=RRFRanker(), # 原生 RRF 融合
output_fields=["text", "metadata"],
expr='metadata["confidence"] > 0.9', # CRAG 置信度过滤
limit=5
)
五、完整代码架构
5.1 系统架构图
5.2 CRAG Middleware 核心实现
class CRAGMiddleware(AgentMiddleware):
"""CRAG 评估与纠正中间件"""
def __init__(self, vector_store: Milvus, agent_id: str):
super().__init__()
self.vector_store = vector_store
self.agent_id = agent_id # 多租户隔离
self.evaluator = init_chat_model("deepseek:deepseek-chat")
self.web_search = TavilySearch(max_results=3)
def wrap_model_call(self, request: ModelRequest, handler) -> ModelResponse:
"""在模型调用前执行 CRAG 流程"""
messages = list(request.messages)
# 提取用户查询
query = self._extract_query(messages)
if not query:
return handler(request)
# 1️⃣ 检索
docs = self._retrieve_from_milvus(query)
# 2️⃣ 评估
verdict = self._evaluate_relevance(query, docs)
# 3️⃣ 纠正
final_context = self._correct_context(query, docs, verdict)
# 4️⃣ 注入上下文
new_messages = self._inject_context(messages, final_context)
new_request = request.override(messages=new_messages)
return handler(new_request)
5.3 数据流时序图
欢迎加入智能体技术交流群
|
|

