LangChain 源码深度解析:架构、核心组件与实现原理
一、引言
1.1 为什么需要深入理解LangChain源码?
在当今人工智能领域,大型语言模型(LLMs)如GPT-4、Claude等展现出了强大的自然语言处理能力。然而,要将这些模型应用于实际场景,开发者往往面临诸多挑战,如上下文管理、工具调用、多模态交互等。LangChain作为一个开源框架,提供了一套灵活且强大的工具集,帮助开发者构建基于LLMs的复杂应用。
深入理解LangChain源码有以下几个重要原因:
- 定制化需求:不同的应用场景可能需要对LangChain的核心组件进行定制或扩展,例如自定义提示模板、记忆策略或代理决策逻辑。
- 性能优化:通过分析源码,可以识别性能瓶颈并进行针对性优化,例如缓存机制、异步处理或并行计算。
- 故障排查:当应用出现问题时,源码级别的理解能够帮助快速定位和解决问题。
- 技术创新:了解LangChain的实现原理可以启发开发者在其基础上进行技术创新,开发新的组件或应用模式。
1.2 本文目标与范围
本文旨在从源码层面深入解析LangChain的核心架构、组件实现和工作原理。我们将:
- 分析LangChain的整体架构设计和模块划分
- 深入探讨核心组件如Chains、Memory、Prompt Templates、Agents和Tools的实现原理
- 解析LangChain与LLMs的集成机制和交互流程
- 探讨异步处理、缓存策略和并行计算等高级特性
- 通过具体案例展示如何扩展和定制LangChain
本文将基于LangChain的最新稳定版本(截至2023年10月)进行分析,重点关注核心功能和实现原理,而非具体的API使用方法。对于一些复杂的实现细节,我们将提供简化的示例代码以帮助理解。
1.3 预备知识
为了更好地理解本文内容,读者需要具备以下知识:
- 熟悉Python编程语言,包括类、继承、装饰器等高级特性
- 了解大型语言模型(LLMs)的基本原理和应用
- 具备基本的自然语言处理(NLP)知识
- 熟悉异步编程和并发处理的概念
二、LangChain架构概述
2.1 整体架构设计
LangChain的架构设计遵循模块化、可扩展的原则,主要由以下几个核心层组成:
- LLM接口层:负责与各种大型语言模型(如OpenAI、Hugging Face等)进行交互,提供统一的调用接口。
- 核心组件层:包含Chains、Memory、Prompt Templates、Agents和Tools等核心组件,实现了LLM应用的基本功能单元。
- 集成层:提供与外部数据源(如向量数据库、文件系统等)和工具(如搜索引擎、计算器等)的集成能力。
- 应用层:基于上述组件构建的具体应用,如聊天机器人、文档问答系统等。
这种分层设计使得LangChain具有良好的可扩展性和灵活性,开发者可以根据需要替换或扩展任意层的功能。
2.2 核心模块划分
LangChain的源码主要分为以下几个核心模块:
langchain/
├── chains/ # 链组件,用于定义和管理处理流程
├── memory/ # 记忆组件,用于管理对话和交互的上下文
├── prompts/ # 提示模板组件,用于生成向LLM提交的提示
├── agents/ # 代理组件,用于实现智能决策和工具调用
├── tools/ # 工具组件,封装各种外部功能
├── llms/ # LLM接口层,支持多种LLM提供商
├── embeddings/ # 嵌入模型接口,用于文本向量化
├── vectorstores/ # 向量数据库接口,用于语义搜索
├── document_loaders/ # 文档加载器,用于处理各种格式的文档
├── text_splitter/ # 文本分割器,用于将长文本分割为小块
├── callbacks/ # 回调系统,用于监控和记录执行过程
└── utils/ # 工具函数和辅助类
接下来,我们将深入分析每个模块的核心功能和实现原理。
三、LLM接口层实现
3.1 LLM抽象基类
LangChain通过定义抽象基类来统一不同LLM提供商的接口,使得开发者可以无缝切换不同的模型。核心抽象基类位于langchain/llms/base.py
:
# langchain/llms/base.pyfrom abc import ABC, abstractmethod
from typing import Any, List, Mapping, Optional, Unionclass LLM(ABC):"""LLM抽象基类,定义了所有LLM实现必须遵循的接口"""@property@abstractmethoddef _llm_type(self) -> str:"""返回LLM类型的字符串标识"""pass@abstractmethoddef _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:"""执行LLM调用的核心方法"""pass@propertydef _identifying_params(self) -> Mapping[str, Any]:"""返回标识LLM实例的参数"""return {}def __str__(self) -> str:"""返回LLM的字符串表示"""return f"{self._llm_type()}: {self._identifying_params}"
所有具体的LLM实现都必须继承这个抽象基类,并实现_llm_type
和_call
方法。_llm_type
返回LLM的类型标识(如"openai"、"huggingface"等),而_call
方法则实现了实际的模型调用逻辑。
3.2 OpenAI LLM实现
以OpenAI LLM实现为例,我们来看具体的实现细节:
# langchain/llms/openai.pyimport openai
from typing import Any, Dict, List, Optional, Unionfrom langchain.llms.base import LLM
from langchain.utils import get_from_dict_or_envclass OpenAI(LLM):"""OpenAI LLM实现"""# 模型名称,默认为text-davinci-003model_name: str = "text-davinci-003"# 温度参数,控制输出的随机性temperature: float = 0.7# 最大生成token数max_tokens: int = 2049# 顶部概率采样参数top_p: float = 1# 频率惩罚参数frequency_penalty: float = 0# 存在惩罚参数presence_penalty: float = 0# API密钥openai_api_key: Optional[str] = None@propertydef _llm_type(self) -> str:"""返回LLM类型标识"""return "openai"def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:"""执行OpenAI API调用"""# 获取API密钥,优先从实例属性获取,否则从环境变量获取openai_api_key = get_from_dict_or_env(self.__dict__, "openai_api_key", "OPENAI_API_KEY")# 构建API调用参数params = {"model": self.model_name,"prompt": prompt,"temperature": self.temperature,"max_tokens": self.max_tokens,"top_p": self.top_p,"frequency_penalty": self.frequency_penalty,"presence_penalty": self.presence_penalty,}# 如果提供了停止词,添加到参数中if stop is not None:params["stop"] = stop# 执行API调用response = openai.Completion.create(api_key=openai_api_key,**params)# 提取并返回生成的文本return response.choices[0].text.strip()@propertydef _identifying_params(self) -> Dict[str, Any]:"""返回标识LLM实例的参数"""return {"model_name": self.model_name,"temperature": self.temperature,"max_tokens": self.max_tokens,"top_p": self.top_p,"frequency_penalty": self.frequency_penalty,"presence_penalty": self.presence_penalty,}
从这个实现中可以看出,OpenAI类继承了LLM抽象基类,并实现了必要的方法。_call
方法负责构建API请求参数并执行实际的API调用,处理响应并返回生成的文本。
3.3 LLM调用流程
当使用LangChain调用LLM时,整个流程大致如下:
- 用户通过LangChain API创建LLM实例(如OpenAI、HuggingFace等)
- 用户构建提示文本并调用LLM实例
- LLM实例将提示文本和其他参数传递给
_call
方法 -
_call
方法根据具体的LLM提供商实现,构建并发送API请求 - 接收API响应并处理结果
- 将处理后的结果返回给用户
这种设计使得LangChain能够支持多种LLM提供商,用户可以根据需要轻松切换不同的模型,而无需修改业务逻辑代码。
3.4 异步支持
LangChain还提供了对异步LLM调用的支持,通过定义异步方法:
# langchain/llms/base.pyimport asyncio
from typing import AsyncGeneratorclass LLM(ABC):# ... 其他方法 ...async def agenerate(self, prompts: List[str], stop: Optional[List[str]] = None) -> LLMResult:"""异步生成多个提示的响应"""tasks = [self._agenerate_one(prompt, stop=stop) for prompt in prompts]results = await asyncio.gather(*tasks)return LLMResult(generations=[[res] for res in results])async def _agenerate_one(self, prompt: str, stop: Optional[List[str]] = None) -> Generation:"""异步生成单个提示的响应"""text = await self._acall(prompt, stop=stop)return Generation(text=text)@abstractmethodasync def _acall(self, prompt: str, stop: Optional[List[str]] = None) -> str:"""异步调用LLM的核心方法,由子类实现"""pass
具体的LLM实现需要实现_acall
方法,提供异步调用能力。例如,OpenAI的异步实现:
# langchain/llms/openai.pyclass OpenAI(LLM):# ... 其他方法 ...async def _acall(self, prompt: str, stop: Optional[List[str]] = None) -> str:"""异步执行OpenAI API调用"""openai_api_key = get_from_dict_or_env(self.__dict__, "openai_api_key", "OPENAI_API_KEY")params = {"model": self.model_name,"prompt": prompt,"temperature": self.temperature,"max_tokens": self.max_tokens,"top_p": self.top_p,"frequency_penalty": self.frequency_penalty,"presence_penalty": self.presence_penalty,}if stop is not None:params["stop"] = stop# 异步执行API调用response = await openai.Completion.acreate(api_key=openai_api_key,**params)return response.choices[0].text.strip()
这种异步支持使得LangChain能够高效处理多个并发的LLM请求,提高系统的吞吐量和响应性能。
四、Chain组件实现原理
4.1 Chain抽象基类
Chain是LangChain中最核心的组件之一,用于定义和管理一系列处理步骤。Chain的抽象基类定义在langchain/chains/base.py
:
# langchain/chains/base.pyfrom abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Unionfrom langchain.callbacks.base import BaseCallbackManager
from langchain.callbacks.manager import (AsyncCallbackManagerForChainRun,CallbackManagerForChainRun,Callbacks,
)class Chain(ABC):"""Chain抽象基类,定义了所有Chain实现必须遵循的接口"""# 回调管理器,用于记录和监控Chain的执行过程callback_manager: Optional[BaseCallbackManager] = None@property@abstractmethoddef input_keys(self) -> List[str]:"""返回Chain期望的输入键列表"""pass@property@abstractmethoddef output_keys(self) -> List[str]:"""返回Chain产生的输出键列表"""pass@abstractmethoddef _call(self, inputs: Dict[str, Any],run_manager: Optional[CallbackManagerForChainRun] = None) -> Dict[str, Any]:"""执行Chain的核心方法,由子类实现"""passasync def _acall(self, inputs: Dict[str, Any],run_manager: Optional[AsyncCallbackManagerForChainRun] = None) -> Dict[str, Any]:"""异步执行Chain的方法,默认同步实现,可被子类重写"""return self._call(inputs, run_manager=run_manager)def run(self, callbacks: Callbacks = None,**kwargs: Any) -> Union[str, Dict[str, Any]]:"""便捷方法,用于直接运行Chain并获取结果"""# 检查输入键是否匹配if set(kwargs.keys()) != set(self.input_keys):raise ValueError(f"输入键不匹配。期望: {self.input_keys}, 实际: {list(kwargs.keys())}")# 执行Chainoutput = self(kwargs, callbacks=callbacks)# 如果只有一个输出键,直接返回该值if len(self.output_keys) == 1:return output[self.output_keys[0]]else:return outputasync def arun(self, callbacks: Callbacks = None,**kwargs: Any) -> Union[str, Dict[str, Any]]:"""异步便捷方法,用于直接运行Chain并获取结果"""if set(kwargs.keys()) != set(self.input_keys):raise ValueError(f"输入键不匹配。期望: {self.input_keys}, 实际: {list(kwargs.keys())}")output = await self.acall(kwargs, callbacks=callbacks)if len(self.output_keys) == 1:return output[self.output_keys[0]]else:return output# 其他方法...
所有具体的Chain实现都必须继承这个抽象基类,并实现input_keys
、output_keys
和_call
方法。input_keys
和output_keys
分别定义了Chain的输入和输出键,而_call
方法则实现了Chain的核心处理逻辑。
4.2 SimpleSequentialChain实现分析
SimpleSequentialChain
是一个简单的顺序链实现,它按顺序执行多个子Chain,前一个Chain的输出作为后一个Chain的输入。下面是其核心实现:
# langchain/chains/sequential.pyfrom typing import Any, Dict, List, Optionalfrom langchain.chains.base import Chainclass SimpleSequentialChain(Chain):"""简单顺序链,按顺序执行多个Chain,前一个Chain的输出作为后一个Chain的输入"""# 子Chain列表chains: List[Chain]# 是否只返回最后一个Chain的输出return_only_final_output: bool = False@propertydef input_keys(self) -> List[str]:"""返回第一个Chain的输入键"""return self.chains[0].input_keys@propertydef output_keys(self) -> List[str]:"""返回最后一个Chain的输出键"""return self.chains[-1].output_keysdef _call(self, inputs: Dict[str, Any],run_manager: Optional[CallbackManagerForChainRun] = None) -> Dict[str, Any]:"""执行顺序链"""current_inputs = inputsintermediate_outputs = {}# 依次执行每个子Chainfor i, chain in enumerate(self.chains):# 为当前Chain创建回调管理器if run_manager:new_run_manager = run_manager.get_child()else:new_run_manager = None# 执行当前Chainoutputs = chain(current_inputs, callbacks=new_run_manager)# 记录中间输出if not self.return_only_final_output:for k, v in outputs.items():intermediate_outputs[f"{chain.__class__.__name__}_{k}"] = v# 更新当前输入为当前Chain的输出current_inputs = outputs# 返回最终输出或所有中间输出if self.return_only_final_output:return current_inputselse:return intermediate_outputsasync def _acall(self, inputs: Dict[str, Any],run_manager: Optional[AsyncCallbackManagerForChainRun] = None) -> Dict[str, Any]:"""异步执行顺序链"""current_inputs = inputsintermediate_outputs = {}for i, chain in enumerate(self.chains):if run_manager:new_run_manager = run_manager.get_child()else:new_run_manager = Noneoutputs = await chain.acall(current_inputs, callbacks=new_run_manager)if not self.return_only_final_output:for k, v in outputs.items():intermediate_outputs[f"{chain.__class__.__name__}_{k}"] = vcurrent_inputs = outputsif self.return_only_final_output:return current_inputselse:return intermediate_outputs
从这个实现可以看出,SimpleSequentialChain
通过遍历子Chain列表,依次执行每个Chain,并将前一个Chain的输出作为后一个Chain的输入。这种设计使得多个Chain可以灵活组合,形成复杂的处理流程。
4.3 LLMChain实现分析
LLMChain
是LangChain中最常用的Chain之一,它将提示模板和LLM结合起来,用于生成文本。下面是其核心实现:
# langchain/chains/llm.pyfrom typing import Any, Dict, List, Optional, Unionfrom langchain.base_language import BaseLanguageModel
from langchain.callbacks.manager import (AsyncCallbackManagerForChainRun,CallbackManagerForChainRun,
)
from langchain.chains.base import Chain
from langchain.prompts.base import BasePromptTemplateclass LLMChain(Chain):"""LLMChain将提示模板和LLM结合,用于生成文本"""# 提示模板prompt: BasePromptTemplate# 语言模型llm: BaseLanguageModel# 输出键名称output_key: str = "text"@propertydef input_keys(self) -> List[str]:"""返回提示模板所需的输入键"""return self.prompt.input_variables@propertydef output_keys(self) -> List[str]:"""返回输出键"""return [self.output_key]def _call(self, inputs: Dict[str, Any],run_manager: Optional[CallbackManagerForChainRun] = None) -> Dict[str, str]:"""执行LLMChain,生成文本"""# 格式化提示prompt_value = self.prompt.format_prompt(**inputs)# 记录提示信息if run_manager:run_manager.on_text("Prompt after formatting:\n", end="\n", verbose=self.verbose)run_manager.on_text(prompt_value.to_string(), color="green", end="\n", verbose=self.verbose)# 调用LLM生成文本response = self.llm.generate_prompt([prompt_value],callbacks=run_manager.get_child() if run_manager else None)# 提取生成的文本text = response.generations[0][0].text# 记录生成的文本if run_manager:run_manager.on_text("Generated response:", end="\n", verbose=self.verbose)run_manager.on_text(text, color="yellow", end="\n", verbose=self.verbose)return {self.output_key: text}async def _acall(self, inputs: Dict[str, Any],run_manager: Optional[AsyncCallbackManagerForChainRun] = None) -> Dict[str, str]:"""异步执行LLMChain,生成文本"""prompt_value = self.prompt.format_prompt(**inputs)if run_manager:await run_manager.on_text("Prompt after formatting:\n", end="\n", verbose=self.verbose)await run_manager.on_text(prompt_value.to_string(), color="green", end="\n", verbose=self.verbose)response = await self.llm.agenerate_prompt([prompt_value],callbacks=run_manager.get_child() if run_manager else None)text = response.generations[0][0].textif run_manager:await run_manager.on_text("Generated response:", end="\n", verbose=self.verbose)await run_manager.on_text(text, color="yellow", end="\n", verbose=self.verbose)return {self.output_key: text}
LLMChain
的核心逻辑是将输入变量传递给提示模板,生成最终的提示文本,然后将该提示文本传递给LLM进行处理,最后将生成的结果作为输出返回。这种设计使得提示模板和LLM可以独立配置和替换,提高了代码的灵活性和可复用性。
4.4 链式组合机制
LangChain提供了多种方式来组合不同的Chain,形成复杂的处理流程:
- SequentialChain:允许定义多个Chain,并指定它们之间的输入输出关系
- SimpleSequentialChain:简化版的顺序链,前一个Chain的输出直接作为后一个Chain的输入
- RouterChain:根据输入内容动态选择合适的子Chain
- TransformChain:用于对数据进行转换和处理
这些组合机制使得开发者可以根据具体需求,灵活构建各种复杂的处理流程。例如,一个典型的文档问答系统可能包含以下Chain的组合:
- 文档加载Chain:负责加载和解析文档
- 文本分割Chain:将长文档分割为小块
- 嵌入生成Chain:将文本转换为向量表示
- 向量检索Chain:在向量数据库中检索相关内容
- 问答Chain:结合检索结果和用户问题生成答案
通过合理组合不同的Chain,开发者可以构建出功能强大、灵活可扩展的LLM应用。
五、Memory组件实现原理
5.1 Memory抽象基类
Memory组件负责在Chain执行过程中存储和检索上下文信息,使得Chain能够"记住"之前的交互内容。Memory的抽象基类定义在langchain/memory/base.py
:
# langchain/memory/base.pyfrom abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuplefrom langchain.schema import BaseMessageclass BaseMemory(ABC):"""Memory抽象基类,定义了所有Memory实现必须遵循的接口"""@property@abstractmethoddef memory_variables(self) -> List[str]:"""返回Memory中存储的变量名称列表"""pass@abstractmethoddef load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:"""根据输入加载Memory中的变量"""pass@abstractmethoddef save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:"""保存上下文信息到Memory中"""passdef clear(self) -> None:"""清除Memory中的所有信息,默认实现为空"""pass
所有具体的Memory实现都必须继承这个抽象基类,并实现memory_variables
、load_memory_variables
和save_context
方法。memory_variables
返回Memory中存储的变量名称,load_memory_variables
根据当前输入加载相应的上下文信息,save_context
则将新的上下文信息保存到Memory中。
5.2 ConversationBufferMemory实现分析
ConversationBufferMemory
是最简单也是最常用的Memory实现之一,它以文本形式存储整个对话历史。下面是其核心实现:
# langchain/memory/buffer.pyfrom typing import Any, Dict, List, Optional, Tuplefrom langchain.memory.base import BaseMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessageclass ConversationBufferMemory(BaseMemory):"""对话缓冲区Memory,存储整个对话历史"""# 对话历史chat_memory: BaseChatMemory# 是否返回聊天历史作为字符串return_messages: bool = False# 输入键名称input_key: Optional[str] = None# 输出键名称output_key: Optional[str] = None@propertydef memory_variables(self) -> List[str]:"""返回Memory中存储的变量名称"""return ["history"]def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:"""加载对话历史"""messages = self.chat_memory.messagesif self.return_messages:# 直接返回消息列表history = messageselse:# 将消息转换为字符串history = get_buffer_string(messages)return {"history": history}def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:"""保存上下文信息到对话历史"""# 获取用户输入input_str = inputs[self.input_key] if self.input_key else inputs[list(inputs.keys())[0]]# 添加用户消息到对话历史self.chat_memory.add_user_message(input_str)# 获取模型输出output_str = outputs[self.output_key] if self.output_key else outputs[list(outputs.keys())[0]]# 添加模型回复到对话历史self.chat_memory.add_ai_message(output_str)def clear(self) -> None:"""清除对话历史"""self.chat_memory.clear()
ConversationBufferMemory
使用chat_memory
属性来存储对话历史,支持以消息列表或字符串形式返回历史记录。save_context
方法将用户输入和模型输出添加到对话历史中,而load_memory_variables
方法则负责加载这些历史记录。
5.3 ConversationSummaryMemory实现分析
ConversationSummaryMemory
会维护对话的摘要,而不是存储完整的对话历史,这在处理长对话时非常有用。下面是其核心实现:
# langchain/memory/summary.pyfrom typing import Any, Dict, List, Optional, Tuplefrom langchain.base_language import BaseLanguageModel
from langchain.chains import LLMChain
from langchain.memory.base import BaseMemory
from langchain.prompts import PromptTemplate
from langchain.schema import BaseMessage, HumanMessage, AIMessageSUMMARY_PROMPT = """总结以下对话:{history}总结:"""class ConversationSummaryMemory(BaseMemory):"""对话摘要Memory,维护对话的摘要"""# 语言模型,用于生成摘要llm: BaseLanguageModel# 对话历史chat_memory: BaseChatMemory# 摘要summary: str = ""# 摘要提示模板prompt: PromptTemplate = PromptTemplate(input_variables=["history"], template=SUMMARY_PROMPT)# 输入键名称input_key: Optional[str] = None# 输出键名称output_key: Optional[str] = None@propertydef memory_variables(self) -> List[str]:"""返回Memory中存储的变量名称"""return ["history"]def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:"""加载对话摘要"""return {"history": self.summary}def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> None:"""保存上下文信息并更新摘要"""# 保存用户输入和模型输出到对话历史input_str = inputs[self.input_key] if self.input_key else inputs[list(inputs.keys())[0]]self.chat_memory.add_user_message(input_str)output_str = outputs[self.output_key] if self.output_key else outputs[list(outputs.keys())[0]]self.chat_memory.add_ai_message(output_str)# 获取最新的对话历史messages = self.chat_memory.messagesmessage_text = get_buffer_string(messages)# 创建LLMChain用于生成摘要chain = LLMChain(llm=self.llm, prompt=self.prompt)# 生成新的摘要self.summary = chain.run(history=message_text)def clear(self) -> None:"""清除对话历史和摘要"""self.chat_memory.clear()self.summary = ""
ConversationSummaryMemory
使用LLM来生成对话的摘要,每次有新的对话内容时,它会更新摘要。这种方法可以有效地减少内存占用,同时保留对话的关键信息。
5.4 其他Memory实现
除了上述两种Memory实现,LangChain还提供了多种其他Memory类型,包括:
- ConversationBufferWindowMemory:存储最近的N条对话消息,适用于限制对话历史长度的场景
- ConversationTokenBufferMemory:基于token数量限制对话历史长度,而不是消息数量
- EntityMemory:跟踪对话中提到的实体及其相关信息
- VectorStoreRetrieverMemory:使用向量数据库存储和检索对话历史,支持语义检索
这些Memory实现可以根据具体应用场景选择和组合使用,以满足不同的需求。
5.5 Memory在Chain中的集成
Memory组件通常与Chain一起使用,为Chain提供上下文信息。下面是一个简单的示例,展示如何在LLMChain中使用ConversationBufferMemory:
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory# 创建提示模板
prompt = PromptTemplate(input_variables=["input", "history"],template="""你是一个智能助手,帮助用户解决问题。以下是之前的对话历史:{history}用户提问: {input}助手回答:"""
)# 创建Memory
memory = ConversationBufferMemory(input_key="input", output_key="text")# 创建LLMChain并集成Memory
chain = LLMChain(llm=OpenAI(),prompt=prompt,memory=memory,verbose=True
)# 第一次调用
result = chain.run("你好,我是小明。")
print(result)# 第二次调用,Memory会自动包含第一次的对话历史
result = chain.run("我今天感觉很累,有什么建议吗?")
print(result)
在这个示例中,Memory会自动保存每次对话的上下文,并在下次调用时提供给Chain,使得对话能够保持连贯性。
六、Prompt Templates组件实现原理
6.1 PromptTemplate基类
Prompt Templates负责生成向LLM提交的提示文本,它允许将动态变量插入到固定的提示模板中。核心基类定义在langchain/prompts/base.py
:
# langchain/prompts/base.pyfrom abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Unionfrom langchain.schema import BaseMessageclass BasePromptTemplate(ABC):"""提示模板抽象基类,定义了所有提示模板必须遵循的接口"""@property@abstractmethoddef input_variables(self) -> List[str]:"""返回提示模板期望的输入变量列表"""pass@abstractmethoddef format_prompt(self, **kwargs: Any) -> PromptValue:"""格式化提示模板,返回PromptValue对象"""pass@abstractmethoddef format(self, **kwargs: Any) -> str:"""格式化提示模板,返回字符串"""passdef _get_prompt_dict(self) -> Dict[str, Any]:"""返回提示模板的字典表示"""return {"input_variables": self.input_variables,"template": self.format(),}
所有具体的提示模板实现都必须继承这个抽象基类,并实现input_variables
、format_prompt
和format
方法。input_variables
返回模板期望的输入变量列表,format_prompt
和format
方法用于将变量值填充到模板中,生成最终的提示文本。
6.2 PromptTemplate实现分析
PromptTemplate
是最常用的提示模板实现,它使用Python的字符串格式化语法:
# langchain/prompts/prompt.pyfrom typing import Any, Dict, List, Optional, Unionfrom langchain.prompts.base import BasePromptTemplate, PromptValue
from langchain.schema import BaseMessage, HumanMessageclass PromptTemplate(BasePromptTemplate):"""基于字符串格式化的提示模板"""# 模板字符串template: str# 输入变量列表input_variables: List[str]# 模板格式,默认为f-stringtemplate_format: str = "f-string"# 是否验证模板validate_template: bool = Truedef __init__(self,input_variables: List[str],template: str,template_format: str = "f-string",validate_template: bool = True,**kwargs: Any,):"""初始化PromptTemplate"""super().__init__(**kwargs)self.input_variables = input_variablesself.template = templateself.template_format = template_format# 验证模板if validate_template:self._validate_template()def _validate_template(self) -> None:"""验证模板格式是否正确"""if self.template_format == "f-string":# 简单验证f-string格式try:# 使用空字典进行格式化,检查是否有未定义的变量self.format(**{v: "" for v in self.input_variables})except Exception as e:raise ValueError(f"模板验证失败: {e}")# 其他格式的验证...def format_prompt(self, **kwargs: Any) -> PromptValue:"""格式化提示模板"""# 检查所有必需的输入变量都已提供missing_vars = set(self.input_variables) - set(kwargs.keys())if missing_vars:raise ValueError(f"缺少必需的输入变量: {missing_vars}")# 格式化模板formatted = self.format(**kwargs)# 返回PromptValue对象return StringPromptValue(text=formatted)def format(self, **kwargs: Any) -> str:"""格式化模板字符串"""if self.template_format == "f-string":# 使用f-string格式化return self.template.format(**kwargs)elif self.template_format == "jinja2":# 使用Jinja2格式化from jinja2 import Templatereturn Template(self.template).render(**kwargs)else:raise ValueError(f"不支持的模板格式: {self.template_format}")def _get_prompt_dict(self) -> Dict[str, Any]:"""返回提示模板的字典表示"""return {"input_variables": self.input_variables,"template": self.template,"template_format": self.template_format,}
PromptTemplate
支持多种格式化语法,默认使用Python的f-string格式。它会验证输入变量是否齐全,并将变量值正确地填充到模板中。
6.3 FewShotPromptTemplate实现分析
FewShotPromptTemplate
用于生成包含示例的提示,这在进行少样本学习时非常有用:
# langchain/prompts/few_shot.pyfrom typing import Any, Dict, List, Optional, Unionfrom langchain.prompts.base import BasePromptTemplate, PromptValue
from langchain.prompts.example_selector.base import BaseExampleSelector
from langchain.prompts.prompt import PromptTemplate
from langchain.schema import BaseMessage, HumanMessageclass FewShotPromptTemplate(BasePromptTemplate):"""少样本提示模板,用于生成包含示例的提示"""# 示例选择器或示例列表examples: Union[List[Dict[str, Any]], BaseExampleSelector]# 示例格式提示example_prompt: PromptTemplate# 连接示例的字符串example_separator: str = "\n\n"# 前缀文本prefix: str = ""# 后缀文本suffix: str = ""# 输入变量input_variables: List[str]# 模板格式template_format: str = "f-string"def format_prompt(self, **kwargs: Any) -> PromptValue:"""格式化提示模板"""# 获取示例if isinstance(self.examples, BaseExampleSelector):examples = self.examples.select_examples(kwargs)else:examples = self.examples# 格式化每个示例example_strings = [self.example_prompt.format(**example) for example in examples]# 连接示例example_string = self.example_separator.join(example_strings)# 构建完整的提示pieces = [self.prefix, example_string, self.suffix]full_prompt = "\n\n".join([p for p in pieces if p])# 格式化最终提示formatted_prompt = full_prompt.format(**kwargs)return StringPromptValue(text=formatted_prompt)def format(self, **kwargs: Any) -> str:"""格式化提示模板,返回字符串"""return self.format_prompt(**kwargs).to_string()def _get_prompt_dict(self) -> Dict[str, Any]:"""返回提示模板的字典表示"""return {"examples": self.examples,"example_prompt": self.example_prompt,"example_separator": self.example_separator,"prefix": self.prefix,"suffix": self.suffix,"input_variables": self.input_variables,"template_format": self.template_format,}
FewShotPromptTemplate
允许在提示中包含多个示例,这些示例可以帮助LLM更好地理解任务要求。它可以通过BaseExampleSelector
动态选择示例,也可以使用预定义的示例列表。
6.4 其他提示模板实现
LangChain还提供了多种其他提示模板实现,包括:
- ChatPromptTemplate:专为聊天模型设计的提示模板
- PipelinePromptTemplate:组合多个提示模板
- PromptTemplate.from_template:从字符串快速创建提示模板
- load_prompt:从文件加载提示模板
这些提示模板实现提供了丰富的功能,使得开发者可以根据具体需求灵活构建复杂的提示。
6.5 提示模板的高级特性
除了基本的变量替换功能,LangChain的提示模板还支持一些高级特性:
- 动态示例选择:通过
BaseExampleSelector
接口,可以根据当前输入动态选择最相关的示例 - 模板验证:可以验证模板格式是否正确,确保所有必需的变量都被提供
- 多语言支持:支持多种模板格式语法,如f-string、Jinja2等
- 序列化和反序列化:可以将提示模板保存到文件或从文件加载
这些高级特性使得提示模板更加灵活和强大,能够适应各种复杂的应用场景。
七、Agents组件实现原理
7.1 Agent抽象基类
Agents是LangChain中最复杂也是最强大的组件之一,它允许LLM根据任务需求自主选择和使用工具。Agent的抽象基类定义在langchain/agents/base.py
:
# langchain/agents/base.pyfrom abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Unionfrom langchain.base_language import BaseLanguageModel
from langchain.callbacks.base import BaseCallbackManager
from langchain.chains.base import Chain
from langchain.prompts.base import BasePromptTemplate
from langchain.schema import AgentAction, AgentFinish, OutputParserExceptionclass Agent(ABC):"""Agent抽象基类,定义了所有Agent必须遵循的接口"""# 语言模型llm: BaseLanguageModel# 工具列表tools: List[BaseTool]# 提示模板prompt: BasePromptTemplate# 回调管理器callback_manager: Optional[BaseCallbackManager] = None@classmethod@abstractmethoddef create_prompt(cls, tools: List[BaseTool]) -> BasePromptTemplate:"""创建Agent使用的提示模板"""pass@classmethod@abstractmethoddef _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser:"""获取默认的输出解析器"""pass@property@abstractmethoddef observation_prefix(self) -> str:"""观察结果的前缀"""pass@property@abstractmethoddef llm_prefix(self) -> str:"""LLM思考的前缀"""pass@abstractmethoddef _construct_scratchpad(self, intermediate_steps: List[Tuple[AgentAction, str]]) -> str:"""构建思考过程的草稿本"""pass@classmethod@abstractmethoddef from_llm_and_tools(cls,llm: BaseLanguageModel,tools: List[BaseTool],callback_manager: Optional[BaseCallbackManager] = None,**kwargs: Any,) -> "Agent":"""从LLM和工具列表创建Agent"""pass@abstractmethoddef plan(self,intermediate_steps: List[Tuple[AgentAction, str]],callbacks: Optional[BaseCallbackManager] = None,**kwargs: Any,) -> Union[AgentAction, AgentFinish]:"""根据中间步骤和输入计划下一步行动"""passasync def aplan(self,intermediate_steps: List[Tuple[AgentAction, str]],callbacks: Optional[BaseCallbackManager] = None,**kwargs: Any,) -> Union[AgentAction, AgentFinish]:"""异步根据中间步骤和输入计划下一步行动"""return self.plan(intermediate_steps, callbacks=callbacks, **kwargs)
所有具体的Agent实现都必须继承这个抽象基类,并实现上述抽象方法。这些方法定义了Agent如何创建提示模板、解析LLM输出、规划下一步行动等核心功能。
7.2 AgentOutputParser抽象基类
AgentOutputParser负责解析LLM的输出,将其转换为AgentAction或AgentFinish对象:
# langchain/agents/agent.pyfrom abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Unionfrom langchain.schema import AgentAction, AgentFinishclass AgentOutputParser(ABC):"""Agent输出解析器抽象基类"""@abstractmethoddef parse(self, text: str) -> Union[AgentAction, AgentFinish]:"""解析LLM输出"""passdef get_format_instructions(self) -> str:"""获取格式说明"""return ""
不同类型的Agent通常需要不同的输出解析器,以正确解析LLM的输出。
7.3 ZeroShotAgent实现分析
ZeroShotAgent
是一种常用的Agent实现,它可以在没有预定义示例的情况下使用工具:
# langchain/agents/zero_shot.pyfrom typing import Any, Dict, List, Optional, Tuple, Unionfrom langchain.agents.agent import Agent, AgentOutputParser
from langchain.agents.agent_types import AgentType
from langchain.base_language import BaseLanguageModel
from langchain.callbacks.base import BaseCallbackManager
from langchain.chains import LLMChain
from langchain.prompts import BasePromptTemplate, PromptTemplate
from langchain.schema import AgentAction, AgentFinish, OutputParserExceptionclass ZeroShotAgent(Agent):"""零样本Agent,能够在没有预定义示例的情况下使用工具"""@classmethoddef create_prompt(cls,tools: List[BaseTool],prefix: str = DEFAULT_PREFIX,suffix: str = DEFAULT_SUFFIX,format_instructions: str = FORMAT_INSTRUCTIONS,input_variables: Optional[List[str]] = None,) -> BasePromptTemplate:"""创建提示模板"""tool_strings = "\n".join([f"{tool.name}: {tool.description}" for tool in tools])tool_names = ", ".join([tool.name for tool in tools])format_instructions = format_instructions.format(tool_names=tool_names)template = "\n\n".join([prefix, tool_strings, format_instructions, suffix])if input_variables is None:input_variables = ["input", "agent_scratchpad"]return PromptTemplate(template=template, input_variables=input_variables)@classmethoddef _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser:"""获取默认的输出解析器"""return ZeroShotAgentOutputParser()@propertydef observation_prefix(self) -> str:"""观察结果的前缀"""return "Observation: "@propertydef llm_prefix(self) -> str:"""LLM思考的前缀"""return "Thought: "def _construct_scratchpad(self, intermediate_steps: List[Tuple[AgentAction, str]]) -> str:"""构建思考过程的草稿本"""thoughts = ""for action, observation in intermediate_steps:thoughts += f"{action.log}\n{self.observation_prefix}{observation}\n{self.llm_prefix}"return thoughts@classmethoddef from_llm_and_tools(cls,llm: BaseLanguageModel,tools: List[BaseTool],callback_manager: Optional[BaseCallbackManager] = None,**kwargs: Any,) -> "ZeroShotAgent":"""从LLM和工具列表创建Agent"""cls._validate_tools(tools)prompt = cls.create_prompt(tools, **kwargs)output_parser = kwargs.pop("output_parser", cls._get_default_output_parser())return cls(llm_chain=LLMChain(llm=llm,prompt=prompt,callback_manager=callback_manager,),tools=tools,output_parser=output_parser,**kwargs,)def plan(self,intermediate_steps: List[Tuple[AgentAction, str]],callbacks: Optional[BaseCallbackManager] = None,**kwargs: Any,) -> Union[AgentAction, AgentFinish]:"""根据中间步骤和输入计划下一步行动"""# 构建思考草稿本thoughts = self._construct_scratchpad(intermediate_steps)# 构建输入inputs = {"input": kwargs["input"],"agent_scratchpad": thoughts,**{k: v for k, v in kwargs.items() if k != "input"},}# 调用LLM生成下一步行动output = self.llm_chain.predict(callbacks=callbacks, **inputs)# 解析LLM输出try:return self.output_parser.parse(output)except OutputParserException as e:return AgentFinish(return_values={"output": str(e)},log=output,)
ZeroShotAgent
的核心是通过提示模板告诉LLM有哪些工具可用以及如何使用它们,然后LLM根据输入自主决定是否使用工具以及使用哪个工具。
7.4 AgentExecutor实现分析
AgentExecutor
负责执行Agent,管理工具调用和中间步骤:
# langchain/agents/agent_executor.pyfrom typing import Any, Dict, List, Optional, Tuple, Unionfrom langchain.agents.agent import Agent, AgentOutputParser
from langchain.agents.tools import BaseTool
from langchain.base_language import BaseLanguageModel
from langchain.callbacks.base import BaseCallbackManager
from langchain.chains.base import Chain
from langchain.schema import AgentAction, AgentFinish, OutputParserExceptionclass AgentExecutor(Chain):"""Agent执行器,负责执行Agent并管理工具调用"""# Agentagent: Agent# 工具列表tools: List[BaseTool]# 最大迭代次数max_iterations: Optional[int] = 15# 是否返回中间步骤return_intermediate_steps: bool = False# 工具名称到工具的映射tool_names: List[str]# 工具名称到工具的映射name_to_tool_map: Dict[str, BaseTool]@propertydef input_keys(self) -> List[str]:"""返回期望的输入键"""return self.agent.input_keys@propertydef output_keys(self) -> List[str]:"""返回产生的输出键"""return self.agent.output_keys@classmethoddef from_agent_and_tools(cls,agent: Agent,tools: List[BaseTool],callback_manager: Optional[BaseCallbackManager] = None,**kwargs: Any,) -> "AgentExecutor":"""从Agent和工具列表创建执行器"""name_to_tool_map = {tool.name: tool for tool in tools}tool_names = list(name_to_tool_map.keys())return cls(agent=agent,tools=tools,name_to_tool_map=name_to_tool_map,tool_names=tool_names,callback_manager=callback_manager,**kwargs,)def _call(self,inputs: Dict[str, Any],run_manager: Optional[CallbackManagerForChainRun] = None,) -> Dict[str, Any]:"""执行Agent"""# 准备输入intermediate_steps: List[Tuple[AgentAction, str]] = []agent_inputs = inputs# 获取终止消息stop = agent_inputs.get("stop", None)# 主循环iterations = 0while self._should_continue(iterations, intermediate_steps):# 调用Agent规划下一步行动next_step_output = self._take_next_step(agent_inputs, intermediate_steps, run_manager=run_manager)# 如果Agent返回完成,直接返回结果if isinstance(next_step_output, AgentFinish):if self.return_intermediate_steps:return {**next_step_output.return_values,"intermediate_steps": intermediate_steps,}else:return next_step_output.return_values# 否则,执行工具并记录结果action = next_step_outputif run_manager:run_manager.on_agent_action(action)# 执行工具observation = self._call_tool(action, run_manager=run_manager)# 记录中间步骤intermediate_steps.append((action, observation))# 更新迭代次数iterations += 1# 如果达到最大迭代次数,返回错误output = self.agent.return_stopped_response(self.agent.llm_chain.llm, intermediate_steps, False)if self.return_intermediate_steps:return {**output.return_values, "intermediate_steps": intermediate_steps}else:return output.return_valuesdef _take_next_step(self,inputs: Dict[str, Any],intermediate_steps: List[Tuple[AgentAction, str]],run_manager: Optional[CallbackManagerForChainRun] = None,) -> Union[AgentAction, AgentFinish]:"""获取下一步行动"""# 调用Agent规划下一步return self.agent.plan(intermediate_steps,callbacks=run_manager.get_child() if run_manager else None,**inputs,)def _call_tool(self,action: AgentAction,run_manager: Optional[CallbackManagerForChainRun] = None,) -> str:"""调用工具并返回观察结果"""if action.tool not in self.name_to_tool_map:return (f"错误: 工具 '{action.tool}' 不存在。可用工具: {', '.join(self.tool_names)}")# 获取工具tool = self.name_to_tool_map[action.tool]# 执行工具observation = tool.run(action.tool_input,verbose=self.verbose,callbacks=run_manager.get_child() if run_manager else None,)return observationdef _should_continue(self, iterations: int, intermediate_steps: List[Tuple[AgentAction, str]]) -> bool:"""判断是否应该继续执行"""if self.max_iterations is not None and iterations >= self.max_iterations:return Falsereturn True
AgentExecutor
实现了Agent的执行逻辑,包括循环调用Agent规划下一步行动、执行工具、记录中间步骤等。它确保了整个执行过程的可控性和可观测性。
7.5 其他Agent实现
LangChain提供了多种Agent实现,包括:
- ReAct Agent:基于ReAct框架的Agent,能够在思考和行动之间进行循环
- SelfAskWithSearch Agent:专门用于知识检索的Agent,能够递归地提出问题
- Chat Agent:专为聊天模型设计的Agent,能够处理对话格式的输入和输出
- Structured Tools Agent:支持结构化工具输入的Agent,能够处理复杂的工具参数
这些Agent实现适用于不同的应用场景,开发者可以根据具体需求选择合适的Agent类型。
7.6 Agent的工作流程
当使用AgentExecutor执行一个任务时,整个工作流程大致如下:
- 用户提供输入,AgentExecutor将其传递给Agent
- Agent根据输入和当前状态,决定是直接返回结果还是调用工具
- 如果决定调用工具,Agent会生成一个AgentAction,描述要调用的工具和参数
- AgentExecutor执行该工具,并获取观察结果
- AgentExecutor将观察结果添加到中间步骤列表中,并将其反馈给Agent
- Agent根据新的中间步骤,再次决定下一步行动
- 重复步骤3-6,直到Agent返回AgentFinish,表示任务完成
- AgentExecutor返回最终结果
这种设计使得Agent能够根据任务需求,灵活地选择和使用工具,从而完成复杂的任务。
八、Tools组件实现原理
8.1 Tool抽象基类
Tools是LangChain中用于执行特定任务的组件,它封装了各种外部功能,使Agent能够扩展其能力。Tool的抽象基类定义在langchain/tools/base.py
:
# langchain/tools/base.pyfrom abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Unionfrom langchain.callbacks.base import BaseCallbackManager
from langchain.schema import BaseToolclass Tool(BaseTool, ABC):"""Tool抽象基类,定义了所有工具必须遵循的接口"""# 工具名称name: str# 工具描述description: str# 是否异步支持is_async: bool = Falsedef __init__(self,name: str,description: str,func: Optional[Callable[..., str]] = None,coroutine: Optional[Callable[..., str]] = None,**kwargs: Any,):"""初始化工具"""super().__init__(**kwargs)self.name = nameself.description = descriptionself.func = funcself.coroutine = coroutine@abstractmethoddef _run(self, *args: Any, **kwargs: Any) -> str:"""同步运行工具的核心方法"""passasync def _arun(self, *args: Any, **kwargs: Any) -> str:"""异步运行工具的核心方法,默认调用同步方法"""return self._run(*args, **kwargs)def run(self,tool_input: Union[str, Dict],verbose: bool = False,callbacks: Optional[BaseCallbackManager] = None,) -> str:"""运行工具"""# 处理输入if isinstance(tool_input, str):parsed_input = (tool_input,)kwargs = {}else:parsed_input = ()kwargs = tool_input# 执行工具if self.is_async:# 如果是异步工具,使用asyncio运行import asyncioreturn asyncio.run(self._arun(*parsed_input, **kwargs))else:return self._run(*parsed_input, **kwargs)async def arun(self,tool_input: Union[str, Dict],verbose: bool = False,callbacks: Optional[BaseCallbackManager] = None,) -> str:"""异步运行工具"""if isinstance(tool_input, str):parsed_input = (tool_input,)kwargs = {}else:parsed_input = ()kwargs = tool_inputreturn await self._arun(*parsed_input, **kwargs)
所有具体的Tool实现都必须继承这个抽象基类,并实现_run
方法(如果支持异步,还需实现_arun
方法)。_run
方法定义了工具的核心功能,接收输入参数并返回执行结果。
8.2 内置工具示例
LangChain提供了多种内置工具,以下是一些常见的示例:
8.2.1 PythonREPLTool
# langchain/tools/python/tool.pyclass PythonREPLTool(BaseTool):"""Python REPL工具,允许执行Python代码"""name = "python_repl"description = ("一个Python shell。适合执行Python代码。""输入应该是有效的Python代码。""如果您期望输出,应该打印它。")def __init__(self):super().__init__()self.repl = PythonREPL()def _run(self, code: str) -> str:"""执行Python代码并返回结果"""return self.repl.run(code)async def _arun(self, code: str) -> str:"""异步执行Python代码并返回结果"""return await asyncio.to_thread(self._run, code)
8.2.2 SerpAPIWrapper
# langchain/tools/serpapi.pyclass SerpAPIWrapper(BaseTool):"""SerpAPI工具,用于搜索引擎查询"""name = "serpapi"description = ("一个搜索引擎。适合回答关于当前事件的问题。""输入应该是一个搜索查询。")def __init__(self, serpapi_api_key: Optional[str] = None):super().__init__()self.serpapi_api_key = serpapi_api_key or os.environ.get("SERPAPI_API_KEY")if not self.serpapi_api_key:raise ValueError("需要设置SERPAPI_API_KEY环境变量或提供serpapi_api_key参数")def _run(self, query: str) -> str:"""执行搜索查询并返回结果"""params = {"q": query,"api_key": self.serpapi_api_key,"engine": "google","gl": "us","hl": "en",}response = requests.get("https://serpapi.com/search", params=params)response.raise_for_status()return response.json()async def _arun(self, query: str) -> str:"""异步执行搜索查询并返回结果"""return await asyncio.to_thread(self._run, query)
8.2.3 CalculatorTool
# langchain/tools/calculator.pyclass CalculatorTool(BaseTool):"""计算器工具,用于执行数学计算"""name = "calculator"description = ("一个计算器。适合执行复杂的数学计算。""输入应该是一个数学表达式。")def _run(self, query: str) -> str:"""执行数学计算并返回结果"""try:# 使用ast来安全地评估数学表达式return str(eval(query, {"__builtins__": None}, {}))except Exception as e:return f"错误: {str(e)}"async def _arun(self, query: str) -> str:"""异步执行数学计算并返回结果"""return await asyncio.to_thread(self._run, query)
8.3 自定义工具
开发者可以根据需要创建自定义工具,只需继承BaseTool
类并实现_run
方法。以下是一个简单的示例:
from langchain.tools import BaseToolclass GetWeatherTool(BaseTool):"""获取天气信息的工具"""name = "get_weather"description = ("获取指定城市的当前天气信息。""输入应该是城市名称。")def _run(self, city: str) -> str:"""获取并返回城市的天气信息"""# 实际实现中,这里会调用天气API# 为简化示例,我们返回模拟数据weather_data = {"北京": "晴朗,25°C","上海": "多云,28°C","广州": "小雨,27°C","深圳": "多云,29°C",}if city in weather_data:return f"{city}的当前天气是{weather_data[city]}"else:return f"抱歉,没有找到{city}的天气信息"async def _arun(self, city: str) -> str:"""异步获取并返回城市的天气信息"""return await asyncio.to_thread(self._run, city)
8.4 工具的参数验证
LangChain支持对工具的输入参数进行验证,确保输入符合工具的要求。可以通过在工具类中定义args_schema
属性来实现:
from langchain.tools import BaseTool
from pydantic import BaseModel, Fieldclass GetWeatherInput(BaseModel):"""获取天气工具的输入参数"""city: str = Field(..., description="城市名称")class GetWeatherTool(BaseTool):"""获取天气信息的工具"""name = "get_weather"description = ("获取指定城市的当前天气信息。""输入应该是城市名称。")args_schema: Type[BaseModel] = GetWeatherInputdef _run(self, city: str) -> str:"""获取并返回城市的天气信息"""# 实际实现中,这里会调用天气APIweather_data = {"北京": "晴朗,25°C","上海": "多云,28°C","广州": "小雨,27°C","深圳": "多云,29°C",}if city in weather_data:return f"{city}的当前天气是{weather_data[city]}"else:return f"抱歉,没有找到{city}的天气信息"
通过这种方式,工具会自动验证输入参数,并在输入不符合要求时抛出异常。
8.5 工具的异步支持
对于支持异步操作的工具,可以实现_arun
方法,并将is_async
属性设置为True
:
from langchain.tools import BaseToolclass AsyncHTTPRequestTool(BaseTool):"""异步HTTP请求工具"""name = "http_request"description = ("发送HTTP请求并获取响应。""输入应该是一个包含URL和可选参数的字典。")is_async: bool = Trueasync def _arun(self, params: dict) -> str:"""异步发送HTTP请求并返回响应"""url = params.get("url")method = params.get("method", "GET")headers = params.get("headers", {})data = params.get("data", {})if not url:return "错误: 缺少URL参数"async with aiohttp.ClientSession() as session:async with session.request(method, url, headers=headers, json=data) as response:return await response.text()
这样,当AgentExecutor调用这个工具时,会自动使用异步方式执行,提高系统的并发处理能力。
九、异步处理与并发
9.1 异步支持概述
LangChain对异步处理提供了全面支持,使得开发者能够高效地处理多个并发请求。这种支持主要体现在以下几个方面:
- LLM调用的异步接口
- 工具执行的异步支持
- 链和代理的异步执行
- 回调系统的异步扩展
通过这些异步特性,LangChain能够充分利用现代硬件的多核性能,提高应用程序的吞吐量和响应速度。
9.2 异步LLM调用
如前面章节所述,LLM接口定义了异步方法:
# langchain/llms/base.pyclass LLM(ABC):# ... 其他方法 ...async def agenerate(self, prompts: List[str], stop: Optional[List[str]] = None) -> LLMResult:"""异步生成多个提示的响应"""passasync def _acall(self, prompt: str, stop: Optional[List[str]] = None) -> str:"""异步调用LLM的核心方法"""pass
具体的LLM实现需要提供这些异步方法的具体实现。例如,OpenAI的异步实现:
# langchain/llms/openai.pyclass OpenAI(LLM):# ... 其他方法 ...async def _acall(self, prompt: str, stop: Optional[List[str]] = None) -> str:"""异步执行OpenAI API调用"""params = {"model": self.model_name,"prompt": prompt,"temperature": self.temperature,"max_tokens": self.max_tokens,"top_p": self.top_p,