51. LangChain技术指南
10. LangChain Expression Language (LCEL)
1. LangChain基础概念
1.1 LangChain简介
LangChain是一个用于开发由大语言模型(LLM)驱动的应用程序的开源框架,由Harrison Chase于2022年10月创建。它提供了一套标准化的接口和工具,使开发者能够轻松构建复杂的AI应用。
核心定位:
- 应用开发框架:不是模型本身,而是连接和编排LLM的工具
- 抽象层:统一不同LLM提供商的API接口
- 组件化设计:提供可复用的模块化组件
- 端到端解决方案:从数据处理到模型调用的完整链路
发展历程:
| 时间 | 里程碑 | 说明 |
|---|---|---|
| 2022.10 | 项目启动 | Harrison Chase创建LangChain |
| 2023.01 | 快速增长 | GitHub Star突破10K |
| 2023.04 | 融资A轮 | 获得1000万美元投资 |
| 2023.08 | LangSmith发布 | 推出调试和监控平台 |
| 2024.01 | LCEL成熟 | Expression Language成为主流 |
技术栈支持:
- Python版本:最成熟,功能最全
- JavaScript/TypeScript版本:适合前端和Node.js应用
- 其他语言:社区支持Go、Java等版本
1.2 核心特性与优势
1. 模型无关性(Model Agnostic)
LangChain支持多种LLM提供商,开发者可以轻松切换:
| 提供商类型 | 支持的模型 | 特点 |
|---|---|---|
| OpenAI | GPT-4, GPT-3.5 | 性能强大,API稳定 |
| Anthropic | Claude 3 | 长上下文,安全性高 |
| Gemini, PaLM | 多模态能力强 | |
| 开源模型 | LLaMA, Mistral | 可本地部署,成本低 |
| 国内模型 | 文心一言、通义千问 | 中文理解好 |
2. 组件化架构
模型层"] --> B["Prompts
提示工程"] B --> C["Chains
链式调用"] C --> D["Agents
智能代理"] E["Memory
记忆系统"] --> C F["Data Connection
数据连接"] --> C G["Callbacks
回调机制"] --> C end style A fill:#e1f5ff style B fill:#fff4e1 style C fill:#e8f5e9 style D fill:#fce4ec style E fill:#f3e5f5 style F fill:#e0f2f1 style G fill:#fff9c4
3. 数据增强能力
- RAG(检索增强生成):结合外部知识库提升准确性
- 向量数据库集成:支持Pinecone、Chroma、FAISS等
- 文档处理:自动加载、分割、索引各类文档
4. 可观测性
- LangSmith:专业的调试和监控平台
- Tracing:完整的调用链追踪
- Metrics:性能指标收集和分析
5. 生产就绪
- 错误处理:完善的异常捕获和重试机制
- 流式输出:支持实时响应
- 缓存机制:减少重复调用成本
- 并发控制:高效处理大量请求
1.3 应用场景
1. 智能问答系统
- 企业知识库问答:基于内部文档的智能检索
- 客户服务机器人:自动回答常见问题
- 技术支持助手:代码问题诊断和解决
2. 文档处理
- 文档总结:自动提取关键信息
- 内容生成:根据模板生成报告、邮件
- 翻译服务:多语言文档翻译
3. 数据分析
- SQL生成:自然语言转SQL查询
- 数据可视化:自动生成图表和报表
- 商业智能:数据洞察和趋势分析
4. 代码辅助
- 代码生成:根据需求生成代码
- 代码审查:自动发现潜在问题
- 文档生成:自动生成API文档
5. 工作流自动化
- 邮件处理:自动分类和回复
- 会议纪要:自动总结会议内容
- 任务管理:智能任务分配和跟踪
1.4 生态系统
核心项目:
核心框架"] --> B["LangSmith
调试平台"] A --> C["LangServe
部署工具"] A --> D["LangGraph
图状态管理"] E["LangChain Hub
提示模板库"] --> A F["LangChain Templates
应用模板"] --> A end style A fill:#4CAF50 style B fill:#2196F3 style C fill:#FF9800 style D fill:#9C27B0 style E fill:#00BCD4 style F fill:#E91E63
1. LangChain Core
- 核心抽象和接口定义
- 基础组件实现
- 最小依赖,高性能
2. LangSmith
- 调试工具:可视化调用链
- 性能监控:延迟、成本追踪
- 数据集管理:测试用例管理
- 协作功能:团队共享和评审
3. LangServe
- 快速部署:一键将Chain转为REST API
- 自动文档:生成OpenAPI规范
- 流式支持:Server-Sent Events
- 客户端SDK:自动生成调用代码
4. LangGraph
- 状态图管理:复杂工作流编排
- 循环控制:支持条件分支和循环
- 持久化:状态保存和恢复
- 人机协作:支持人工介入
5. LangChain Hub
- 提示模板库:社区共享的优质提示
- 版本管理:提示模板版本控制
- 评分系统:社区评价和推荐
集成生态:
| 类别 | 集成工具 | 用途 |
|---|---|---|
| 向量数据库 | Pinecone, Chroma, Weaviate, Milvus | 向量存储和检索 |
| 文档加载 | PyPDF, Docx, Markdown, Web Scraper | 多格式文档处理 |
| 模型提供商 | OpenAI, Anthropic, Cohere, HuggingFace | LLM调用 |
| 工具集成 | Google Search, Wikipedia, Wolfram Alpha | 外部工具调用 |
| 监控平台 | LangSmith, Weights & Biases, MLflow | 性能监控 |
社区资源:
- GitHub:50K+ Stars,活跃的开源社区
- Discord:技术交流和问题解答
- 文档:详细的官方文档和教程
- 博客:最佳实践和案例分享
1.5 小白入门指南
什么是LangChain?用人话说
简单理解:LangChain就像是一个"乐高积木盒",里面有各种积木块(组件),你可以把它们拼起来,做出一个能和AI对话、能回答问题、能帮你处理文档的智能助手。
打个比方:
- 如果你想做一个能回答公司内部问题的机器人
- 传统方式:你需要自己写很多代码,处理各种细节
- 用LangChain:就像搭积木一样,把"文档读取"、“AI模型”、“记忆"这些积木拼起来就行了
核心概念通俗讲解
1. Models(模型)- 就是AI的大脑
# 最简单的例子
from langchain_openai import ChatOpenAI
# 创建一个AI大脑
model = ChatOpenAI(model="gpt-3.5-turbo")
# 让AI回答问题
response = model.invoke("什么是Python?")
print(response.content)
理解要点:
ChatOpenAI:这是一个类,代表OpenAI的AI模型model="gpt-3.5-turbo":选择哪个AI模型(就像选择用哪个版本的大脑)invoke():这是一个方法,意思是"调用”,就是让AI工作response.content:AI的回答内容
2. Prompts(提示)- 就是你怎么问AI
from langchain.prompts import PromptTemplate
# 创建一个提示模板(就像填空题)
template = "你是一个{role},请回答:{question}"
prompt = PromptTemplate(
input_variables=["role", "question"], # 需要填的空
template=template
)
# 填空
formatted = prompt.format(
role="Python老师",
question="什么是变量?"
)
print(formatted)
# 输出:你是一个Python老师,请回答:什么是变量?
理解要点:
PromptTemplate:提示模板类,用来创建可重复使用的问题格式input_variables:需要填的空(变量)format():填空的方法
3. Chains(链)- 把多个步骤串起来
# ✅ 新版本(LangChain 1.2.0+)使用 LCEL
from langchain_core.output_parsers import StrOutputParser
# 创建一个链(使用 | 操作符连接)
chain = prompt | model | StrOutputParser()
# 一次性完成:填空 → 问AI → 得到答案
result = chain.invoke({
"role": "数学老师",
"question": "1+1等于几?"
})
print(result) # 直接输出字符串
理解要点:
- LCEL(LangChain Expression Language):新版本的链式语法
|操作符:把组件连接起来(像管道一样)StrOutputParser():把 AI 的回答转成纯文本- 好处:代码更简洁,性能更好
4. Memory(记忆)- 让AI记住之前说过的话
# ✅ 新版本(LangChain 1.2.0+)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
# 创建记忆存储
store = {}
def get_session_history(session_id: str):
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
# 创建带记忆的对话链
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好的助手"),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
model = ChatOpenAI(model="gpt-4.1-mini")
chain = prompt | model
# 包装成带记忆的链
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history"
)
# 使用(会自动记住对话)
response1 = chain_with_history.invoke(
{"input": "我叫张三"},
config={"configurable": {"session_id": "user123"}}
)
response2 = chain_with_history.invoke(
{"input": "我叫什么名字?"},
config={"configurable": {"session_id": "user123"}}
)
# AI会回答:你叫张三
理解要点:
RunnableWithMessageHistory:新版本的记忆管理MessagesPlaceholder:在提示中预留对话历史的位置session_id:用来区分不同用户的对话- AI能记住之前的对话内容
5. Agents(代理)- 会自己思考和使用工具的AI
# ✅ 新版本(LangChain 1.2.0+)
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool
from langchain import hub
# 定义工具(使用装饰器)
@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
return str(eval(expression))
except:
return "计算错误"
tools = [calculator]
# 获取提示模板
prompt = hub.pull("hwchase17/openai-tools-agent")
# 创建 Agent
agent = create_tool_calling_agent(model, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 使用
result = agent_executor.invoke({
"input": "123 * 456 等于多少?"
})
print(result["output"])
# Agent会:
# 1. 思考:这是数学问题,需要用计算器
# 2. 使用计算器工具
# 3. 得到结果:56088
# 4. 回答用户
理解要点:
@tool装饰器:定义工具的新方式(更简洁)create_tool_calling_agent:新版本的 Agent 创建方法hub.pull():从 LangChain Hub 获取提示模板AgentExecutor:执行器,运行 Agent
6. RAG(检索增强生成)- 让AI基于你的文档回答
# ✅ 新版本(LangChain 1.2.0+)
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
# 步骤1:加载文档
loader = TextLoader("company_handbook.txt")
documents = loader.load()
# 步骤2:切分文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(documents)
# 步骤3:向量化并存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
retriever = vectorstore.as_retriever()
# 步骤4:创建 RAG 链(使用 LCEL)
template = """基于以下上下文回答问题:
上下文:{context}
问题:{question}
答案:"""
prompt = ChatPromptTemplate.from_template(template)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
# 步骤5:提问
answer = rag_chain.invoke("公司的年假政策是什么?")
print(answer)
# AI会:
# 1. 在文档中搜索相关内容
# 2. 基于找到的内容回答
# 3. 不会瞎编,因为有依据
理解要点:
retriever:检索器,搜索相关文档RunnablePassthrough():传递输入到下一步- LCEL 语法:用
|和字典组合组件 - 更灵活、性能更好
常用类和方法速查表关
| 类名 | 用途 | 常用方法 |
|---|---|---|
ChatOpenAI | OpenAI聊天模型 | invoke(), stream(), batch() |
OpenAI | OpenAI文本模型 | invoke(), predict() |
OpenAIEmbeddings | 文本向量化 | embed_query(), embed_documents() |
提示相关:
| 类名 | 用途 | 常用方法 |
|---|---|---|
PromptTemplate | 提示模板 | format(), format_messages() |
ChatPromptTemplate | 聊天提示模板 | from_messages(), format_messages() |
FewShotPromptTemplate | 少样本提示 | format() |
记忆相关:
| 类名 | 用途 | 常用方法 |
|---|---|---|
ConversationBufferMemory | 完整对话记忆 | save_context(), load_memory_variables() |
ConversationSummaryMemory | 总结式记忆 | save_context(), clear() |
VectorStoreRetrieverMemory | 向量检索记忆 | save_context(), load_memory_variables() |
链相关:
| 类名 | 用途 | 常用方法 | 状态 |
|---|---|---|---|
| LCEL (推荐) | 新版链式语法 | | 操作符, invoke() | ✅ 最新 |
RunnablePassthrough | 传递数据 | invoke() | ✅ 最新 |
RunnableParallel | 并行执行 | invoke() | ✅ 最新 |
LLMChain | 基础链 | invoke(), run() | ⚠️ 已弃用 |
ConversationChain | 对话链 | predict() | ⚠️ 已弃用 |
RetrievalQA | 检索问答链 | invoke() | ⚠️ 已弃用 |
数据处理相关:
| 类名 | 用途 | 常用方法 |
|---|---|---|
TextLoader | 文本加载 | load() |
PyPDFLoader | PDF加载 | load(), load_and_split() |
RecursiveCharacterTextSplitter | 文本分割 | split_text(), split_documents() |
Chroma | 向量数据库 | from_documents(), similarity_search() |
最简单的完整示例
# 1. 安装(在命令行运行)
# pip install langchain langchain-openai
# 2. 设置API密钥(在代码开头)
import os
os.environ["OPENAI_API_KEY"] = "你的API密钥"
# 3. 导入需要的类
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
# 4. 创建组件
model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("用简单的话解释:{topic}")
output_parser = StrOutputParser()
# 5. 组合成链(使用 | 操作符)
chain = prompt | model | output_parser
# 6. 使用
result = chain.invoke({"topic": "什么是机器学习"})
print(result)
这个例子做了什么?
- 创建了一个AI模型
- 创建了一个提示模板
- 创建了一个输出解析器(把AI的回答变成纯文本)
- 用
|把它们连起来(就像水管一样) - 输入问题,得到答案
常见错误和解决方法
错误1:API密钥错误
# ❌ 错误
model = ChatOpenAI() # 没有设置API密钥
# ✅ 正确
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
model = ChatOpenAI()
错误2:忘记调用方法
# ❌ 错误
result = chain # 忘记调用
# ✅ 正确
result = chain.invoke({"topic": "Python"})
错误3:变量名不匹配
# ❌ 错误
prompt = PromptTemplate(
input_variables=["question"], # 定义的是question
template="解释{topic}" # 但模板用的是topic
)
# ✅ 正确
prompt = PromptTemplate(
input_variables=["topic"], # 名字要一致
template="解释{topic}"
)
学习路径建议
第1周:基础概念
- 学会使用
ChatOpenAI调用AI - 学会使用
PromptTemplate创建提示 - 学会使用
LLMChain组合它们
第2周:记忆和对话
- 学会使用
ConversationChain - 理解不同的Memory类型
- 做一个简单的聊天机器人
第3周:文档处理
- 学会加载和分割文档
- 学会使用向量数据库
- 做一个文档问答系统
第4周:高级功能
- 学会使用Agent
- 学会自定义工具
- 做一个能使用工具的AI助手
实用技巧
技巧1:调试时开启verbose
chain = LLMChain(llm=model, prompt=prompt, verbose=True)
# verbose=True 会打印详细信息,方便调试
技巧2:使用流式输出
for chunk in model.stream("写一首诗"):
print(chunk.content, end="", flush=True)
# 实时显示AI的输出,不用等全部生成完
技巧3:控制成本
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = chain.invoke({"topic": "AI"})
print(f"花费: ${cb.total_cost}")
print(f"Token数: {cb.total_tokens}")
技巧4:错误重试
model = ChatOpenAI(
max_retries=3, # 失败自动重试3次
timeout=30 # 30秒超时
)
2. 核心架构与组件
2.1 整体架构设计
LangChain采用分层架构设计,从底层到上层依次为:
链式调用"] B2["Agents
智能代理"] B3["LCEL
表达式语言"] end subgraph "组件层" C1["Models
模型"] C2["Prompts
提示"] C3["Memory
记忆"] C4["Tools
工具"] C5["Retrievers
检索器"] end subgraph "基础设施层" D1["Vector Stores
向量库"] D2["Document Loaders
文档加载"] D3["Text Splitters
文本分割"] D4["Callbacks
回调"] end A1 --> B1 A2 --> B2 A3 --> B3 A4 --> B1 B1 --> C1 B1 --> C2 B2 --> C3 B2 --> C4 B3 --> C5 C1 --> D4 C3 --> D1 C5 --> D2 C5 --> D3 style A1 fill:#e3f2fd style A2 fill:#e3f2fd style A3 fill:#e3f2fd style A4 fill:#e3f2fd style B1 fill:#fff3e0 style B2 fill:#fff3e0 style B3 fill:#fff3e0 style C1 fill:#e8f5e9 style C2 fill:#e8f5e9 style C3 fill:#e8f5e9 style C4 fill:#e8f5e9 style C5 fill:#e8f5e9 style D1 fill:#f3e5f5 style D2 fill:#f3e5f5 style D3 fill:#f3e5f5 style D4 fill:#f3e5f5
架构特点:
- 松耦合设计:各组件独立,可单独使用或组合
- 接口标准化:统一的抽象接口,易于扩展
- 可组合性:通过LCEL灵活组合各种组件
- 可观测性:内置回调机制,全链路追踪
2.2 核心组件概览
小白理解:LangChain就像一个工具箱,里面有6大类工具,每类工具负责不同的事情。
1. Models(模型层)- AI的大脑
通俗解释:这是真正干活的AI,就像你请的一个聪明助手。
负责与各种LLM交互的抽象层:
| 组件 | 功能 | 典型用途 | 小白理解 |
|---|---|---|---|
| LLMs | 文本输入输出模型 | 文本生成、补全 | 给它一句话,它接着往下写 |
| Chat Models | 对话式模型 | 多轮对话、角色扮演 | 能记住上下文的聊天机器人 |
| Embeddings | 文本向量化 | 语义搜索、相似度计算 | 把文字变成数字,让电脑能理解 |
代码示例:
# LLM - 文本补全
from langchain_openai import OpenAI
llm = OpenAI()
result = llm.invoke("从前有座山,") # AI会接着写故事
# Chat Model - 对话
from langchain_openai import ChatOpenAI
chat = ChatOpenAI()
result = chat.invoke("你好") # AI会回复你
# Embeddings - 向量化
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector = embeddings.embed_query("机器学习") # 变成一串数字
2. Prompts(提示工程)- 怎么问AI
通俗解释:就像你问问题的方式,问得好,AI回答得就好。
管理和优化模型输入:
- PromptTemplate:参数化提示模板(就像填空题)
- ChatPromptTemplate:对话提示模板(多轮对话的问题格式)
- FewShotPromptTemplate:少样本学习模板(给AI看几个例子)
- OutputParser:结构化输出解析(让AI按格式回答)
代码示例:
from langchain.prompts import PromptTemplate
# 创建模板(就像填空题)
template = PromptTemplate(
input_variables=["name", "job"],
template="你好{name},你是做{job}的吗?"
)
# 填空
question = template.format(name="张三", job="程序员")
# 结果:你好张三,你是做程序员的吗?
3. Memory(记忆系统)- AI的笔记本
通俗解释:让AI记住之前说过的话,不会一转头就忘。
维护对话上下文和历史:
- ConversationBufferMemory:完整对话历史(全记住)
- ConversationSummaryMemory:总结式记忆(记重点)
- VectorStoreMemory:向量化记忆检索(智能搜索记忆)
- EntityMemory:实体关系记忆(记住人名、地名等)
代码示例:
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
# 创建记忆存储
message_history = ChatMessageHistory()
# 保存对话
message_history.add_user_message("我叫张三")
message_history.add_ai_message("你好张三,很高兴认识你!")
# 查看记忆
print("对话历史:")
for msg in message_history.messages:
print(f"{msg.type}: {msg.content}")
4. Chains(链式调用)- 把步骤串起来
通俗解释:就像流水线,一个步骤接一个步骤自动完成。
组合多个组件形成工作流:
- LLMChain:基础链,模型+提示(最简单的组合)
- SequentialChain:顺序执行多个链(一步步来)
- RouterChain:条件路由选择(根据情况选择不同路径)
- TransformChain:数据转换链(处理数据)
代码示例:
from langchain_core.output_parsers import StrOutputParser
# 1、创建大模型实例
chat_model = ChatOpenAI(model="gpt-4o-mini")
# 2、原始字符串模板
template = "桌上有{number}个苹果,四个桃子和 3 本书,一共有几个水果?"
prompt = PromptTemplate.from_template(template)
# 3、创建链式调用(使用LCEL语法,替代已弃用的LLMChain)
parser = StrOutputParser()
chain = prompt | chat_model | parser
# 4、调用chain,返回结果
result = chain.invoke({"number": 2})
print(result)
5. Agents(智能代理)- 会思考的AI
通俗解释:不只是回答问题,还会自己决定要不要用工具,怎么用。
具有决策能力的自主系统:
- ReAct Agent:推理-行动循环(边思考边行动)
- OpenAI Functions Agent:函数调用代理(精确控制)
- Conversational Agent:对话式代理(聊天机器人)
- Plan-and-Execute Agent:规划执行代理(先计划再执行)
代码示例:
from langchain.agents import create_agent
from langchain_core.tools import tool
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气"""
return f"{city}今天晴,气温 25 度"
agent = create_agent(
"gpt-4o-mini",
tools=[get_weather],
system_prompt="你是一个有帮助的助手",
)
result = agent.invoke(
{"messages": [{"role": "user", "content": "北京天气怎么样?"}]}
)
print(result["messages"][-1].content)
6. Data Connection(数据连接)- 处理文档
通俗解释:读取文件、切分文档、搜索内容的工具集。
处理外部数据源:
- Document Loaders:加载各类文档(读文件)
- Text Splitters:智能文本分割(切成小块)
- Vector Stores:向量数据库集成(存储和搜索)
- Retrievers:检索策略实现(找相关内容)
代码示例:
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 1. 加载文档
loader = TextLoader("document.txt")
docs = loader.load()
# 2. 切分文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000)
chunks = splitter.split_documents(docs)
# 3. 向量化存储
from langchain_community.vectorstores import Chroma
vectorstore = Chroma.from_documents(chunks, embeddings)
# 4. 搜索
results = vectorstore.similarity_search("机器学习")
组件关系图解:
用户问题
↓
Prompts(格式化问题)
↓
Models(AI思考)
↓
Memory(记住对话)
↓
Chains(组合步骤)
↓
Agents(决策和使用工具)
↓
Data Connection(搜索文档)
↓
返回答案
2.3 数据流与处理流程
典型RAG应用的数据流:
Agent执行流程:
分析任务"] Think --> Decide{"需要使用
工具吗?"} Decide -->|是| SelectTool["选择合适工具"] SelectTool --> ExecuteTool["执行工具"] ExecuteTool --> Observe["观察结果"] Observe --> Think Decide -->|否| Generate["生成最终答案"] Generate --> End["返回结果"] style Start fill:#e1f5ff style Think fill:#fff4e1 style Decide fill:#ffe1e1 style SelectTool fill:#e8f5e9 style ExecuteTool fill:#f3e5f5 style Observe fill:#fff9c4 style Generate fill:#e0f2f1 style End fill:#e1f5ff
2.4 模块化设计原则
1. 单一职责原则
每个组件只负责一个明确的功能:
- Document Loader:只负责加载文档
- Text Splitter:只负责分割文本
- Embeddings:只负责向量化
2. 接口抽象
所有组件都实现标准接口:
# 伪代码示例
class BaseModel:
def invoke(self, input): pass
def batch(self, inputs): pass
def stream(self, input): pass
class BaseRetriever:
def get_relevant_documents(self, query): pass
def aget_relevant_documents(self, query): pass # 异步版本
3. 可组合性
通过LCEL实现灵活组合:
# 组合示例(伪代码)
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| output_parser
)
4. 可扩展性
- 自定义组件:继承基类实现自定义逻辑
- 插件机制:通过回调扩展功能
- 中间件模式:在调用链中插入处理逻辑
5. 可测试性
- Mock支持:可以模拟各个组件
- 单元测试:每个组件可独立测试
- 集成测试:完整链路测试
组件交互模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| Pipeline | 顺序执行 | 数据处理流程 |
| Router | 条件分支 | 多策略选择 |
| Parallel | 并行执行 | 多路检索 |
| Fallback | 失败重试 | 容错处理 |
| Map-Reduce | 分布处理 | 大规模数据 |
3. Models模型层
3.1 LLM大语言模型
LLM是最基础的模型类型,接收文本输入,返回文本输出。
核心特点:
- 无状态:每次调用独立,不保留上下文
- 文本补全:根据输入生成后续文本
- 单轮交互:适合一次性任务
支持的模型提供商:
| 提供商 | 模型示例 | 特点 | 成本 |
|---|---|---|---|
| OpenAI | GPT-4, GPT-3.5-turbo | 性能强,生态好 | 中高 |
| Anthropic | Claude 3 Opus/Sonnet | 长上下文,安全 | 中高 |
| Gemini Pro | 多模态 | 中 | |
| Cohere | Command | 企业级 | 中 |
| HuggingFace | LLaMA, Mistral | 开源,可本地部署 | 低 |
| 国内 | 文心一言、通义千问 | 中文优化 | 低中 |
基本使用:
from langchain_openai import OpenAI
# 初始化模型
llm = OpenAI(
model_name="gpt-3.5-turbo-instruct",
temperature=0.7, # 控制随机性
max_tokens=256, # 最大输出长度
)
# 单次调用
response = llm.invoke("解释什么是量子计算")
# 批量调用
responses = llm.batch([
"什么是机器学习?",
"什么是深度学习?"
])
# 流式输出
for chunk in llm.stream("写一首关于春天的诗"):
print(chunk, end="", flush=True)
重要参数配置:
| 参数 | 说明 | 推荐值 | 影响 |
|---|---|---|---|
| temperature | 随机性控制 | 0.0-1.0 | 越高越随机 |
| max_tokens | 最大输出长度 | 根据需求 | 影响成本 |
| top_p | 核采样 | 0.9-1.0 | 控制多样性 |
| frequency_penalty | 重复惩罚 | 0.0-2.0 | 减少重复 |
| presence_penalty | 主题惩罚 | 0.0-2.0 | 鼓励新主题 |
3.2 Chat Models聊天模型
Chat Models专为对话场景设计,支持多轮对话和角色设定。
消息类型:
系统消息"] --> B["定义AI角色和行为"] C["HumanMessage
用户消息"] --> D["用户输入"] E["AIMessage
AI消息"] --> F["模型回复"] G["FunctionMessage
函数消息"] --> H["工具调用结果"] style A fill:#e3f2fd style C fill:#fff3e0 style E fill:#e8f5e9 style G fill:#f3e5f5
基本使用:
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
# 初始化聊天模型
chat = ChatOpenAI(
model="gpt-4o-mini", # 使用更经济的模型
temperature=0.7
)
# 构建消息列表
messages = [
SystemMessage(content="你是一个专业的Python编程助手"),
HumanMessage(content="如何实现单例模式?"),
]
# 调用模型
response = chat.invoke(messages)
print(response.content)
# 多轮对话
messages.append(AIMessage(content=response.content))
messages.append(HumanMessage(content="能给个代码示例吗?"))
response = chat.invoke(messages)
print(response.content)
与LLM的对比:
| 特性 | LLM | Chat Models |
|---|---|---|
| 输入格式 | 纯文本字符串 | 消息列表 |
| 角色支持 | 无 | System/Human/AI |
| 多轮对话 | 需手动拼接 | 原生支持 |
| 函数调用 | 不支持 | 支持 |
| 适用场景 | 文本生成、补全 | 对话、助手 |
3.3 Embeddings嵌入模型
Embeddings将文本转换为向量表示,用于语义搜索和相似度计算。
核心概念:
- 向量维度:通常为768、1536等
- 语义相似度:向量距离反映文本相似度
- 批量处理:支持批量向量化提升效率
常用Embeddings模型:
| 模型 | 维度 | 特点 | 适用场景 |
|---|---|---|---|
| OpenAI text-embedding-3 | 1536/3072 | 性能好,成本低 | 通用场景 |
| Cohere embed-v3 | 1024 | 多语言支持 | 国际化应用 |
| HuggingFace BAAI/bge | 768 | 开源,中文好 | 本地部署 |
| Sentence Transformers | 384/768 | 轻量级 | 资源受限 |
基本使用:
from langchain_openai import OpenAIEmbeddings
# 初始化
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
# 单个文本向量化
vector = embeddings.embed_query("机器学习是什么?")
print(f"向量维度: {len(vector)}")
# 批量向量化
texts = ["文本1", "文本2", "文本3"]
vectors = embeddings.embed_documents(texts)
# 计算相似度
from numpy import dot
from numpy.linalg import norm
def cosine_similarity(v1, v2):
return dot(v1, v2) / (norm(v1) * norm(v2))
query_vec = embeddings.embed_query("深度学习")
doc_vec = embeddings.embed_query("神经网络")
similarity = cosine_similarity(query_vec, doc_vec)
print(f"相似度: {similarity}")
性能优化:
- 批量处理:一次处理多个文本
- 缓存机制:避免重复向量化
- 异步调用:提高并发性能
- 本地模型:减少API调用成本
3.4 模型集成与配置
统一接口设计:
所有模型都实现相同的核心方法:
# 标准接口
class BaseLanguageModel:
def invoke(self, input):
"""同步调用"""
pass
async def ainvoke(self, input):
"""异步调用"""
pass
def batch(self, inputs):
"""批量调用"""
pass
def stream(self, input):
"""流式输出"""
pass
模型切换:
# 方式1:直接替换
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# OpenAI模型
model = ChatOpenAI(model="gpt-4")
# 切换到Anthropic(接口完全相同)
model = ChatAnthropic(model="claude-3-opus")
# 方式2:配置化
def get_model(provider: str):
if provider == "openai":
return ChatOpenAI(model="gpt-4")
elif provider == "anthropic":
return ChatAnthropic(model="claude-3-opus")
# ... 其他提供商
环境变量配置:
# OpenAI
export OPENAI_API_KEY="sk-..."
export OPENAI_API_BASE="https://api.openai.com/v1"
# Anthropic
export ANTHROPIC_API_KEY="sk-ant-..."
# HuggingFace
export HUGGINGFACEHUB_API_TOKEN="hf_..."
高级配置:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4",
temperature=0.7,
max_tokens=2000,
timeout=60, # 超时设置
max_retries=3, # 重试次数
request_timeout=30, # 请求超时
streaming=True, # 启用流式
verbose=True, # 调试模式
)
成本控制策略:
| 策略 | 实现方式 | 效果 |
|---|---|---|
| 模型降级 | 优先用便宜模型 | 成本降低50-90% |
| 缓存 | 相同输入返回缓存 | 减少重复调用 |
| 批量处理 | 合并多个请求 | 提高吞吐量 |
| Token限制 | 设置max_tokens | 控制单次成本 |
| 本地模型 | 使用开源模型 | 零API成本 |
错误处理:
from langchain_community.callbacks import get_openai_callback
from tenacity import retry, stop_after_attempt, wait_exponential
# 成本追踪
with get_openai_callback() as cb:
response = model.invoke("你好")
print(f"Tokens: {cb.total_tokens}")
print(f"Cost: ${cb.total_cost}")
# 自动重试
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def call_model_with_retry(prompt):
return model.invoke(prompt)
result = call_model_with_retry("你好")
print(result.content)
4. Prompts提示工程
4.1 Prompt Templates提示模板
PromptTemplate是参数化的提示字符串,支持动态变量替换。
基础用法:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser,JsonOutputParser
# 创建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是世界级的技术文档编写者。"),
("user", "{input}")
])
# 使用输出解析器
# output_parser = StrOutputParser()
output_parser = JsonOutputParser()
# 将其添加到上一个链中
# chain = prompt | llm
chain = prompt | llm | output_parser
# 调用它并提出同样的问题。答案是一个字符串,而不是ChatMessage
# chain.invoke({"input": "LangChain是什么?"})
chain.invoke({"input": "LangChain是什么? 用JSON格式回复,问题用question,回答用answer"})
ChatPromptTemplate(对话模板):
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 初始化模型
llm = ChatOpenAI(model="gpt-4o-mini")
# 系统消息模板
system_template = "你是一个{expertise}专家,擅长{skill}。"
system_prompt = SystemMessagePromptTemplate.from_template(system_template)
# 用户消息模板
human_template = "{user_input}"
human_prompt = HumanMessagePromptTemplate.from_template(human_template)
# 组合
chat_prompt = ChatPromptTemplate.from_messages([
system_prompt,
human_prompt
])
# 创建输出解析器
output_parser = StrOutputParser()
# 创建链
chain = chat_prompt | llm | output_parser
# 使用
result = chain.invoke({
"expertise": "数据分析",
"skill": "Python和SQL",
"user_input": "如何分析用户行为数据?"
})
print(result)
模板类型对比:
| 模板类型 | 适用场景 | 特点 |
|---|---|---|
| PromptTemplate | 简单文本生成 | 单一字符串 |
| ChatPromptTemplate | 对话应用 | 多消息类型 |
| FewShotPromptTemplate | 少样本学习 | 包含示例 |
| PipelinePromptTemplate | 复杂组合 | 模板嵌套 |
4.2 Few-shot Prompting少样本提示
Few-shot通过示例引导模型输出,提高准确性和一致性。
基本结构:
from langchain_core.prompts import FewShotPromptTemplate, PromptTemplate
# 定义示例
examples = [
{
"input": "happy",
"output": "sad"
},
{
"input": "tall",
"output": "short"
},
{
"input": "hot",
"output": "cold"
}
]
# 示例格式化模板
example_template = """
Input: {input}
Output: {output}
"""
example_prompt = PromptTemplate(
input_variables=["input", "output"],
template=example_template
)
# Few-shot模板
few_shot_prompt = FewShotPromptTemplate(
examples=examples,
example_prompt=example_prompt,
prefix="给出以下词的反义词:",
suffix="Input: {word}\nOutput:",
input_variables=["word"]
)
# 使用
print(few_shot_prompt.format(word="big"))
动态示例选择:
from langchain.prompts.example_selector import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# 创建示例选择器
example_selector = SemanticSimilarityExampleSelector.from_examples(
examples,
OpenAIEmbeddings(),
Chroma,
k=2 # 选择最相似的2个示例
)
# 使用选择器
few_shot_prompt = FewShotPromptTemplate(
example_selector=example_selector,
example_prompt=example_prompt,
prefix="给出以下词的反义词:",
suffix="Input: {word}\nOutput:",
input_variables=["word"]
)
Few-shot策略:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 固定示例 | 使用预定义示例 | 任务明确 |
| 语义相似 | 根据输入选择相似示例 | 多样化输入 |
| 长度限制 | 控制示例数量 | Token限制 |
| 难度递进 | 从简单到复杂 | 复杂任务 |
4.3 Output Parsers输出解析器
OutputParser将模型输出转换为结构化数据。
常用解析器类型:
Pydantic模型"] A --> C["StructuredOutputParser
字典结构"] A --> D["CommaSeparatedListOutputParser
逗号分隔列表"] A --> E["DatetimeOutputParser
日期时间"] A --> F["EnumOutputParser
枚举类型"] A --> G["JsonOutputParser
JSON格式"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style D fill:#f3e5f5 style E fill:#fff9c4 style F fill:#e0f2f1 style G fill:#fce4ec
PydanticOutputParser示例:
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field
# 定义数据模型
class Person(BaseModel):
name: str = Field(description="人物姓名")
age: int = Field(description="年龄")
occupation: str = Field(description="职业")
# 创建解析器
parser = PydanticOutputParser(pydantic_object=Person)
# 获取格式说明
format_instructions = parser.get_format_instructions()
# 创建提示
prompt = PromptTemplate(
template="提取以下文本中的人物信息:\n{text}\n\n{format_instructions}",
input_variables=["text"],
partial_variables={"format_instructions": format_instructions}
)
# 使用
text = "张三今年30岁,是一名软件工程师。"
formatted_prompt = prompt.format(text=text)
response = model.invoke(formatted_prompt)
person = parser.parse(response.content)
print(person.name) # 张三
print(person.age) # 30
StructuredOutputParser示例:
from langchain_core.output_parsers import StructuredOutputParser, ResponseSchema
# 定义响应模式
response_schemas = [
ResponseSchema(name="answer", description="问题的答案"),
ResponseSchema(name="confidence", description="置信度(0-100)"),
ResponseSchema(name="source", description="信息来源")
]
parser = StructuredOutputParser.from_response_schemas(response_schemas)
# 使用
format_instructions = parser.get_format_instructions()
# ... 构建提示并调用模型
result = parser.parse(response.content)
print(result["answer"])
print(result["confidence"])
错误处理:
from langchain.output_parsers import OutputFixingParser
# 原始解析器
base_parser = PydanticOutputParser(pydantic_object=Person)
# 包装修复解析器
fixing_parser = OutputFixingParser.from_llm(
parser=base_parser,
llm=model
)
# 即使输出格式有问题,也会尝试修复
try:
result = fixing_parser.parse(malformed_output)
except Exception as e:
print(f"解析失败: {e}")
4.4 提示优化策略
1. 清晰的指令
# ❌ 不好的提示
"写点关于AI的东西"
# ✅ 好的提示
"""
请写一篇500字的文章,介绍人工智能在医疗领域的应用。
要求:
1. 包含至少3个具体应用场景
2. 使用通俗易懂的语言
3. 结构清晰,分段合理
"""
2. 角色设定
system_message = """
你是一个资深的Python开发工程师,拥有10年以上的开发经验。
你的特点:
- 代码风格遵循PEP 8规范
- 注重代码可读性和可维护性
- 善于使用设计模式解决问题
- 会考虑性能和安全性
"""
3. 上下文提供
template = """
背景信息:
{context}
基于以上背景,请回答:
{question}
要求:
- 答案要基于提供的背景信息
- 如果背景信息不足,请明确指出
- 保持客观和准确
"""
4. 输出格式约束
template = """
请分析以下代码的时间复杂度:
{code}
请按以下格式输出:
1. 时间复杂度:O(?)
2. 空间复杂度:O(?)
3. 分析过程:[详细说明]
4. 优化建议:[如果有]
"""
5. Chain of Thought(思维链)
template = """
问题:{question}
请按以下步骤思考:
1. 理解问题:明确问题要求
2. 分析思路:列出可能的解决方案
3. 选择方案:选择最优方案并说明理由
4. 详细步骤:给出具体实现步骤
5. 最终答案:给出最终结果
让我们一步步思考:
"""
提示优化检查清单:
| 检查项 | 说明 | 重要性 |
|---|---|---|
| 明确目标 | 清楚说明期望输出 | ⭐⭐⭐⭐⭐ |
| 提供上下文 | 给出必要背景信息 | ⭐⭐⭐⭐ |
| 格式约束 | 指定输出格式 | ⭐⭐⭐⭐ |
| 示例引导 | 提供Few-shot示例 | ⭐⭐⭐ |
| 角色设定 | 定义AI角色 | ⭐⭐⭐ |
| 思维引导 | 使用CoT技巧 | ⭐⭐⭐ |
| 约束条件 | 明确限制和要求 | ⭐⭐⭐ |
A/B测试提示:
from langchain.evaluation import load_evaluator
# 定义两个提示版本
prompt_a = PromptTemplate(template="简单提示:{question}")
prompt_b = PromptTemplate(template="详细提示:{question}\n请详细解释。")
# 评估器
evaluator = load_evaluator("criteria", criteria="helpfulness")
# 测试
test_questions = ["什么是机器学习?", "如何学习Python?"]
for question in test_questions:
response_a = model.invoke(prompt_a.format(question=question))
response_b = model.invoke(prompt_b.format(question=question))
score_a = evaluator.evaluate_strings(
prediction=response_a.content,
input=question
)
score_b = evaluator.evaluate_strings(
prediction=response_b.content,
input=question
)
print(f"Prompt A: {score_a['score']}")
print(f"Prompt B: {score_b['score']}")
5. Memory记忆系统
5.1 记忆类型与机制
Memory使AI能够记住对话历史和上下文,实现连贯的多轮交互。
记忆类型分类:
Short-term"] A --> C["长期记忆
Long-term"] B --> B1["ConversationBufferMemory
完整历史"] B --> B2["ConversationBufferWindowMemory
滑动窗口"] B --> B3["ConversationTokenBufferMemory
Token限制"] C --> C1["ConversationSummaryMemory
总结式"] C --> C2["VectorStoreMemory
向量检索"] C --> C3["EntityMemory
实体关系"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style B1 fill:#f3e5f5 style B2 fill:#f3e5f5 style B3 fill:#f3e5f5 style C1 fill:#fff9c4 style C2 fill:#fff9c4 style C3 fill:#fff9c4
记忆机制对比:
| 类型 | 存储方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| Buffer | 完整历史 | 信息完整 | 占用大 | 短对话 |
| Window | 最近N轮 | 控制大小 | 丢失历史 | 固定长度 |
| Token Buffer | Token限制 | 精确控制 | 需计算 | 成本敏感 |
| Summary | 总结压缩 | 节省空间 | 信息损失 | 长对话 |
| Vector Store | 向量检索 | 智能检索 | 需向量化 | 大量历史 |
5.2 对话历史管理
ConversationBufferMemory(完整历史):
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# 创建记忆
memory = ConversationBufferMemory()
# 创建对话链
conversation = ConversationChain(
llm=model,
memory=memory,
verbose=True
)
# 多轮对话
conversation.predict(input="你好,我叫张三")
conversation.predict(input="我的名字是什么?") # 能记住之前的信息
# 查看历史
print(memory.load_memory_variables({}))
ConversationBufferWindowMemory(滑动窗口):
from langchain.memory import ConversationBufferWindowMemory
# 只保留最近3轮对话
memory = ConversationBufferWindowMemory(k=3)
conversation = ConversationChain(
llm=model,
memory=memory
)
# 第4轮对话时,第1轮会被丢弃
ConversationSummaryMemory(总结式):
from langchain.memory import ConversationSummaryMemory
# 自动总结历史对话
memory = ConversationSummaryMemory(llm=model)
conversation = ConversationChain(
llm=model,
memory=memory
)
# 对话会被自动总结,节省Token
conversation.predict(input="介绍一下机器学习")
conversation.predict(input="深度学习呢?")
# 查看总结
print(memory.buffer) # 显示总结后的内容
自定义记忆变量:
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(
memory_key="chat_history", # 自定义键名
return_messages=True, # 返回消息对象而非字符串
input_key="question", # 输入键
output_key="answer" # 输出键
)
5.3 向量存储记忆
VectorStoreRetrieverMemory实现智能检索:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
# 创建检索器记忆
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)
# 保存对话
memory.save_context(
{"input": "我喜欢吃披萨"},
{"output": "好的,我记住了你喜欢披萨"}
)
memory.save_context(
{"input": "我的爱好是游泳"},
{"output": "明白了,游泳是你的爱好"}
)
# 智能检索相关记忆
relevant_memories = memory.load_memory_variables(
{"input": "我喜欢什么食物?"}
)
print(relevant_memories) # 会检索到披萨相关的记忆
EntityMemory(实体关系记忆):
from langchain.memory import ConversationEntityMemory
# 自动提取和记忆实体
memory = ConversationEntityMemory(llm=model)
conversation = ConversationChain(
llm=model,
memory=memory
)
conversation.predict(input="张三在北京工作,是一名工程师")
conversation.predict(input="李四是张三的同事")
# 查看实体信息
print(memory.entity_store.store)
# 输出:{'张三': '在北京工作的工程师', '李四': '张三的同事'}
5.4 记忆优化与性能
5.4.1 记忆压缩(Memory Compression)深度解析
记忆压缩是大模型长对话系统的核心。它的本质是将原始对话转化为更高维度的抽象信息。
1. 实现方式:总结式压缩 (Summarization)
- 触发机制:当对话 Token 达到限制(如 4000 Token)时触发。
- 执行过程:系统将“旧总结 + 新对话片段”发送给 LLM,要求生成“更新后的总结”。
- Prompt 示例:
“请基于之前的对话摘要和最新的对话内容,生成一份新的摘要,保留关键事实(人名、日期、决定、偏好)。”
2. 实现方式:实体提取 (Entity Extraction)
- 原理:不总结全文,只提取关键实体及其属性。
- 效果:将 “张三昨天在上海开会,讨论了关于 A 项目的预算” 压缩为
{"张三": {"位置": "上海", "关联项目": "A", "动作": "讨论预算"}}。
5.4.2 细节丢失与补偿机制
记忆压缩必然是“有损”的。细节丢失是其天然属性,但工业界通过以下策略进行补偿:
1. 混合记忆 (Hybrid Memory) - 细节补位
- 策略:Summary (长线背景) + Buffer (短线细节)。
- 原理:总结只负责“很久以前”的事,而“最近 5-10 轮”的对话保持原样。这样大模型既能知道全局背景,又能精准回应爸爸刚刚说的话。
2. 关键信息标记 (Selective Preservation)
- 原理:在总结时,通过 Prompt 强制要求保留某些细节。
- 技巧:使用
SummaryBufferMemory,它会设置一个阈值,只有超过阈值的旧记忆才会被总结,未超过的部分保留原文。
3. 向量找回 (Vector Recovery)
- 原理:将原始对话切片存入向量数据库。
- 效果:即使总结里丢了“某次发票的具体金额”,大模型也可以通过语义检索,从向量库里把那段原始对话“捞”回来。
| 压缩策略 | 细节丢失程度 | 性能开销 | 适用场景 |
|---|---|---|---|
| 滑动窗口 | 极高(直接丢弃) | 极低 | 固定步长的对话 |
| 全文总结 | 中等(丢失语气/琐碎细节) | 高 | 业务背景复杂的长对话 |
| 实体提取 | 低(保留关键事实) | 中 | 知识密集型任务 |
| 混合策略 | 最低(工业界首选) | 中 | 复杂 Agent/长期伴侣 AI |
1. 混合记忆策略:
from langchain.memory import CombinedMemory, ConversationBufferMemory, ConversationSummaryMemory
# 组合多种记忆
short_term = ConversationBufferWindowMemory(k=5, memory_key="recent")
long_term = ConversationSummaryMemory(llm=model, memory_key="summary")
memory = CombinedMemory(memories=[short_term, long_term])
2. 记忆持久化:
import json
# 保存记忆
memory_data = memory.load_memory_variables({})
with open("memory.json", "w") as f:
json.dump(memory_data, f)
# 加载记忆
with open("memory.json", "r") as f:
loaded_data = json.load(f)
# 恢复记忆状态
3. 记忆清理:
# 清空记忆
memory.clear()
# 选择性删除
memory.chat_memory.messages = memory.chat_memory.messages[-10:] # 只保留最近10条
性能优化建议:
| 优化点 | 方法 | 效果 |
|---|---|---|
| Token控制 | 使用TokenBuffer或Summary | 降低成本50-80% |
| 异步处理 | 异步保存记忆 | 提升响应速度 |
| 批量操作 | 批量保存/检索 | 减少IO次数 |
| 缓存 | 缓存常用记忆 | 加快检索 |
| 定期清理 | 删除过期记忆 | 控制存储 |
6. Chains链式调用
6.1 Chain基础概念
⚠️ 重要提示:传统的 LLMChain、SequentialChain 等已在 LangChain 1.0+ 被 LCEL (LangChain Expression Language) 取代。
LCEL 的优势:
- 更简洁:使用
|操作符连接组件 - 更高效:自动优化执行流程
- 更灵活:支持并行、分支、流式等高级功能
- 类型安全:更好的类型提示
LCEL 执行流程:
✅ 基础 LCEL 链:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 创建组件
model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("为{product}写一句广告语")
output_parser = StrOutputParser()
# 使用 | 操作符组合(LCEL)
chain = prompt | model | output_parser
# 执行
result = chain.invoke({"product": "智能手表"})
print(result)
# 批量执行
results = chain.batch([
{"product": "智能手表"},
{"product": "无线耳机"}
])
# 流式输出
for chunk in chain.stream({"product": "智能音箱"}):
print(chunk, end="", flush=True)
6.2 常用Chain类型
1. 顺序链(使用 LCEL):
from langchain_core.runnables import RunnablePassthrough
# 步骤1:生成大纲
outline_prompt = ChatPromptTemplate.from_template("为{topic}生成文章大纲")
outline_chain = outline_prompt | model | StrOutputParser()
# 步骤2:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
"主题:{topic}\n大纲:{outline}\n\n请写一篇文章:"
)
# 组合成顺序链
sequential_chain = (
{"topic": RunnablePassthrough()}
| RunnablePassthrough.assign(outline=outline_chain)
| article_prompt
| model
| StrOutputParser()
)
# 执行
result = sequential_chain.invoke("人工智能的未来")
print(result)
2. 并行链(使用 LCEL):
from langchain_core.runnables import RunnableParallel
# 定义多个并行任务
summary_chain = (
ChatPromptTemplate.from_template("总结:{text}")
| model
| StrOutputParser()
)
keywords_chain = (
ChatPromptTemplate.from_template("提取关键词:{text}")
| model
| StrOutputParser()
)
sentiment_chain = (
ChatPromptTemplate.from_template("分析情感:{text}")
| model
| StrOutputParser()
)
# 并行执行
parallel_chain = RunnableParallel(
summary=summary_chain,
keywords=keywords_chain,
sentiment=sentiment_chain
)
# 执行
result = parallel_chain.invoke({"text": "今天天气真好,心情很愉快!"})
print(result)
# 输出:{'summary': '...', 'keywords': '...', 'sentiment': '...'}
3. 条件分支链(使用 LCEL):
from langchain_core.runnables import RunnableBranch
# 定义不同的处理链
physics_chain = (
ChatPromptTemplate.from_template("作为物理学专家回答:{question}")
| model
| StrOutputParser()
)
math_chain = (
ChatPromptTemplate.from_template("作为数学专家回答:{question}")
| model
| StrOutputParser()
)
general_chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)
# 创建分支
def route_question(input_dict):
question = input_dict["question"].lower()
if "物理" in question or "力" in question:
return "physics"
elif "数学" in question or "计算" in question:
return "math"
else:
return "general"
branch_chain = RunnableBranch(
(lambda x: route_question(x) == "physics", physics_chain),
(lambda x: route_question(x) == "math", math_chain),
general_chain # 默认分支
)
# 执行
result = branch_chain.invoke({"question": "什么是牛顿第一定律?"})
print(result)
# 创建路由链
router_chain = MultiPromptChain.from_prompts(
llm=model,
prompt_infos=prompt_infos
)
# 自动路由到合适的专家
result = router_chain.invoke({"input": "什么是牛顿第一定律?"}) # 路由到physics
3. TransformChain(转换链):
from langchain.chains import TransformChain
def transform_func(inputs: dict) -> dict:
text = inputs["text"]
# 自定义转换逻辑
processed_text = text.upper()
return {"processed_text": processed_text}
transform_chain = TransformChain(
input_variables=["text"],
output_variables=["processed_text"],
transform=transform_func
)
# 与LLMChain组合
combined_chain = SequentialChain(
chains=[transform_chain, llm_chain],
input_variables=["text"],
output_variables=["result"]
)
Chain类型对比:
| Chain类型 | 特点 | 适用场景 | 复杂度 |
|---|---|---|---|
| LLMChain | 基础链 | 单步任务 | ⭐ |
| SequentialChain | 顺序执行 | 多步流程 | ⭐⭐ |
| RouterChain | 条件分支 | 多策略选择 | ⭐⭐⭐ |
| TransformChain | 数据转换 | 预处理/后处理 | ⭐⭐ |
| MapReduceChain | 并行处理 | 大规模数据 | ⭐⭐⭐⭐ |
6.3 自定义Chain开发
继承Chain基类:
from langchain.chains.base import Chain
from typing import Dict, List
class CustomChain(Chain):
"""自定义Chain示例"""
llm: Any
prompt: PromptTemplate
@property
def input_keys(self) -> List[str]:
"""定义输入键"""
return ["input_text"]
@property
def output_keys(self) -> List[str]:
"""定义输出键"""
return ["output_text", "metadata"]
def _call(self, inputs: Dict[str, str]) -> Dict[str, str]:
"""核心执行逻辑"""
input_text = inputs["input_text"]
# 自定义处理
formatted_prompt = self.prompt.format(text=input_text)
response = self.llm.invoke(formatted_prompt)
# 返回结果
return {
"output_text": response.content,
"metadata": {"length": len(response.content)}
}
6.4 Chain组合与编排
使用LCEL组合Chain:
from langchain_core.runnables import RunnablePassthrough
# 方式1:管道操作符
chain = prompt | model | output_parser
# 方式2:并行执行
from langchain_core.runnables import RunnableParallel
chain = RunnableParallel(
summary=summary_chain,
keywords=keyword_chain
)
# 方式3:条件分支
from langchain_core.runnables import RunnableBranch
branch = RunnableBranch(
(lambda x: len(x["text"]) < 100, short_chain),
(lambda x: len(x["text"]) < 500, medium_chain),
long_chain # 默认
)
错误处理与重试:
from langchain_core.runnables import RunnableWithFallbacks
# 主Chain失败时使用备用Chain
chain_with_fallback = primary_chain.with_fallbacks(
[backup_chain_1, backup_chain_2]
)
# 自动重试
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def call_chain_with_retry(input_data):
return chain.invoke(input_data)
7. Agents智能代理
7.1 Agent架构原理
Agent是具有自主决策能力的系统,能够根据任务选择和使用工具。
核心组成:
决策中心"] B --> C{"需要工具?"} C -->|是| D["Tool Selection
工具选择"] D --> E["Tool Execution
工具执行"] E --> F["Observation
观察结果"] F --> B C -->|否| G["Final Answer
最终答案"] end H["Tools
工具集"] --> D I["Memory
记忆"] --> B J["LLM
语言模型"] --> B style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#ffe1e1 style D fill:#e8f5e9 style E fill:#f3e5f5 style F fill:#fff9c4 style G fill:#e0f2f1 style H fill:#fce4ec style I fill:#e1bee7 style J fill:#c5e1a5
ReAct模式(Reasoning + Acting):
Thought: 我需要了解当前天气
Action: search_weather
Action Input: {"city": "北京"}
Observation: 北京今天晴天,温度25度
Thought: 我已经获得了天气信息,可以回答了
Final Answer: 北京今天是晴天,温度25度
7.2 Agent类型与选择
常用Agent类型:
| Agent类型 | 特点 | 适用场景 | 模型要求 |
|---|---|---|---|
| Zero-shot ReAct | 无需示例 | 通用任务 | GPT-3.5+ |
| Structured Chat | 结构化输入 | 复杂参数 | GPT-4 |
| OpenAI Functions | 函数调用 | 精确控制 | GPT-3.5-turbo+ |
| Conversational | 对话式 | 多轮交互 | 任意 |
| Self-ask with Search | 自问自答 | 需要推理 | GPT-3+ |
| Plan-and-Execute | 规划执行 | 复杂任务 | GPT-4 |
创建Agent:
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.tools import Tool
from langchain import hub
# 1. 定义工具
def search_tool(query: str) -> str:
"""搜索工具"""
return f"搜索结果:{query}"
def calculator_tool(expression: str) -> str:
"""计算器工具"""
return str(eval(expression))
tools = [
Tool(
name="Search",
func=search_tool,
description="用于搜索信息"
),
Tool(
name="Calculator",
func=calculator_tool,
description="用于数学计算"
)
]
# 2. 获取提示模板
prompt = hub.pull("hwchase17/openai-functions-agent")
# 3. 创建Agent
agent = create_openai_functions_agent(
llm=model,
tools=tools,
prompt=prompt
)
# 4. 创建执行器
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5, # 最大迭代次数
max_execution_time=60, # 最大执行时间
handle_parsing_errors=True # 处理解析错误
)
# 5. 执行
result = agent_executor.invoke({
"input": "北京的人口是多少?然后计算人口的平方根"
})
7.3 Tools工具集成
工具定义方式:
方式1:使用@tool装饰器:
from langchain.tools import tool
@tool
def get_word_length(word: str) -> int:
"""返回单词的长度"""
return len(word)
@tool
def multiply(a: int, b: int) -> int:
"""计算两个数的乘积"""
return a * b
方式2:使用Tool类:
from langchain.tools import Tool
def custom_function(input_str: str) -> str:
# 自定义逻辑
return f"处理结果:{input_str}"
tool = Tool(
name="CustomTool",
func=custom_function,
description="这是一个自定义工具,用于..."
)
方式3:使用StructuredTool(复杂参数):
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="搜索查询")
max_results: int = Field(default=5, description="最大结果数")
def search_function(query: str, max_results: int = 5) -> str:
return f"搜索 '{query}',返回 {max_results} 条结果"
search_tool = StructuredTool.from_function(
func=search_function,
name="Search",
description="搜索工具",
args_schema=SearchInput
)
内置工具集成:
from langchain.tools import WikipediaQueryRun, DuckDuckGoSearchRun
from langchain.utilities import WikipediaAPIWrapper
# Wikipedia工具
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
# DuckDuckGo搜索
search = DuckDuckGoSearchRun()
# Python REPL
from langchain.tools import PythonREPLTool
python_repl = PythonREPLTool()
tools = [wikipedia, search, python_repl]
工具最佳实践:
| 实践 | 说明 | 重要性 |
|---|---|---|
| 清晰描述 | 准确描述工具功能 | ⭐⭐⭐⭐⭐ |
| 参数验证 | 使用Pydantic验证 | ⭐⭐⭐⭐ |
| 错误处理 | 捕获并返回友好错误 | ⭐⭐⭐⭐ |
| 超时控制 | 设置执行超时 | ⭐⭐⭐ |
| 日志记录 | 记录工具调用 | ⭐⭐⭐ |
7.4 Agent执行流程
详细执行流程:
Agent配置优化:
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
# 迭代控制
max_iterations=10, # 最大迭代次数
max_execution_time=120, # 最大执行时间(秒)
early_stopping_method="generate", # 提前停止策略
# 错误处理
handle_parsing_errors=True, # 自动处理解析错误
return_intermediate_steps=True, # 返回中间步骤
# 记忆
memory=memory, # 添加记忆
# 回调
callbacks=[callback_handler] # 添加回调
)
自定义Agent逻辑:
from langchain.agents import BaseSingleActionAgent
from typing import List, Tuple, Any
class CustomAgent(BaseSingleActionAgent):
"""自定义Agent"""
tools: List[Tool]
llm: Any
@property
def input_keys(self):
return ["input"]
def plan(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
**kwargs: Any
) -> Union[AgentAction, AgentFinish]:
"""决策逻辑"""
# 自定义决策过程
if len(intermediate_steps) > 5:
return AgentFinish(
return_values={"output": "达到最大步数"},
log="停止执行"
)
# 选择工具
tool_name = self._select_tool(kwargs["input"])
return AgentAction(
tool=tool_name,
tool_input=kwargs["input"],
log=f"使用工具: {tool_name}"
)
async def aplan(self, *args, **kwargs):
"""异步版本"""
return self.plan(*args, **kwargs)
Agent调试技巧:
# 1. 启用详细输出
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 2. 获取中间步骤
result = agent_executor.invoke(
{"input": "问题"},
return_intermediate_steps=True
)
for step in result["intermediate_steps"]:
print(f"Action: {step[0]}")
print(f"Observation: {step[1]}")
# 3. 使用回调追踪
from langchain.callbacks import StdOutCallbackHandler
agent_executor.invoke(
{"input": "问题"},
callbacks=[StdOutCallbackHandler()]
)
8. Data Connection数据连接
8.1 Document Loaders文档加载器
Document Loaders负责从各种数据源加载文档。
支持的数据源:
| 类别 | 加载器 | 支持格式 |
|---|---|---|
| 文件 | TextLoader, PDFLoader, DocxLoader | txt, pdf, docx |
| 网页 | WebBaseLoader, SeleniumURLLoader | html, 动态网页 |
| 数据库 | SQLDatabaseLoader, MongoDBLoader | SQL, NoSQL |
| API | GitHubLoader, NotionLoader | GitHub, Notion |
| 云存储 | S3Loader, GCSLoader | AWS S3, GCS |
基本使用:
from langchain_community.document_loaders import TextLoader, PyPDFLoader, WebBaseLoader
# 1. 加载文本文件
loader = TextLoader("document.txt", encoding="utf-8")
documents = loader.load()
# 2. 加载PDF
pdf_loader = PyPDFLoader("document.pdf")
pages = pdf_loader.load_and_split()
# 3. 加载网页
web_loader = WebBaseLoader("https://example.com")
web_docs = web_loader.load()
# 4. 批量加载目录
from langchain_community.document_loaders import DirectoryLoader
loader = DirectoryLoader(
"./docs",
glob="**/*.md", # 匹配所有markdown文件
loader_cls=TextLoader
)
docs = loader.load()
Document结构:
from langchain.schema import Document
# Document对象包含两个主要属性
doc = Document(
page_content="文档内容...", # 文本内容
metadata={ # 元数据
"source": "document.pdf",
"page": 1,
"author": "张三"
}
)
print(doc.page_content)
print(doc.metadata)
8.2 Text Splitters文本分割
Text Splitters将长文档分割成小块,适应模型上下文限制。
分割策略:
字符分割"] A --> C["RecursiveCharacterTextSplitter
递归分割"] A --> D["TokenTextSplitter
Token分割"] A --> E["MarkdownTextSplitter
Markdown分割"] A --> F["CodeTextSplitter
代码分割"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style D fill:#f3e5f5 style E fill:#fff9c4 style F fill:#e0f2f1
RecursiveCharacterTextSplitter(推荐):
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 创建分割器
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 每块大小
chunk_overlap=200, # 重叠部分
length_function=len, # 长度计算函数
separators=["\n\n", "\n", " ", ""] # 分隔符优先级
)
# 分割文档
chunks = splitter.split_documents(documents)
# 分割文本
texts = splitter.split_text("长文本...")
TokenTextSplitter(精确控制):
from langchain.text_splitter import TokenTextSplitter
# 按Token分割
token_splitter = TokenTextSplitter(
chunk_size=512, # Token数量
chunk_overlap=50
)
chunks = token_splitter.split_documents(documents)
代码分割:
from langchain.text_splitter import Language, RecursiveCharacterTextSplitter
# Python代码分割
python_splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON,
chunk_size=500,
chunk_overlap=50
)
# JavaScript代码分割
js_splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.JS,
chunk_size=500
)
分割参数优化:
| 参数 | 推荐值 | 说明 |
|---|---|---|
| chunk_size | 500-1500 | 根据模型上下文调整 |
| chunk_overlap | 10-20% | 保持上下文连贯性 |
| separators | 自然分隔符 | 优先段落、句子 |
8.3 Vector Stores向量数据库
Vector Stores存储和检索文档向量。
主流向量数据库对比:
| 数据库 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| Chroma | 本地/云 | 轻量级,易用 | 开发测试 |
| Pinecone | 云服务 | 高性能,托管 | 生产环境 |
| Weaviate | 本地/云 | 功能丰富 | 企业应用 |
| FAISS | 本地 | Facebook开源 | 本地部署 |
| Milvus | 本地/云 | 大规模 | 海量数据 |
Chroma使用示例:
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db" # 持久化目录
)
# 相似度搜索
results = vectorstore.similarity_search(
"机器学习是什么?",
k=3 # 返回top 3
)
# 带分数的搜索
results_with_scores = vectorstore.similarity_search_with_score(
"机器学习是什么?",
k=3
)
for doc, score in results_with_scores:
print(f"Score: {score}")
print(f"Content: {doc.page_content}")
Pinecone使用示例:
from langchain_community.vectorstores import Pinecone
import pinecone
# 初始化Pinecone
pinecone.init(
api_key="your-api-key",
environment="us-west1-gcp"
)
# 创建索引
index_name = "langchain-demo"
vectorstore = Pinecone.from_documents(
documents=chunks,
embedding=embeddings,
index_name=index_name
)
# 搜索
results = vectorstore.similarity_search("查询文本", k=5)
8.4 Retrievers检索器
Retrievers提供统一的检索接口。
检索器类型:
# 1. 向量存储检索器
retriever = vectorstore.as_retriever(
search_type="similarity", # 相似度搜索
search_kwargs={"k": 5} # 返回5个结果
)
# 2. MMR检索(最大边际相关性)
retriever = vectorstore.as_retriever(
search_type="mmr", # 平衡相关性和多样性
search_kwargs={"k": 5, "fetch_k": 20}
)
# 3. 相似度阈值检索
retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8} # 只返回分数>0.8的
)
自定义检索器:
from langchain.schema import BaseRetriever, Document
class CustomRetriever(BaseRetriever):
"""自定义检索器"""
def _get_relevant_documents(self, query: str) -> List[Document]:
"""同步检索"""
# 自定义检索逻辑
return [Document(page_content="结果...")]
async def _aget_relevant_documents(self, query: str) -> List[Document]:
"""异步检索"""
return self._get_relevant_documents(query)
混合检索:
from langchain.retrievers import EnsembleRetriever
# 组合多个检索器
ensemble_retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.7, 0.3] # 权重分配
)
results = ensemble_retriever.get_relevant_documents("查询")
9. RAG检索增强生成
9.1 RAG架构原理
RAG(Retrieval-Augmented Generation)通过检索外部知识增强LLM能力。
RAG工作流程:
RAG vs Fine-tuning:
| 对比项 | RAG | Fine-tuning |
|---|---|---|
| 知识更新 | 实时更新 | 需要重新训练 |
| 成本 | 低 | 高 |
| 可解释性 | 高(可追溯来源) | 低 |
| 准确性 | 依赖检索质量 | 依赖训练数据 |
| 适用场景 | 知识密集型 | 特定领域 |
9.2 文档索引构建
完整索引构建流程:
from langchain_community.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# 1. 加载文档
loader = DirectoryLoader(
"./knowledge_base",
glob="**/*.md",
show_progress=True
)
documents = loader.load()
# 2. 文本分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
)
chunks = text_splitter.split_documents(documents)
# 3. 添加元数据
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = i
chunk.metadata["source_type"] = "markdown"
# 4. 向量化并存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./vectorstore",
collection_name="knowledge_base"
)
print(f"索引构建完成,共 {len(chunks)} 个文档块")
增量更新索引:
# 添加新文档
new_docs = loader.load_new_documents()
new_chunks = text_splitter.split_documents(new_docs)
vectorstore.add_documents(new_chunks)
# 删除文档
vectorstore.delete(ids=["doc_id_1", "doc_id_2"])
# 更新文档
vectorstore.update_document(doc_id="doc_id", document=new_doc)
9.3 检索策略优化
1. 基础RAG实现:
from langchain.chains import RetrievalQA
# 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
# 创建RAG链
qa_chain = RetrievalQA.from_chain_type(
llm=model,
chain_type="stuff", # 将所有文档塞入上下文
retriever=retriever,
return_source_documents=True # 返回来源文档
)
# 查询
result = qa_chain.invoke({"query": "什么是机器学习?"})
print(result["result"])
print(result["source_documents"])
2. 高级检索策略:
多查询检索(Multi-Query):
from langchain.retrievers.multi_query import MultiQueryRetriever
# 自动生成多个查询变体
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=retriever,
llm=model
)
# 单个问题生成多个查询,提高召回率
results = multi_query_retriever.get_relevant_documents(
"机器学习的应用"
)
上下文压缩检索:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# 使用LLM提取相关部分
compressor = LLMChainExtractor.from_llm(model)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
# 只返回相关的文档片段
compressed_docs = compression_retriever.get_relevant_documents(
"机器学习的定义"
)
父文档检索:
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
# 小块检索,大块返回
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
store = InMemoryStore()
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter
)
3. 重排序(Reranking):
from langchain.retrievers.document_compressors import CohereRerank
# 使用Cohere重排序
reranker = CohereRerank(model="rerank-english-v2.0", top_n=3)
rerank_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=retriever
)
# 检索后重新排序,提高精确度
results = rerank_retriever.get_relevant_documents("查询")
检索策略对比:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 基础检索 | 简单快速 | 可能不准确 | 简单问答 |
| 多查询 | 提高召回率 | 增加成本 | 复杂查询 |
| 压缩检索 | 减少噪音 | 额外LLM调用 | 长文档 |
| 重排序 | 提高精确度 | 需要额外服务 | 高质量要求 |
| 混合检索 | 平衡性能 | 复杂度高 | 生产环境 |
9.4 RAG性能调优
1. 提示工程优化:
from langchain.prompts import PromptTemplate
# 自定义RAG提示
template = """
你是一个专业的问答助手。请基于以下上下文回答问题。
上下文信息:
{context}
问题:{question}
回答要求:
1. 只基于上下文信息回答
2. 如果上下文中没有相关信息,明确说明
3. 引用具体的上下文片段
4. 保持客观和准确
回答:
"""
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
qa_chain = RetrievalQA.from_chain_type(
llm=model,
retriever=retriever,
chain_type_kwargs={"prompt": prompt}
)
2. 分块策略优化:
# 策略1:固定大小 + 重叠
splitter1 = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# 策略2:语义分块
from langchain.text_splitter import SemanticChunker
splitter2 = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile" # 基于语义相似度分块
)
# 策略3:根据文档结构分块
from langchain.text_splitter import MarkdownHeaderTextSplitter
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
)
3. 检索参数调优:
# 实验不同的k值
for k in [3, 5, 7, 10]:
retriever = vectorstore.as_retriever(
search_kwargs={"k": k}
)
# 评估性能
evaluate_retriever(retriever, test_queries)
# 实验不同的检索类型
search_types = ["similarity", "mmr", "similarity_score_threshold"]
for search_type in search_types:
retriever = vectorstore.as_retriever(
search_type=search_type,
search_kwargs={"k": 5}
)
# 评估性能
4. 缓存策略:
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
# 内存缓存
set_llm_cache(InMemoryCache())
# 持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# 相同查询直接返回缓存结果,节省成本
5. 评估指标:
from langchain.evaluation import load_evaluator
# 相关性评估
relevance_evaluator = load_evaluator("criteria", criteria="relevance")
# 准确性评估
accuracy_evaluator = load_evaluator("criteria", criteria="correctness")
# 评估RAG输出
def evaluate_rag(question, answer, context):
relevance_score = relevance_evaluator.evaluate_strings(
prediction=answer,
input=question,
reference=context
)
accuracy_score = accuracy_evaluator.evaluate_strings(
prediction=answer,
input=question
)
return {
"relevance": relevance_score["score"],
"accuracy": accuracy_score["score"]
}
RAG优化检查清单:
| 优化项 | 目标 | 方法 |
|---|---|---|
| 检索质量 | 提高相关性 | 优化分块、重排序 |
| 响应速度 | 降低延迟 | 缓存、并行检索 |
| 成本控制 | 减少Token | 压缩上下文 |
| 准确性 | 提高正确率 | 提示优化、多查询 |
| 可扩展性 | 支持大规模 | 分布式向量库 |
10. LangChain Expression Language (LCEL)
10.1 LCEL语法基础
LCEL是LangChain的声明式编程语言,用于组合和编排组件。
核心优势:
- 简洁性:用管道操作符
|连接组件 - 可组合性:任意组件可以自由组合
- 流式支持:原生支持流式输出
- 并行执行:自动并行化独立操作
- 类型安全:编译时类型检查
基本语法:
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
# 创建组件
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
model = ChatOpenAI()
output_parser = StrOutputParser()
# 使用管道操作符组合
chain = prompt | model | output_parser
# 执行
result = chain.invoke({"topic": "程序员"})
print(result)
Runnable接口:
所有LCEL组件都实现Runnable接口:
class Runnable:
def invoke(self, input):
"""同步调用"""
pass
async def ainvoke(self, input):
"""异步调用"""
pass
def batch(self, inputs):
"""批量调用"""
pass
def stream(self, input):
"""流式调用"""
pass
async def astream(self, input):
"""异步流式调用"""
pass
10.2 链式操作符
1. 管道操作符 |:
# 顺序执行
chain = component1 | component2 | component3
# 等价于
def chain(input):
output1 = component1.invoke(input)
output2 = component2.invoke(output1)
output3 = component3.invoke(output2)
return output3
2. RunnablePassthrough(透传):
from langchain_core.runnables import RunnablePassthrough
# 保留原始输入
chain = {
"context": retriever,
"question": RunnablePassthrough() # 透传question
} | prompt | model
# 调用
chain.invoke("什么是机器学习?")
# question会被透传到prompt
3. RunnableLambda(自定义函数):
from langchain_core.runnables import RunnableLambda
def custom_transform(input_dict):
"""自定义转换逻辑"""
return {
"processed": input_dict["text"].upper(),
"length": len(input_dict["text"])
}
chain = (
RunnableLambda(custom_transform)
| prompt
| model
)
4. 字典组合:
# 并行执行多个组件
chain = {
"summary": summary_chain,
"keywords": keyword_chain,
"sentiment": sentiment_chain
} | final_processor
# 输入会同时发送给三个chain
result = chain.invoke({"text": "文本内容"})
# 输出: {"summary": "...", "keywords": [...], "sentiment": "..."}
10.3 并行与分支
1. RunnableParallel(并行执行):
from langchain_core.runnables import RunnableParallel
# 显式并行
parallel_chain = RunnableParallel(
summary=summary_chain,
translation=translation_chain,
analysis=analysis_chain
)
# 三个chain并行执行
results = parallel_chain.invoke({"text": "输入文本"})
2. RunnableBranch(条件分支):
from langchain_core.runnables import RunnableBranch
# 根据条件选择不同的处理链
branch = RunnableBranch(
(lambda x: len(x["text"]) < 100, short_text_chain),
(lambda x: len(x["text"]) < 500, medium_text_chain),
long_text_chain # 默认分支
)
chain = branch | output_parser
3. 动态路由:
def route_by_language(input_dict):
"""根据语言路由"""
language = detect_language(input_dict["text"])
if language == "zh":
return chinese_chain
elif language == "en":
return english_chain
else:
return default_chain
router = RunnableLambda(route_by_language)
chain = router | output_parser
4. Map操作:
from langchain_core.runnables import RunnableMap
# 对列表中每个元素应用chain
map_chain = RunnableMap(
lambda item: process_chain.invoke(item)
)
# 批量处理
items = ["item1", "item2", "item3"]
results = map_chain.batch(items)
10.4 LCEL最佳实践
1. 复杂链的构建:
# 构建一个完整的RAG链
from langchain_core.runnables import RunnablePassthrough
# 检索链
retrieval_chain = {
"context": retriever | format_docs, # 检索并格式化
"question": RunnablePassthrough()
}
# 完整链
rag_chain = (
retrieval_chain
| prompt
| model
| StrOutputParser()
)
# 使用
answer = rag_chain.invoke("什么是量子计算?")
2. 错误处理:
from langchain_core.runnables import RunnableWithFallbacks
# 主链失败时使用备用链
chain_with_fallback = primary_chain.with_fallbacks(
[backup_chain_1, backup_chain_2],
exceptions_to_handle=(ValueError, TimeoutError)
)
# 自动重试
chain_with_retry = chain.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
3. 流式输出:
# 流式处理
for chunk in chain.stream({"question": "解释深度学习"}):
print(chunk, end="", flush=True)
# 异步流式
async for chunk in chain.astream({"question": "解释深度学习"}):
print(chunk, end="", flush=True)
4. 批量处理:
# 批量调用
questions = [
{"question": "什么是AI?"},
{"question": "什么是ML?"},
{"question": "什么是DL?"}
]
# 并行处理多个输入
results = chain.batch(questions)
# 配置并发数
results = chain.batch(questions, config={"max_concurrency": 5})
5. 配置传递:
from langchain_core.runnables import RunnableConfig
# 传递配置
config = RunnableConfig(
tags=["production", "v1"],
metadata={"user_id": "123"},
callbacks=[callback_handler]
)
result = chain.invoke(input_data, config=config)
LCEL vs 传统Chain对比:
| 特性 | LCEL | 传统Chain |
|---|---|---|
| 语法 | 声明式,简洁 | 命令式,冗长 |
| 组合性 | 高度灵活 | 相对固定 |
| 流式支持 | 原生支持 | 需要额外配置 |
| 并行执行 | 自动优化 | 手动实现 |
| 类型安全 | 编译时检查 | 运行时检查 |
| 调试 | 更容易追踪 | 相对困难 |
LCEL使用建议:
- ✅ 新项目优先使用LCEL
- ✅ 复杂链用LCEL更清晰
- ✅ 需要流式输出时使用LCEL
- ⚠️ 简单场景两者都可以
- ⚠️ 旧项目迁移需要评估成本
11. 实战应用场景
11.1 智能问答系统
场景描述:基于企业知识库的智能问答系统,支持多轮对话和上下文理解。
完整实现:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
class IntelligentQASystem:
def __init__(self, knowledge_base_path):
# 初始化组件
self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
self.embeddings = OpenAIEmbeddings()
# 加载向量数据库
self.vectorstore = Chroma(
persist_directory=knowledge_base_path,
embedding_function=self.embeddings
)
# 配置记忆
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
# 创建对话检索链
self.qa_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 4}
),
memory=self.memory,
return_source_documents=True,
verbose=True
)
def ask(self, question: str):
"""提问"""
result = self.qa_chain.invoke({"question": question})
return {
"answer": result["answer"],
"sources": [
{
"content": doc.page_content[:200],
"metadata": doc.metadata
}
for doc in result["source_documents"]
]
}
def clear_history(self):
"""清除对话历史"""
self.memory.clear()
# 使用示例
qa_system = IntelligentQASystem("./knowledge_base")
# 多轮对话
print(qa_system.ask("公司的年假政策是什么?"))
print(qa_system.ask("那病假呢?")) # 能理解上下文
print(qa_system.ask("需要提前多久申请?")) # 继续追问
优化点:
- 添加意图识别,路由到不同的专家系统
- 集成反馈机制,持续优化答案质量
- 支持多模态输入(图片、语音)
11.2 文档分析与总结
场景描述:自动分析长文档,生成结构化总结和关键信息提取。
实现方案:
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
class DocumentAnalyzer:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=200
)
def analyze_document(self, file_path):
"""分析文档"""
# 1. 加载文档
loader = PyPDFLoader(file_path)
documents = loader.load()
# 2. 分割文档
chunks = self.text_splitter.split_documents(documents)
# 3. 生成总结
summary = self._generate_summary(chunks)
# 4. 提取关键信息
key_points = self._extract_key_points(chunks)
# 5. 情感分析
sentiment = self._analyze_sentiment(summary)
return {
"summary": summary,
"key_points": key_points,
"sentiment": sentiment,
"page_count": len(documents),
"word_count": sum(len(doc.page_content.split()) for doc in documents)
}
def _generate_summary(self, chunks):
"""生成总结"""
# Map-Reduce策略
chain = load_summarize_chain(
self.llm,
chain_type="map_reduce",
verbose=True
)
return chain.run(chunks)
def _extract_key_points(self, chunks):
"""提取关键点"""
prompt = ChatPromptTemplate.from_template(
"从以下文本中提取3-5个关键点:\n\n{text}"
)
chain = prompt | self.llm | StrOutputParser()
# 对每个chunk提取关键点
all_points = []
for chunk in chunks[:3]: # 只处理前3个chunk
points = chain.invoke({"text": chunk.page_content})
all_points.append(points)
return all_points
def _analyze_sentiment(self, text):
"""情感分析"""
prompt = ChatPromptTemplate.from_template(
"分析以下文本的情感倾向(积极/中性/消极):\n\n{text}"
)
chain = prompt | self.llm | StrOutputParser()
return chain.invoke({"text": text})
# 使用
analyzer = DocumentAnalyzer()
result = analyzer.analyze_document("report.pdf")
print(f"总结: {result['summary']}")
print(f"关键点: {result['key_points']}")
11.3 代码生成助手
场景描述:根据自然语言描述生成代码,支持多种编程语言。
实现方案:
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
class CodeOutput(BaseModel):
code: str = Field(description="生成的代码")
explanation: str = Field(description="代码说明")
language: str = Field(description="编程语言")
dependencies: list[str] = Field(description="依赖库")
class CodeAssistant:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.parser = PydanticOutputParser(pydantic_object=CodeOutput)
def generate_code(self, description: str, language: str = "python"):
"""生成代码"""
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的编程助手。
根据用户描述生成高质量代码。
要求:
1. 代码要简洁、高效
2. 遵循最佳实践
3. 添加必要的注释
4. 考虑边界情况
{format_instructions}
"""),
("human", "语言: {language}\n需求: {description}")
])
chain = prompt | self.llm | self.parser
result = chain.invoke({
"language": language,
"description": description,
"format_instructions": self.parser.get_format_instructions()
})
return result
def review_code(self, code: str):
"""代码审查"""
prompt = ChatPromptTemplate.from_template(
"""请审查以下代码,指出潜在问题和改进建议:
{code}
请从以下方面分析:
1. 代码质量
2. 性能问题
3. 安全隐患
4. 最佳实践
"""
)
chain = prompt | self.llm | StrOutputParser()
return chain.invoke({"code": code})
def explain_code(self, code: str):
"""解释代码"""
prompt = ChatPromptTemplate.from_template(
"""请详细解释以下代码的功能和实现原理:
{code}
请包括:
1. 整体功能
2. 关键步骤
3. 算法复杂度
4. 使用场景
"""
)
chain = prompt | self.llm | StrOutputParser()
return chain.invoke({"code": code})
# 使用示例
assistant = CodeAssistant()
# 生成代码
result = assistant.generate_code(
"实现一个二分查找算法",
language="python"
)
print(f"代码:\n{result.code}")
print(f"说明: {result.explanation}")
print(f"依赖: {result.dependencies}")
# 审查代码
review = assistant.review_code(result.code)
print(f"审查意见: {review}")
11.4 多模态应用
场景描述:处理图片、文本、音频等多种模态的数据。
图片理解应用:
from langchain_openai import ChatOpenAI
from langchain.schema.messages import HumanMessage
import base64
class MultimodalAssistant:
def __init__(self):
self.llm = ChatOpenAI(
model="gpt-4-vision-preview",
max_tokens=1024
)
def analyze_image(self, image_path: str, question: str):
"""分析图片"""
# 读取并编码图片
with open(image_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode()
# 构建消息
message = HumanMessage(
content=[
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
}
]
)
# 调用模型
response = self.llm.invoke([message])
return response.content
def compare_images(self, image1_path: str, image2_path: str):
"""比较两张图片"""
# 编码两张图片
images = []
for path in [image1_path, image2_path]:
with open(path, "rb") as f:
data = base64.b64encode(f.read()).decode()
images.append(data)
# 构建消息
message = HumanMessage(
content=[
{"type": "text", "text": "比较这两张图片的异同"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{images[0]}"}
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{images[1]}"}
}
]
)
response = self.llm.invoke([message])
return response.content
def extract_text_from_image(self, image_path: str):
"""从图片中提取文字(OCR)"""
return self.analyze_image(
image_path,
"请提取图片中的所有文字内容"
)
def describe_chart(self, chart_path: str):
"""描述图表"""
return self.analyze_image(
chart_path,
"请详细描述这个图表,包括类型、数据趋势和关键发现"
)
# 使用示例
assistant = MultimodalAssistant()
# 分析图片
result = assistant.analyze_image(
"product.jpg",
"这个产品有什么特点?"
)
print(result)
# OCR
text = assistant.extract_text_from_image("document.jpg")
print(f"提取的文字: {text}")
# 图表分析
analysis = assistant.describe_chart("sales_chart.png")
print(f"图表分析: {analysis}")
应用场景总结:
| 应用 | 核心技术 | 难点 | 价值 |
|---|---|---|---|
| 智能问答 | RAG + Memory | 上下文理解 | 提升服务效率 |
| 文档分析 | Summarization | 长文本处理 | 快速获取信息 |
| 代码助手 | Code Generation | 代码质量 | 提高开发效率 |
| 多模态 | Vision API | 模态融合 | 丰富交互方式 |
12. 性能优化与调试
12.1 性能监控与分析
关键性能指标(KPI):
| 指标 | 目标值 | 监控方法 |
|---|---|---|
| 响应时间 | < 3秒 | 计时器 |
| Token使用 | 最小化 | OpenAI Callback |
| 成本 | 预算内 | 成本追踪 |
| 准确率 | > 90% | 人工评估 |
| 错误率 | < 1% | 错误日志 |
性能监控实现:
import time
from langchain.callbacks import get_openai_callback
from functools import wraps
class PerformanceTracker:
def __init__(self):
self.metrics = {
"calls": [],
"total_tokens": 0,
"total_cost": 0.0,
"total_time": 0.0,
"errors": 0
}
def track(self, func):
"""性能追踪装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
with get_openai_callback() as cb:
result = func(*args, **kwargs)
# 记录指标
call_metrics = {
"timestamp": time.time(),
"duration": time.time() - start_time,
"tokens": cb.total_tokens,
"cost": cb.total_cost,
"success": True
}
self.metrics["calls"].append(call_metrics)
self.metrics["total_tokens"] += cb.total_tokens
self.metrics["total_cost"] += cb.total_cost
self.metrics["total_time"] += call_metrics["duration"]
return result
except Exception as e:
self.metrics["errors"] += 1
call_metrics = {
"timestamp": time.time(),
"duration": time.time() - start_time,
"error": str(e),
"success": False
}
self.metrics["calls"].append(call_metrics)
raise
return wrapper
def get_stats(self):
"""获取统计信息"""
total_calls = len(self.metrics["calls"])
successful_calls = sum(1 for c in self.metrics["calls"] if c.get("success"))
return {
"总调用次数": total_calls,
"成功次数": successful_calls,
"失败次数": self.metrics["errors"],
"成功率": f"{successful_calls/max(total_calls,1)*100:.2f}%",
"总Token数": self.metrics["total_tokens"],
"总成本": f"${self.metrics['total_cost']:.4f}",
"总耗时": f"{self.metrics['total_time']:.2f}秒",
"平均延迟": f"{self.metrics['total_time']/max(total_calls,1):.2f}秒",
"平均Token": int(self.metrics["total_tokens"]/max(successful_calls,1))
}
def get_slow_calls(self, threshold=5.0):
"""获取慢调用"""
return [
call for call in self.metrics["calls"]
if call.get("duration", 0) > threshold
]
# 使用示例
tracker = PerformanceTracker()
@tracker.track
def process_query(query):
return chain.invoke({"question": query})
# 执行查询
for query in queries:
process_query(query)
# 查看统计
print(tracker.get_stats())
# 查看慢调用
slow_calls = tracker.get_slow_calls(threshold=3.0)
print(f"慢调用数量: {len(slow_calls)}")
12.2 缓存策略
多层缓存架构:
from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
import hashlib
import json
from functools import lru_cache
class MultiLevelCache:
def __init__(self):
# L1: 内存缓存(最快)
self.memory_cache = {}
# L2: LRU缓存
self.lru_size = 100
# L3: 持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain_cache.db"))
def get_cache_key(self, input_data):
"""生成缓存键"""
# 将输入序列化为JSON并计算哈希
serialized = json.dumps(input_data, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()
@lru_cache(maxsize=100)
def _lru_get(self, key):
"""LRU缓存层"""
return self.memory_cache.get(key)
def get(self, input_data):
"""获取缓存"""
key = self.get_cache_key(input_data)
# L1: 内存缓存
if key in self.memory_cache:
return self.memory_cache[key]
# L2: LRU缓存
cached = self._lru_get(key)
if cached:
return cached
return None
def set(self, input_data, result):
"""设置缓存"""
key = self.get_cache_key(input_data)
# 写入所有缓存层
self.memory_cache[key] = result
self._lru_get.cache_clear() # 清除LRU缓存以更新
def clear(self):
"""清除缓存"""
self.memory_cache.clear()
self._lru_get.cache_clear()
# 使用缓存
cache = MultiLevelCache()
def cached_invoke(chain, input_data):
# 检查缓存
cached_result = cache.get(input_data)
if cached_result:
print("缓存命中!")
return cached_result
# 调用chain
result = chain.invoke(input_data)
# 保存缓存
cache.set(input_data, result)
return result
语义缓存:
from langchain_openai import OpenAIEmbeddings
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
class SemanticCache:
"""基于语义相似度的缓存"""
def __init__(self, similarity_threshold=0.95):
self.embeddings = OpenAIEmbeddings()
self.cache = [] # [(embedding, query, result)]
self.threshold = similarity_threshold
def get(self, query: str):
"""查找语义相似的缓存"""
if not self.cache:
return None
# 计算查询的向量
query_embedding = self.embeddings.embed_query(query)
# 计算与所有缓存的相似度
for cached_embedding, cached_query, cached_result in self.cache:
similarity = cosine_similarity(
[query_embedding],
[cached_embedding]
)[0][0]
if similarity >= self.threshold:
print(f"语义缓存命中!相似度: {similarity:.3f}")
print(f"原查询: {cached_query}")
return cached_result
return None
def set(self, query: str, result):
"""保存缓存"""
embedding = self.embeddings.embed_query(query)
self.cache.append((embedding, query, result))
# 限制缓存大小
if len(self.cache) > 100:
self.cache.pop(0)
# 使用
semantic_cache = SemanticCache()
def semantic_cached_invoke(query):
# 检查语义缓存
cached = semantic_cache.get(query)
if cached:
return cached
# 调用模型
result = chain.invoke({"question": query})
# 保存缓存
semantic_cache.set(query, result)
return result
# 测试
result1 = semantic_cached_invoke("什么是机器学习?")
result2 = semantic_cached_invoke("机器学习是什么?") # 语义相似,命中缓存
12.3 并发与异步处理
异步调用:
import asyncio
from langchain_openai import ChatOpenAI
async def process_queries_async(queries):
"""异步处理多个查询"""
llm = ChatOpenAI()
# 创建异步任务
tasks = [llm.ainvoke(query) for query in queries]
# 并发执行
results = await asyncio.gather(*tasks)
return results
# 使用
queries = ["问题1", "问题2", "问题3"]
results = asyncio.run(process_queries_async(queries))
并发控制:
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from asyncio import Semaphore
class ConcurrencyController:
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.semaphore = Semaphore(max_concurrent)
async def process_with_limit(self, func, items):
"""限制并发数"""
async def limited_func(item):
async with self.semaphore:
return await func(item)
tasks = [limited_func(item) for item in items]
return await asyncio.gather(*tasks)
# 使用
controller = ConcurrencyController(max_concurrent=3)
async def process_item(item):
# 处理单个项目
return await chain.ainvoke(item)
# 限制并发为3
results = await controller.process_with_limit(process_item, items)
批量处理优化:
def batch_process_optimized(items, batch_size=10):
"""优化的批量处理"""
results = []
# 分批处理
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
# 批量调用
batch_results = chain.batch(batch)
results.extend(batch_results)
# 避免速率限制
time.sleep(1)
return results
12.4 调试技巧
1. 详细日志:
import logging
# 配置LangChain日志
logging.basicConfig(level=logging.DEBUG)
langchain_logger = logging.getLogger("langchain")
langchain_logger.setLevel(logging.DEBUG)
# 启用详细输出
chain = LLMChain(llm=model, prompt=prompt, verbose=True)
2. 断点调试:
from langchain.callbacks import StdOutCallbackHandler
class DebugCallbackHandler(StdOutCallbackHandler):
"""调试回调"""
def on_llm_start(self, serialized, prompts, **kwargs):
print("\n=== LLM Start ===")
print(f"Prompt: {prompts[0]}")
# 可以在这里设置断点
import pdb; pdb.set_trace()
def on_llm_end(self, response, **kwargs):
print("\n=== LLM End ===")
print(f"Response: {response.generations[0][0].text}")
# 使用
chain.invoke(input, callbacks=[DebugCallbackHandler()])
3. 中间步骤追踪:
# Agent中间步骤
result = agent_executor.invoke(
{"input": "问题"},
return_intermediate_steps=True
)
print("中间步骤:")
for i, (action, observation) in enumerate(result["intermediate_steps"]):
print(f"\n步骤 {i+1}:")
print(f" Action: {action.tool}")
print(f" Input: {action.tool_input}")
print(f" Output: {observation}")
4. 单元测试:
import pytest
from unittest.mock import Mock, patch
def test_chain():
"""测试chain"""
# Mock LLM
mock_llm = Mock()
mock_llm.invoke.return_value = "测试响应"
# 创建chain
chain = LLMChain(llm=mock_llm, prompt=prompt)
# 测试
result = chain.invoke({"input": "测试"})
# 断言
assert result is not None
mock_llm.invoke.assert_called_once()
def test_retriever():
"""测试检索器"""
# Mock向量存储
mock_vectorstore = Mock()
mock_vectorstore.similarity_search.return_value = [
Document(page_content="测试文档")
]
retriever = mock_vectorstore.as_retriever()
docs = retriever.get_relevant_documents("查询")
assert len(docs) > 0
5. 性能分析:
import cProfile
import pstats
def profile_chain(chain, input_data):
"""性能分析"""
profiler = cProfile.Profile()
profiler.enable()
# 执行
result = chain.invoke(input_data)
profiler.disable()
# 输出统计
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # 显示前10个最耗时的函数
return result
调试检查清单:
| 问题类型 | 调试方法 | 工具 |
|---|---|---|
| 输出不符合预期 | 检查提示模板 | verbose=True |
| 性能慢 | 性能分析 | cProfile |
| 成本高 | Token追踪 | OpenAI Callback |
| 错误频繁 | 错误日志 | logging |
| 检索不准 | 中间步骤追踪 | return_intermediate_steps |
13. 生产部署
13.1 部署架构设计
典型部署架构:
Nginx/ALB"] B2["API网关
Kong/AWS API Gateway"] end subgraph "应用层" C1["LangChain服务
FastAPI/Flask"] C2["缓存服务
Redis"] C3["队列服务
Celery/RabbitMQ"] end subgraph "数据层" D1["向量数据库
Pinecone/Milvus"] D2["关系数据库
PostgreSQL"] D3["对象存储
S3/OSS"] end subgraph "外部服务" E1["LLM API
OpenAI/Anthropic"] E2["监控服务
LangSmith"] end A1 --> B1 A2 --> B1 A3 --> B2 B1 --> C1 B2 --> C1 C1 --> C2 C1 --> C3 C1 --> D1 C1 --> D2 C1 --> D3 C1 --> E1 C1 --> E2 style A1 fill:#e3f2fd style A2 fill:#e3f2fd style A3 fill:#e3f2fd style B1 fill:#fff3e0 style B2 fill:#fff3e0 style C1 fill:#e8f5e9 style C2 fill:#e8f5e9 style C3 fill:#e8f5e9 style D1 fill:#f3e5f5 style D2 fill:#f3e5f5 style D3 fill:#f3e5f5 style E1 fill:#fff9c4 style E2 fill:#fff9c4
使用LangServe部署:
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
# 创建FastAPI应用
app = FastAPI(
title="LangChain API",
version="1.0",
description="LangChain服务API"
)
# 创建chain
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("回答问题: {question}")
chain = prompt | model
# 添加路由
add_routes(
app,
chain,
path="/chat",
enable_feedback_endpoint=True, # 启用反馈
enable_public_trace_link_endpoint=True, # 启用追踪
)
# 健康检查
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# 运行
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker部署:
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
langchain-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- postgres
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=langchain
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- langchain-api
volumes:
redis_data:
postgres_data:
13.2 安全与权限控制
API密钥管理:
from fastapi import FastAPI, HTTPException, Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
from typing import Optional
security = HTTPBearer()
class APIKeyManager:
def __init__(self):
# 从环境变量或数据库加载API密钥
self.valid_keys = set(os.getenv("API_KEYS", "").split(","))
def verify_key(self, credentials: HTTPAuthorizationCredentials = Security(security)):
"""验证API密钥"""
token = credentials.credentials
if token not in self.valid_keys:
raise HTTPException(
status_code=401,
detail="Invalid API key"
)
return token
api_key_manager = APIKeyManager()
@app.post("/chat")
async def chat(
request: dict,
api_key: str = Depends(api_key_manager.verify_key)
):
"""需要API密钥的端点"""
# 处理请求
pass
速率限制:
from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# 创建限流器
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/chat")
@limiter.limit("10/minute") # 每分钟10次
async def chat(request: Request):
"""限流的端点"""
pass
输入验证:
from pydantic import BaseModel, Field, validator
from typing import Optional
class ChatRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=1000)
session_id: Optional[str] = None
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
@validator('question')
def validate_question(cls, v):
# 检查恶意输入
forbidden_words = ['<script>', 'DROP TABLE']
if any(word in v.upper() for word in forbidden_words):
raise ValueError('Invalid input detected')
return v
@app.post("/chat")
async def chat(request: ChatRequest):
"""验证输入的端点"""
# 输入已经过验证
pass
内容过滤:
class ContentFilter:
"""内容过滤器"""
def __init__(self):
self.sensitive_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{16}\b', # 信用卡号
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' # 邮箱
]
def filter_input(self, text: str) -> str:
"""过滤输入"""
import re
for pattern in self.sensitive_patterns:
text = re.sub(pattern, '[REDACTED]', text)
return text
def filter_output(self, text: str) -> str:
"""过滤输出"""
# 检查是否包含敏感信息
return self.filter_input(text)
content_filter = ContentFilter()
@app.post("/chat")
async def chat(request: ChatRequest):
# 过滤输入
filtered_question = content_filter.filter_input(request.question)
# 处理
response = chain.invoke({"question": filtered_question})
# 过滤输出
filtered_response = content_filter.filter_output(response)
return {"answer": filtered_response}
13.3 监控与日志
结构化日志:
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON格式handler
handler = logging.StreamHandler()
handler.setFormatter(self.JSONFormatter())
self.logger.addHandler(handler)
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
# 添加额外字段
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
return json.dumps(log_data)
def info(self, message, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.info(message, extra=extra)
def error(self, message, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.error(message, extra=extra)
# 使用
logger = StructuredLogger("langchain_api")
@app.post("/chat")
async def chat(request: ChatRequest):
request_id = str(uuid.uuid4())
logger.info(
"Chat request received",
request_id=request_id,
question_length=len(request.question)
)
try:
response = chain.invoke({"question": request.question})
logger.info(
"Chat request completed",
request_id=request_id,
response_length=len(response)
)
return {"answer": response}
except Exception as e:
logger.error(
"Chat request failed",
request_id=request_id,
error=str(e)
)
raise
Prometheus指标:
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
# 定义指标
request_count = Counter(
'langchain_requests_total',
'Total number of requests',
['endpoint', 'status']
)
request_duration = Histogram(
'langchain_request_duration_seconds',
'Request duration in seconds',
['endpoint']
)
active_requests = Gauge(
'langchain_active_requests',
'Number of active requests'
)
token_usage = Counter(
'langchain_tokens_total',
'Total tokens used',
['model']
)
# 中间件
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
active_requests.inc()
start_time = time.time()
try:
response = await call_next(request)
# 记录指标
duration = time.time() - start_time
request_duration.labels(endpoint=request.url.path).observe(duration)
request_count.labels(
endpoint=request.url.path,
status=response.status_code
).inc()
return response
finally:
active_requests.dec()
# 指标端点
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
13.4 成本优化
智能模型选择:
class CostOptimizer:
"""成本优化器"""
def __init__(self):
self.models = {
"cheap": ChatOpenAI(model="gpt-3.5-turbo"),
"expensive": ChatOpenAI(model="gpt-4")
}
self.cost_per_1k_tokens = {
"gpt-3.5-turbo": 0.002,
"gpt-4": 0.03
}
def select_model(self, task_complexity: str):
"""根据任务复杂度选择模型"""
if task_complexity == "simple":
return self.models["cheap"]
else:
return self.models["expensive"]
def estimate_cost(self, text: str, model: str):
"""估算成本"""
token_count = len(text.split()) * 1.3 # 粗略估算
cost = (token_count / 1000) * self.cost_per_1k_tokens[model]
return cost
def should_use_cache(self, estimated_cost: float):
"""判断是否应该使用缓存"""
return estimated_cost > 0.01 # 成本超过1分钱就缓存
optimizer = CostOptimizer()
@app.post("/chat")
async def chat(request: ChatRequest):
# 评估任务复杂度
complexity = "simple" if len(request.question) < 100 else "complex"
# 选择模型
model = optimizer.select_model(complexity)
# 估算成本
estimated_cost = optimizer.estimate_cost(request.question, model.model_name)
# 决定是否缓存
use_cache = optimizer.should_use_cache(estimated_cost)
# 执行
if use_cache:
# 使用缓存逻辑
pass
response = model.invoke(request.question)
return {"answer": response}
预算控制:
class BudgetController:
"""预算控制器"""
def __init__(self, daily_budget: float):
self.daily_budget = daily_budget
self.current_spending = 0.0
self.last_reset = datetime.now().date()
def check_budget(self, estimated_cost: float) -> bool:
"""检查预算"""
# 每天重置
if datetime.now().date() > self.last_reset:
self.current_spending = 0.0
self.last_reset = datetime.now().date()
# 检查是否超预算
if self.current_spending + estimated_cost > self.daily_budget:
return False
return True
def record_spending(self, actual_cost: float):
"""记录支出"""
self.current_spending += actual_cost
def get_remaining_budget(self) -> float:
"""获取剩余预算"""
return self.daily_budget - self.current_spending
budget_controller = BudgetController(daily_budget=100.0)
@app.post("/chat")
async def chat(request: ChatRequest):
# 估算成本
estimated_cost = 0.01 # 估算值
# 检查预算
if not budget_controller.check_budget(estimated_cost):
raise HTTPException(
status_code=429,
detail="Daily budget exceeded"
)
# 执行请求
with get_openai_callback() as cb:
response = chain.invoke({"question": request.question})
# 记录实际成本
budget_controller.record_spending(cb.total_cost)
return {"answer": response}
部署检查清单:
| 检查项 | 说明 | 优先级 |
|---|---|---|
| 环境变量 | API密钥等敏感信息 | ⭐⭐⭐⭐⭐ |
| 错误处理 | 完善的异常捕获 | ⭐⭐⭐⭐⭐ |
| 日志记录 | 结构化日志 | ⭐⭐⭐⭐ |
| 监控告警 | Prometheus + Grafana | ⭐⭐⭐⭐ |
| 速率限制 | 防止滥用 | ⭐⭐⭐⭐ |
| 缓存策略 | 降低成本 | ⭐⭐⭐ |
| 负载均衡 | 高可用 | ⭐⭐⭐ |
| 备份恢复 | 数据安全 | ⭐⭐⭐ |
14. 高级特性
14.1 Callbacks回调机制
Callbacks提供了钩子函数,可以在LangChain执行的各个阶段插入自定义逻辑。
回调事件类型:
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
class CustomCallbackHandler(BaseCallbackHandler):
"""自定义回调处理器"""
def on_llm_start(self, serialized, prompts, **kwargs):
"""LLM开始调用"""
print(f"LLM开始: {prompts[0][:50]}...")
def on_llm_end(self, response: LLMResult, **kwargs):
"""LLM调用结束"""
print(f"LLM结束: {response.generations[0][0].text[:50]}...")
def on_llm_error(self, error, **kwargs):
"""LLM调用错误"""
print(f"LLM错误: {error}")
def on_chain_start(self, serialized, inputs, **kwargs):
"""Chain开始"""
print(f"Chain开始: {serialized.get('name', 'Unknown')}")
def on_chain_end(self, outputs, **kwargs):
"""Chain结束"""
print(f"Chain结束: {outputs}")
def on_chain_error(self, error, **kwargs):
"""Chain错误"""
print(f"Chain错误: {error}")
def on_tool_start(self, serialized, input_str, **kwargs):
"""Tool开始"""
print(f"Tool开始: {serialized['name']}")
def on_tool_end(self, output, **kwargs):
"""Tool结束"""
print(f"Tool结束: {output[:50]}...")
def on_tool_error(self, error, **kwargs):
"""Tool错误"""
print(f"Tool错误: {error}")
def on_agent_action(self, action, **kwargs):
"""Agent执行动作"""
print(f"Agent动作: {action.tool} - {action.tool_input}")
def on_agent_finish(self, finish, **kwargs):
"""Agent完成"""
print(f"Agent完成: {finish.return_values}")
# 使用回调
handler = CustomCallbackHandler()
chain.invoke(input_data, callbacks=[handler])
实用回调示例:
1. 成本追踪回调:
class CostTrackingCallback(BaseCallbackHandler):
"""追踪成本"""
def __init__(self):
self.total_tokens = 0
self.total_cost = 0.0
self.cost_per_1k = {
"gpt-3.5-turbo": 0.002,
"gpt-4": 0.03
}
def on_llm_end(self, response: LLMResult, **kwargs):
# 计算token数
if response.llm_output:
tokens = response.llm_output.get("token_usage", {})
total_tokens = tokens.get("total_tokens", 0)
model = response.llm_output.get("model_name", "gpt-3.5-turbo")
# 计算成本
cost = (total_tokens / 1000) * self.cost_per_1k.get(model, 0.002)
self.total_tokens += total_tokens
self.total_cost += cost
def get_summary(self):
return {
"total_tokens": self.total_tokens,
"total_cost": f"${self.total_cost:.4f}"
}
2. 延迟监控回调:
import time
class LatencyCallback(BaseCallbackHandler):
"""监控延迟"""
def __init__(self):
self.start_times = {}
self.latencies = []
def on_llm_start(self, serialized, prompts, **kwargs):
run_id = kwargs.get("run_id")
self.start_times[run_id] = time.time()
def on_llm_end(self, response: LLMResult, **kwargs):
run_id = kwargs.get("run_id")
if run_id in self.start_times:
latency = time.time() - self.start_times[run_id]
self.latencies.append(latency)
if latency > 5.0:
print(f"⚠️ 慢查询警告: {latency:.2f}秒")
def get_stats(self):
if not self.latencies:
return {}
return {
"avg_latency": sum(self.latencies) / len(self.latencies),
"max_latency": max(self.latencies),
"min_latency": min(self.latencies)
}
14.2 Streaming流式输出
流式输出提供实时响应,改善用户体验。
基础流式输出:
from langchain_openai import ChatOpenAI
# 启用流式
model = ChatOpenAI(streaming=True)
# 流式调用
for chunk in model.stream("写一首诗"):
print(chunk.content, end="", flush=True)
带回调的流式输出:
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
class CustomStreamingCallback(StreamingStdOutCallbackHandler):
"""自定义流式回调"""
def __init__(self):
super().__init__()
self.tokens = []
def on_llm_new_token(self, token: str, **kwargs):
"""接收新token"""
self.tokens.append(token)
print(token, end="", flush=True)
# 可以在这里添加自定义逻辑
# 例如:发送到WebSocket
def on_llm_end(self, response: LLMResult, **kwargs):
"""流式结束"""
print("\n[流式输出完成]")
# 使用
callback = CustomStreamingCallback()
model = ChatOpenAI(streaming=True, callbacks=[callback])
model.invoke("讲个故事")
LCEL流式输出:
from langchain_core.runnables import RunnablePassthrough
# 创建流式chain
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
# 流式调用
for chunk in chain.stream("什么是机器学习?"):
print(chunk, end="", flush=True)
异步流式输出:
import asyncio
async def async_stream():
"""异步流式输出"""
async for chunk in chain.astream("解释量子计算"):
print(chunk, end="", flush=True)
await asyncio.sleep(0.01) # 控制输出速度
# 运行
asyncio.run(async_stream())
WebSocket流式输出:
from fastapi import WebSocket
from fastapi.responses import StreamingResponse
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收消息
data = await websocket.receive_text()
# 流式响应
async for chunk in chain.astream({"question": data}):
await websocket.send_text(chunk)
# 发送结束标记
await websocket.send_text("[DONE]")
except Exception as e:
await websocket.close()
# HTTP流式端点
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
async for chunk in chain.astream({"question": request.question}):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
14.3 Multi-Agent系统
Multi-Agent实现多个Agent协作完成复杂任务。
基础Multi-Agent架构:
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
class MultiAgentSystem:
"""多Agent系统"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4")
# 创建专家Agent
self.research_agent = self._create_research_agent()
self.writing_agent = self._create_writing_agent()
self.review_agent = self._create_review_agent()
def _create_research_agent(self):
"""研究Agent"""
tools = [
Tool(
name="Search",
func=self.search_tool,
description="搜索信息"
)
]
prompt = hub.pull("hwchase17/openai-functions-agent")
agent = create_openai_functions_agent(self.llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools)
def _create_writing_agent(self):
"""写作Agent"""
tools = [
Tool(
name="Write",
func=self.write_tool,
description="写作内容"
)
]
prompt = hub.pull("hwchase17/openai-functions-agent")
agent = create_openai_functions_agent(self.llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools)
def _create_review_agent(self):
"""审查Agent"""
tools = [
Tool(
name="Review",
func=self.review_tool,
description="审查内容"
)
]
prompt = hub.pull("hwchase17/openai-functions-agent")
agent = create_openai_functions_agent(self.llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools)
def execute_task(self, task: str):
"""执行任务"""
# 1. 研究阶段
research_result = self.research_agent.invoke({
"input": f"研究以下主题: {task}"
})
# 2. 写作阶段
writing_result = self.writing_agent.invoke({
"input": f"基于以下研究写文章: {research_result['output']}"
})
# 3. 审查阶段
review_result = self.review_agent.invoke({
"input": f"审查以下文章: {writing_result['output']}"
})
return review_result["output"]
def search_tool(self, query: str) -> str:
"""搜索工具"""
# 实现搜索逻辑
return f"搜索结果: {query}"
def write_tool(self, content: str) -> str:
"""写作工具"""
# 实现写作逻辑
return f"写作内容: {content}"
def review_tool(self, content: str) -> str:
"""审查工具"""
# 实现审查逻辑
return f"审查结果: {content}"
# 使用
multi_agent = MultiAgentSystem()
result = multi_agent.execute_task("人工智能的未来")
使用LangGraph构建复杂工作流:
from langgraph.graph import StateGraph, END
class AgentState(TypedDict):
"""Agent状态"""
task: str
research_done: bool
writing_done: bool
review_done: bool
final_output: str
def research_node(state: AgentState):
"""研究节点"""
# 执行研究
result = research_agent.invoke({"input": state["task"]})
return {"research_done": True, "research_result": result}
def writing_node(state: AgentState):
"""写作节点"""
# 执行写作
result = writing_agent.invoke({"input": state["research_result"]})
return {"writing_done": True, "writing_result": result}
def review_node(state: AgentState):
"""审查节点"""
# 执行审查
result = review_agent.invoke({"input": state["writing_result"]})
return {"review_done": True, "final_output": result}
# 构建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("research", research_node)
workflow.add_node("writing", writing_node)
workflow.add_node("review", review_node)
# 添加边
workflow.add_edge("research", "writing")
workflow.add_edge("writing", "review")
workflow.add_edge("review", END)
# 设置入口
workflow.set_entry_point("research")
# 编译
app = workflow.compile()
# 执行
result = app.invoke({"task": "写一篇关于AI的文章"})
14.4 自定义组件开发
自定义Retriever:
from langchain.schema import BaseRetriever, Document
from typing import List
class CustomRetriever(BaseRetriever):
"""自定义检索器"""
def __init__(self, data_source):
self.data_source = data_source
def _get_relevant_documents(self, query: str) -> List[Document]:
"""同步检索"""
# 自定义检索逻辑
results = self.data_source.search(query)
return [
Document(
page_content=result["content"],
metadata=result["metadata"]
)
for result in results
]
async def _aget_relevant_documents(self, query: str) -> List[Document]:
"""异步检索"""
# 异步检索逻辑
return self._get_relevant_documents(query)
自定义Tool:
from langchain.tools import BaseTool
from pydantic import Field
class CustomTool(BaseTool):
"""自定义工具"""
name: str = "custom_tool"
description: str = "这是一个自定义工具"
# 自定义参数
api_key: str = Field(description="API密钥")
def _run(self, query: str) -> str:
"""同步执行"""
# 实现工具逻辑
return f"处理结果: {query}"
async def _arun(self, query: str) -> str:
"""异步执行"""
# 异步实现
return self._run(query)
自定义OutputParser:
from langchain.schema import BaseOutputParser
class CustomOutputParser(BaseOutputParser):
"""自定义输出解析器"""
def parse(self, text: str) -> dict:
"""解析输出"""
# 自定义解析逻辑
lines = text.strip().split("\n")
return {
"title": lines[0] if lines else "",
"content": "\n".join(lines[1:]) if len(lines) > 1 else ""
}
def get_format_instructions(self) -> str:
"""格式说明"""
return """
请按以下格式输出:
第一行:标题
后续行:内容
"""
自定义Memory:
from langchain.memory import BaseMemory
from pydantic import Field
class CustomMemory(BaseMemory):
"""自定义记忆"""
messages: List[str] = Field(default_factory=list)
memory_key: str = "history"
@property
def memory_variables(self) -> List[str]:
"""记忆变量"""
return [self.memory_key]
def load_memory_variables(self, inputs: dict) -> dict:
"""加载记忆"""
return {self.memory_key: "\n".join(self.messages)}
def save_context(self, inputs: dict, outputs: dict):
"""保存上下文"""
self.messages.append(f"Q: {inputs.get('input', '')}")
self.messages.append(f"A: {outputs.get('output', '')}")
# 限制记忆大小
if len(self.messages) > 10:
self.messages = self.messages[-10:]
def clear(self):
"""清除记忆"""
self.messages = []
15. 常见问题与解决方案
15.1 模型调用问题
Q1: API调用超时
问题:模型调用经常超时
解决方案:
from langchain_openai import ChatOpenAI
# 增加超时时间
model = ChatOpenAI(
timeout=60, # 60秒超时
max_retries=3, # 自动重试3次
request_timeout=30 # 请求超时30秒
)
# 使用异步避免阻塞
async def call_with_timeout():
try:
result = await asyncio.wait_for(
model.ainvoke("问题"),
timeout=30.0
)
return result
except asyncio.TimeoutError:
return "请求超时,请稍后重试"
Q2: 速率限制错误
问题:频繁遇到Rate Limit错误
解决方案:
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5)
)
def call_with_retry():
return model.invoke("问题")
# 或使用队列控制速率
import asyncio
from asyncio import Semaphore
class RateLimiter:
def __init__(self, max_per_minute=60):
self.semaphore = Semaphore(max_per_minute)
self.reset_time = time.time() + 60
async def acquire(self):
# 每分钟重置
if time.time() > self.reset_time:
self.semaphore = Semaphore(self.max_per_minute)
self.reset_time = time.time() + 60
await self.semaphore.acquire()
15.2 内存管理问题
Q3: 内存占用过高
问题:长时间运行后内存占用过高
解决方案:
# 1. 使用ConversationSummaryMemory
from langchain.memory import ConversationSummaryMemory
memory = ConversationSummaryMemory(llm=model)
# 2. 定期清理
if len(memory.chat_memory.messages) > 100:
memory.chat_memory.messages = memory.chat_memory.messages[-50:]
# 3. 使用向量存储记忆
from langchain.memory import VectorStoreRetrieverMemory
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)
15.3 性能瓶颈
Q4: 检索速度慢
问题:向量检索速度慢
解决方案:
# 1. 使用更快的向量数据库
from langchain_community.vectorstores import FAISS
vectorstore = FAISS.from_documents(documents, embeddings)
# 2. 减少检索数量
retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # 只检索3个
# 3. 使用索引优化
vectorstore.save_local("faiss_index") # 保存索引
vectorstore = FAISS.load_local("faiss_index", embeddings) # 加载索引
15.4 错误处理
Q5: 解析错误
问题:OutputParser经常解析失败
解决方案:
from langchain.output_parsers import OutputFixingParser
# 使用修复解析器
base_parser = PydanticOutputParser(pydantic_object=MyModel)
fixing_parser = OutputFixingParser.from_llm(
parser=base_parser,
llm=model
)
# 自动修复格式错误
try:
result = fixing_parser.parse(output)
except Exception as e:
# 降级处理
result = {"error": str(e), "raw_output": output}
常见问题速查表:
| 问题 | 原因 | 解决方案 |
|---|---|---|
| API超时 | 网络慢/模型慢 | 增加超时、异步调用 |
| 速率限制 | 调用过频 | 重试机制、速率控制 |
| 内存溢出 | 记忆过多 | 使用Summary/清理 |
| 检索慢 | 数据量大 | 优化索引、减少k值 |
| 解析失败 | 格式不对 | 使用FixingParser |
| 成本高 | 模型选择不当 | 智能降级、缓存 |
16. 面试题精选
16.1 基础概念题
Q1: 什么是LangChain?它解决了什么问题?
答案: LangChain是一个用于开发由大语言模型驱动的应用程序的开源框架。它主要解决以下问题:
- 模型抽象:统一不同LLM提供商的API接口,实现模型无关性
- 组件化开发:提供可复用的模块(Prompts、Chains、Agents等)
- 数据增强:通过RAG技术结合外部知识库
- 工作流编排:简化复杂AI应用的开发流程
- 可观测性:内置调试和监控能力
核心价值:降低LLM应用开发门槛,提高开发效率,实现快速迭代。
Q2: LangChain的核心组件有哪些?各自的作用是什么?
答案:
| 组件 | 作用 | 典型应用 |
|---|---|---|
| Models | 模型抽象层 | 统一调用不同LLM |
| Prompts | 提示管理 | 模板化、参数化提示 |
| Memory | 记忆系统 | 维护对话上下文 |
| Chains | 链式调用 | 组合多个步骤 |
| Agents | 智能代理 | 自主决策和工具使用 |
| Data Connection | 数据连接 | 文档加载和检索 |
| Callbacks | 回调机制 | 监控和日志 |
关键点:这些组件可以独立使用,也可以组合使用,体现了模块化设计思想。
Q3: LLM和Chat Models有什么区别?
答案:
LLM(Language Model):
- 输入:纯文本字符串
- 输出:文本补全
- 特点:无状态,单轮交互
- 适用:文本生成、补全任务
Chat Models:
- 输入:消息列表(System/Human/AI Message)
- 输出:AI消息
- 特点:支持角色设定,多轮对话
- 适用:对话系统、助手应用
选择建议:
- 简单文本生成 → LLM
- 对话应用 → Chat Models
- 需要函数调用 → Chat Models(支持Function Calling)
Q4: 什么是RAG?为什么需要RAG?
答案:
**RAG(Retrieval-Augmented Generation)**是检索增强生成技术,通过检索外部知识库来增强LLM的回答能力。
为什么需要RAG:
- 知识时效性:LLM训练数据有截止日期,RAG可以获取最新信息
- 领域知识:LLM可能不了解特定领域或企业内部知识
- 减少幻觉:基于检索到的事实回答,提高准确性
- 可追溯性:可以引用具体来源,增强可信度
- 成本效益:相比Fine-tuning,RAG更新知识成本更低
核心流程:
用户问题 → 向量化 → 检索相关文档 → 构建提示 → LLM生成答案
Q5: Memory在LangChain中的作用是什么?有哪些类型?
答案:
Memory的作用:
- 维护对话历史和上下文
- 实现连贯的多轮交互
- 记住用户偏好和信息
主要类型:
ConversationBufferMemory:完整保存所有对话
- 优点:信息完整
- 缺点:占用空间大
ConversationBufferWindowMemory:只保留最近N轮
- 优点:控制大小
- 缺点:丢失早期信息
ConversationSummaryMemory:总结压缩历史
- 优点:节省Token
- 缺点:可能丢失细节
VectorStoreMemory:向量化存储,智能检索
- 优点:大规模历史管理
- 缺点:需要向量化成本
选择策略:
- 短对话 → Buffer
- 长对话 → Summary或Vector Store
- 成本敏感 → Window或Summary
16.2 架构设计题
Q6: 如何设计一个企业级的RAG问答系统?
答案:
系统架构:
用户层 → API网关 → 应用层 → 数据层
↓
缓存层 → 向量数据库
↓
监控层 → LLM服务
关键设计要点:
数据层:
- 文档预处理:清洗、格式化
- 智能分块:根据文档结构分割
- 元数据管理:记录来源、时间、版本
- 增量更新:支持文档的增删改
检索层:
- 混合检索:向量检索 + 关键词检索
- 重排序:使用Reranker提高精确度
- 缓存策略:热点问题缓存
- 多路召回:提高召回率
生成层:
- 提示优化:结构化提示模板
- 上下文压缩:只保留相关部分
- 流式输出:提升用户体验
- 答案验证:检查答案质量
监控层:
- 性能监控:延迟、吞吐量
- 成本追踪:Token使用量
- 质量评估:准确率、相关性
- 异常告警:错误率监控
安全层:
- 权限控制:基于角色的访问控制
- 内容过滤:敏感信息过滤
- 审计日志:记录所有操作
- 数据加密:传输和存储加密
技术选型:
- 向量数据库:Pinecone(云)或Milvus(自建)
- LLM:GPT-4(高质量)+ GPT-3.5(成本优化)
- 缓存:Redis
- 监控:LangSmith + Prometheus
Q7: Agent和Chain有什么区别?各自适用什么场景?
答案:
Chain(链):
- 特点:预定义的执行流程,确定性
- 执行方式:按固定顺序执行
- 适用场景:
- 流程明确的任务
- 需要可预测的行为
- 对成本敏感的场景
Agent(代理):
- 特点:自主决策,动态选择工具
- 执行方式:根据任务动态规划
- 适用场景:
- 复杂、开放式任务
- 需要多步推理
- 工具选择不确定
对比示例:
| 任务 | 推荐方案 | 理由 |
|---|---|---|
| 文档总结 | Chain | 流程固定 |
| 数据分析 | Agent | 需要动态选择工具 |
| 翻译 | Chain | 单一任务 |
| 复杂问答 | Agent | 可能需要搜索、计算等 |
混合使用:
- Agent内部可以使用Chain
- Chain可以调用Agent作为一个步骤
Q8: 如何优化LangChain应用的性能?
答案:
1. 模型层优化:
- 模型选择:根据任务复杂度选择合适模型
- 简单任务:GPT-3.5-turbo
- 复杂任务:GPT-4
- 参数调优:
- temperature:降低随机性提高稳定性
- max_tokens:限制输出长度控制成本
- 批量处理:合并多个请求减少调用次数
2. 检索优化:
- 索引优化:
- 合理的分块大小(500-1500字符)
- 适当的重叠(10-20%)
- 检索策略:
- 使用MMR平衡相关性和多样性
- 重排序提高精确度
- 缓存:
- 向量缓存:避免重复向量化
- 结果缓存:缓存常见问题答案
3. 并发优化:
# 异步调用
async def process_batch(questions):
tasks = [chain.ainvoke(q) for q in questions]
return await asyncio.gather(*tasks)
# 并行检索
from langchain_core.runnables import RunnableParallel
parallel_chain = RunnableParallel(
summary=summary_chain,
keywords=keyword_chain
)
4. 成本优化:
- 智能降级:优先使用便宜模型,必要时升级
- 提示压缩:减少不必要的上下文
- 缓存策略:避免重复调用
- Token限制:设置合理的max_tokens
5. 监控优化:
- 使用LangSmith追踪性能瓶颈
- 设置告警阈值
- 定期分析日志优化
性能指标:
- 响应时间:< 3秒
- Token使用:优化到最小
- 准确率:> 90%
- 成本:控制在预算内
Q9: 如何处理LangChain应用中的错误和异常?
答案:
1. 模型调用错误:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def call_llm_with_retry(prompt):
try:
return model.invoke(prompt)
except Exception as e:
logger.error(f"LLM调用失败: {e}")
raise
# 使用Fallback
from langchain_core.runnables import RunnableWithFallbacks
chain_with_fallback = primary_chain.with_fallbacks(
[backup_chain]
)
2. 解析错误:
from langchain.output_parsers import OutputFixingParser
# 自动修复解析错误
fixing_parser = OutputFixingParser.from_llm(
parser=base_parser,
llm=model
)
try:
result = fixing_parser.parse(output)
except Exception as e:
# 降级处理
result = fallback_parse(output)
3. 超时处理:
# 设置超时
model = ChatOpenAI(
timeout=30, # 30秒超时
max_retries=3
)
# Agent超时
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
max_execution_time=60 # 60秒超时
)
4. 资源限制:
# Token限制
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = chain.invoke(input)
if cb.total_tokens > 10000:
logger.warning("Token使用过多")
# 速率限制
from langchain.llms import OpenAI
llm = OpenAI(
max_tokens_per_minute=90000,
max_requests_per_minute=3500
)
5. 错误监控:
from langchain.callbacks import StdOutCallbackHandler
class ErrorTrackingHandler(StdOutCallbackHandler):
def on_llm_error(self, error, **kwargs):
logger.error(f"LLM错误: {error}")
# 发送告警
send_alert(error)
def on_chain_error(self, error, **kwargs):
logger.error(f"Chain错误: {error}")
# 使用
chain.invoke(input, callbacks=[ErrorTrackingHandler()])
错误处理最佳实践:
- 优雅降级:主服务失败时使用备用方案
- 详细日志:记录完整的错误上下文
- 用户友好:向用户展示友好的错误信息
- 自动恢复:实现重试和自动修复机制
- 监控告警:及时发现和处理问题
16.3 实战应用题
Q10: 如何实现一个支持多轮对话的智能客服系统?
答案:
系统设计:
from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
class CustomerServiceBot:
def __init__(self):
# 1. 初始化模型
self.llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.7
)
# 2. 配置记忆(保留最近10轮对话)
self.memory = ConversationBufferWindowMemory(
k=10,
return_messages=True
)
# 3. 系统提示
self.system_prompt = """
你是一个专业的客服助手。
职责:
- 友好、耐心地回答用户问题
- 如果不确定,诚实告知并提供替代方案
- 记住对话历史,提供连贯的服务
"""
# 4. 创建对话链
self.conversation = ConversationChain(
llm=self.llm,
memory=self.memory,
verbose=True
)
def chat(self, user_input: str) -> str:
"""处理用户输入"""
try:
response = self.conversation.predict(input=user_input)
return response
except Exception as e:
return "抱歉,系统出现问题,请稍后再试。"
def reset(self):
"""重置对话"""
self.memory.clear()
# 使用
bot = CustomerServiceBot()
print(bot.chat("你好,我想咨询退款问题"))
print(bot.chat("我的订单号是12345"))
print(bot.chat("什么时候能退款?"))
增强功能:
- 意图识别:
from langchain.chains.router import MultiPromptChain
# 根据意图路由到不同的处理链
intent_chains = {
"退款": refund_chain,
"咨询": inquiry_chain,
"投诉": complaint_chain
}
- 知识库集成:
# 结合RAG检索FAQ
retriever = vectorstore.as_retriever()
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
retriever=retriever
)
- 情感分析:
def analyze_sentiment(text):
# 检测用户情绪
if is_negative(text):
# 升级到人工客服
escalate_to_human()
Q11: 如何构建一个代码审查助手?
答案:
完整实现:
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
# 1. 定义输出结构
class CodeReview(BaseModel):
issues: list[str] = Field(description="发现的问题列表")
suggestions: list[str] = Field(description="改进建议")
security_concerns: list[str] = Field(description="安全隐患")
performance_tips: list[str] = Field(description="性能优化建议")
rating: int = Field(description="代码质量评分(1-10)")
# 2. 创建解析器
parser = PydanticOutputParser(pydantic_object=CodeReview)
# 3. 构建提示
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个资深的代码审查专家。
请从以下维度审查代码:
1. 代码质量和可读性
2. 潜在的Bug和错误
3. 安全漏洞
4. 性能问题
5. 最佳实践
{format_instructions}
"""),
("human", "请审查以下代码:\n\n{code}")
])
# 4. 创建链
model = ChatOpenAI(model="gpt-4", temperature=0)
chain = prompt | model | parser
# 5. 使用
code = """
def calculate_total(items):
total = 0
for item in items:
total = total + item['price']
return total
"""
review = chain.invoke({
"code": code,
"format_instructions": parser.get_format_instructions()
})
print(f"评分: {review.rating}/10")
print(f"问题: {review.issues}")
print(f"建议: {review.suggestions}")
高级功能:
- 多语言支持:
language_prompts = {
"python": python_review_prompt,
"javascript": js_review_prompt,
"java": java_review_prompt
}
def review_code(code, language):
prompt = language_prompts[language]
return chain.invoke({"code": code, "prompt": prompt})
- 增量审查:
def review_diff(old_code, new_code):
"""只审查变更部分"""
diff = generate_diff(old_code, new_code)
return chain.invoke({"code": diff})
- 自动修复:
def auto_fix(code, issues):
"""根据问题自动生成修复建议"""
fix_prompt = f"""
代码:{code}
问题:{issues}
请提供修复后的代码。
"""
return model.invoke(fix_prompt)
Q12: 如何实现一个文档问答系统,支持多种文档格式?
答案:
系统架构:
from langchain_community.document_loaders import (
PyPDFLoader, Docx2txtLoader, TextLoader, UnstructuredMarkdownLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
class DocumentQASystem:
def __init__(self):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = None
self.qa_chain = None
# 文档加载器映射
self.loaders = {
'.pdf': PyPDFLoader,
'.docx': Docx2txtLoader,
'.txt': TextLoader,
'.md': UnstructuredMarkdownLoader
}
def load_documents(self, file_paths: list[str]):
"""加载多种格式文档"""
all_docs = []
for file_path in file_paths:
# 根据文件扩展名选择加载器
ext = os.path.splitext(file_path)[1]
loader_class = self.loaders.get(ext)
if loader_class:
loader = loader_class(file_path)
docs = loader.load()
all_docs.extend(docs)
else:
print(f"不支持的文件格式: {ext}")
return all_docs
def build_index(self, documents):
"""构建向量索引"""
# 文本分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)
# 创建向量存储
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory="./doc_qa_db"
)
# 创建QA链
self.qa_chain = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-3.5-turbo"),
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 4}
),
return_source_documents=True
)
def query(self, question: str):
"""查询"""
if not self.qa_chain:
return "请先加载文档并构建索引"
result = self.qa_chain.invoke({"query": question})
return {
"answer": result["result"],
"sources": [
{
"content": doc.page_content[:200],
"source": doc.metadata.get("source", "未知")
}
for doc in result["source_documents"]
]
}
# 使用示例
qa_system = DocumentQASystem()
# 加载文档
docs = qa_system.load_documents([
"manual.pdf",
"guide.docx",
"readme.md"
])
# 构建索引
qa_system.build_index(docs)
# 查询
result = qa_system.query("如何安装软件?")
print(result["answer"])
print("来源:", result["sources"])
优化点:
- 增量更新:
def add_documents(self, new_docs):
"""增量添加文档"""
chunks = self.text_splitter.split_documents(new_docs)
self.vectorstore.add_documents(chunks)
- 多语言支持:
def detect_language(text):
# 检测文档语言
return langdetect.detect(text)
def query_multilingual(question, language):
# 根据语言选择合适的模型
if language == "zh":
llm = ChatOpenAI(model="gpt-3.5-turbo")
else:
llm = ChatOpenAI(model="gpt-3.5-turbo")
- 答案质量评估:
def evaluate_answer(question, answer, sources):
"""评估答案质量"""
evaluator = load_evaluator("criteria", criteria="relevance")
score = evaluator.evaluate_strings(
prediction=answer,
input=question,
reference="\n".join([s["content"] for s in sources])
)
return score["score"]
16.4 性能优化题
Q13: 如何降低LangChain应用的Token成本?
答案:
1. 模型选择策略:
class CostOptimizedChain:
def __init__(self):
# 便宜模型用于简单任务
self.cheap_model = ChatOpenAI(model="gpt-3.5-turbo")
# 贵模型用于复杂任务
self.expensive_model = ChatOpenAI(model="gpt-4")
def invoke(self, input_text):
# 根据复杂度选择模型
if self.is_simple_task(input_text):
return self.cheap_model.invoke(input_text)
else:
return self.expensive_model.invoke(input_text)
def is_simple_task(self, text):
"""判断任务复杂度"""
# 简单规则:长度、关键词等
return len(text) < 100 and not any(
keyword in text for keyword in ["分析", "推理", "复杂"]
)
2. 提示压缩:
from langchain.prompts import PromptTemplate
# ❌ 冗长的提示
bad_prompt = """
你是一个非常专业的助手,拥有丰富的经验...
(大量无用描述)
请回答:{question}
"""
# ✅ 精简的提示
good_prompt = """
角色:专业助手
任务:{question}
要求:简洁准确
"""
# 动态压缩上下文
def compress_context(context, max_tokens=500):
"""压缩上下文到指定Token数"""
if count_tokens(context) > max_tokens:
# 使用总结模型压缩
summary_chain = load_summarize_chain(llm, chain_type="stuff")
return summary_chain.run([Document(page_content=context)])
return context
3. 缓存策略:
from langchain.cache import SQLiteCache
from langchain.globals import set_llm_cache
import hashlib
# 全局缓存
set_llm_cache(SQLiteCache(database_path=".langchain.db"))
# 自定义缓存
class SmartCache:
def __init__(self):
self.cache = {}
def get_cache_key(self, prompt):
"""生成缓存键"""
return hashlib.md5(prompt.encode()).hexdigest()
def get(self, prompt):
key = self.get_cache_key(prompt)
return self.cache.get(key)
def set(self, prompt, result):
key = self.get_cache_key(prompt)
self.cache[key] = result
cache = SmartCache()
def cached_invoke(prompt):
# 检查缓存
cached_result = cache.get(prompt)
if cached_result:
return cached_result
# 调用模型
result = model.invoke(prompt)
# 保存缓存
cache.set(prompt, result)
return result
4. 批量处理:
# ❌ 逐个处理
for question in questions:
answer = model.invoke(question) # 多次API调用
# ✅ 批量处理
answers = model.batch(questions) # 一次API调用
5. Token计数和限制:
from langchain.callbacks import get_openai_callback
import tiktoken
def count_tokens(text, model="gpt-3.5-turbo"):
"""精确计算Token数"""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def optimize_prompt(prompt, max_tokens=1000):
"""优化提示长度"""
if count_tokens(prompt) > max_tokens:
# 截断或总结
return prompt[:max_tokens]
return prompt
# 追踪成本
with get_openai_callback() as cb:
result = chain.invoke(input)
print(f"使用Token: {cb.total_tokens}")
print(f"成本: ${cb.total_cost:.4f}")
# 设置预算告警
if cb.total_cost > 1.0:
logger.warning("成本超过预算!")
成本优化检查清单:
| 优化项 | 方法 | 预期节省 |
|---|---|---|
| 模型选择 | 简单任务用便宜模型 | 50-80% |
| 提示优化 | 删除冗余内容 | 20-40% |
| 缓存 | 缓存常见查询 | 30-70% |
| 批量处理 | 合并请求 | 10-20% |
| 上下文压缩 | 总结长文本 | 30-50% |
Q14: 如何提高RAG系统的检索准确率?
答案:
1. 优化文档分块:
# 策略1:语义分块
from langchain.text_splitter import SemanticChunker
semantic_splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=95
)
# 策略2:根据文档结构分块
from langchain.text_splitter import MarkdownHeaderTextSplitter
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
]
)
# 策略3:混合分块
def hybrid_split(documents):
# 先按结构分割
structural_chunks = markdown_splitter.split_documents(documents)
# 再按大小分割
final_chunks = text_splitter.split_documents(structural_chunks)
return final_chunks
2. 混合检索:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# 向量检索
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# BM25关键词检索
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 5
# 混合检索
ensemble_retriever = EnsembleRetriever(
retrievers=[vector_retriever, bm25_retriever],
weights=[0.7, 0.3] # 向量70%,关键词30%
)
results = ensemble_retriever.get_relevant_documents("查询")
3. 重排序:
from langchain.retrievers.document_compressors import CohereRerank
from langchain.retrievers import ContextualCompressionRetriever
# 初始检索更多候选
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 20})
# Cohere重排序
reranker = CohereRerank(model="rerank-english-v2.0", top_n=5)
# 组合
compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=base_retriever
)
# 检索 → 重排序 → 返回top 5
results = compression_retriever.get_relevant_documents("查询")
4. 查询优化:
from langchain.retrievers.multi_query import MultiQueryRetriever
# 自动生成多个查询变体
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=base_retriever,
llm=model
)
# 单个查询 → 生成3-5个变体 → 分别检索 → 合并去重
results = multi_query_retriever.get_relevant_documents(
"机器学习的应用"
)
# 可能生成的变体:
# - "机器学习有哪些应用场景?"
# - "ML在实际中如何使用?"
# - "机器学习的实践案例"
5. 元数据过滤:
# 添加丰富的元数据
for doc in documents:
doc.metadata.update({
"source": "manual.pdf",
"category": "技术文档",
"date": "2024-01-01",
"language": "zh",
"importance": "high"
})
# 基于元数据过滤
retriever = vectorstore.as_retriever(
search_kwargs={
"k": 5,
"filter": {
"category": "技术文档",
"language": "zh"
}
}
)
6. 评估和迭代:
from langchain.evaluation import load_evaluator
def evaluate_retrieval(retriever, test_cases):
"""评估检索质量"""
evaluator = load_evaluator("criteria", criteria="relevance")
scores = []
for query, expected_answer in test_cases:
# 检索
docs = retriever.get_relevant_documents(query)
context = "\n".join([d.page_content for d in docs])
# 评估
score = evaluator.evaluate_strings(
prediction=context,
input=query,
reference=expected_answer
)
scores.append(score["score"])
return sum(scores) / len(scores)
# A/B测试不同策略
strategies = {
"基础": base_retriever,
"混合": ensemble_retriever,
"重排序": compression_retriever
}
for name, retriever in strategies.items():
score = evaluate_retrieval(retriever, test_cases)
print(f"{name}策略得分: {score}")
检索优化总结:
| 技术 | 提升点 | 复杂度 | 成本 |
|---|---|---|---|
| 语义分块 | 上下文完整性 | 中 | 低 |
| 混合检索 | 召回率 | 中 | 低 |
| 重排序 | 精确率 | 低 | 中 |
| 多查询 | 召回率 | 低 | 中 |
| 元数据过滤 | 精确率 | 低 | 低 |
Q15: 如何监控和调试LangChain应用?
答案:
1. 使用LangSmith:
import os
# 配置LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# 自动追踪所有调用
result = chain.invoke(input)
# 在LangSmith平台查看完整调用链
2. 自定义回调:
from langchain.callbacks.base import BaseCallbackHandler
class DetailedCallbackHandler(BaseCallbackHandler):
"""详细的回调处理器"""
def on_llm_start(self, serialized, prompts, **kwargs):
"""LLM开始"""
print(f"[LLM Start] Prompts: {prompts[0][:100]}...")
def on_llm_end(self, response, **kwargs):
"""LLM结束"""
print(f"[LLM End] Response: {response.generations[0][0].text[:100]}...")
def on_llm_error(self, error, **kwargs):
"""LLM错误"""
logger.error(f"[LLM Error] {error}")
def on_chain_start(self, serialized, inputs, **kwargs):
"""Chain开始"""
print(f"[Chain Start] Inputs: {inputs}")
def on_chain_end(self, outputs, **kwargs):
"""Chain结束"""
print(f"[Chain End] Outputs: {outputs}")
def on_tool_start(self, serialized, input_str, **kwargs):
"""Tool开始"""
print(f"[Tool Start] {serialized['name']}: {input_str}")
def on_tool_end(self, output, **kwargs):
"""Tool结束"""
print(f"[Tool End] Output: {output}")
# 使用
chain.invoke(input, callbacks=[DetailedCallbackHandler()])
3. 性能监控:
import time
from langchain.callbacks import get_openai_callback
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"total_calls": 0,
"total_tokens": 0,
"total_cost": 0.0,
"total_time": 0.0,
"errors": 0
}
def track_call(self, func):
"""装饰器:追踪调用"""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
with get_openai_callback() as cb:
result = func(*args, **kwargs)
# 更新指标
self.metrics["total_calls"] += 1
self.metrics["total_tokens"] += cb.total_tokens
self.metrics["total_cost"] += cb.total_cost
self.metrics["total_time"] += time.time() - start_time
return result
except Exception as e:
self.metrics["errors"] += 1
raise e
return wrapper
def get_report(self):
"""生成报告"""
return {
"调用次数": self.metrics["total_calls"],
"总Token数": self.metrics["total_tokens"],
"总成本": f"${self.metrics['total_cost']:.4f}",
"平均延迟": f"{self.metrics['total_time'] / max(self.metrics['total_calls'], 1):.2f}s",
"错误率": f"{self.metrics['errors'] / max(self.metrics['total_calls'], 1) * 100:.2f}%"
}
# 使用
monitor = PerformanceMonitor()
@monitor.track_call
def process_query(query):
return chain.invoke(query)
# 查看报告
print(monitor.get_report())
4. 日志记录:
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('langchain.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 记录关键信息
def logged_invoke(chain, input_data):
logger.info(f"Input: {input_data}")
try:
result = chain.invoke(input_data)
logger.info(f"Output: {result}")
return result
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
raise
5. 调试技巧:
# 1. 启用详细输出
chain = LLMChain(llm=model, prompt=prompt, verbose=True)
# 2. 检查中间步骤
result = agent_executor.invoke(
{"input": "问题"},
return_intermediate_steps=True
)
for step in result["intermediate_steps"]:
print(f"Action: {step[0]}")
print(f"Observation: {step[1]}")
# 3. 单元测试
def test_chain():
test_input = {"question": "测试问题"}
result = chain.invoke(test_input)
assert result is not None
assert len(result["text"]) > 0
# 4. 模拟测试
from unittest.mock import Mock
mock_llm = Mock()
mock_llm.invoke.return_value = "模拟响应"
test_chain = LLMChain(llm=mock_llm, prompt=prompt)
监控最佳实践:
- 生产环境必须启用LangSmith
- 设置告警阈值(延迟、成本、错误率)
- 定期分析日志,发现优化点
- A/B测试不同策略
- 建立性能基准,持续改进
总结
LangChain作为LLM应用开发的主流框架,提供了完整的工具链和最佳实践。通过本文档的学习,你应该掌握:
核心能力:
- ✅ 理解LangChain的架构和核心组件
- ✅ 掌握Models、Prompts、Memory、Chains、Agents的使用
- ✅ 实现RAG检索增强生成系统
- ✅ 构建生产级的AI应用
- ✅ 优化性能和控制成本
实战技能:
- 智能问答系统开发
- 文档分析和处理
- 代码辅助工具
- 工作流自动化
- 多Agent协作系统
最佳实践:
- 模块化设计,组件复用
- 提示工程优化
- 错误处理和容错
- 性能监控和调优
- 成本控制策略
持续学习:
- 关注LangChain官方文档更新
- 参与社区讨论和贡献
- 实践不同场景的应用
- 探索新的模型和工具
- 分享经验和最佳实践
LangChain生态系统在快速发展,新功能和工具不断涌现。保持学习和实践,才能充分发挥LLM的潜力,构建真正有价值的AI应用。
参考资源:
- 官方文档:https://python.langchain.com/
- GitHub仓库:https://github.com/langchain-ai/langchain
- LangSmith平台:https://smith.langchain.com/
- 社区Discord:https://discord.gg/langchain
- 博客和教程:https://blog.langchain.dev/
文档版本:v1.0 最后更新:2024年12月