如何用LangGraphRAG为?

摘要:在本系列的开篇,我们利用create_agent工厂函数编写了一个RAG例子,这是一个将指定博文内容作为上下文的QA应用。现在我们使用LangGraph的编程模式重现实现它,并添加如下两个功能: 查询和上下文相关性评估:在利用查询文本从Ve
在本系列的开篇,我们利用create_agent工厂函数编写了一个RAG例子,这是一个将指定博文内容作为上下文的QA应用。现在我们使用LangGraph的编程模式重现实现它,并添加如下两个功能: 查询和上下文相关性评估:在利用查询文本从VectorStore检索出相关内容之后,我们利用LLM评估它们之间是否具有相关性; 重新生成查询:如果没有通过相关性评估,我们会利用LLM重新生成具有更高质量的查询文本,并重启QA流程; 整个流程如下图所示:入口节点“retrieve_context”利用查询文件检索博文内容,并将其作为上下文;如果通过了上述的相关性评估,直接转入“generate_response”节点生成回答,否则转入“regenerate_query”节点重新生成更高质量的查询文本。“regenerate_query”节点完成后再次转向入口节点重启流程。接下来我们分步骤介绍整个应用的实现。 步骤一: 检索上下文 我们首先通过如下的步骤创建作为上下文检索器的Retriever对象:我们利用WebBaseLoader加载博文内容并转换成Document列表,后者被RecursiveCharacterTextSplitter切割之后生成的切片被添加到创建的InMemoryVectorStore中,最后调用InMemoryVectorStore的as_retriever方法得到所需的检索器。 loader = WebBaseLoader( web_paths=("https://www.cnblogs.com/artech/p/inside-asp-net-core-framework.html",), bs_kwargs={"parse_only": SoupStrainer(class_=("postBody"))}, ) documents = loader.load() splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, add_start_index=True, ) retriever = ( InMemoryVectorStore.from_documents( documents=splitter.split_documents(documents), embedding=OpenAIEmbeddings(model="text-embedding-3-small"), ).as_retriever() ) 我们定义了如下的State作为整个执行流程的状态Schema类型。为了让大家了解整个流程的执行情况,我们会将每个步骤的执行信息以日志的形式写入log变量表示的列表。 log =[] class State(TypedDict): query: str context: str result:str generate_times: Annotated[int,operator.add] State类型具有如下四个成员: query:查询文本,可以是用户输入的原始查询文本,也可以是在相关性评估失败后重新生成的查询; context: 根据查询文本检索得到的作为上下文的文本; result:最终的回答; generate_times:如果评估一直失败,“检索-评估-查询"循环将会一直持续下去,generate_times对查询再生成进行计数; 如下所示的retrieve_context为“上下文检索”节点函数,只需要直接调用Retriever对象,将格式化后的检索内容用于更新状态的context成员。在方法返回之前,我们将检索信息写入日志。 def retrieve_context(query: State)->dict: retrieved_docs = retriever.invoke(query["query"]) context = "\n\n".join( (f"来源: {doc.metadata}\n内容: {doc.page_content}") for doc in retrieved_docs ) log.append(f"检索上下文: 检索到 {len(retrieved_docs)} 个相关文档\n") return {"context": context, "result": ""} 步骤二:相关性评估 我们使用LLM评估查询文本与检索内容的相关性,我们利用模型的结果化输出得到一个确定的二元结果(评估成功或者失败),如下所示的QueryEvaluationResult为绑定的结构化输出类型,两个字段成员is_relevant和reason为评估结果和理由。我们使用的模型组件为ChatOpenAI,并调用with_structured_output方法完成结构化输出Schema的绑定。 class QueryEvaluationResult(TypedDict): """对查询与上下文相关性的评估结果""" is_relevant: bool """查询与上下文相关性的判断结果""" reason: str """对判断结果的简要说明""" llm_evaluate = ChatOpenAI(model="gpt-5.2-chat").with_structured_output(QueryEvaluationResult) 如下所示的evaluate_query_relevance为相关性评估函数,返回的字符串表示在评估成功和失败情况下的路由节点。为了因总是评估失败而导致的无限循环,我们将查询文本的最大生成次数设置为3。 def evaluate_query_relevance(state: State) -> Literal["generate_response", "regenerate_query"]: if(state.get("generate_times") > 3): return "generate_response" evaluation_prompt = ( f"Query: {state['query']}\n" f"Context: {state['context']}\n\n" "请判断这个查询与上下文的相关性,返回格式为:{'is_relevant': bool, 'reason': str},其中reason是对判断结果的简要说明。" ) response = llm_evaluate.invoke([HumanMessage(content=evaluation_prompt)]) is_relevant = response["is_relevant"] path = "generate_response" if is_relevant else "regenerate_query" log.append(f"""查询相关性评估: 结果:{"不相关" if not is_relevant else "相关"}, 路由:{path}, 原因:{response['reason']}. """) return path 在使用查询文本和上下文格式化提示词后,我们调用ChatOpenAI对象得到评估结果和理由,并据此返回最终的路由节点名称。在方法返回之前,我们将评估信息写入日志。 步骤三:查询文本再生成 如下所示的查询文本再生成节点函数regenerate_query,会根据当前状态提供的查询文本和上下文构建提示词,并调用另一个ChatOpenAI对象得到由LLM生成的高质量的查询文本。我们在利用返回的字典更新query和generate_times成员之前,会将查询再生成的信息写入日志; llm = ChatOpenAI(model="gpt-5.2-chat") def regenerate_query(state: State) -> dict: regeneration_prompt = ( f"根据以下上下文信息,重新生成一个与查询相关的新查询。\n\n" f"Context: {state['context']}\n\n" f"Original Query: {state['query']}\n\n" "请直接回复新查询的内容,不要任何额外的说明和前后缀。" ) query = llm.invoke([HumanMessage(content=regeneration_prompt)]).content log.append(f"""重新生成查询 原始查询: '{state['query']}', 新查询: '{query}' """) return {"query": query, "generate_times": 1} 步骤四:生成答案 最终得到的答案由如下的节点函数generate_response生成,它利用状态提供的查询和上下文生成提示词调用ChatOpenAI对象(和查询再生成使用的是同一个)。在利用返回的字典将得到的结果写入状态的result成员之前,我们也会将相关执行信息写入日志。 def generate_response(state: State) -> dict: response_prompt = ( f"根据以下上下文信息,使用精炼的语言回答查询,尽量控制在100个字以内。\n\n" f"Context: {state['context']}\n\n" f"Query: {state['query']}\n\n" ) result = llm.invoke([HumanMessage(content=response_prompt)]).content log.append(f"生成最终回答: {result}") return {"result": result} 步骤五:图的构建和编译 我们创建了一个StateGraph对象,并将添加了上面定义的三个节点,其中retrieve_context和generate_response作为入口和完成节点。我们在retrieve_context和generate_respons/regenerate_query节点之间添加了一个“条件边”,条件分支函数为evaluate_query_relevance。regenerate_query和retrieve_context之间的边确保查询再生成后流程再次启动。 builder = (StateGraph(State) .add_node("retrieve_context", retrieve_context) # type: ignore .add_node("generate_response", generate_response) .add_node("regenerate_query", regenerate_query) .set_entry_point("retrieve_context") .set_finish_point("generate_response") .add_conditional_edges("retrieve_context", evaluate_query_relevance) .add_edge("regenerate_query", "retrieve_context") ) agent = builder.compile().with_config(recursion_limit=10) payload = agent.get_graph().draw_mermaid_png() PILImage.open(io.BytesIO(payload)).show() 在将StateGraph编译成Agent之后,我们调用其get_graph()得到Graph对象,并将其转换成PNG图片呈现出来,开篇给出的图片就是最终的呈现效果。 步骤六:调用Agent 我们调用Agent,并问了一个问题:比较一下ASP.NET Core和Express。并输出最终的状态和写入的日志。 result = agent.invoke({"query": "比较一下ASP.NET Core和Express"}) # type: ignore print(f"""最终状态: query:{result['query']} context:{result['context'][:50]} result:{result['result'][:50]} generate_times:{result['generate_times']} """) print("\n\nExecution Log:") for i, entry in enumerate(log): print(f"{i + 1}. {entry}") 最终的输出如下所示。从输出的日志可以清除地看出其执行流程:上下文检索 -> 查询相关性评估(失败)-> 查询再生成 -> 上下文检索 -> 查询相关性评估(失败)-> 查询再生成 -> 上下文检索->查询相关性评估(成功)-> 生成最终答案。 最终状态: query:结合RequestDelegate与Func<RequestDelegate, RequestDelegate>的抽象方式,深入分析ASP.NET Core中间件管道的设计动机及其对异步请求处理的支持机制 context:来源: {'source': 'https://www.cnblogs.com/artech/p/i result:ASP.NET Core将请求处理抽象为RequestDelegate(Func<HttpContext,Task>),统一同步/异步模型;中间件用Func<RequestDelegate,RequestDelegate>包装后续管道,实现职责链组合。该设计简洁、可组合,天然支持异步与管道式扩展。 generate_times:2 Execution Log: 1. 检索上下文: 检索到 4 个相关文档 2. 查询相关性评估: 结果:不相关, 路由:regenerate_query, 原因:查询要求比较 ASP.NET Core 与 Express,而上下文内容仅详细介绍了 ASP.NET Core 的框架原理与实现,没有涉及 Express 或两者的对比,因此相关性不足。. 3. 重新生成查询 原始查询: '比较一下ASP.NET Core和Express', 新查询: '从请求处理管道和中间件设计的角度,对比ASP.NET Core与Express框架的核心架构思想' 4. 检索上下文: 检索到 4 个相关文档 5. 查询相关性评估: 结果:不相关, 路由:regenerate_query, 原因:上下文内容详细介绍了 ASP.NET Core 的请求处理管道与中间件设计,但未涉及 Express 框架或其架构思想,无法支持对 ASP.NET Core 与 Express 的对比分析。. 6. 重新生成查询 原始查询: '从请求处理管道和中间件设计的角度,对比ASP.NET Core与Express框架的核心架构思想', 新查询: '结合RequestDelegate与Func<RequestDelegate, RequestDelegate>的抽象方式,深入分析ASP.NET Core中间件管道的设计动机及其对异步请求处理的支持机制' 7. 检索上下文: 检索到 4 个相关文档 8. 查询相关性评估: 结果:相关, 路由:generate_response, 原因:查询聚焦于RequestDelegate与Func<RequestDelegate, RequestDelegate>在ASP.NET Core中间件管道中的设计动机及异步支持机制,而上下文内容正系统性地解释了RequestDelegate为何采用Func<HttpContext, Task>、中间件为何抽象为Func<RequestDelegate, RequestDelegate>,以及这种设计如何支持异步请求处理,二者高度一致。. 9. 生成最终回答: ASP.NET Core将请求处理抽象为RequestDelegate(Func<HttpContext,Task>),统一同步/异步模型;中间件用Func<RequestDelegate,RequestDelegate>包装后续管道,实现职责链组合。该设计简 洁、可组合,天然支持异步与管道式扩展。 如果运行时设置了LangSmith相关的环境变量,从其提供的Trace不仅仅可以看清执行的流程,还可以获取每个步骤的输入和输出。 附上完整的代码: from dotenv import load_dotenv load_dotenv() from typing import TypedDict,Literal,Annotated from langchain.agents import create_agent from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_core.vectorstores import InMemoryVectorStore from langchain_core.messages import HumanMessage,AIMessage from langchain.agents import AgentState from langchain.tools import tool from bs4.filter import SoupStrainer from langchain_community.document_loaders import WebBaseLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langgraph.graph import StateGraph from PIL import Image as PILImage import io,operator loader = WebBaseLoader( web_paths=("https://www.cnblogs.com/artech/p/inside-asp-net-core-framework.html",), bs_kwargs={"parse_only": SoupStrainer(class_=("postBody"))}, ) documents = loader.load() splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, add_start_index=True, ) retriever = ( InMemoryVectorStore.from_documents( documents=splitter.split_documents(documents), embedding=OpenAIEmbeddings(model="text-embedding-3-small"), ).as_retriever() ) log =[] class State(TypedDict): query: str context: str result:str generate_times: Annotated[int,operator.add] class QueryEvaluationResult(TypedDict): """对查询与上下文相关性的评估结果""" is_relevant: bool """查询与上下文相关性的判断结果""" reason: str """对判断结果的简要说明""" def retrieve_context(query: State)->dict: retrieved_docs = retriever.invoke(query["query"]) context = "\n\n".join( (f"来源: {doc.metadata}\n内容: {doc.page_content}") for doc in retrieved_docs ) log.append(f"检索上下文: 检索到 {len(retrieved_docs)} 个相关文档\n") return {"context": context, "result": ""} llm_evaluate = ChatOpenAI(model="gpt-5.2-chat").with_structured_output(QueryEvaluationResult) def evaluate_query_relevance(state: State) -> Literal["generate_response", "regenerate_query"]: if(state.get("generate_times") > 3): return "generate_response" evaluation_prompt = ( f"Query: {state['query']}\n" f"Context: {state['context']}\n\n" "请判断这个查询与上下文的相关性,返回格式为:{'is_relevant': bool, 'reason': str},其中reason是对判断结果的简要说明。" ) response = llm_evaluate.invoke([HumanMessage(content=evaluation_prompt)]) is_relevant = response["is_relevant"] path = "generate_response" if is_relevant else "regenerate_query" log.append(f"""查询相关性评估: 结果:{"不相关" if not is_relevant else "相关"}, 路由:{path}, 原因:{response['reason']}. """) return path llm = ChatOpenAI(model="gpt-5.2-chat") def generate_response(state: State) -> dict: response_prompt = ( f"根据以下上下文信息,使用精炼的语言回答查询,尽量控制在100个字以内。\n\n" f"Context: {state['context']}\n\n" f"Query: {state['query']}\n\n" ) result = llm.invoke([HumanMessage(content=response_prompt)]).content log.append(f"生成最终回答: {result}") return {"result": result} def regenerate_query(state: State) -> dict: regeneration_prompt = ( f"根据以下上下文信息,重新生成一个与查询相关的新查询。\n\n" f"Context: {state['context']}\n\n" f"Original Query: {state['query']}\n\n" "请直接回复新查询的内容,不要任何额外的说明和前后缀。" ) query = llm.invoke([HumanMessage(content=regeneration_prompt)]).content log.append(f"""重新生成查询 原始查询: '{state['query']}', 新查询: '{query}' """) return {"query": query, "generate_times": 1} builder = (StateGraph(State) .add_node("retrieve_context", retrieve_context) # type: ignore .add_node("generate_response", generate_response) .add_node("regenerate_query", regenerate_query) .set_entry_point("retrieve_context") .set_finish_point("generate_response") .add_conditional_edges("retrieve_context", evaluate_query_relevance) .add_edge("regenerate_query", "retrieve_context") ) agent = builder.compile().with_config(recursion_limit=10) payload = agent.get_graph(xray=True).draw_mermaid_png() PILImage.open(io.BytesIO(payload)).show() result = agent.invoke({"query": "比较一下ASP.NET Core和Express"}) # type: ignore print(f"""最终状态: query:{result['query']} context:{result['context'][:50]} result:{result['result']} generate_times:{result['generate_times']} """) print("\n\nExecution Log:") for i, entry in enumerate(log): print(f"{i + 1}. {entry}")