51. LangChain技术指南

1. LangChain基础概念

2. 核心架构与组件

3. Models模型层

4. Prompts提示工程

5. Memory记忆系统

6. Chains链式调用

7. Agents智能代理

8. Data Connection数据连接

9. RAG检索增强生成

10. LangChain Expression Language (LCEL)

11. 实战应用场景

12. 性能优化与调试

13. 生产部署

14. 高级特性

15. 常见问题与解决方案

16. 面试题精选


1. LangChain基础概念

1.1 LangChain简介

LangChain是一个用于开发由大语言模型(LLM)驱动的应用程序的开源框架,由Harrison Chase于2022年10月创建。它提供了一套标准化的接口和工具,使开发者能够轻松构建复杂的AI应用。

核心定位

  • 应用开发框架:不是模型本身,而是连接和编排LLM的工具
  • 抽象层:统一不同LLM提供商的API接口
  • 组件化设计:提供可复用的模块化组件
  • 端到端解决方案:从数据处理到模型调用的完整链路

发展历程

时间里程碑说明
2022.10项目启动Harrison Chase创建LangChain
2023.01快速增长GitHub Star突破10K
2023.04融资A轮获得1000万美元投资
2023.08LangSmith发布推出调试和监控平台
2024.01LCEL成熟Expression Language成为主流

技术栈支持

  • Python版本:最成熟,功能最全
  • JavaScript/TypeScript版本:适合前端和Node.js应用
  • 其他语言:社区支持Go、Java等版本

1.2 核心特性与优势

1. 模型无关性(Model Agnostic)

LangChain支持多种LLM提供商,开发者可以轻松切换:

提供商类型支持的模型特点
OpenAIGPT-4, GPT-3.5性能强大,API稳定
AnthropicClaude 3长上下文,安全性高
GoogleGemini, PaLM多模态能力强
开源模型LLaMA, Mistral可本地部署,成本低
国内模型文心一言、通义千问中文理解好

2. 组件化架构

graph TB subgraph "LangChain核心组件" A["Models
模型层"] --> B["Prompts
提示工程"] B --> C["Chains
链式调用"] C --> D["Agents
智能代理"] E["Memory
记忆系统"] --> C F["Data Connection
数据连接"] --> C G["Callbacks
回调机制"] --> C end style A fill:#e1f5ff style B fill:#fff4e1 style C fill:#e8f5e9 style D fill:#fce4ec style E fill:#f3e5f5 style F fill:#e0f2f1 style G fill:#fff9c4

3. 数据增强能力

  • RAG(检索增强生成):结合外部知识库提升准确性
  • 向量数据库集成:支持Pinecone、Chroma、FAISS等
  • 文档处理:自动加载、分割、索引各类文档

4. 可观测性

  • LangSmith:专业的调试和监控平台
  • Tracing:完整的调用链追踪
  • Metrics:性能指标收集和分析

5. 生产就绪

  • 错误处理:完善的异常捕获和重试机制
  • 流式输出:支持实时响应
  • 缓存机制:减少重复调用成本
  • 并发控制:高效处理大量请求

1.3 应用场景

1. 智能问答系统

  • 企业知识库问答:基于内部文档的智能检索
  • 客户服务机器人:自动回答常见问题
  • 技术支持助手:代码问题诊断和解决

2. 文档处理

  • 文档总结:自动提取关键信息
  • 内容生成:根据模板生成报告、邮件
  • 翻译服务:多语言文档翻译

3. 数据分析

  • SQL生成:自然语言转SQL查询
  • 数据可视化:自动生成图表和报表
  • 商业智能:数据洞察和趋势分析

4. 代码辅助

  • 代码生成:根据需求生成代码
  • 代码审查:自动发现潜在问题
  • 文档生成:自动生成API文档

5. 工作流自动化

  • 邮件处理:自动分类和回复
  • 会议纪要:自动总结会议内容
  • 任务管理:智能任务分配和跟踪

1.4 生态系统

核心项目

graph LR subgraph "LangChain生态" A["LangChain
核心框架"] --> B["LangSmith
调试平台"] A --> C["LangServe
部署工具"] A --> D["LangGraph
图状态管理"] E["LangChain Hub
提示模板库"] --> A F["LangChain Templates
应用模板"] --> A end style A fill:#4CAF50 style B fill:#2196F3 style C fill:#FF9800 style D fill:#9C27B0 style E fill:#00BCD4 style F fill:#E91E63

1. LangChain Core

  • 核心抽象和接口定义
  • 基础组件实现
  • 最小依赖,高性能

2. LangSmith

  • 调试工具:可视化调用链
  • 性能监控:延迟、成本追踪
  • 数据集管理:测试用例管理
  • 协作功能:团队共享和评审

3. LangServe

  • 快速部署:一键将Chain转为REST API
  • 自动文档:生成OpenAPI规范
  • 流式支持:Server-Sent Events
  • 客户端SDK:自动生成调用代码

4. LangGraph

  • 状态图管理:复杂工作流编排
  • 循环控制:支持条件分支和循环
  • 持久化:状态保存和恢复
  • 人机协作:支持人工介入

5. LangChain Hub

  • 提示模板库:社区共享的优质提示
  • 版本管理:提示模板版本控制
  • 评分系统:社区评价和推荐

集成生态

类别集成工具用途
向量数据库Pinecone, Chroma, Weaviate, Milvus向量存储和检索
文档加载PyPDF, Docx, Markdown, Web Scraper多格式文档处理
模型提供商OpenAI, Anthropic, Cohere, HuggingFaceLLM调用
工具集成Google Search, Wikipedia, Wolfram Alpha外部工具调用
监控平台LangSmith, Weights & Biases, MLflow性能监控

社区资源

  • GitHub:50K+ Stars,活跃的开源社区
  • Discord:技术交流和问题解答
  • 文档:详细的官方文档和教程
  • 博客:最佳实践和案例分享

1.5 小白入门指南

什么是LangChain?用人话说

简单理解:LangChain就像是一个"乐高积木盒",里面有各种积木块(组件),你可以把它们拼起来,做出一个能和AI对话、能回答问题、能帮你处理文档的智能助手。

打个比方

  • 如果你想做一个能回答公司内部问题的机器人
  • 传统方式:你需要自己写很多代码,处理各种细节
  • 用LangChain:就像搭积木一样,把"文档读取"、“AI模型”、“记忆"这些积木拼起来就行了

核心概念通俗讲解

1. Models(模型)- 就是AI的大脑

# 最简单的例子
from langchain_openai import ChatOpenAI

# 创建一个AI大脑
model = ChatOpenAI(model="gpt-3.5-turbo")

# 让AI回答问题
response = model.invoke("什么是Python?")
print(response.content)

理解要点

  • ChatOpenAI:这是一个类,代表OpenAI的AI模型
  • model="gpt-3.5-turbo":选择哪个AI模型(就像选择用哪个版本的大脑)
  • invoke():这是一个方法,意思是"调用”,就是让AI工作
  • response.content:AI的回答内容

2. Prompts(提示)- 就是你怎么问AI

from langchain.prompts import PromptTemplate

# 创建一个提示模板(就像填空题)
template = "你是一个{role},请回答:{question}"

prompt = PromptTemplate(
    input_variables=["role", "question"],  # 需要填的空
    template=template
)

# 填空
formatted = prompt.format(
    role="Python老师",
    question="什么是变量?"
)

print(formatted)
# 输出:你是一个Python老师,请回答:什么是变量?

理解要点

  • PromptTemplate:提示模板类,用来创建可重复使用的问题格式
  • input_variables:需要填的空(变量)
  • format():填空的方法

3. Chains(链)- 把多个步骤串起来

# ✅ 新版本(LangChain 1.2.0+)使用 LCEL
from langchain_core.output_parsers import StrOutputParser

# 创建一个链(使用 | 操作符连接)
chain = prompt | model | StrOutputParser()

# 一次性完成:填空 → 问AI → 得到答案
result = chain.invoke({
    "role": "数学老师",
    "question": "1+1等于几?"
})

print(result)  # 直接输出字符串

理解要点

  • LCEL(LangChain Expression Language):新版本的链式语法
  • | 操作符:把组件连接起来(像管道一样)
  • StrOutputParser():把 AI 的回答转成纯文本
  • 好处:代码更简洁,性能更好

4. Memory(记忆)- 让AI记住之前说过的话

# ✅ 新版本(LangChain 1.2.0+)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

# 创建记忆存储
store = {}

def get_session_history(session_id: str):
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

# 创建带记忆的对话链
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个友好的助手"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])
model = ChatOpenAI(model="gpt-4.1-mini")
chain = prompt | model

# 包装成带记忆的链
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history"
)

# 使用(会自动记住对话)
response1 = chain_with_history.invoke(
    {"input": "我叫张三"},
    config={"configurable": {"session_id": "user123"}}
)

response2 = chain_with_history.invoke(
    {"input": "我叫什么名字?"},
    config={"configurable": {"session_id": "user123"}}
)
# AI会回答:你叫张三

理解要点

  • RunnableWithMessageHistory:新版本的记忆管理
  • MessagesPlaceholder:在提示中预留对话历史的位置
  • session_id:用来区分不同用户的对话
  • AI能记住之前的对话内容

5. Agents(代理)- 会自己思考和使用工具的AI

# ✅ 新版本(LangChain 1.2.0+)
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool
from langchain import hub

# 定义工具(使用装饰器)
@tool
def calculator(expression: str) -> str:
    """计算数学表达式"""
    try:
        return str(eval(expression))
    except:
        return "计算错误"

tools = [calculator]

# 获取提示模板
prompt = hub.pull("hwchase17/openai-tools-agent")

# 创建 Agent
agent = create_tool_calling_agent(model, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# 使用
result = agent_executor.invoke({
    "input": "123 * 456 等于多少?"
})

print(result["output"])
# Agent会:
# 1. 思考:这是数学问题,需要用计算器
# 2. 使用计算器工具
# 3. 得到结果:56088
# 4. 回答用户

理解要点

  • @tool 装饰器:定义工具的新方式(更简洁)
  • create_tool_calling_agent:新版本的 Agent 创建方法
  • hub.pull():从 LangChain Hub 获取提示模板
  • AgentExecutor:执行器,运行 Agent

6. RAG(检索增强生成)- 让AI基于你的文档回答

# ✅ 新版本(LangChain 1.2.0+)
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

# 步骤1:加载文档
loader = TextLoader("company_handbook.txt")
documents = loader.load()

# 步骤2:切分文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(documents)

# 步骤3:向量化并存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
retriever = vectorstore.as_retriever()

# 步骤4:创建 RAG 链(使用 LCEL)
template = """基于以下上下文回答问题:

上下文:{context}

问题:{question}

答案:"""

prompt = ChatPromptTemplate.from_template(template)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 步骤5:提问
answer = rag_chain.invoke("公司的年假政策是什么?")
print(answer)

# AI会:
# 1. 在文档中搜索相关内容
# 2. 基于找到的内容回答
# 3. 不会瞎编,因为有依据

理解要点

  • retriever:检索器,搜索相关文档
  • RunnablePassthrough():传递输入到下一步
  • LCEL 语法:用 | 和字典组合组件
  • 更灵活、性能更好

常用类和方法速查表关

类名用途常用方法
ChatOpenAIOpenAI聊天模型invoke(), stream(), batch()
OpenAIOpenAI文本模型invoke(), predict()
OpenAIEmbeddings文本向量化embed_query(), embed_documents()

提示相关

类名用途常用方法
PromptTemplate提示模板format(), format_messages()
ChatPromptTemplate聊天提示模板from_messages(), format_messages()
FewShotPromptTemplate少样本提示format()

记忆相关

类名用途常用方法
ConversationBufferMemory完整对话记忆save_context(), load_memory_variables()
ConversationSummaryMemory总结式记忆save_context(), clear()
VectorStoreRetrieverMemory向量检索记忆save_context(), load_memory_variables()

链相关

类名用途常用方法状态
LCEL (推荐)新版链式语法| 操作符, invoke()✅ 最新
RunnablePassthrough传递数据invoke()✅ 最新
RunnableParallel并行执行invoke()✅ 最新
LLMChain基础链invoke(), run()⚠️ 已弃用
ConversationChain对话链predict()⚠️ 已弃用
RetrievalQA检索问答链invoke()⚠️ 已弃用

数据处理相关

类名用途常用方法
TextLoader文本加载load()
PyPDFLoaderPDF加载load(), load_and_split()
RecursiveCharacterTextSplitter文本分割split_text(), split_documents()
Chroma向量数据库from_documents(), similarity_search()

最简单的完整示例

# 1. 安装(在命令行运行)
# pip install langchain langchain-openai

# 2. 设置API密钥(在代码开头)
import os
os.environ["OPENAI_API_KEY"] = "你的API密钥"

# 3. 导入需要的类
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# 4. 创建组件
model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("用简单的话解释:{topic}")
output_parser = StrOutputParser()

# 5. 组合成链(使用 | 操作符)
chain = prompt | model | output_parser

# 6. 使用
result = chain.invoke({"topic": "什么是机器学习"})
print(result)

这个例子做了什么?

  1. 创建了一个AI模型
  2. 创建了一个提示模板
  3. 创建了一个输出解析器(把AI的回答变成纯文本)
  4. |把它们连起来(就像水管一样)
  5. 输入问题,得到答案

常见错误和解决方法

错误1:API密钥错误

# ❌ 错误
model = ChatOpenAI()  # 没有设置API密钥

# ✅ 正确
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
model = ChatOpenAI()

错误2:忘记调用方法

# ❌ 错误
result = chain  # 忘记调用

# ✅ 正确
result = chain.invoke({"topic": "Python"})

错误3:变量名不匹配

# ❌ 错误
prompt = PromptTemplate(
    input_variables=["question"],  # 定义的是question
    template="解释{topic}"  # 但模板用的是topic
)

# ✅ 正确
prompt = PromptTemplate(
    input_variables=["topic"],  # 名字要一致
    template="解释{topic}"
)

学习路径建议

第1周:基础概念

  • 学会使用ChatOpenAI调用AI
  • 学会使用PromptTemplate创建提示
  • 学会使用LLMChain组合它们

第2周:记忆和对话

  • 学会使用ConversationChain
  • 理解不同的Memory类型
  • 做一个简单的聊天机器人

第3周:文档处理

  • 学会加载和分割文档
  • 学会使用向量数据库
  • 做一个文档问答系统

第4周:高级功能

  • 学会使用Agent
  • 学会自定义工具
  • 做一个能使用工具的AI助手

实用技巧

技巧1:调试时开启verbose

chain = LLMChain(llm=model, prompt=prompt, verbose=True)
# verbose=True 会打印详细信息,方便调试

技巧2:使用流式输出

for chunk in model.stream("写一首诗"):
    print(chunk.content, end="", flush=True)
# 实时显示AI的输出,不用等全部生成完

技巧3:控制成本

from langchain.callbacks import get_openai_callback

with get_openai_callback() as cb:
    result = chain.invoke({"topic": "AI"})
    print(f"花费: ${cb.total_cost}")
    print(f"Token数: {cb.total_tokens}")

技巧4:错误重试

model = ChatOpenAI(
    max_retries=3,  # 失败自动重试3次
    timeout=30  # 30秒超时
)

2. 核心架构与组件

2.1 整体架构设计

LangChain采用分层架构设计,从底层到上层依次为:

graph TB subgraph "应用层" A1["智能问答"] A2["文档分析"] A3["代码助手"] A4["工作流自动化"] end subgraph "编排层" B1["Chains
链式调用"] B2["Agents
智能代理"] B3["LCEL
表达式语言"] end subgraph "组件层" C1["Models
模型"] C2["Prompts
提示"] C3["Memory
记忆"] C4["Tools
工具"] C5["Retrievers
检索器"] end subgraph "基础设施层" D1["Vector Stores
向量库"] D2["Document Loaders
文档加载"] D3["Text Splitters
文本分割"] D4["Callbacks
回调"] end A1 --> B1 A2 --> B2 A3 --> B3 A4 --> B1 B1 --> C1 B1 --> C2 B2 --> C3 B2 --> C4 B3 --> C5 C1 --> D4 C3 --> D1 C5 --> D2 C5 --> D3 style A1 fill:#e3f2fd style A2 fill:#e3f2fd style A3 fill:#e3f2fd style A4 fill:#e3f2fd style B1 fill:#fff3e0 style B2 fill:#fff3e0 style B3 fill:#fff3e0 style C1 fill:#e8f5e9 style C2 fill:#e8f5e9 style C3 fill:#e8f5e9 style C4 fill:#e8f5e9 style C5 fill:#e8f5e9 style D1 fill:#f3e5f5 style D2 fill:#f3e5f5 style D3 fill:#f3e5f5 style D4 fill:#f3e5f5

架构特点

  1. 松耦合设计:各组件独立,可单独使用或组合
  2. 接口标准化:统一的抽象接口,易于扩展
  3. 可组合性:通过LCEL灵活组合各种组件
  4. 可观测性:内置回调机制,全链路追踪

2.2 核心组件概览

小白理解:LangChain就像一个工具箱,里面有6大类工具,每类工具负责不同的事情。

1. Models(模型层)- AI的大脑

通俗解释:这是真正干活的AI,就像你请的一个聪明助手。

负责与各种LLM交互的抽象层:

组件功能典型用途小白理解
LLMs文本输入输出模型文本生成、补全给它一句话,它接着往下写
Chat Models对话式模型多轮对话、角色扮演能记住上下文的聊天机器人
Embeddings文本向量化语义搜索、相似度计算把文字变成数字,让电脑能理解

代码示例

# LLM - 文本补全
from langchain_openai import OpenAI
llm = OpenAI()
result = llm.invoke("从前有座山,")  # AI会接着写故事

# Chat Model - 对话
from langchain_openai import ChatOpenAI
chat = ChatOpenAI()
result = chat.invoke("你好")  # AI会回复你

# Embeddings - 向量化
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector = embeddings.embed_query("机器学习")  # 变成一串数字

2. Prompts(提示工程)- 怎么问AI

通俗解释:就像你问问题的方式,问得好,AI回答得就好。

管理和优化模型输入:

  • PromptTemplate:参数化提示模板(就像填空题)
  • ChatPromptTemplate:对话提示模板(多轮对话的问题格式)
  • FewShotPromptTemplate:少样本学习模板(给AI看几个例子)
  • OutputParser:结构化输出解析(让AI按格式回答)

代码示例

from langchain.prompts import PromptTemplate

# 创建模板(就像填空题)
template = PromptTemplate(
    input_variables=["name", "job"],
    template="你好{name},你是做{job}的吗?"
)

# 填空
question = template.format(name="张三", job="程序员")
# 结果:你好张三,你是做程序员的吗?

3. Memory(记忆系统)- AI的笔记本

通俗解释:让AI记住之前说过的话,不会一转头就忘。

维护对话上下文和历史:

  • ConversationBufferMemory:完整对话历史(全记住)
  • ConversationSummaryMemory:总结式记忆(记重点)
  • VectorStoreMemory:向量化记忆检索(智能搜索记忆)
  • EntityMemory:实体关系记忆(记住人名、地名等)

代码示例

from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory

# 创建记忆存储
message_history = ChatMessageHistory()

# 保存对话
message_history.add_user_message("我叫张三")
message_history.add_ai_message("你好张三,很高兴认识你!")

# 查看记忆
print("对话历史:")
for msg in message_history.messages:
    print(f"{msg.type}: {msg.content}")

4. Chains(链式调用)- 把步骤串起来

通俗解释:就像流水线,一个步骤接一个步骤自动完成。

组合多个组件形成工作流:

  • LLMChain:基础链,模型+提示(最简单的组合)
  • SequentialChain:顺序执行多个链(一步步来)
  • RouterChain:条件路由选择(根据情况选择不同路径)
  • TransformChain:数据转换链(处理数据)

代码示例

from langchain_core.output_parsers import StrOutputParser

# 1、创建大模型实例
chat_model = ChatOpenAI(model="gpt-4o-mini")
# 2、原始字符串模板
template = "桌上有{number}个苹果,四个桃子和 3 本书,一共有几个水果?"
prompt = PromptTemplate.from_template(template)
# 3、创建链式调用(使用LCEL语法,替代已弃用的LLMChain)
parser = StrOutputParser()
chain = prompt | chat_model | parser
# 4、调用chain,返回结果
result = chain.invoke({"number": 2})
print(result)

5. Agents(智能代理)- 会思考的AI

通俗解释:不只是回答问题,还会自己决定要不要用工具,怎么用。

具有决策能力的自主系统:

  • ReAct Agent:推理-行动循环(边思考边行动)
  • OpenAI Functions Agent:函数调用代理(精确控制)
  • Conversational Agent:对话式代理(聊天机器人)
  • Plan-and-Execute Agent:规划执行代理(先计划再执行)

代码示例

from langchain.agents import create_agent
from langchain_core.tools import tool

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气"""
    return f"{city}今天晴,气温 25 度"

agent = create_agent(
    "gpt-4o-mini",
    tools=[get_weather],
    system_prompt="你是一个有帮助的助手",
)

result = agent.invoke(
    {"messages": [{"role": "user", "content": "北京天气怎么样?"}]}
)
print(result["messages"][-1].content)

6. Data Connection(数据连接)- 处理文档

通俗解释:读取文件、切分文档、搜索内容的工具集。

处理外部数据源:

  • Document Loaders:加载各类文档(读文件)
  • Text Splitters:智能文本分割(切成小块)
  • Vector Stores:向量数据库集成(存储和搜索)
  • Retrievers:检索策略实现(找相关内容)

代码示例

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 1. 加载文档
loader = TextLoader("document.txt")
docs = loader.load()

# 2. 切分文档
splitter = RecursiveCharacterTextSplitter(chunk_size=1000)
chunks = splitter.split_documents(docs)

# 3. 向量化存储
from langchain_community.vectorstores import Chroma
vectorstore = Chroma.from_documents(chunks, embeddings)

# 4. 搜索
results = vectorstore.similarity_search("机器学习")

组件关系图解

用户问题
   ↓
Prompts(格式化问题)
   ↓
Models(AI思考)
   ↓
Memory(记住对话)
   ↓
Chains(组合步骤)
   ↓
Agents(决策和使用工具)
   ↓
Data Connection(搜索文档)
   ↓
返回答案

2.3 数据流与处理流程

典型RAG应用的数据流

sequenceDiagram participant U as 用户 participant A as 应用层 participant R as Retriever participant V as Vector Store participant M as LLM participant P as Prompt U->>A: 提交问题 A->>R: 触发检索 R->>V: 向量相似度搜索 V-->>R: 返回相关文档 R-->>A: 返回检索结果 A->>P: 构建提示(问题+文档) P->>M: 发送完整提示 M-->>P: 生成回答 P-->>A: 解析输出 A-->>U: 返回最终答案

Agent执行流程

graph TB Start["接收用户输入"] --> Think["Agent思考
分析任务"] Think --> Decide{"需要使用
工具吗?"} Decide -->|是| SelectTool["选择合适工具"] SelectTool --> ExecuteTool["执行工具"] ExecuteTool --> Observe["观察结果"] Observe --> Think Decide -->|否| Generate["生成最终答案"] Generate --> End["返回结果"] style Start fill:#e1f5ff style Think fill:#fff4e1 style Decide fill:#ffe1e1 style SelectTool fill:#e8f5e9 style ExecuteTool fill:#f3e5f5 style Observe fill:#fff9c4 style Generate fill:#e0f2f1 style End fill:#e1f5ff

2.4 模块化设计原则

1. 单一职责原则

每个组件只负责一个明确的功能:

  • Document Loader:只负责加载文档
  • Text Splitter:只负责分割文本
  • Embeddings:只负责向量化

2. 接口抽象

所有组件都实现标准接口:

# 伪代码示例
class BaseModel:
    def invoke(self, input): pass
    def batch(self, inputs): pass
    def stream(self, input): pass
    
class BaseRetriever:
    def get_relevant_documents(self, query): pass
    def aget_relevant_documents(self, query): pass  # 异步版本

3. 可组合性

通过LCEL实现灵活组合:

# 组合示例(伪代码)
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | output_parser
)

4. 可扩展性

  • 自定义组件:继承基类实现自定义逻辑
  • 插件机制:通过回调扩展功能
  • 中间件模式:在调用链中插入处理逻辑

5. 可测试性

  • Mock支持:可以模拟各个组件
  • 单元测试:每个组件可独立测试
  • 集成测试:完整链路测试

组件交互模式

模式说明适用场景
Pipeline顺序执行数据处理流程
Router条件分支多策略选择
Parallel并行执行多路检索
Fallback失败重试容错处理
Map-Reduce分布处理大规模数据

3. Models模型层

3.1 LLM大语言模型

LLM是最基础的模型类型,接收文本输入,返回文本输出。

核心特点

  • 无状态:每次调用独立,不保留上下文
  • 文本补全:根据输入生成后续文本
  • 单轮交互:适合一次性任务

支持的模型提供商

提供商模型示例特点成本
OpenAIGPT-4, GPT-3.5-turbo性能强,生态好中高
AnthropicClaude 3 Opus/Sonnet长上下文,安全中高
GoogleGemini Pro多模态
CohereCommand企业级
HuggingFaceLLaMA, Mistral开源,可本地部署
国内文心一言、通义千问中文优化低中

基本使用

from langchain_openai import OpenAI

# 初始化模型
llm = OpenAI(
    model_name="gpt-3.5-turbo-instruct",
    temperature=0.7,  # 控制随机性
    max_tokens=256,   # 最大输出长度
)

# 单次调用
response = llm.invoke("解释什么是量子计算")

# 批量调用
responses = llm.batch([
    "什么是机器学习?",
    "什么是深度学习?"
])

# 流式输出
for chunk in llm.stream("写一首关于春天的诗"):
    print(chunk, end="", flush=True)

重要参数配置

参数说明推荐值影响
temperature随机性控制0.0-1.0越高越随机
max_tokens最大输出长度根据需求影响成本
top_p核采样0.9-1.0控制多样性
frequency_penalty重复惩罚0.0-2.0减少重复
presence_penalty主题惩罚0.0-2.0鼓励新主题

3.2 Chat Models聊天模型

Chat Models专为对话场景设计,支持多轮对话和角色设定。

消息类型

graph LR A["SystemMessage
系统消息"] --> B["定义AI角色和行为"] C["HumanMessage
用户消息"] --> D["用户输入"] E["AIMessage
AI消息"] --> F["模型回复"] G["FunctionMessage
函数消息"] --> H["工具调用结果"] style A fill:#e3f2fd style C fill:#fff3e0 style E fill:#e8f5e9 style G fill:#f3e5f5

基本使用

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

# 初始化聊天模型
chat = ChatOpenAI(
    model="gpt-4o-mini",  # 使用更经济的模型
    temperature=0.7
)

# 构建消息列表
messages = [
    SystemMessage(content="你是一个专业的Python编程助手"),
    HumanMessage(content="如何实现单例模式?"),
]

# 调用模型
response = chat.invoke(messages)
print(response.content)

# 多轮对话
messages.append(AIMessage(content=response.content))
messages.append(HumanMessage(content="能给个代码示例吗?"))
response = chat.invoke(messages)
print(response.content)

与LLM的对比

特性LLMChat Models
输入格式纯文本字符串消息列表
角色支持System/Human/AI
多轮对话需手动拼接原生支持
函数调用不支持支持
适用场景文本生成、补全对话、助手

3.3 Embeddings嵌入模型

Embeddings将文本转换为向量表示,用于语义搜索和相似度计算。

核心概念

  • 向量维度:通常为768、1536等
  • 语义相似度:向量距离反映文本相似度
  • 批量处理:支持批量向量化提升效率

常用Embeddings模型

模型维度特点适用场景
OpenAI text-embedding-31536/3072性能好,成本低通用场景
Cohere embed-v31024多语言支持国际化应用
HuggingFace BAAI/bge768开源,中文好本地部署
Sentence Transformers384/768轻量级资源受限

基本使用

from langchain_openai import OpenAIEmbeddings

# 初始化
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small"
)

# 单个文本向量化
vector = embeddings.embed_query("机器学习是什么?")
print(f"向量维度: {len(vector)}")

# 批量向量化
texts = ["文本1", "文本2", "文本3"]
vectors = embeddings.embed_documents(texts)

# 计算相似度
from numpy import dot
from numpy.linalg import norm

def cosine_similarity(v1, v2):
    return dot(v1, v2) / (norm(v1) * norm(v2))

query_vec = embeddings.embed_query("深度学习")
doc_vec = embeddings.embed_query("神经网络")
similarity = cosine_similarity(query_vec, doc_vec)
print(f"相似度: {similarity}")

性能优化

  1. 批量处理:一次处理多个文本
  2. 缓存机制:避免重复向量化
  3. 异步调用:提高并发性能
  4. 本地模型:减少API调用成本

3.4 模型集成与配置

统一接口设计

所有模型都实现相同的核心方法:

# 标准接口
class BaseLanguageModel:
    def invoke(self, input):
        """同步调用"""
        pass
    
    async def ainvoke(self, input):
        """异步调用"""
        pass
    
    def batch(self, inputs):
        """批量调用"""
        pass
    
    def stream(self, input):
        """流式输出"""
        pass

模型切换

# 方式1:直接替换
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# OpenAI模型
model = ChatOpenAI(model="gpt-4")

# 切换到Anthropic(接口完全相同)
model = ChatAnthropic(model="claude-3-opus")

# 方式2:配置化
def get_model(provider: str):
    if provider == "openai":
        return ChatOpenAI(model="gpt-4")
    elif provider == "anthropic":
        return ChatAnthropic(model="claude-3-opus")
    # ... 其他提供商

环境变量配置

# OpenAI
export OPENAI_API_KEY="sk-..."
export OPENAI_API_BASE="https://api.openai.com/v1"

# Anthropic
export ANTHROPIC_API_KEY="sk-ant-..."

# HuggingFace
export HUGGINGFACEHUB_API_TOKEN="hf_..."

高级配置

from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="gpt-4",
    temperature=0.7,
    max_tokens=2000,
    timeout=60,  # 超时设置
    max_retries=3,  # 重试次数
    request_timeout=30,  # 请求超时
    streaming=True,  # 启用流式
    verbose=True,  # 调试模式
)

成本控制策略

策略实现方式效果
模型降级优先用便宜模型成本降低50-90%
缓存相同输入返回缓存减少重复调用
批量处理合并多个请求提高吞吐量
Token限制设置max_tokens控制单次成本
本地模型使用开源模型零API成本

错误处理

from langchain_community.callbacks import get_openai_callback
from tenacity import retry, stop_after_attempt, wait_exponential

# 成本追踪
with get_openai_callback() as cb:
    response = model.invoke("你好")
    print(f"Tokens: {cb.total_tokens}")
    print(f"Cost: ${cb.total_cost}")

# 自动重试
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def call_model_with_retry(prompt):
    return model.invoke(prompt)

result = call_model_with_retry("你好")
print(result.content)

4. Prompts提示工程

4.1 Prompt Templates提示模板

PromptTemplate是参数化的提示字符串,支持动态变量替换。

基础用法


from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser,JsonOutputParser

# 创建提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是世界级的技术文档编写者。"),
("user", "{input}")
])
# 使用输出解析器
# output_parser = StrOutputParser()
output_parser = JsonOutputParser()
# 将其添加到上一个链中
# chain = prompt | llm
chain = prompt | llm | output_parser
# 调用它并提出同样的问题。答案是一个字符串,而不是ChatMessage
# chain.invoke({"input": "LangChain是什么?"})
chain.invoke({"input": "LangChain是什么? 用JSON格式回复,问题用question,回答用answer"})

ChatPromptTemplate(对话模板)

from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 初始化模型
llm = ChatOpenAI(model="gpt-4o-mini")

# 系统消息模板
system_template = "你是一个{expertise}专家,擅长{skill}。"
system_prompt = SystemMessagePromptTemplate.from_template(system_template)

# 用户消息模板
human_template = "{user_input}"
human_prompt = HumanMessagePromptTemplate.from_template(human_template)

# 组合
chat_prompt = ChatPromptTemplate.from_messages([
    system_prompt,
    human_prompt
])

# 创建输出解析器
output_parser = StrOutputParser()

# 创建链
chain = chat_prompt | llm | output_parser

# 使用
result = chain.invoke({
    "expertise": "数据分析",
    "skill": "Python和SQL",
    "user_input": "如何分析用户行为数据?"
})

print(result)

模板类型对比

模板类型适用场景特点
PromptTemplate简单文本生成单一字符串
ChatPromptTemplate对话应用多消息类型
FewShotPromptTemplate少样本学习包含示例
PipelinePromptTemplate复杂组合模板嵌套

4.2 Few-shot Prompting少样本提示

Few-shot通过示例引导模型输出,提高准确性和一致性。

基本结构

from langchain_core.prompts import FewShotPromptTemplate, PromptTemplate

# 定义示例
examples = [
    {
        "input": "happy",
        "output": "sad"
    },
    {
        "input": "tall",
        "output": "short"
    },
    {
        "input": "hot",
        "output": "cold"
    }
]

# 示例格式化模板
example_template = """
Input: {input}
Output: {output}
"""

example_prompt = PromptTemplate(
    input_variables=["input", "output"],
    template=example_template
)

# Few-shot模板
few_shot_prompt = FewShotPromptTemplate(
    examples=examples,
    example_prompt=example_prompt,
    prefix="给出以下词的反义词:",
    suffix="Input: {word}\nOutput:",
    input_variables=["word"]
)

# 使用
print(few_shot_prompt.format(word="big"))

动态示例选择

from langchain.prompts.example_selector import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

# 创建示例选择器
example_selector = SemanticSimilarityExampleSelector.from_examples(
    examples,
    OpenAIEmbeddings(),
    Chroma,
    k=2  # 选择最相似的2个示例
)

# 使用选择器
few_shot_prompt = FewShotPromptTemplate(
    example_selector=example_selector,
    example_prompt=example_prompt,
    prefix="给出以下词的反义词:",
    suffix="Input: {word}\nOutput:",
    input_variables=["word"]
)

Few-shot策略

策略说明适用场景
固定示例使用预定义示例任务明确
语义相似根据输入选择相似示例多样化输入
长度限制控制示例数量Token限制
难度递进从简单到复杂复杂任务

4.3 Output Parsers输出解析器

OutputParser将模型输出转换为结构化数据

常用解析器类型

graph TB A["Output Parsers"] --> B["PydanticOutputParser
Pydantic模型"] A --> C["StructuredOutputParser
字典结构"] A --> D["CommaSeparatedListOutputParser
逗号分隔列表"] A --> E["DatetimeOutputParser
日期时间"] A --> F["EnumOutputParser
枚举类型"] A --> G["JsonOutputParser
JSON格式"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style D fill:#f3e5f5 style E fill:#fff9c4 style F fill:#e0f2f1 style G fill:#fce4ec

PydanticOutputParser示例

from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field

# 定义数据模型
class Person(BaseModel):
    name: str = Field(description="人物姓名")
    age: int = Field(description="年龄")
    occupation: str = Field(description="职业")

# 创建解析器
parser = PydanticOutputParser(pydantic_object=Person)

# 获取格式说明
format_instructions = parser.get_format_instructions()

# 创建提示
prompt = PromptTemplate(
    template="提取以下文本中的人物信息:\n{text}\n\n{format_instructions}",
    input_variables=["text"],
    partial_variables={"format_instructions": format_instructions}
)

# 使用
text = "张三今年30岁,是一名软件工程师。"
formatted_prompt = prompt.format(text=text)
response = model.invoke(formatted_prompt)
person = parser.parse(response.content)

print(person.name)  # 张三
print(person.age)   # 30

StructuredOutputParser示例

from langchain_core.output_parsers import StructuredOutputParser, ResponseSchema

# 定义响应模式
response_schemas = [
    ResponseSchema(name="answer", description="问题的答案"),
    ResponseSchema(name="confidence", description="置信度(0-100)"),
    ResponseSchema(name="source", description="信息来源")
]

parser = StructuredOutputParser.from_response_schemas(response_schemas)

# 使用
format_instructions = parser.get_format_instructions()
# ... 构建提示并调用模型
result = parser.parse(response.content)
print(result["answer"])
print(result["confidence"])

错误处理

from langchain.output_parsers import OutputFixingParser

# 原始解析器
base_parser = PydanticOutputParser(pydantic_object=Person)

# 包装修复解析器
fixing_parser = OutputFixingParser.from_llm(
    parser=base_parser,
    llm=model
)

# 即使输出格式有问题,也会尝试修复
try:
    result = fixing_parser.parse(malformed_output)
except Exception as e:
    print(f"解析失败: {e}")

4.4 提示优化策略

1. 清晰的指令

# ❌ 不好的提示
"写点关于AI的东西"

# ✅ 好的提示
"""
请写一篇500字的文章,介绍人工智能在医疗领域的应用。
要求:
1. 包含至少3个具体应用场景
2. 使用通俗易懂的语言
3. 结构清晰,分段合理
"""

2. 角色设定

system_message = """
你是一个资深的Python开发工程师,拥有10年以上的开发经验。
你的特点:
- 代码风格遵循PEP 8规范
- 注重代码可读性和可维护性
- 善于使用设计模式解决问题
- 会考虑性能和安全性
"""

3. 上下文提供

template = """
背景信息:
{context}

基于以上背景,请回答:
{question}

要求:
- 答案要基于提供的背景信息
- 如果背景信息不足,请明确指出
- 保持客观和准确
"""

4. 输出格式约束

template = """
请分析以下代码的时间复杂度:

{code}

请按以下格式输出:
1. 时间复杂度:O(?)
2. 空间复杂度:O(?)
3. 分析过程:[详细说明]
4. 优化建议:[如果有]
"""

5. Chain of Thought(思维链)

template = """
问题:{question}

请按以下步骤思考:
1. 理解问题:明确问题要求
2. 分析思路:列出可能的解决方案
3. 选择方案:选择最优方案并说明理由
4. 详细步骤:给出具体实现步骤
5. 最终答案:给出最终结果

让我们一步步思考:
"""

提示优化检查清单

检查项说明重要性
明确目标清楚说明期望输出⭐⭐⭐⭐⭐
提供上下文给出必要背景信息⭐⭐⭐⭐
格式约束指定输出格式⭐⭐⭐⭐
示例引导提供Few-shot示例⭐⭐⭐
角色设定定义AI角色⭐⭐⭐
思维引导使用CoT技巧⭐⭐⭐
约束条件明确限制和要求⭐⭐⭐

A/B测试提示

from langchain.evaluation import load_evaluator

# 定义两个提示版本
prompt_a = PromptTemplate(template="简单提示:{question}")
prompt_b = PromptTemplate(template="详细提示:{question}\n请详细解释。")

# 评估器
evaluator = load_evaluator("criteria", criteria="helpfulness")

# 测试
test_questions = ["什么是机器学习?", "如何学习Python?"]
for question in test_questions:
    response_a = model.invoke(prompt_a.format(question=question))
    response_b = model.invoke(prompt_b.format(question=question))
    
    score_a = evaluator.evaluate_strings(
        prediction=response_a.content,
        input=question
    )
    score_b = evaluator.evaluate_strings(
        prediction=response_b.content,
        input=question
    )
    
    print(f"Prompt A: {score_a['score']}")
    print(f"Prompt B: {score_b['score']}")

5. Memory记忆系统

5.1 记忆类型与机制

Memory使AI能够记住对话历史和上下文,实现连贯的多轮交互。

记忆类型分类

graph TB A["Memory类型"] --> B["短期记忆
Short-term"] A --> C["长期记忆
Long-term"] B --> B1["ConversationBufferMemory
完整历史"] B --> B2["ConversationBufferWindowMemory
滑动窗口"] B --> B3["ConversationTokenBufferMemory
Token限制"] C --> C1["ConversationSummaryMemory
总结式"] C --> C2["VectorStoreMemory
向量检索"] C --> C3["EntityMemory
实体关系"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style B1 fill:#f3e5f5 style B2 fill:#f3e5f5 style B3 fill:#f3e5f5 style C1 fill:#fff9c4 style C2 fill:#fff9c4 style C3 fill:#fff9c4

记忆机制对比

类型存储方式优点缺点适用场景
Buffer完整历史信息完整占用大短对话
Window最近N轮控制大小丢失历史固定长度
Token BufferToken限制精确控制需计算成本敏感
Summary总结压缩节省空间信息损失长对话
Vector Store向量检索智能检索需向量化大量历史

5.2 对话历史管理

ConversationBufferMemory(完整历史)

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

# 创建记忆
memory = ConversationBufferMemory()

# 创建对话链
conversation = ConversationChain(
    llm=model,
    memory=memory,
    verbose=True
)

# 多轮对话
conversation.predict(input="你好,我叫张三")
conversation.predict(input="我的名字是什么?")  # 能记住之前的信息

# 查看历史
print(memory.load_memory_variables({}))

ConversationBufferWindowMemory(滑动窗口)

from langchain.memory import ConversationBufferWindowMemory

# 只保留最近3轮对话
memory = ConversationBufferWindowMemory(k=3)

conversation = ConversationChain(
    llm=model,
    memory=memory
)

# 第4轮对话时,第1轮会被丢弃

ConversationSummaryMemory(总结式)

from langchain.memory import ConversationSummaryMemory

# 自动总结历史对话
memory = ConversationSummaryMemory(llm=model)

conversation = ConversationChain(
    llm=model,
    memory=memory
)

# 对话会被自动总结,节省Token
conversation.predict(input="介绍一下机器学习")
conversation.predict(input="深度学习呢?")

# 查看总结
print(memory.buffer)  # 显示总结后的内容

自定义记忆变量

from langchain.memory import ConversationBufferMemory

memory = ConversationBufferMemory(
    memory_key="chat_history",  # 自定义键名
    return_messages=True,  # 返回消息对象而非字符串
    input_key="question",  # 输入键
    output_key="answer"  # 输出键
)

5.3 向量存储记忆

VectorStoreRetrieverMemory实现智能检索

from langchain.memory import VectorStoreRetrieverMemory
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

# 创建检索器记忆
memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)

# 保存对话
memory.save_context(
    {"input": "我喜欢吃披萨"},
    {"output": "好的,我记住了你喜欢披萨"}
)

memory.save_context(
    {"input": "我的爱好是游泳"},
    {"output": "明白了,游泳是你的爱好"}
)

# 智能检索相关记忆
relevant_memories = memory.load_memory_variables(
    {"input": "我喜欢什么食物?"}
)
print(relevant_memories)  # 会检索到披萨相关的记忆

EntityMemory(实体关系记忆)

from langchain.memory import ConversationEntityMemory

# 自动提取和记忆实体
memory = ConversationEntityMemory(llm=model)

conversation = ConversationChain(
    llm=model,
    memory=memory
)

conversation.predict(input="张三在北京工作,是一名工程师")
conversation.predict(input="李四是张三的同事")

# 查看实体信息
print(memory.entity_store.store)
# 输出:{'张三': '在北京工作的工程师', '李四': '张三的同事'}

5.4 记忆优化与性能

5.4.1 记忆压缩(Memory Compression)深度解析

记忆压缩是大模型长对话系统的核心。它的本质是将原始对话转化为更高维度的抽象信息

1. 实现方式:总结式压缩 (Summarization)

  • 触发机制:当对话 Token 达到限制(如 4000 Token)时触发。
  • 执行过程:系统将“旧总结 + 新对话片段”发送给 LLM,要求生成“更新后的总结”。
  • Prompt 示例

    “请基于之前的对话摘要和最新的对话内容,生成一份新的摘要,保留关键事实(人名、日期、决定、偏好)。”

2. 实现方式:实体提取 (Entity Extraction)

  • 原理:不总结全文,只提取关键实体及其属性。
  • 效果:将 “张三昨天在上海开会,讨论了关于 A 项目的预算” 压缩为 {"张三": {"位置": "上海", "关联项目": "A", "动作": "讨论预算"}}

5.4.2 细节丢失与补偿机制

记忆压缩必然是“有损”的。细节丢失是其天然属性,但工业界通过以下策略进行补偿:

1. 混合记忆 (Hybrid Memory) - 细节补位

  • 策略Summary (长线背景) + Buffer (短线细节)
  • 原理:总结只负责“很久以前”的事,而“最近 5-10 轮”的对话保持原样。这样大模型既能知道全局背景,又能精准回应爸爸刚刚说的话。

2. 关键信息标记 (Selective Preservation)

  • 原理:在总结时,通过 Prompt 强制要求保留某些细节。
  • 技巧:使用 SummaryBufferMemory,它会设置一个阈值,只有超过阈值的旧记忆才会被总结,未超过的部分保留原文。

3. 向量找回 (Vector Recovery)

  • 原理:将原始对话切片存入向量数据库。
  • 效果:即使总结里丢了“某次发票的具体金额”,大模型也可以通过语义检索,从向量库里把那段原始对话“捞”回来。
压缩策略细节丢失程度性能开销适用场景
滑动窗口极高(直接丢弃)极低固定步长的对话
全文总结中等(丢失语气/琐碎细节)业务背景复杂的长对话
实体提取低(保留关键事实)知识密集型任务
混合策略最低(工业界首选)复杂 Agent/长期伴侣 AI

1. 混合记忆策略

from langchain.memory import CombinedMemory, ConversationBufferMemory, ConversationSummaryMemory

# 组合多种记忆
short_term = ConversationBufferWindowMemory(k=5, memory_key="recent")
long_term = ConversationSummaryMemory(llm=model, memory_key="summary")

memory = CombinedMemory(memories=[short_term, long_term])

2. 记忆持久化

import json

# 保存记忆
memory_data = memory.load_memory_variables({})
with open("memory.json", "w") as f:
    json.dump(memory_data, f)

# 加载记忆
with open("memory.json", "r") as f:
    loaded_data = json.load(f)
    # 恢复记忆状态

3. 记忆清理

# 清空记忆
memory.clear()

# 选择性删除
memory.chat_memory.messages = memory.chat_memory.messages[-10:]  # 只保留最近10条

性能优化建议

优化点方法效果
Token控制使用TokenBuffer或Summary降低成本50-80%
异步处理异步保存记忆提升响应速度
批量操作批量保存/检索减少IO次数
缓存缓存常用记忆加快检索
定期清理删除过期记忆控制存储

6. Chains链式调用

6.1 Chain基础概念

⚠️ 重要提示:传统的 LLMChainSequentialChain 等已在 LangChain 1.0+ 被 LCEL (LangChain Expression Language) 取代。

LCEL 的优势

  • 更简洁:使用 | 操作符连接组件
  • 更高效:自动优化执行流程
  • 更灵活:支持并行、分支、流式等高级功能
  • 类型安全:更好的类型提示

LCEL 执行流程

sequenceDiagram participant U as 用户输入 participant P as Prompt participant M as Model participant O as Output Parser participant R as 结果 U->>P: 原始输入 P->>M: 格式化提示 M->>O: 模型输出 O->>R: 解析结果 Note over P,O: 使用 | 操作符连接

✅ 基础 LCEL 链

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 创建组件
model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("为{product}写一句广告语")
output_parser = StrOutputParser()

# 使用 | 操作符组合(LCEL)
chain = prompt | model | output_parser

# 执行
result = chain.invoke({"product": "智能手表"})
print(result)

# 批量执行
results = chain.batch([
    {"product": "智能手表"},
    {"product": "无线耳机"}
])

# 流式输出
for chunk in chain.stream({"product": "智能音箱"}):
    print(chunk, end="", flush=True)

6.2 常用Chain类型

1. 顺序链(使用 LCEL)

from langchain_core.runnables import RunnablePassthrough

# 步骤1:生成大纲
outline_prompt = ChatPromptTemplate.from_template("为{topic}生成文章大纲")
outline_chain = outline_prompt | model | StrOutputParser()

# 步骤2:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
    "主题:{topic}\n大纲:{outline}\n\n请写一篇文章:"
)

# 组合成顺序链
sequential_chain = (
    {"topic": RunnablePassthrough()}
    | RunnablePassthrough.assign(outline=outline_chain)
    | article_prompt
    | model
    | StrOutputParser()
)

# 执行
result = sequential_chain.invoke("人工智能的未来")
print(result)

2. 并行链(使用 LCEL)

from langchain_core.runnables import RunnableParallel

# 定义多个并行任务
summary_chain = (
    ChatPromptTemplate.from_template("总结:{text}")
    | model
    | StrOutputParser()
)

keywords_chain = (
    ChatPromptTemplate.from_template("提取关键词:{text}")
    | model
    | StrOutputParser()
)

sentiment_chain = (
    ChatPromptTemplate.from_template("分析情感:{text}")
    | model
    | StrOutputParser()
)

# 并行执行
parallel_chain = RunnableParallel(
    summary=summary_chain,
    keywords=keywords_chain,
    sentiment=sentiment_chain
)

# 执行
result = parallel_chain.invoke({"text": "今天天气真好,心情很愉快!"})
print(result)
# 输出:{'summary': '...', 'keywords': '...', 'sentiment': '...'}

3. 条件分支链(使用 LCEL)

from langchain_core.runnables import RunnableBranch

# 定义不同的处理链
physics_chain = (
    ChatPromptTemplate.from_template("作为物理学专家回答:{question}")
    | model
    | StrOutputParser()
)

math_chain = (
    ChatPromptTemplate.from_template("作为数学专家回答:{question}")
    | model
    | StrOutputParser()
)

general_chain = (
    ChatPromptTemplate.from_template("回答:{question}")
    | model
    | StrOutputParser()
)

# 创建分支
def route_question(input_dict):
    question = input_dict["question"].lower()
    if "物理" in question or "力" in question:
        return "physics"
    elif "数学" in question or "计算" in question:
        return "math"
    else:
        return "general"

branch_chain = RunnableBranch(
    (lambda x: route_question(x) == "physics", physics_chain),
    (lambda x: route_question(x) == "math", math_chain),
    general_chain  # 默认分支
)

# 执行
result = branch_chain.invoke({"question": "什么是牛顿第一定律?"})
print(result)

# 创建路由链
router_chain = MultiPromptChain.from_prompts(
    llm=model,
    prompt_infos=prompt_infos
)

# 自动路由到合适的专家
result = router_chain.invoke({"input": "什么是牛顿第一定律?"})  # 路由到physics

3. TransformChain(转换链)

from langchain.chains import TransformChain

def transform_func(inputs: dict) -> dict:
    text = inputs["text"]
    # 自定义转换逻辑
    processed_text = text.upper()
    return {"processed_text": processed_text}

transform_chain = TransformChain(
    input_variables=["text"],
    output_variables=["processed_text"],
    transform=transform_func
)

# 与LLMChain组合
combined_chain = SequentialChain(
    chains=[transform_chain, llm_chain],
    input_variables=["text"],
    output_variables=["result"]
)

Chain类型对比

Chain类型特点适用场景复杂度
LLMChain基础链单步任务
SequentialChain顺序执行多步流程⭐⭐
RouterChain条件分支多策略选择⭐⭐⭐
TransformChain数据转换预处理/后处理⭐⭐
MapReduceChain并行处理大规模数据⭐⭐⭐⭐

6.3 自定义Chain开发

继承Chain基类

from langchain.chains.base import Chain
from typing import Dict, List

class CustomChain(Chain):
    """自定义Chain示例"""
    
    llm: Any
    prompt: PromptTemplate
    
    @property
    def input_keys(self) -> List[str]:
        """定义输入键"""
        return ["input_text"]
    
    @property
    def output_keys(self) -> List[str]:
        """定义输出键"""
        return ["output_text", "metadata"]
    
    def _call(self, inputs: Dict[str, str]) -> Dict[str, str]:
        """核心执行逻辑"""
        input_text = inputs["input_text"]
        
        # 自定义处理
        formatted_prompt = self.prompt.format(text=input_text)
        response = self.llm.invoke(formatted_prompt)
        
        # 返回结果
        return {
            "output_text": response.content,
            "metadata": {"length": len(response.content)}
        }

6.4 Chain组合与编排

使用LCEL组合Chain

from langchain_core.runnables import RunnablePassthrough

# 方式1:管道操作符
chain = prompt | model | output_parser

# 方式2:并行执行
from langchain_core.runnables import RunnableParallel

chain = RunnableParallel(
    summary=summary_chain,
    keywords=keyword_chain
)

# 方式3:条件分支
from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: len(x["text"]) < 100, short_chain),
    (lambda x: len(x["text"]) < 500, medium_chain),
    long_chain  # 默认
)

错误处理与重试

from langchain_core.runnables import RunnableWithFallbacks

# 主Chain失败时使用备用Chain
chain_with_fallback = primary_chain.with_fallbacks(
    [backup_chain_1, backup_chain_2]
)

# 自动重试
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def call_chain_with_retry(input_data):
    return chain.invoke(input_data)

7. Agents智能代理

7.1 Agent架构原理

Agent是具有自主决策能力的系统,能够根据任务选择和使用工具。

核心组成

graph TB subgraph "Agent架构" A["用户输入"] --> B["Agent
决策中心"] B --> C{"需要工具?"} C -->|是| D["Tool Selection
工具选择"] D --> E["Tool Execution
工具执行"] E --> F["Observation
观察结果"] F --> B C -->|否| G["Final Answer
最终答案"] end H["Tools
工具集"] --> D I["Memory
记忆"] --> B J["LLM
语言模型"] --> B style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#ffe1e1 style D fill:#e8f5e9 style E fill:#f3e5f5 style F fill:#fff9c4 style G fill:#e0f2f1 style H fill:#fce4ec style I fill:#e1bee7 style J fill:#c5e1a5

ReAct模式(Reasoning + Acting)

Thought: 我需要了解当前天气
Action: search_weather
Action Input: {"city": "北京"}
Observation: 北京今天晴天,温度25度

Thought: 我已经获得了天气信息,可以回答了
Final Answer: 北京今天是晴天,温度25度

7.2 Agent类型与选择

常用Agent类型

Agent类型特点适用场景模型要求
Zero-shot ReAct无需示例通用任务GPT-3.5+
Structured Chat结构化输入复杂参数GPT-4
OpenAI Functions函数调用精确控制GPT-3.5-turbo+
Conversational对话式多轮交互任意
Self-ask with Search自问自答需要推理GPT-3+
Plan-and-Execute规划执行复杂任务GPT-4

创建Agent

from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.tools import Tool
from langchain import hub

# 1. 定义工具
def search_tool(query: str) -> str:
    """搜索工具"""
    return f"搜索结果:{query}"

def calculator_tool(expression: str) -> str:
    """计算器工具"""
    return str(eval(expression))

tools = [
    Tool(
        name="Search",
        func=search_tool,
        description="用于搜索信息"
    ),
    Tool(
        name="Calculator",
        func=calculator_tool,
        description="用于数学计算"
    )
]

# 2. 获取提示模板
prompt = hub.pull("hwchase17/openai-functions-agent")

# 3. 创建Agent
agent = create_openai_functions_agent(
    llm=model,
    tools=tools,
    prompt=prompt
)

# 4. 创建执行器
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5,  # 最大迭代次数
    max_execution_time=60,  # 最大执行时间
    handle_parsing_errors=True  # 处理解析错误
)

# 5. 执行
result = agent_executor.invoke({
    "input": "北京的人口是多少?然后计算人口的平方根"
})

7.3 Tools工具集成

工具定义方式

方式1:使用@tool装饰器

from langchain.tools import tool

@tool
def get_word_length(word: str) -> int:
    """返回单词的长度"""
    return len(word)

@tool
def multiply(a: int, b: int) -> int:
    """计算两个数的乘积"""
    return a * b

方式2:使用Tool类

from langchain.tools import Tool

def custom_function(input_str: str) -> str:
    # 自定义逻辑
    return f"处理结果:{input_str}"

tool = Tool(
    name="CustomTool",
    func=custom_function,
    description="这是一个自定义工具,用于..."
)

方式3:使用StructuredTool(复杂参数)

from langchain.tools import StructuredTool
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    query: str = Field(description="搜索查询")
    max_results: int = Field(default=5, description="最大结果数")

def search_function(query: str, max_results: int = 5) -> str:
    return f"搜索 '{query}',返回 {max_results} 条结果"

search_tool = StructuredTool.from_function(
    func=search_function,
    name="Search",
    description="搜索工具",
    args_schema=SearchInput
)

内置工具集成

from langchain.tools import WikipediaQueryRun, DuckDuckGoSearchRun
from langchain.utilities import WikipediaAPIWrapper

# Wikipedia工具
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())

# DuckDuckGo搜索
search = DuckDuckGoSearchRun()

# Python REPL
from langchain.tools import PythonREPLTool
python_repl = PythonREPLTool()

tools = [wikipedia, search, python_repl]

工具最佳实践

实践说明重要性
清晰描述准确描述工具功能⭐⭐⭐⭐⭐
参数验证使用Pydantic验证⭐⭐⭐⭐
错误处理捕获并返回友好错误⭐⭐⭐⭐
超时控制设置执行超时⭐⭐⭐
日志记录记录工具调用⭐⭐⭐

7.4 Agent执行流程

详细执行流程

sequenceDiagram participant U as 用户 participant A as Agent participant L as LLM participant T as Tools participant M as Memory U->>A: 提交任务 A->>M: 加载历史 M-->>A: 返回上下文 loop Agent循环 A->>L: 发送提示(任务+工具+历史) L-->>A: 返回决策 alt 需要使用工具 A->>T: 调用工具 T-->>A: 返回结果 A->>M: 保存观察 else 得出最终答案 A->>U: 返回答案 end end

Agent配置优化

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    
    # 迭代控制
    max_iterations=10,  # 最大迭代次数
    max_execution_time=120,  # 最大执行时间(秒)
    early_stopping_method="generate",  # 提前停止策略
    
    # 错误处理
    handle_parsing_errors=True,  # 自动处理解析错误
    return_intermediate_steps=True,  # 返回中间步骤
    
    # 记忆
    memory=memory,  # 添加记忆
    
    # 回调
    callbacks=[callback_handler]  # 添加回调
)

自定义Agent逻辑

from langchain.agents import BaseSingleActionAgent
from typing import List, Tuple, Any

class CustomAgent(BaseSingleActionAgent):
    """自定义Agent"""
    
    tools: List[Tool]
    llm: Any
    
    @property
    def input_keys(self):
        return ["input"]
    
    def plan(
        self,
        intermediate_steps: List[Tuple[AgentAction, str]],
        **kwargs: Any
    ) -> Union[AgentAction, AgentFinish]:
        """决策逻辑"""
        # 自定义决策过程
        if len(intermediate_steps) > 5:
            return AgentFinish(
                return_values={"output": "达到最大步数"},
                log="停止执行"
            )
        
        # 选择工具
        tool_name = self._select_tool(kwargs["input"])
        return AgentAction(
            tool=tool_name,
            tool_input=kwargs["input"],
            log=f"使用工具: {tool_name}"
        )
    
    async def aplan(self, *args, **kwargs):
        """异步版本"""
        return self.plan(*args, **kwargs)

Agent调试技巧

# 1. 启用详细输出
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# 2. 获取中间步骤
result = agent_executor.invoke(
    {"input": "问题"},
    return_intermediate_steps=True
)
for step in result["intermediate_steps"]:
    print(f"Action: {step[0]}")
    print(f"Observation: {step[1]}")

# 3. 使用回调追踪
from langchain.callbacks import StdOutCallbackHandler

agent_executor.invoke(
    {"input": "问题"},
    callbacks=[StdOutCallbackHandler()]
)

8. Data Connection数据连接

8.1 Document Loaders文档加载器

Document Loaders负责从各种数据源加载文档

支持的数据源

类别加载器支持格式
文件TextLoader, PDFLoader, DocxLoadertxt, pdf, docx
网页WebBaseLoader, SeleniumURLLoaderhtml, 动态网页
数据库SQLDatabaseLoader, MongoDBLoaderSQL, NoSQL
APIGitHubLoader, NotionLoaderGitHub, Notion
云存储S3Loader, GCSLoaderAWS S3, GCS

基本使用

from langchain_community.document_loaders import TextLoader, PyPDFLoader, WebBaseLoader

# 1. 加载文本文件
loader = TextLoader("document.txt", encoding="utf-8")
documents = loader.load()

# 2. 加载PDF
pdf_loader = PyPDFLoader("document.pdf")
pages = pdf_loader.load_and_split()

# 3. 加载网页
web_loader = WebBaseLoader("https://example.com")
web_docs = web_loader.load()

# 4. 批量加载目录
from langchain_community.document_loaders import DirectoryLoader

loader = DirectoryLoader(
    "./docs",
    glob="**/*.md",  # 匹配所有markdown文件
    loader_cls=TextLoader
)
docs = loader.load()

Document结构

from langchain.schema import Document

# Document对象包含两个主要属性
doc = Document(
    page_content="文档内容...",  # 文本内容
    metadata={  # 元数据
        "source": "document.pdf",
        "page": 1,
        "author": "张三"
    }
)

print(doc.page_content)
print(doc.metadata)

8.2 Text Splitters文本分割

Text Splitters将长文档分割成小块,适应模型上下文限制。

分割策略

graph TB A["Text Splitters"] --> B["CharacterTextSplitter
字符分割"] A --> C["RecursiveCharacterTextSplitter
递归分割"] A --> D["TokenTextSplitter
Token分割"] A --> E["MarkdownTextSplitter
Markdown分割"] A --> F["CodeTextSplitter
代码分割"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#e8f5e9 style D fill:#f3e5f5 style E fill:#fff9c4 style F fill:#e0f2f1

RecursiveCharacterTextSplitter(推荐)

from langchain.text_splitter import RecursiveCharacterTextSplitter

# 创建分割器
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,  # 每块大小
    chunk_overlap=200,  # 重叠部分
    length_function=len,  # 长度计算函数
    separators=["\n\n", "\n", " ", ""]  # 分隔符优先级
)

# 分割文档
chunks = splitter.split_documents(documents)

# 分割文本
texts = splitter.split_text("长文本...")

TokenTextSplitter(精确控制)

from langchain.text_splitter import TokenTextSplitter

# 按Token分割
token_splitter = TokenTextSplitter(
    chunk_size=512,  # Token数量
    chunk_overlap=50
)

chunks = token_splitter.split_documents(documents)

代码分割

from langchain.text_splitter import Language, RecursiveCharacterTextSplitter

# Python代码分割
python_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON,
    chunk_size=500,
    chunk_overlap=50
)

# JavaScript代码分割
js_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.JS,
    chunk_size=500
)

分割参数优化

参数推荐值说明
chunk_size500-1500根据模型上下文调整
chunk_overlap10-20%保持上下文连贯性
separators自然分隔符优先段落、句子

8.3 Vector Stores向量数据库

Vector Stores存储和检索文档向量

主流向量数据库对比

数据库类型特点适用场景
Chroma本地/云轻量级,易用开发测试
Pinecone云服务高性能,托管生产环境
Weaviate本地/云功能丰富企业应用
FAISS本地Facebook开源本地部署
Milvus本地/云大规模海量数据

Chroma使用示例

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"  # 持久化目录
)

# 相似度搜索
results = vectorstore.similarity_search(
    "机器学习是什么?",
    k=3  # 返回top 3
)

# 带分数的搜索
results_with_scores = vectorstore.similarity_search_with_score(
    "机器学习是什么?",
    k=3
)

for doc, score in results_with_scores:
    print(f"Score: {score}")
    print(f"Content: {doc.page_content}")

Pinecone使用示例

from langchain_community.vectorstores import Pinecone
import pinecone

# 初始化Pinecone
pinecone.init(
    api_key="your-api-key",
    environment="us-west1-gcp"
)

# 创建索引
index_name = "langchain-demo"
vectorstore = Pinecone.from_documents(
    documents=chunks,
    embedding=embeddings,
    index_name=index_name
)

# 搜索
results = vectorstore.similarity_search("查询文本", k=5)

8.4 Retrievers检索器

Retrievers提供统一的检索接口

检索器类型

# 1. 向量存储检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",  # 相似度搜索
    search_kwargs={"k": 5}  # 返回5个结果
)

# 2. MMR检索(最大边际相关性)
retriever = vectorstore.as_retriever(
    search_type="mmr",  # 平衡相关性和多样性
    search_kwargs={"k": 5, "fetch_k": 20}
)

# 3. 相似度阈值检索
retriever = vectorstore.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={"score_threshold": 0.8}  # 只返回分数>0.8的
)

自定义检索器

from langchain.schema import BaseRetriever, Document

class CustomRetriever(BaseRetriever):
    """自定义检索器"""
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        """同步检索"""
        # 自定义检索逻辑
        return [Document(page_content="结果...")]
    
    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        """异步检索"""
        return self._get_relevant_documents(query)

混合检索

from langchain.retrievers import EnsembleRetriever

# 组合多个检索器
ensemble_retriever = EnsembleRetriever(
    retrievers=[vector_retriever, bm25_retriever],
    weights=[0.7, 0.3]  # 权重分配
)

results = ensemble_retriever.get_relevant_documents("查询")

9. RAG检索增强生成

9.1 RAG架构原理

RAG(Retrieval-Augmented Generation)通过检索外部知识增强LLM能力

RAG工作流程

graph TB subgraph "离线索引构建" A1["原始文档"] --> A2["文档加载"] A2 --> A3["文本分割"] A3 --> A4["向量化"] A4 --> A5["存入向量库"] end subgraph "在线查询" B1["用户问题"] --> B2["问题向量化"] B2 --> B3["向量检索"] A5 --> B3 B3 --> B4["获取相关文档"] B4 --> B5["构建提示"] B5 --> B6["LLM生成答案"] B6 --> B7["返回结果"] end style A1 fill:#e3f2fd style A5 fill:#fff3e0 style B1 fill:#e8f5e9 style B3 fill:#f3e5f5 style B6 fill:#fff9c4 style B7 fill:#e0f2f1

RAG vs Fine-tuning

对比项RAGFine-tuning
知识更新实时更新需要重新训练
成本
可解释性高(可追溯来源)
准确性依赖检索质量依赖训练数据
适用场景知识密集型特定领域

9.2 文档索引构建

完整索引构建流程

from langchain_community.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

# 1. 加载文档
loader = DirectoryLoader(
    "./knowledge_base",
    glob="**/*.md",
    show_progress=True
)
documents = loader.load()

# 2. 文本分割
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
)
chunks = text_splitter.split_documents(documents)

# 3. 添加元数据
for i, chunk in enumerate(chunks):
    chunk.metadata["chunk_id"] = i
    chunk.metadata["source_type"] = "markdown"

# 4. 向量化并存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./vectorstore",
    collection_name="knowledge_base"
)

print(f"索引构建完成,共 {len(chunks)} 个文档块")

增量更新索引

# 添加新文档
new_docs = loader.load_new_documents()
new_chunks = text_splitter.split_documents(new_docs)
vectorstore.add_documents(new_chunks)

# 删除文档
vectorstore.delete(ids=["doc_id_1", "doc_id_2"])

# 更新文档
vectorstore.update_document(doc_id="doc_id", document=new_doc)

9.3 检索策略优化

1. 基础RAG实现

from langchain.chains import RetrievalQA

# 创建检索器
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 4}
)

# 创建RAG链
qa_chain = RetrievalQA.from_chain_type(
    llm=model,
    chain_type="stuff",  # 将所有文档塞入上下文
    retriever=retriever,
    return_source_documents=True  # 返回来源文档
)

# 查询
result = qa_chain.invoke({"query": "什么是机器学习?"})
print(result["result"])
print(result["source_documents"])

2. 高级检索策略

多查询检索(Multi-Query)

from langchain.retrievers.multi_query import MultiQueryRetriever

# 自动生成多个查询变体
multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=retriever,
    llm=model
)

# 单个问题生成多个查询,提高召回率
results = multi_query_retriever.get_relevant_documents(
    "机器学习的应用"
)

上下文压缩检索

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 使用LLM提取相关部分
compressor = LLMChainExtractor.from_llm(model)

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

# 只返回相关的文档片段
compressed_docs = compression_retriever.get_relevant_documents(
    "机器学习的定义"
)

父文档检索

from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore

# 小块检索,大块返回
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)

store = InMemoryStore()

parent_retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=store,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter
)

3. 重排序(Reranking)

from langchain.retrievers.document_compressors import CohereRerank

# 使用Cohere重排序
reranker = CohereRerank(model="rerank-english-v2.0", top_n=3)

rerank_retriever = ContextualCompressionRetriever(
    base_compressor=reranker,
    base_retriever=retriever
)

# 检索后重新排序,提高精确度
results = rerank_retriever.get_relevant_documents("查询")

检索策略对比

策略优点缺点适用场景
基础检索简单快速可能不准确简单问答
多查询提高召回率增加成本复杂查询
压缩检索减少噪音额外LLM调用长文档
重排序提高精确度需要额外服务高质量要求
混合检索平衡性能复杂度高生产环境

9.4 RAG性能调优

1. 提示工程优化

from langchain.prompts import PromptTemplate

# 自定义RAG提示
template = """
你是一个专业的问答助手。请基于以下上下文回答问题。

上下文信息:
{context}

问题:{question}

回答要求:
1. 只基于上下文信息回答
2. 如果上下文中没有相关信息,明确说明
3. 引用具体的上下文片段
4. 保持客观和准确

回答:
"""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

qa_chain = RetrievalQA.from_chain_type(
    llm=model,
    retriever=retriever,
    chain_type_kwargs={"prompt": prompt}
)

2. 分块策略优化

# 策略1:固定大小 + 重叠
splitter1 = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)

# 策略2:语义分块
from langchain.text_splitter import SemanticChunker

splitter2 = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile"  # 基于语义相似度分块
)

# 策略3:根据文档结构分块
from langchain.text_splitter import MarkdownHeaderTextSplitter

markdown_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
    ]
)

3. 检索参数调优

# 实验不同的k值
for k in [3, 5, 7, 10]:
    retriever = vectorstore.as_retriever(
        search_kwargs={"k": k}
    )
    # 评估性能
    evaluate_retriever(retriever, test_queries)

# 实验不同的检索类型
search_types = ["similarity", "mmr", "similarity_score_threshold"]
for search_type in search_types:
    retriever = vectorstore.as_retriever(
        search_type=search_type,
        search_kwargs={"k": 5}
    )
    # 评估性能

4. 缓存策略

from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache

# 内存缓存
set_llm_cache(InMemoryCache())

# 持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain.db"))

# 相同查询直接返回缓存结果,节省成本

5. 评估指标

from langchain.evaluation import load_evaluator

# 相关性评估
relevance_evaluator = load_evaluator("criteria", criteria="relevance")

# 准确性评估
accuracy_evaluator = load_evaluator("criteria", criteria="correctness")

# 评估RAG输出
def evaluate_rag(question, answer, context):
    relevance_score = relevance_evaluator.evaluate_strings(
        prediction=answer,
        input=question,
        reference=context
    )
    
    accuracy_score = accuracy_evaluator.evaluate_strings(
        prediction=answer,
        input=question
    )
    
    return {
        "relevance": relevance_score["score"],
        "accuracy": accuracy_score["score"]
    }

RAG优化检查清单

优化项目标方法
检索质量提高相关性优化分块、重排序
响应速度降低延迟缓存、并行检索
成本控制减少Token压缩上下文
准确性提高正确率提示优化、多查询
可扩展性支持大规模分布式向量库

10. LangChain Expression Language (LCEL)

10.1 LCEL语法基础

LCEL是LangChain的声明式编程语言,用于组合和编排组件。

核心优势

  • 简洁性:用管道操作符|连接组件
  • 可组合性:任意组件可以自由组合
  • 流式支持:原生支持流式输出
  • 并行执行:自动并行化独立操作
  • 类型安全:编译时类型检查

基本语法

from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# 创建组件
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
model = ChatOpenAI()
output_parser = StrOutputParser()

# 使用管道操作符组合
chain = prompt | model | output_parser

# 执行
result = chain.invoke({"topic": "程序员"})
print(result)

Runnable接口

所有LCEL组件都实现Runnable接口:

class Runnable:
    def invoke(self, input):
        """同步调用"""
        pass
    
    async def ainvoke(self, input):
        """异步调用"""
        pass
    
    def batch(self, inputs):
        """批量调用"""
        pass
    
    def stream(self, input):
        """流式调用"""
        pass
    
    async def astream(self, input):
        """异步流式调用"""
        pass

10.2 链式操作符

1. 管道操作符 |

# 顺序执行
chain = component1 | component2 | component3

# 等价于
def chain(input):
    output1 = component1.invoke(input)
    output2 = component2.invoke(output1)
    output3 = component3.invoke(output2)
    return output3

2. RunnablePassthrough(透传)

from langchain_core.runnables import RunnablePassthrough

# 保留原始输入
chain = {
    "context": retriever,
    "question": RunnablePassthrough()  # 透传question
} | prompt | model

# 调用
chain.invoke("什么是机器学习?")
# question会被透传到prompt

3. RunnableLambda(自定义函数)

from langchain_core.runnables import RunnableLambda

def custom_transform(input_dict):
    """自定义转换逻辑"""
    return {
        "processed": input_dict["text"].upper(),
        "length": len(input_dict["text"])
    }

chain = (
    RunnableLambda(custom_transform)
    | prompt
    | model
)

4. 字典组合

# 并行执行多个组件
chain = {
    "summary": summary_chain,
    "keywords": keyword_chain,
    "sentiment": sentiment_chain
} | final_processor

# 输入会同时发送给三个chain
result = chain.invoke({"text": "文本内容"})
# 输出: {"summary": "...", "keywords": [...], "sentiment": "..."}

10.3 并行与分支

1. RunnableParallel(并行执行)

from langchain_core.runnables import RunnableParallel

# 显式并行
parallel_chain = RunnableParallel(
    summary=summary_chain,
    translation=translation_chain,
    analysis=analysis_chain
)

# 三个chain并行执行
results = parallel_chain.invoke({"text": "输入文本"})

2. RunnableBranch(条件分支)

from langchain_core.runnables import RunnableBranch

# 根据条件选择不同的处理链
branch = RunnableBranch(
    (lambda x: len(x["text"]) < 100, short_text_chain),
    (lambda x: len(x["text"]) < 500, medium_text_chain),
    long_text_chain  # 默认分支
)

chain = branch | output_parser

3. 动态路由

def route_by_language(input_dict):
    """根据语言路由"""
    language = detect_language(input_dict["text"])
    
    if language == "zh":
        return chinese_chain
    elif language == "en":
        return english_chain
    else:
        return default_chain

router = RunnableLambda(route_by_language)
chain = router | output_parser

4. Map操作

from langchain_core.runnables import RunnableMap

# 对列表中每个元素应用chain
map_chain = RunnableMap(
    lambda item: process_chain.invoke(item)
)

# 批量处理
items = ["item1", "item2", "item3"]
results = map_chain.batch(items)

10.4 LCEL最佳实践

1. 复杂链的构建

# 构建一个完整的RAG链
from langchain_core.runnables import RunnablePassthrough

# 检索链
retrieval_chain = {
    "context": retriever | format_docs,  # 检索并格式化
    "question": RunnablePassthrough()
}

# 完整链
rag_chain = (
    retrieval_chain
    | prompt
    | model
    | StrOutputParser()
)

# 使用
answer = rag_chain.invoke("什么是量子计算?")

2. 错误处理

from langchain_core.runnables import RunnableWithFallbacks

# 主链失败时使用备用链
chain_with_fallback = primary_chain.with_fallbacks(
    [backup_chain_1, backup_chain_2],
    exceptions_to_handle=(ValueError, TimeoutError)
)

# 自动重试
chain_with_retry = chain.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

3. 流式输出

# 流式处理
for chunk in chain.stream({"question": "解释深度学习"}):
    print(chunk, end="", flush=True)

# 异步流式
async for chunk in chain.astream({"question": "解释深度学习"}):
    print(chunk, end="", flush=True)

4. 批量处理

# 批量调用
questions = [
    {"question": "什么是AI?"},
    {"question": "什么是ML?"},
    {"question": "什么是DL?"}
]

# 并行处理多个输入
results = chain.batch(questions)

# 配置并发数
results = chain.batch(questions, config={"max_concurrency": 5})

5. 配置传递

from langchain_core.runnables import RunnableConfig

# 传递配置
config = RunnableConfig(
    tags=["production", "v1"],
    metadata={"user_id": "123"},
    callbacks=[callback_handler]
)

result = chain.invoke(input_data, config=config)

LCEL vs 传统Chain对比

特性LCEL传统Chain
语法声明式,简洁命令式,冗长
组合性高度灵活相对固定
流式支持原生支持需要额外配置
并行执行自动优化手动实现
类型安全编译时检查运行时检查
调试更容易追踪相对困难

LCEL使用建议

  • ✅ 新项目优先使用LCEL
  • ✅ 复杂链用LCEL更清晰
  • ✅ 需要流式输出时使用LCEL
  • ⚠️ 简单场景两者都可以
  • ⚠️ 旧项目迁移需要评估成本

11. 实战应用场景

11.1 智能问答系统

场景描述:基于企业知识库的智能问答系统,支持多轮对话和上下文理解。

完整实现

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain

class IntelligentQASystem:
    def __init__(self, knowledge_base_path):
        # 初始化组件
        self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        self.embeddings = OpenAIEmbeddings()
        
        # 加载向量数据库
        self.vectorstore = Chroma(
            persist_directory=knowledge_base_path,
            embedding_function=self.embeddings
        )
        
        # 配置记忆
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key="answer"
        )
        
        # 创建对话检索链
        self.qa_chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.vectorstore.as_retriever(
                search_kwargs={"k": 4}
            ),
            memory=self.memory,
            return_source_documents=True,
            verbose=True
        )
    
    def ask(self, question: str):
        """提问"""
        result = self.qa_chain.invoke({"question": question})
        
        return {
            "answer": result["answer"],
            "sources": [
                {
                    "content": doc.page_content[:200],
                    "metadata": doc.metadata
                }
                for doc in result["source_documents"]
            ]
        }
    
    def clear_history(self):
        """清除对话历史"""
        self.memory.clear()

# 使用示例
qa_system = IntelligentQASystem("./knowledge_base")

# 多轮对话
print(qa_system.ask("公司的年假政策是什么?"))
print(qa_system.ask("那病假呢?"))  # 能理解上下文
print(qa_system.ask("需要提前多久申请?"))  # 继续追问

优化点

  • 添加意图识别,路由到不同的专家系统
  • 集成反馈机制,持续优化答案质量
  • 支持多模态输入(图片、语音)

11.2 文档分析与总结

场景描述:自动分析长文档,生成结构化总结和关键信息提取。

实现方案

from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

class DocumentAnalyzer:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,
            chunk_overlap=200
        )
    
    def analyze_document(self, file_path):
        """分析文档"""
        # 1. 加载文档
        loader = PyPDFLoader(file_path)
        documents = loader.load()
        
        # 2. 分割文档
        chunks = self.text_splitter.split_documents(documents)
        
        # 3. 生成总结
        summary = self._generate_summary(chunks)
        
        # 4. 提取关键信息
        key_points = self._extract_key_points(chunks)
        
        # 5. 情感分析
        sentiment = self._analyze_sentiment(summary)
        
        return {
            "summary": summary,
            "key_points": key_points,
            "sentiment": sentiment,
            "page_count": len(documents),
            "word_count": sum(len(doc.page_content.split()) for doc in documents)
        }
    
    def _generate_summary(self, chunks):
        """生成总结"""
        # Map-Reduce策略
        chain = load_summarize_chain(
            self.llm,
            chain_type="map_reduce",
            verbose=True
        )
        return chain.run(chunks)
    
    def _extract_key_points(self, chunks):
        """提取关键点"""
        prompt = ChatPromptTemplate.from_template(
            "从以下文本中提取3-5个关键点:\n\n{text}"
        )
        chain = prompt | self.llm | StrOutputParser()
        
        # 对每个chunk提取关键点
        all_points = []
        for chunk in chunks[:3]:  # 只处理前3个chunk
            points = chain.invoke({"text": chunk.page_content})
            all_points.append(points)
        
        return all_points
    
    def _analyze_sentiment(self, text):
        """情感分析"""
        prompt = ChatPromptTemplate.from_template(
            "分析以下文本的情感倾向(积极/中性/消极):\n\n{text}"
        )
        chain = prompt | self.llm | StrOutputParser()
        return chain.invoke({"text": text})

# 使用
analyzer = DocumentAnalyzer()
result = analyzer.analyze_document("report.pdf")
print(f"总结: {result['summary']}")
print(f"关键点: {result['key_points']}")

11.3 代码生成助手

场景描述:根据自然语言描述生成代码,支持多种编程语言。

实现方案

from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

class CodeOutput(BaseModel):
    code: str = Field(description="生成的代码")
    explanation: str = Field(description="代码说明")
    language: str = Field(description="编程语言")
    dependencies: list[str] = Field(description="依赖库")

class CodeAssistant:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0)
        self.parser = PydanticOutputParser(pydantic_object=CodeOutput)
    
    def generate_code(self, description: str, language: str = "python"):
        """生成代码"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个专业的编程助手。
            根据用户描述生成高质量代码。
            要求:
            1. 代码要简洁、高效
            2. 遵循最佳实践
            3. 添加必要的注释
            4. 考虑边界情况
            
            {format_instructions}
            """),
            ("human", "语言: {language}\n需求: {description}")
        ])
        
        chain = prompt | self.llm | self.parser
        
        result = chain.invoke({
            "language": language,
            "description": description,
            "format_instructions": self.parser.get_format_instructions()
        })
        
        return result
    
    def review_code(self, code: str):
        """代码审查"""
        prompt = ChatPromptTemplate.from_template(
            """请审查以下代码,指出潜在问题和改进建议:
            
            {code}
            
            请从以下方面分析:
            1. 代码质量
            2. 性能问题
            3. 安全隐患
            4. 最佳实践
            """
        )
        
        chain = prompt | self.llm | StrOutputParser()
        return chain.invoke({"code": code})
    
    def explain_code(self, code: str):
        """解释代码"""
        prompt = ChatPromptTemplate.from_template(
            """请详细解释以下代码的功能和实现原理:
            
            {code}
            
            请包括:
            1. 整体功能
            2. 关键步骤
            3. 算法复杂度
            4. 使用场景
            """
        )
        
        chain = prompt | self.llm | StrOutputParser()
        return chain.invoke({"code": code})

# 使用示例
assistant = CodeAssistant()

# 生成代码
result = assistant.generate_code(
    "实现一个二分查找算法",
    language="python"
)
print(f"代码:\n{result.code}")
print(f"说明: {result.explanation}")
print(f"依赖: {result.dependencies}")

# 审查代码
review = assistant.review_code(result.code)
print(f"审查意见: {review}")

11.4 多模态应用

场景描述:处理图片、文本、音频等多种模态的数据。

图片理解应用

from langchain_openai import ChatOpenAI
from langchain.schema.messages import HumanMessage
import base64

class MultimodalAssistant:
    def __init__(self):
        self.llm = ChatOpenAI(
            model="gpt-4-vision-preview",
            max_tokens=1024
        )
    
    def analyze_image(self, image_path: str, question: str):
        """分析图片"""
        # 读取并编码图片
        with open(image_path, "rb") as image_file:
            image_data = base64.b64encode(image_file.read()).decode()
        
        # 构建消息
        message = HumanMessage(
            content=[
                {"type": "text", "text": question},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{image_data}"
                    }
                }
            ]
        )
        
        # 调用模型
        response = self.llm.invoke([message])
        return response.content
    
    def compare_images(self, image1_path: str, image2_path: str):
        """比较两张图片"""
        # 编码两张图片
        images = []
        for path in [image1_path, image2_path]:
            with open(path, "rb") as f:
                data = base64.b64encode(f.read()).decode()
                images.append(data)
        
        # 构建消息
        message = HumanMessage(
            content=[
                {"type": "text", "text": "比较这两张图片的异同"},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{images[0]}"}
                },
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{images[1]}"}
                }
            ]
        )
        
        response = self.llm.invoke([message])
        return response.content
    
    def extract_text_from_image(self, image_path: str):
        """从图片中提取文字(OCR)"""
        return self.analyze_image(
            image_path,
            "请提取图片中的所有文字内容"
        )
    
    def describe_chart(self, chart_path: str):
        """描述图表"""
        return self.analyze_image(
            chart_path,
            "请详细描述这个图表,包括类型、数据趋势和关键发现"
        )

# 使用示例
assistant = MultimodalAssistant()

# 分析图片
result = assistant.analyze_image(
    "product.jpg",
    "这个产品有什么特点?"
)
print(result)

# OCR
text = assistant.extract_text_from_image("document.jpg")
print(f"提取的文字: {text}")

# 图表分析
analysis = assistant.describe_chart("sales_chart.png")
print(f"图表分析: {analysis}")

应用场景总结

应用核心技术难点价值
智能问答RAG + Memory上下文理解提升服务效率
文档分析Summarization长文本处理快速获取信息
代码助手Code Generation代码质量提高开发效率
多模态Vision API模态融合丰富交互方式

12. 性能优化与调试

12.1 性能监控与分析

关键性能指标(KPI)

指标目标值监控方法
响应时间< 3秒计时器
Token使用最小化OpenAI Callback
成本预算内成本追踪
准确率> 90%人工评估
错误率< 1%错误日志

性能监控实现

import time
from langchain.callbacks import get_openai_callback
from functools import wraps

class PerformanceTracker:
    def __init__(self):
        self.metrics = {
            "calls": [],
            "total_tokens": 0,
            "total_cost": 0.0,
            "total_time": 0.0,
            "errors": 0
        }
    
    def track(self, func):
        """性能追踪装饰器"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                with get_openai_callback() as cb:
                    result = func(*args, **kwargs)
                    
                    # 记录指标
                    call_metrics = {
                        "timestamp": time.time(),
                        "duration": time.time() - start_time,
                        "tokens": cb.total_tokens,
                        "cost": cb.total_cost,
                        "success": True
                    }
                    
                    self.metrics["calls"].append(call_metrics)
                    self.metrics["total_tokens"] += cb.total_tokens
                    self.metrics["total_cost"] += cb.total_cost
                    self.metrics["total_time"] += call_metrics["duration"]
                    
                    return result
                    
            except Exception as e:
                self.metrics["errors"] += 1
                call_metrics = {
                    "timestamp": time.time(),
                    "duration": time.time() - start_time,
                    "error": str(e),
                    "success": False
                }
                self.metrics["calls"].append(call_metrics)
                raise
        
        return wrapper
    
    def get_stats(self):
        """获取统计信息"""
        total_calls = len(self.metrics["calls"])
        successful_calls = sum(1 for c in self.metrics["calls"] if c.get("success"))
        
        return {
            "总调用次数": total_calls,
            "成功次数": successful_calls,
            "失败次数": self.metrics["errors"],
            "成功率": f"{successful_calls/max(total_calls,1)*100:.2f}%",
            "总Token数": self.metrics["total_tokens"],
            "总成本": f"${self.metrics['total_cost']:.4f}",
            "总耗时": f"{self.metrics['total_time']:.2f}秒",
            "平均延迟": f"{self.metrics['total_time']/max(total_calls,1):.2f}秒",
            "平均Token": int(self.metrics["total_tokens"]/max(successful_calls,1))
        }
    
    def get_slow_calls(self, threshold=5.0):
        """获取慢调用"""
        return [
            call for call in self.metrics["calls"]
            if call.get("duration", 0) > threshold
        ]

# 使用示例
tracker = PerformanceTracker()

@tracker.track
def process_query(query):
    return chain.invoke({"question": query})

# 执行查询
for query in queries:
    process_query(query)

# 查看统计
print(tracker.get_stats())

# 查看慢调用
slow_calls = tracker.get_slow_calls(threshold=3.0)
print(f"慢调用数量: {len(slow_calls)}")

12.2 缓存策略

多层缓存架构

from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
import hashlib
import json
from functools import lru_cache

class MultiLevelCache:
    def __init__(self):
        # L1: 内存缓存(最快)
        self.memory_cache = {}
        
        # L2: LRU缓存
        self.lru_size = 100
        
        # L3: 持久化缓存
        set_llm_cache(SQLiteCache(database_path=".langchain_cache.db"))
    
    def get_cache_key(self, input_data):
        """生成缓存键"""
        # 将输入序列化为JSON并计算哈希
        serialized = json.dumps(input_data, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()
    
    @lru_cache(maxsize=100)
    def _lru_get(self, key):
        """LRU缓存层"""
        return self.memory_cache.get(key)
    
    def get(self, input_data):
        """获取缓存"""
        key = self.get_cache_key(input_data)
        
        # L1: 内存缓存
        if key in self.memory_cache:
            return self.memory_cache[key]
        
        # L2: LRU缓存
        cached = self._lru_get(key)
        if cached:
            return cached
        
        return None
    
    def set(self, input_data, result):
        """设置缓存"""
        key = self.get_cache_key(input_data)
        
        # 写入所有缓存层
        self.memory_cache[key] = result
        self._lru_get.cache_clear()  # 清除LRU缓存以更新
    
    def clear(self):
        """清除缓存"""
        self.memory_cache.clear()
        self._lru_get.cache_clear()

# 使用缓存
cache = MultiLevelCache()

def cached_invoke(chain, input_data):
    # 检查缓存
    cached_result = cache.get(input_data)
    if cached_result:
        print("缓存命中!")
        return cached_result
    
    # 调用chain
    result = chain.invoke(input_data)
    
    # 保存缓存
    cache.set(input_data, result)
    return result

语义缓存

from langchain_openai import OpenAIEmbeddings
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class SemanticCache:
    """基于语义相似度的缓存"""
    
    def __init__(self, similarity_threshold=0.95):
        self.embeddings = OpenAIEmbeddings()
        self.cache = []  # [(embedding, query, result)]
        self.threshold = similarity_threshold
    
    def get(self, query: str):
        """查找语义相似的缓存"""
        if not self.cache:
            return None
        
        # 计算查询的向量
        query_embedding = self.embeddings.embed_query(query)
        
        # 计算与所有缓存的相似度
        for cached_embedding, cached_query, cached_result in self.cache:
            similarity = cosine_similarity(
                [query_embedding],
                [cached_embedding]
            )[0][0]
            
            if similarity >= self.threshold:
                print(f"语义缓存命中!相似度: {similarity:.3f}")
                print(f"原查询: {cached_query}")
                return cached_result
        
        return None
    
    def set(self, query: str, result):
        """保存缓存"""
        embedding = self.embeddings.embed_query(query)
        self.cache.append((embedding, query, result))
        
        # 限制缓存大小
        if len(self.cache) > 100:
            self.cache.pop(0)

# 使用
semantic_cache = SemanticCache()

def semantic_cached_invoke(query):
    # 检查语义缓存
    cached = semantic_cache.get(query)
    if cached:
        return cached
    
    # 调用模型
    result = chain.invoke({"question": query})
    
    # 保存缓存
    semantic_cache.set(query, result)
    return result

# 测试
result1 = semantic_cached_invoke("什么是机器学习?")
result2 = semantic_cached_invoke("机器学习是什么?")  # 语义相似,命中缓存

12.3 并发与异步处理

异步调用

import asyncio
from langchain_openai import ChatOpenAI

async def process_queries_async(queries):
    """异步处理多个查询"""
    llm = ChatOpenAI()
    
    # 创建异步任务
    tasks = [llm.ainvoke(query) for query in queries]
    
    # 并发执行
    results = await asyncio.gather(*tasks)
    
    return results

# 使用
queries = ["问题1", "问题2", "问题3"]
results = asyncio.run(process_queries_async(queries))

并发控制

from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from asyncio import Semaphore

class ConcurrencyController:
    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.semaphore = Semaphore(max_concurrent)
    
    async def process_with_limit(self, func, items):
        """限制并发数"""
        async def limited_func(item):
            async with self.semaphore:
                return await func(item)
        
        tasks = [limited_func(item) for item in items]
        return await asyncio.gather(*tasks)

# 使用
controller = ConcurrencyController(max_concurrent=3)

async def process_item(item):
    # 处理单个项目
    return await chain.ainvoke(item)

# 限制并发为3
results = await controller.process_with_limit(process_item, items)

批量处理优化

def batch_process_optimized(items, batch_size=10):
    """优化的批量处理"""
    results = []
    
    # 分批处理
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        
        # 批量调用
        batch_results = chain.batch(batch)
        results.extend(batch_results)
        
        # 避免速率限制
        time.sleep(1)
    
    return results

12.4 调试技巧

1. 详细日志

import logging

# 配置LangChain日志
logging.basicConfig(level=logging.DEBUG)
langchain_logger = logging.getLogger("langchain")
langchain_logger.setLevel(logging.DEBUG)

# 启用详细输出
chain = LLMChain(llm=model, prompt=prompt, verbose=True)

2. 断点调试

from langchain.callbacks import StdOutCallbackHandler

class DebugCallbackHandler(StdOutCallbackHandler):
    """调试回调"""
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        print("\n=== LLM Start ===")
        print(f"Prompt: {prompts[0]}")
        # 可以在这里设置断点
        import pdb; pdb.set_trace()
    
    def on_llm_end(self, response, **kwargs):
        print("\n=== LLM End ===")
        print(f"Response: {response.generations[0][0].text}")

# 使用
chain.invoke(input, callbacks=[DebugCallbackHandler()])

3. 中间步骤追踪

# Agent中间步骤
result = agent_executor.invoke(
    {"input": "问题"},
    return_intermediate_steps=True
)

print("中间步骤:")
for i, (action, observation) in enumerate(result["intermediate_steps"]):
    print(f"\n步骤 {i+1}:")
    print(f"  Action: {action.tool}")
    print(f"  Input: {action.tool_input}")
    print(f"  Output: {observation}")

4. 单元测试

import pytest
from unittest.mock import Mock, patch

def test_chain():
    """测试chain"""
    # Mock LLM
    mock_llm = Mock()
    mock_llm.invoke.return_value = "测试响应"
    
    # 创建chain
    chain = LLMChain(llm=mock_llm, prompt=prompt)
    
    # 测试
    result = chain.invoke({"input": "测试"})
    
    # 断言
    assert result is not None
    mock_llm.invoke.assert_called_once()

def test_retriever():
    """测试检索器"""
    # Mock向量存储
    mock_vectorstore = Mock()
    mock_vectorstore.similarity_search.return_value = [
        Document(page_content="测试文档")
    ]
    
    retriever = mock_vectorstore.as_retriever()
    docs = retriever.get_relevant_documents("查询")
    
    assert len(docs) > 0

5. 性能分析

import cProfile
import pstats

def profile_chain(chain, input_data):
    """性能分析"""
    profiler = cProfile.Profile()
    profiler.enable()
    
    # 执行
    result = chain.invoke(input_data)
    
    profiler.disable()
    
    # 输出统计
    stats = pstats.Stats(profiler)
    stats.sort_stats('cumulative')
    stats.print_stats(10)  # 显示前10个最耗时的函数
    
    return result

调试检查清单

问题类型调试方法工具
输出不符合预期检查提示模板verbose=True
性能慢性能分析cProfile
成本高Token追踪OpenAI Callback
错误频繁错误日志logging
检索不准中间步骤追踪return_intermediate_steps

13. 生产部署

13.1 部署架构设计

典型部署架构

graph TB subgraph "客户端层" A1["Web前端"] A2["移动端"] A3["API客户端"] end subgraph "接入层" B1["负载均衡
Nginx/ALB"] B2["API网关
Kong/AWS API Gateway"] end subgraph "应用层" C1["LangChain服务
FastAPI/Flask"] C2["缓存服务
Redis"] C3["队列服务
Celery/RabbitMQ"] end subgraph "数据层" D1["向量数据库
Pinecone/Milvus"] D2["关系数据库
PostgreSQL"] D3["对象存储
S3/OSS"] end subgraph "外部服务" E1["LLM API
OpenAI/Anthropic"] E2["监控服务
LangSmith"] end A1 --> B1 A2 --> B1 A3 --> B2 B1 --> C1 B2 --> C1 C1 --> C2 C1 --> C3 C1 --> D1 C1 --> D2 C1 --> D3 C1 --> E1 C1 --> E2 style A1 fill:#e3f2fd style A2 fill:#e3f2fd style A3 fill:#e3f2fd style B1 fill:#fff3e0 style B2 fill:#fff3e0 style C1 fill:#e8f5e9 style C2 fill:#e8f5e9 style C3 fill:#e8f5e9 style D1 fill:#f3e5f5 style D2 fill:#f3e5f5 style D3 fill:#f3e5f5 style E1 fill:#fff9c4 style E2 fill:#fff9c4

使用LangServe部署

from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

# 创建FastAPI应用
app = FastAPI(
    title="LangChain API",
    version="1.0",
    description="LangChain服务API"
)

# 创建chain
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("回答问题: {question}")
chain = prompt | model

# 添加路由
add_routes(
    app,
    chain,
    path="/chat",
    enable_feedback_endpoint=True,  # 启用反馈
    enable_public_trace_link_endpoint=True,  # 启用追踪
)

# 健康检查
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# 运行
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  langchain-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
      - postgres
    restart: unless-stopped
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
  
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=langchain
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - langchain-api

volumes:
  redis_data:
  postgres_data:

13.2 安全与权限控制

API密钥管理

from fastapi import FastAPI, HTTPException, Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
from typing import Optional

security = HTTPBearer()

class APIKeyManager:
    def __init__(self):
        # 从环境变量或数据库加载API密钥
        self.valid_keys = set(os.getenv("API_KEYS", "").split(","))
    
    def verify_key(self, credentials: HTTPAuthorizationCredentials = Security(security)):
        """验证API密钥"""
        token = credentials.credentials
        
        if token not in self.valid_keys:
            raise HTTPException(
                status_code=401,
                detail="Invalid API key"
            )
        
        return token

api_key_manager = APIKeyManager()

@app.post("/chat")
async def chat(
    request: dict,
    api_key: str = Depends(api_key_manager.verify_key)
):
    """需要API密钥的端点"""
    # 处理请求
    pass

速率限制

from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# 创建限流器
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/chat")
@limiter.limit("10/minute")  # 每分钟10次
async def chat(request: Request):
    """限流的端点"""
    pass

输入验证

from pydantic import BaseModel, Field, validator
from typing import Optional

class ChatRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=1000)
    session_id: Optional[str] = None
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    
    @validator('question')
    def validate_question(cls, v):
        # 检查恶意输入
        forbidden_words = ['<script>', 'DROP TABLE']
        if any(word in v.upper() for word in forbidden_words):
            raise ValueError('Invalid input detected')
        return v

@app.post("/chat")
async def chat(request: ChatRequest):
    """验证输入的端点"""
    # 输入已经过验证
    pass

内容过滤

class ContentFilter:
    """内容过滤器"""
    
    def __init__(self):
        self.sensitive_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{16}\b',  # 信用卡号
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'  # 邮箱
        ]
    
    def filter_input(self, text: str) -> str:
        """过滤输入"""
        import re
        for pattern in self.sensitive_patterns:
            text = re.sub(pattern, '[REDACTED]', text)
        return text
    
    def filter_output(self, text: str) -> str:
        """过滤输出"""
        # 检查是否包含敏感信息
        return self.filter_input(text)

content_filter = ContentFilter()

@app.post("/chat")
async def chat(request: ChatRequest):
    # 过滤输入
    filtered_question = content_filter.filter_input(request.question)
    
    # 处理
    response = chain.invoke({"question": filtered_question})
    
    # 过滤输出
    filtered_response = content_filter.filter_output(response)
    
    return {"answer": filtered_response}

13.3 监控与日志

结构化日志

import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON格式handler
        handler = logging.StreamHandler()
        handler.setFormatter(self.JSONFormatter())
        self.logger.addHandler(handler)
    
    class JSONFormatter(logging.Formatter):
        def format(self, record):
            log_data = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName,
            }
            
            # 添加额外字段
            if hasattr(record, 'user_id'):
                log_data['user_id'] = record.user_id
            if hasattr(record, 'request_id'):
                log_data['request_id'] = record.request_id
            
            return json.dumps(log_data)
    
    def info(self, message, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.info(message, extra=extra)
    
    def error(self, message, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.error(message, extra=extra)

# 使用
logger = StructuredLogger("langchain_api")

@app.post("/chat")
async def chat(request: ChatRequest):
    request_id = str(uuid.uuid4())
    
    logger.info(
        "Chat request received",
        request_id=request_id,
        question_length=len(request.question)
    )
    
    try:
        response = chain.invoke({"question": request.question})
        
        logger.info(
            "Chat request completed",
            request_id=request_id,
            response_length=len(response)
        )
        
        return {"answer": response}
    
    except Exception as e:
        logger.error(
            "Chat request failed",
            request_id=request_id,
            error=str(e)
        )
        raise

Prometheus指标

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response

# 定义指标
request_count = Counter(
    'langchain_requests_total',
    'Total number of requests',
    ['endpoint', 'status']
)

request_duration = Histogram(
    'langchain_request_duration_seconds',
    'Request duration in seconds',
    ['endpoint']
)

active_requests = Gauge(
    'langchain_active_requests',
    'Number of active requests'
)

token_usage = Counter(
    'langchain_tokens_total',
    'Total tokens used',
    ['model']
)

# 中间件
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    active_requests.inc()
    
    start_time = time.time()
    
    try:
        response = await call_next(request)
        
        # 记录指标
        duration = time.time() - start_time
        request_duration.labels(endpoint=request.url.path).observe(duration)
        request_count.labels(
            endpoint=request.url.path,
            status=response.status_code
        ).inc()
        
        return response
    
    finally:
        active_requests.dec()

# 指标端点
@app.get("/metrics")
async def metrics():
    return Response(
        content=generate_latest(),
        media_type="text/plain"
    )

13.4 成本优化

智能模型选择

class CostOptimizer:
    """成本优化器"""
    
    def __init__(self):
        self.models = {
            "cheap": ChatOpenAI(model="gpt-3.5-turbo"),
            "expensive": ChatOpenAI(model="gpt-4")
        }
        
        self.cost_per_1k_tokens = {
            "gpt-3.5-turbo": 0.002,
            "gpt-4": 0.03
        }
    
    def select_model(self, task_complexity: str):
        """根据任务复杂度选择模型"""
        if task_complexity == "simple":
            return self.models["cheap"]
        else:
            return self.models["expensive"]
    
    def estimate_cost(self, text: str, model: str):
        """估算成本"""
        token_count = len(text.split()) * 1.3  # 粗略估算
        cost = (token_count / 1000) * self.cost_per_1k_tokens[model]
        return cost
    
    def should_use_cache(self, estimated_cost: float):
        """判断是否应该使用缓存"""
        return estimated_cost > 0.01  # 成本超过1分钱就缓存

optimizer = CostOptimizer()

@app.post("/chat")
async def chat(request: ChatRequest):
    # 评估任务复杂度
    complexity = "simple" if len(request.question) < 100 else "complex"
    
    # 选择模型
    model = optimizer.select_model(complexity)
    
    # 估算成本
    estimated_cost = optimizer.estimate_cost(request.question, model.model_name)
    
    # 决定是否缓存
    use_cache = optimizer.should_use_cache(estimated_cost)
    
    # 执行
    if use_cache:
        # 使用缓存逻辑
        pass
    
    response = model.invoke(request.question)
    return {"answer": response}

预算控制

class BudgetController:
    """预算控制器"""
    
    def __init__(self, daily_budget: float):
        self.daily_budget = daily_budget
        self.current_spending = 0.0
        self.last_reset = datetime.now().date()
    
    def check_budget(self, estimated_cost: float) -> bool:
        """检查预算"""
        # 每天重置
        if datetime.now().date() > self.last_reset:
            self.current_spending = 0.0
            self.last_reset = datetime.now().date()
        
        # 检查是否超预算
        if self.current_spending + estimated_cost > self.daily_budget:
            return False
        
        return True
    
    def record_spending(self, actual_cost: float):
        """记录支出"""
        self.current_spending += actual_cost
    
    def get_remaining_budget(self) -> float:
        """获取剩余预算"""
        return self.daily_budget - self.current_spending

budget_controller = BudgetController(daily_budget=100.0)

@app.post("/chat")
async def chat(request: ChatRequest):
    # 估算成本
    estimated_cost = 0.01  # 估算值
    
    # 检查预算
    if not budget_controller.check_budget(estimated_cost):
        raise HTTPException(
            status_code=429,
            detail="Daily budget exceeded"
        )
    
    # 执行请求
    with get_openai_callback() as cb:
        response = chain.invoke({"question": request.question})
        
        # 记录实际成本
        budget_controller.record_spending(cb.total_cost)
    
    return {"answer": response}

部署检查清单

检查项说明优先级
环境变量API密钥等敏感信息⭐⭐⭐⭐⭐
错误处理完善的异常捕获⭐⭐⭐⭐⭐
日志记录结构化日志⭐⭐⭐⭐
监控告警Prometheus + Grafana⭐⭐⭐⭐
速率限制防止滥用⭐⭐⭐⭐
缓存策略降低成本⭐⭐⭐
负载均衡高可用⭐⭐⭐
备份恢复数据安全⭐⭐⭐

14. 高级特性

14.1 Callbacks回调机制

Callbacks提供了钩子函数,可以在LangChain执行的各个阶段插入自定义逻辑。

回调事件类型

from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult

class CustomCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器"""
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        """LLM开始调用"""
        print(f"LLM开始: {prompts[0][:50]}...")
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        """LLM调用结束"""
        print(f"LLM结束: {response.generations[0][0].text[:50]}...")
    
    def on_llm_error(self, error, **kwargs):
        """LLM调用错误"""
        print(f"LLM错误: {error}")
    
    def on_chain_start(self, serialized, inputs, **kwargs):
        """Chain开始"""
        print(f"Chain开始: {serialized.get('name', 'Unknown')}")
    
    def on_chain_end(self, outputs, **kwargs):
        """Chain结束"""
        print(f"Chain结束: {outputs}")
    
    def on_chain_error(self, error, **kwargs):
        """Chain错误"""
        print(f"Chain错误: {error}")
    
    def on_tool_start(self, serialized, input_str, **kwargs):
        """Tool开始"""
        print(f"Tool开始: {serialized['name']}")
    
    def on_tool_end(self, output, **kwargs):
        """Tool结束"""
        print(f"Tool结束: {output[:50]}...")
    
    def on_tool_error(self, error, **kwargs):
        """Tool错误"""
        print(f"Tool错误: {error}")
    
    def on_agent_action(self, action, **kwargs):
        """Agent执行动作"""
        print(f"Agent动作: {action.tool} - {action.tool_input}")
    
    def on_agent_finish(self, finish, **kwargs):
        """Agent完成"""
        print(f"Agent完成: {finish.return_values}")

# 使用回调
handler = CustomCallbackHandler()
chain.invoke(input_data, callbacks=[handler])

实用回调示例

1. 成本追踪回调

class CostTrackingCallback(BaseCallbackHandler):
    """追踪成本"""
    
    def __init__(self):
        self.total_tokens = 0
        self.total_cost = 0.0
        self.cost_per_1k = {
            "gpt-3.5-turbo": 0.002,
            "gpt-4": 0.03
        }
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        # 计算token数
        if response.llm_output:
            tokens = response.llm_output.get("token_usage", {})
            total_tokens = tokens.get("total_tokens", 0)
            model = response.llm_output.get("model_name", "gpt-3.5-turbo")
            
            # 计算成本
            cost = (total_tokens / 1000) * self.cost_per_1k.get(model, 0.002)
            
            self.total_tokens += total_tokens
            self.total_cost += cost
    
    def get_summary(self):
        return {
            "total_tokens": self.total_tokens,
            "total_cost": f"${self.total_cost:.4f}"
        }

2. 延迟监控回调

import time

class LatencyCallback(BaseCallbackHandler):
    """监控延迟"""
    
    def __init__(self):
        self.start_times = {}
        self.latencies = []
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        run_id = kwargs.get("run_id")
        self.start_times[run_id] = time.time()
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        run_id = kwargs.get("run_id")
        if run_id in self.start_times:
            latency = time.time() - self.start_times[run_id]
            self.latencies.append(latency)
            
            if latency > 5.0:
                print(f"⚠️ 慢查询警告: {latency:.2f}秒")
    
    def get_stats(self):
        if not self.latencies:
            return {}
        
        return {
            "avg_latency": sum(self.latencies) / len(self.latencies),
            "max_latency": max(self.latencies),
            "min_latency": min(self.latencies)
        }

14.2 Streaming流式输出

流式输出提供实时响应,改善用户体验。

基础流式输出

from langchain_openai import ChatOpenAI

# 启用流式
model = ChatOpenAI(streaming=True)

# 流式调用
for chunk in model.stream("写一首诗"):
    print(chunk.content, end="", flush=True)

带回调的流式输出

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

class CustomStreamingCallback(StreamingStdOutCallbackHandler):
    """自定义流式回调"""
    
    def __init__(self):
        super().__init__()
        self.tokens = []
    
    def on_llm_new_token(self, token: str, **kwargs):
        """接收新token"""
        self.tokens.append(token)
        print(token, end="", flush=True)
        
        # 可以在这里添加自定义逻辑
        # 例如:发送到WebSocket
    
    def on_llm_end(self, response: LLMResult, **kwargs):
        """流式结束"""
        print("\n[流式输出完成]")

# 使用
callback = CustomStreamingCallback()
model = ChatOpenAI(streaming=True, callbacks=[callback])
model.invoke("讲个故事")

LCEL流式输出

from langchain_core.runnables import RunnablePassthrough

# 创建流式chain
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 流式调用
for chunk in chain.stream("什么是机器学习?"):
    print(chunk, end="", flush=True)

异步流式输出

import asyncio

async def async_stream():
    """异步流式输出"""
    async for chunk in chain.astream("解释量子计算"):
        print(chunk, end="", flush=True)
        await asyncio.sleep(0.01)  # 控制输出速度

# 运行
asyncio.run(async_stream())

WebSocket流式输出

from fastapi import WebSocket
from fastapi.responses import StreamingResponse

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # 接收消息
            data = await websocket.receive_text()
            
            # 流式响应
            async for chunk in chain.astream({"question": data}):
                await websocket.send_text(chunk)
            
            # 发送结束标记
            await websocket.send_text("[DONE]")
    
    except Exception as e:
        await websocket.close()

# HTTP流式端点
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        async for chunk in chain.astream({"question": request.question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

14.3 Multi-Agent系统

Multi-Agent实现多个Agent协作完成复杂任务。

基础Multi-Agent架构

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool

class MultiAgentSystem:
    """多Agent系统"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4")
        
        # 创建专家Agent
        self.research_agent = self._create_research_agent()
        self.writing_agent = self._create_writing_agent()
        self.review_agent = self._create_review_agent()
    
    def _create_research_agent(self):
        """研究Agent"""
        tools = [
            Tool(
                name="Search",
                func=self.search_tool,
                description="搜索信息"
            )
        ]
        
        prompt = hub.pull("hwchase17/openai-functions-agent")
        agent = create_openai_functions_agent(self.llm, tools, prompt)
        return AgentExecutor(agent=agent, tools=tools)
    
    def _create_writing_agent(self):
        """写作Agent"""
        tools = [
            Tool(
                name="Write",
                func=self.write_tool,
                description="写作内容"
            )
        ]
        
        prompt = hub.pull("hwchase17/openai-functions-agent")
        agent = create_openai_functions_agent(self.llm, tools, prompt)
        return AgentExecutor(agent=agent, tools=tools)
    
    def _create_review_agent(self):
        """审查Agent"""
        tools = [
            Tool(
                name="Review",
                func=self.review_tool,
                description="审查内容"
            )
        ]
        
        prompt = hub.pull("hwchase17/openai-functions-agent")
        agent = create_openai_functions_agent(self.llm, tools, prompt)
        return AgentExecutor(agent=agent, tools=tools)
    
    def execute_task(self, task: str):
        """执行任务"""
        # 1. 研究阶段
        research_result = self.research_agent.invoke({
            "input": f"研究以下主题: {task}"
        })
        
        # 2. 写作阶段
        writing_result = self.writing_agent.invoke({
            "input": f"基于以下研究写文章: {research_result['output']}"
        })
        
        # 3. 审查阶段
        review_result = self.review_agent.invoke({
            "input": f"审查以下文章: {writing_result['output']}"
        })
        
        return review_result["output"]
    
    def search_tool(self, query: str) -> str:
        """搜索工具"""
        # 实现搜索逻辑
        return f"搜索结果: {query}"
    
    def write_tool(self, content: str) -> str:
        """写作工具"""
        # 实现写作逻辑
        return f"写作内容: {content}"
    
    def review_tool(self, content: str) -> str:
        """审查工具"""
        # 实现审查逻辑
        return f"审查结果: {content}"

# 使用
multi_agent = MultiAgentSystem()
result = multi_agent.execute_task("人工智能的未来")

使用LangGraph构建复杂工作流

from langgraph.graph import StateGraph, END

class AgentState(TypedDict):
    """Agent状态"""
    task: str
    research_done: bool
    writing_done: bool
    review_done: bool
    final_output: str

def research_node(state: AgentState):
    """研究节点"""
    # 执行研究
    result = research_agent.invoke({"input": state["task"]})
    return {"research_done": True, "research_result": result}

def writing_node(state: AgentState):
    """写作节点"""
    # 执行写作
    result = writing_agent.invoke({"input": state["research_result"]})
    return {"writing_done": True, "writing_result": result}

def review_node(state: AgentState):
    """审查节点"""
    # 执行审查
    result = review_agent.invoke({"input": state["writing_result"]})
    return {"review_done": True, "final_output": result}

# 构建图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("research", research_node)
workflow.add_node("writing", writing_node)
workflow.add_node("review", review_node)

# 添加边
workflow.add_edge("research", "writing")
workflow.add_edge("writing", "review")
workflow.add_edge("review", END)

# 设置入口
workflow.set_entry_point("research")

# 编译
app = workflow.compile()

# 执行
result = app.invoke({"task": "写一篇关于AI的文章"})

14.4 自定义组件开发

自定义Retriever

from langchain.schema import BaseRetriever, Document
from typing import List

class CustomRetriever(BaseRetriever):
    """自定义检索器"""
    
    def __init__(self, data_source):
        self.data_source = data_source
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        """同步检索"""
        # 自定义检索逻辑
        results = self.data_source.search(query)
        
        return [
            Document(
                page_content=result["content"],
                metadata=result["metadata"]
            )
            for result in results
        ]
    
    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        """异步检索"""
        # 异步检索逻辑
        return self._get_relevant_documents(query)

自定义Tool

from langchain.tools import BaseTool
from pydantic import Field

class CustomTool(BaseTool):
    """自定义工具"""
    
    name: str = "custom_tool"
    description: str = "这是一个自定义工具"
    
    # 自定义参数
    api_key: str = Field(description="API密钥")
    
    def _run(self, query: str) -> str:
        """同步执行"""
        # 实现工具逻辑
        return f"处理结果: {query}"
    
    async def _arun(self, query: str) -> str:
        """异步执行"""
        # 异步实现
        return self._run(query)

自定义OutputParser

from langchain.schema import BaseOutputParser

class CustomOutputParser(BaseOutputParser):
    """自定义输出解析器"""
    
    def parse(self, text: str) -> dict:
        """解析输出"""
        # 自定义解析逻辑
        lines = text.strip().split("\n")
        
        return {
            "title": lines[0] if lines else "",
            "content": "\n".join(lines[1:]) if len(lines) > 1 else ""
        }
    
    def get_format_instructions(self) -> str:
        """格式说明"""
        return """
        请按以下格式输出:
        第一行:标题
        后续行:内容
        """

自定义Memory

from langchain.memory import BaseMemory
from pydantic import Field

class CustomMemory(BaseMemory):
    """自定义记忆"""
    
    messages: List[str] = Field(default_factory=list)
    memory_key: str = "history"
    
    @property
    def memory_variables(self) -> List[str]:
        """记忆变量"""
        return [self.memory_key]
    
    def load_memory_variables(self, inputs: dict) -> dict:
        """加载记忆"""
        return {self.memory_key: "\n".join(self.messages)}
    
    def save_context(self, inputs: dict, outputs: dict):
        """保存上下文"""
        self.messages.append(f"Q: {inputs.get('input', '')}")
        self.messages.append(f"A: {outputs.get('output', '')}")
        
        # 限制记忆大小
        if len(self.messages) > 10:
            self.messages = self.messages[-10:]
    
    def clear(self):
        """清除记忆"""
        self.messages = []

15. 常见问题与解决方案

15.1 模型调用问题

Q1: API调用超时

问题:模型调用经常超时

解决方案

from langchain_openai import ChatOpenAI

# 增加超时时间
model = ChatOpenAI(
    timeout=60,  # 60秒超时
    max_retries=3,  # 自动重试3次
    request_timeout=30  # 请求超时30秒
)

# 使用异步避免阻塞
async def call_with_timeout():
    try:
        result = await asyncio.wait_for(
            model.ainvoke("问题"),
            timeout=30.0
        )
        return result
    except asyncio.TimeoutError:
        return "请求超时,请稍后重试"

Q2: 速率限制错误

问题:频繁遇到Rate Limit错误

解决方案

from tenacity import retry, wait_exponential, stop_after_attempt

@retry(
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5)
)
def call_with_retry():
    return model.invoke("问题")

# 或使用队列控制速率
import asyncio
from asyncio import Semaphore

class RateLimiter:
    def __init__(self, max_per_minute=60):
        self.semaphore = Semaphore(max_per_minute)
        self.reset_time = time.time() + 60
    
    async def acquire(self):
        # 每分钟重置
        if time.time() > self.reset_time:
            self.semaphore = Semaphore(self.max_per_minute)
            self.reset_time = time.time() + 60
        
        await self.semaphore.acquire()

15.2 内存管理问题

Q3: 内存占用过高

问题:长时间运行后内存占用过高

解决方案

# 1. 使用ConversationSummaryMemory
from langchain.memory import ConversationSummaryMemory

memory = ConversationSummaryMemory(llm=model)

# 2. 定期清理
if len(memory.chat_memory.messages) > 100:
    memory.chat_memory.messages = memory.chat_memory.messages[-50:]

# 3. 使用向量存储记忆
from langchain.memory import VectorStoreRetrieverMemory

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)

15.3 性能瓶颈

Q4: 检索速度慢

问题:向量检索速度慢

解决方案

# 1. 使用更快的向量数据库
from langchain_community.vectorstores import FAISS

vectorstore = FAISS.from_documents(documents, embeddings)

# 2. 减少检索数量
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})  # 只检索3个

# 3. 使用索引优化
vectorstore.save_local("faiss_index")  # 保存索引
vectorstore = FAISS.load_local("faiss_index", embeddings)  # 加载索引

15.4 错误处理

Q5: 解析错误

问题:OutputParser经常解析失败

解决方案

from langchain.output_parsers import OutputFixingParser

# 使用修复解析器
base_parser = PydanticOutputParser(pydantic_object=MyModel)
fixing_parser = OutputFixingParser.from_llm(
    parser=base_parser,
    llm=model
)

# 自动修复格式错误
try:
    result = fixing_parser.parse(output)
except Exception as e:
    # 降级处理
    result = {"error": str(e), "raw_output": output}

常见问题速查表

问题原因解决方案
API超时网络慢/模型慢增加超时、异步调用
速率限制调用过频重试机制、速率控制
内存溢出记忆过多使用Summary/清理
检索慢数据量大优化索引、减少k值
解析失败格式不对使用FixingParser
成本高模型选择不当智能降级、缓存

16. 面试题精选

16.1 基础概念题

Q1: 什么是LangChain?它解决了什么问题?

答案: LangChain是一个用于开发由大语言模型驱动的应用程序的开源框架。它主要解决以下问题:

  1. 模型抽象:统一不同LLM提供商的API接口,实现模型无关性
  2. 组件化开发:提供可复用的模块(Prompts、Chains、Agents等)
  3. 数据增强:通过RAG技术结合外部知识库
  4. 工作流编排:简化复杂AI应用的开发流程
  5. 可观测性:内置调试和监控能力

核心价值:降低LLM应用开发门槛,提高开发效率,实现快速迭代。


Q2: LangChain的核心组件有哪些?各自的作用是什么?

答案

组件作用典型应用
Models模型抽象层统一调用不同LLM
Prompts提示管理模板化、参数化提示
Memory记忆系统维护对话上下文
Chains链式调用组合多个步骤
Agents智能代理自主决策和工具使用
Data Connection数据连接文档加载和检索
Callbacks回调机制监控和日志

关键点:这些组件可以独立使用,也可以组合使用,体现了模块化设计思想。


Q3: LLM和Chat Models有什么区别?

答案

LLM(Language Model)

  • 输入:纯文本字符串
  • 输出:文本补全
  • 特点:无状态,单轮交互
  • 适用:文本生成、补全任务

Chat Models

  • 输入:消息列表(System/Human/AI Message)
  • 输出:AI消息
  • 特点:支持角色设定,多轮对话
  • 适用:对话系统、助手应用

选择建议

  • 简单文本生成 → LLM
  • 对话应用 → Chat Models
  • 需要函数调用 → Chat Models(支持Function Calling)

Q4: 什么是RAG?为什么需要RAG?

答案

**RAG(Retrieval-Augmented Generation)**是检索增强生成技术,通过检索外部知识库来增强LLM的回答能力。

为什么需要RAG

  1. 知识时效性:LLM训练数据有截止日期,RAG可以获取最新信息
  2. 领域知识:LLM可能不了解特定领域或企业内部知识
  3. 减少幻觉:基于检索到的事实回答,提高准确性
  4. 可追溯性:可以引用具体来源,增强可信度
  5. 成本效益:相比Fine-tuning,RAG更新知识成本更低

核心流程

用户问题 → 向量化 → 检索相关文档 → 构建提示 → LLM生成答案

Q5: Memory在LangChain中的作用是什么?有哪些类型?

答案

Memory的作用

  • 维护对话历史和上下文
  • 实现连贯的多轮交互
  • 记住用户偏好和信息

主要类型

  1. ConversationBufferMemory:完整保存所有对话

    • 优点:信息完整
    • 缺点:占用空间大
  2. ConversationBufferWindowMemory:只保留最近N轮

    • 优点:控制大小
    • 缺点:丢失早期信息
  3. ConversationSummaryMemory:总结压缩历史

    • 优点:节省Token
    • 缺点:可能丢失细节
  4. VectorStoreMemory:向量化存储,智能检索

    • 优点:大规模历史管理
    • 缺点:需要向量化成本

选择策略

  • 短对话 → Buffer
  • 长对话 → Summary或Vector Store
  • 成本敏感 → Window或Summary

16.2 架构设计题

Q6: 如何设计一个企业级的RAG问答系统?

答案

系统架构

用户层 → API网关 → 应用层 → 数据层
         ↓
      缓存层 → 向量数据库
         ↓
      监控层 → LLM服务

关键设计要点

  1. 数据层

    • 文档预处理:清洗、格式化
    • 智能分块:根据文档结构分割
    • 元数据管理:记录来源、时间、版本
    • 增量更新:支持文档的增删改
  2. 检索层

    • 混合检索:向量检索 + 关键词检索
    • 重排序:使用Reranker提高精确度
    • 缓存策略:热点问题缓存
    • 多路召回:提高召回率
  3. 生成层

    • 提示优化:结构化提示模板
    • 上下文压缩:只保留相关部分
    • 流式输出:提升用户体验
    • 答案验证:检查答案质量
  4. 监控层

    • 性能监控:延迟、吞吐量
    • 成本追踪:Token使用量
    • 质量评估:准确率、相关性
    • 异常告警:错误率监控
  5. 安全层

    • 权限控制:基于角色的访问控制
    • 内容过滤:敏感信息过滤
    • 审计日志:记录所有操作
    • 数据加密:传输和存储加密

技术选型

  • 向量数据库:Pinecone(云)或Milvus(自建)
  • LLM:GPT-4(高质量)+ GPT-3.5(成本优化)
  • 缓存:Redis
  • 监控:LangSmith + Prometheus

Q7: Agent和Chain有什么区别?各自适用什么场景?

答案

Chain(链)

  • 特点:预定义的执行流程,确定性
  • 执行方式:按固定顺序执行
  • 适用场景
    • 流程明确的任务
    • 需要可预测的行为
    • 对成本敏感的场景

Agent(代理)

  • 特点:自主决策,动态选择工具
  • 执行方式:根据任务动态规划
  • 适用场景
    • 复杂、开放式任务
    • 需要多步推理
    • 工具选择不确定

对比示例

任务推荐方案理由
文档总结Chain流程固定
数据分析Agent需要动态选择工具
翻译Chain单一任务
复杂问答Agent可能需要搜索、计算等

混合使用

  • Agent内部可以使用Chain
  • Chain可以调用Agent作为一个步骤

Q8: 如何优化LangChain应用的性能?

答案

1. 模型层优化

  • 模型选择:根据任务复杂度选择合适模型
    • 简单任务:GPT-3.5-turbo
    • 复杂任务:GPT-4
  • 参数调优
    • temperature:降低随机性提高稳定性
    • max_tokens:限制输出长度控制成本
  • 批量处理:合并多个请求减少调用次数

2. 检索优化

  • 索引优化
    • 合理的分块大小(500-1500字符)
    • 适当的重叠(10-20%)
  • 检索策略
    • 使用MMR平衡相关性和多样性
    • 重排序提高精确度
  • 缓存
    • 向量缓存:避免重复向量化
    • 结果缓存:缓存常见问题答案

3. 并发优化

# 异步调用
async def process_batch(questions):
    tasks = [chain.ainvoke(q) for q in questions]
    return await asyncio.gather(*tasks)

# 并行检索
from langchain_core.runnables import RunnableParallel
parallel_chain = RunnableParallel(
    summary=summary_chain,
    keywords=keyword_chain
)

4. 成本优化

  • 智能降级:优先使用便宜模型,必要时升级
  • 提示压缩:减少不必要的上下文
  • 缓存策略:避免重复调用
  • Token限制:设置合理的max_tokens

5. 监控优化

  • 使用LangSmith追踪性能瓶颈
  • 设置告警阈值
  • 定期分析日志优化

性能指标

  • 响应时间:< 3秒
  • Token使用:优化到最小
  • 准确率:> 90%
  • 成本:控制在预算内

Q9: 如何处理LangChain应用中的错误和异常?

答案

1. 模型调用错误

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def call_llm_with_retry(prompt):
    try:
        return model.invoke(prompt)
    except Exception as e:
        logger.error(f"LLM调用失败: {e}")
        raise

# 使用Fallback
from langchain_core.runnables import RunnableWithFallbacks

chain_with_fallback = primary_chain.with_fallbacks(
    [backup_chain]
)

2. 解析错误

from langchain.output_parsers import OutputFixingParser

# 自动修复解析错误
fixing_parser = OutputFixingParser.from_llm(
    parser=base_parser,
    llm=model
)

try:
    result = fixing_parser.parse(output)
except Exception as e:
    # 降级处理
    result = fallback_parse(output)

3. 超时处理

# 设置超时
model = ChatOpenAI(
    timeout=30,  # 30秒超时
    max_retries=3
)

# Agent超时
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    max_execution_time=60  # 60秒超时
)

4. 资源限制

# Token限制
from langchain.callbacks import get_openai_callback

with get_openai_callback() as cb:
    result = chain.invoke(input)
    if cb.total_tokens > 10000:
        logger.warning("Token使用过多")

# 速率限制
from langchain.llms import OpenAI

llm = OpenAI(
    max_tokens_per_minute=90000,
    max_requests_per_minute=3500
)

5. 错误监控

from langchain.callbacks import StdOutCallbackHandler

class ErrorTrackingHandler(StdOutCallbackHandler):
    def on_llm_error(self, error, **kwargs):
        logger.error(f"LLM错误: {error}")
        # 发送告警
        send_alert(error)
    
    def on_chain_error(self, error, **kwargs):
        logger.error(f"Chain错误: {error}")

# 使用
chain.invoke(input, callbacks=[ErrorTrackingHandler()])

错误处理最佳实践

  1. 优雅降级:主服务失败时使用备用方案
  2. 详细日志:记录完整的错误上下文
  3. 用户友好:向用户展示友好的错误信息
  4. 自动恢复:实现重试和自动修复机制
  5. 监控告警:及时发现和处理问题

16.3 实战应用题

Q10: 如何实现一个支持多轮对话的智能客服系统?

答案

系统设计

from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI

class CustomerServiceBot:
    def __init__(self):
        # 1. 初始化模型
        self.llm = ChatOpenAI(
            model="gpt-3.5-turbo",
            temperature=0.7
        )
        
        # 2. 配置记忆(保留最近10轮对话)
        self.memory = ConversationBufferWindowMemory(
            k=10,
            return_messages=True
        )
        
        # 3. 系统提示
        self.system_prompt = """
        你是一个专业的客服助手。
        职责:
        - 友好、耐心地回答用户问题
        - 如果不确定,诚实告知并提供替代方案
        - 记住对话历史,提供连贯的服务
        """
        
        # 4. 创建对话链
        self.conversation = ConversationChain(
            llm=self.llm,
            memory=self.memory,
            verbose=True
        )
    
    def chat(self, user_input: str) -> str:
        """处理用户输入"""
        try:
            response = self.conversation.predict(input=user_input)
            return response
        except Exception as e:
            return "抱歉,系统出现问题,请稍后再试。"
    
    def reset(self):
        """重置对话"""
        self.memory.clear()

# 使用
bot = CustomerServiceBot()
print(bot.chat("你好,我想咨询退款问题"))
print(bot.chat("我的订单号是12345"))
print(bot.chat("什么时候能退款?"))

增强功能

  1. 意图识别
from langchain.chains.router import MultiPromptChain

# 根据意图路由到不同的处理链
intent_chains = {
    "退款": refund_chain,
    "咨询": inquiry_chain,
    "投诉": complaint_chain
}
  1. 知识库集成
# 结合RAG检索FAQ
retriever = vectorstore.as_retriever()
qa_chain = RetrievalQA.from_chain_type(
    llm=self.llm,
    retriever=retriever
)
  1. 情感分析
def analyze_sentiment(text):
    # 检测用户情绪
    if is_negative(text):
        # 升级到人工客服
        escalate_to_human()

Q11: 如何构建一个代码审查助手?

答案

完整实现

from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

# 1. 定义输出结构
class CodeReview(BaseModel):
    issues: list[str] = Field(description="发现的问题列表")
    suggestions: list[str] = Field(description="改进建议")
    security_concerns: list[str] = Field(description="安全隐患")
    performance_tips: list[str] = Field(description="性能优化建议")
    rating: int = Field(description="代码质量评分(1-10)")

# 2. 创建解析器
parser = PydanticOutputParser(pydantic_object=CodeReview)

# 3. 构建提示
prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个资深的代码审查专家。
    请从以下维度审查代码:
    1. 代码质量和可读性
    2. 潜在的Bug和错误
    3. 安全漏洞
    4. 性能问题
    5. 最佳实践
    
    {format_instructions}
    """),
    ("human", "请审查以下代码:\n\n{code}")
])

# 4. 创建链
model = ChatOpenAI(model="gpt-4", temperature=0)
chain = prompt | model | parser

# 5. 使用
code = """
def calculate_total(items):
    total = 0
    for item in items:
        total = total + item['price']
    return total
"""

review = chain.invoke({
    "code": code,
    "format_instructions": parser.get_format_instructions()
})

print(f"评分: {review.rating}/10")
print(f"问题: {review.issues}")
print(f"建议: {review.suggestions}")

高级功能

  1. 多语言支持
language_prompts = {
    "python": python_review_prompt,
    "javascript": js_review_prompt,
    "java": java_review_prompt
}

def review_code(code, language):
    prompt = language_prompts[language]
    return chain.invoke({"code": code, "prompt": prompt})
  1. 增量审查
def review_diff(old_code, new_code):
    """只审查变更部分"""
    diff = generate_diff(old_code, new_code)
    return chain.invoke({"code": diff})
  1. 自动修复
def auto_fix(code, issues):
    """根据问题自动生成修复建议"""
    fix_prompt = f"""
    代码:{code}
    问题:{issues}
    请提供修复后的代码。
    """
    return model.invoke(fix_prompt)

Q12: 如何实现一个文档问答系统,支持多种文档格式?

答案

系统架构

from langchain_community.document_loaders import (
    PyPDFLoader, Docx2txtLoader, TextLoader, UnstructuredMarkdownLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA

class DocumentQASystem:
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = None
        self.qa_chain = None
        
        # 文档加载器映射
        self.loaders = {
            '.pdf': PyPDFLoader,
            '.docx': Docx2txtLoader,
            '.txt': TextLoader,
            '.md': UnstructuredMarkdownLoader
        }
    
    def load_documents(self, file_paths: list[str]):
        """加载多种格式文档"""
        all_docs = []
        
        for file_path in file_paths:
            # 根据文件扩展名选择加载器
            ext = os.path.splitext(file_path)[1]
            loader_class = self.loaders.get(ext)
            
            if loader_class:
                loader = loader_class(file_path)
                docs = loader.load()
                all_docs.extend(docs)
            else:
                print(f"不支持的文件格式: {ext}")
        
        return all_docs
    
    def build_index(self, documents):
        """构建向量索引"""
        # 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        chunks = text_splitter.split_documents(documents)
        
        # 创建向量存储
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory="./doc_qa_db"
        )
        
        # 创建QA链
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=ChatOpenAI(model="gpt-3.5-turbo"),
            retriever=self.vectorstore.as_retriever(
                search_kwargs={"k": 4}
            ),
            return_source_documents=True
        )
    
    def query(self, question: str):
        """查询"""
        if not self.qa_chain:
            return "请先加载文档并构建索引"
        
        result = self.qa_chain.invoke({"query": question})
        
        return {
            "answer": result["result"],
            "sources": [
                {
                    "content": doc.page_content[:200],
                    "source": doc.metadata.get("source", "未知")
                }
                for doc in result["source_documents"]
            ]
        }

# 使用示例
qa_system = DocumentQASystem()

# 加载文档
docs = qa_system.load_documents([
    "manual.pdf",
    "guide.docx",
    "readme.md"
])

# 构建索引
qa_system.build_index(docs)

# 查询
result = qa_system.query("如何安装软件?")
print(result["answer"])
print("来源:", result["sources"])

优化点

  1. 增量更新
def add_documents(self, new_docs):
    """增量添加文档"""
    chunks = self.text_splitter.split_documents(new_docs)
    self.vectorstore.add_documents(chunks)
  1. 多语言支持
def detect_language(text):
    # 检测文档语言
    return langdetect.detect(text)

def query_multilingual(question, language):
    # 根据语言选择合适的模型
    if language == "zh":
        llm = ChatOpenAI(model="gpt-3.5-turbo")
    else:
        llm = ChatOpenAI(model="gpt-3.5-turbo")
  1. 答案质量评估
def evaluate_answer(question, answer, sources):
    """评估答案质量"""
    evaluator = load_evaluator("criteria", criteria="relevance")
    score = evaluator.evaluate_strings(
        prediction=answer,
        input=question,
        reference="\n".join([s["content"] for s in sources])
    )
    return score["score"]

16.4 性能优化题

Q13: 如何降低LangChain应用的Token成本?

答案

1. 模型选择策略

class CostOptimizedChain:
    def __init__(self):
        # 便宜模型用于简单任务
        self.cheap_model = ChatOpenAI(model="gpt-3.5-turbo")
        # 贵模型用于复杂任务
        self.expensive_model = ChatOpenAI(model="gpt-4")
    
    def invoke(self, input_text):
        # 根据复杂度选择模型
        if self.is_simple_task(input_text):
            return self.cheap_model.invoke(input_text)
        else:
            return self.expensive_model.invoke(input_text)
    
    def is_simple_task(self, text):
        """判断任务复杂度"""
        # 简单规则:长度、关键词等
        return len(text) < 100 and not any(
            keyword in text for keyword in ["分析", "推理", "复杂"]
        )

2. 提示压缩

from langchain.prompts import PromptTemplate

# ❌ 冗长的提示
bad_prompt = """
你是一个非常专业的助手,拥有丰富的经验...
(大量无用描述)
请回答:{question}
"""

# ✅ 精简的提示
good_prompt = """
角色:专业助手
任务:{question}
要求:简洁准确
"""

# 动态压缩上下文
def compress_context(context, max_tokens=500):
    """压缩上下文到指定Token数"""
    if count_tokens(context) > max_tokens:
        # 使用总结模型压缩
        summary_chain = load_summarize_chain(llm, chain_type="stuff")
        return summary_chain.run([Document(page_content=context)])
    return context

3. 缓存策略

from langchain.cache import SQLiteCache
from langchain.globals import set_llm_cache
import hashlib

# 全局缓存
set_llm_cache(SQLiteCache(database_path=".langchain.db"))

# 自定义缓存
class SmartCache:
    def __init__(self):
        self.cache = {}
    
    def get_cache_key(self, prompt):
        """生成缓存键"""
        return hashlib.md5(prompt.encode()).hexdigest()
    
    def get(self, prompt):
        key = self.get_cache_key(prompt)
        return self.cache.get(key)
    
    def set(self, prompt, result):
        key = self.get_cache_key(prompt)
        self.cache[key] = result

cache = SmartCache()

def cached_invoke(prompt):
    # 检查缓存
    cached_result = cache.get(prompt)
    if cached_result:
        return cached_result
    
    # 调用模型
    result = model.invoke(prompt)
    
    # 保存缓存
    cache.set(prompt, result)
    return result

4. 批量处理

# ❌ 逐个处理
for question in questions:
    answer = model.invoke(question)  # 多次API调用

# ✅ 批量处理
answers = model.batch(questions)  # 一次API调用

5. Token计数和限制

from langchain.callbacks import get_openai_callback
import tiktoken

def count_tokens(text, model="gpt-3.5-turbo"):
    """精确计算Token数"""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def optimize_prompt(prompt, max_tokens=1000):
    """优化提示长度"""
    if count_tokens(prompt) > max_tokens:
        # 截断或总结
        return prompt[:max_tokens]
    return prompt

# 追踪成本
with get_openai_callback() as cb:
    result = chain.invoke(input)
    print(f"使用Token: {cb.total_tokens}")
    print(f"成本: ${cb.total_cost:.4f}")
    
    # 设置预算告警
    if cb.total_cost > 1.0:
        logger.warning("成本超过预算!")

成本优化检查清单

优化项方法预期节省
模型选择简单任务用便宜模型50-80%
提示优化删除冗余内容20-40%
缓存缓存常见查询30-70%
批量处理合并请求10-20%
上下文压缩总结长文本30-50%

Q14: 如何提高RAG系统的检索准确率?

答案

1. 优化文档分块

# 策略1:语义分块
from langchain.text_splitter import SemanticChunker

semantic_splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile",
    breakpoint_threshold_amount=95
)

# 策略2:根据文档结构分块
from langchain.text_splitter import MarkdownHeaderTextSplitter

markdown_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "Header 1"),
        ("##", "Header 2"),
    ]
)

# 策略3:混合分块
def hybrid_split(documents):
    # 先按结构分割
    structural_chunks = markdown_splitter.split_documents(documents)
    # 再按大小分割
    final_chunks = text_splitter.split_documents(structural_chunks)
    return final_chunks

2. 混合检索

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

# 向量检索
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# BM25关键词检索
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 5

# 混合检索
ensemble_retriever = EnsembleRetriever(
    retrievers=[vector_retriever, bm25_retriever],
    weights=[0.7, 0.3]  # 向量70%,关键词30%
)

results = ensemble_retriever.get_relevant_documents("查询")

3. 重排序

from langchain.retrievers.document_compressors import CohereRerank
from langchain.retrievers import ContextualCompressionRetriever

# 初始检索更多候选
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 20})

# Cohere重排序
reranker = CohereRerank(model="rerank-english-v2.0", top_n=5)

# 组合
compression_retriever = ContextualCompressionRetriever(
    base_compressor=reranker,
    base_retriever=base_retriever
)

# 检索 → 重排序 → 返回top 5
results = compression_retriever.get_relevant_documents("查询")

4. 查询优化

from langchain.retrievers.multi_query import MultiQueryRetriever

# 自动生成多个查询变体
multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=base_retriever,
    llm=model
)

# 单个查询 → 生成3-5个变体 → 分别检索 → 合并去重
results = multi_query_retriever.get_relevant_documents(
    "机器学习的应用"
)
# 可能生成的变体:
# - "机器学习有哪些应用场景?"
# - "ML在实际中如何使用?"
# - "机器学习的实践案例"

5. 元数据过滤

# 添加丰富的元数据
for doc in documents:
    doc.metadata.update({
        "source": "manual.pdf",
        "category": "技术文档",
        "date": "2024-01-01",
        "language": "zh",
        "importance": "high"
    })

# 基于元数据过滤
retriever = vectorstore.as_retriever(
    search_kwargs={
        "k": 5,
        "filter": {
            "category": "技术文档",
            "language": "zh"
        }
    }
)

6. 评估和迭代

from langchain.evaluation import load_evaluator

def evaluate_retrieval(retriever, test_cases):
    """评估检索质量"""
    evaluator = load_evaluator("criteria", criteria="relevance")
    
    scores = []
    for query, expected_answer in test_cases:
        # 检索
        docs = retriever.get_relevant_documents(query)
        context = "\n".join([d.page_content for d in docs])
        
        # 评估
        score = evaluator.evaluate_strings(
            prediction=context,
            input=query,
            reference=expected_answer
        )
        scores.append(score["score"])
    
    return sum(scores) / len(scores)

# A/B测试不同策略
strategies = {
    "基础": base_retriever,
    "混合": ensemble_retriever,
    "重排序": compression_retriever
}

for name, retriever in strategies.items():
    score = evaluate_retrieval(retriever, test_cases)
    print(f"{name}策略得分: {score}")

检索优化总结

技术提升点复杂度成本
语义分块上下文完整性
混合检索召回率
重排序精确率
多查询召回率
元数据过滤精确率

Q15: 如何监控和调试LangChain应用?

答案

1. 使用LangSmith

import os

# 配置LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"

# 自动追踪所有调用
result = chain.invoke(input)
# 在LangSmith平台查看完整调用链

2. 自定义回调

from langchain.callbacks.base import BaseCallbackHandler

class DetailedCallbackHandler(BaseCallbackHandler):
    """详细的回调处理器"""
    
    def on_llm_start(self, serialized, prompts, **kwargs):
        """LLM开始"""
        print(f"[LLM Start] Prompts: {prompts[0][:100]}...")
    
    def on_llm_end(self, response, **kwargs):
        """LLM结束"""
        print(f"[LLM End] Response: {response.generations[0][0].text[:100]}...")
    
    def on_llm_error(self, error, **kwargs):
        """LLM错误"""
        logger.error(f"[LLM Error] {error}")
    
    def on_chain_start(self, serialized, inputs, **kwargs):
        """Chain开始"""
        print(f"[Chain Start] Inputs: {inputs}")
    
    def on_chain_end(self, outputs, **kwargs):
        """Chain结束"""
        print(f"[Chain End] Outputs: {outputs}")
    
    def on_tool_start(self, serialized, input_str, **kwargs):
        """Tool开始"""
        print(f"[Tool Start] {serialized['name']}: {input_str}")
    
    def on_tool_end(self, output, **kwargs):
        """Tool结束"""
        print(f"[Tool End] Output: {output}")

# 使用
chain.invoke(input, callbacks=[DetailedCallbackHandler()])

3. 性能监控

import time
from langchain.callbacks import get_openai_callback

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            "total_calls": 0,
            "total_tokens": 0,
            "total_cost": 0.0,
            "total_time": 0.0,
            "errors": 0
        }
    
    def track_call(self, func):
        """装饰器:追踪调用"""
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                with get_openai_callback() as cb:
                    result = func(*args, **kwargs)
                    
                    # 更新指标
                    self.metrics["total_calls"] += 1
                    self.metrics["total_tokens"] += cb.total_tokens
                    self.metrics["total_cost"] += cb.total_cost
                    self.metrics["total_time"] += time.time() - start_time
                    
                    return result
            except Exception as e:
                self.metrics["errors"] += 1
                raise e
        
        return wrapper
    
    def get_report(self):
        """生成报告"""
        return {
            "调用次数": self.metrics["total_calls"],
            "总Token数": self.metrics["total_tokens"],
            "总成本": f"${self.metrics['total_cost']:.4f}",
            "平均延迟": f"{self.metrics['total_time'] / max(self.metrics['total_calls'], 1):.2f}s",
            "错误率": f"{self.metrics['errors'] / max(self.metrics['total_calls'], 1) * 100:.2f}%"
        }

# 使用
monitor = PerformanceMonitor()

@monitor.track_call
def process_query(query):
    return chain.invoke(query)

# 查看报告
print(monitor.get_report())

4. 日志记录

import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('langchain.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# 记录关键信息
def logged_invoke(chain, input_data):
    logger.info(f"Input: {input_data}")
    
    try:
        result = chain.invoke(input_data)
        logger.info(f"Output: {result}")
        return result
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
        raise

5. 调试技巧

# 1. 启用详细输出
chain = LLMChain(llm=model, prompt=prompt, verbose=True)

# 2. 检查中间步骤
result = agent_executor.invoke(
    {"input": "问题"},
    return_intermediate_steps=True
)
for step in result["intermediate_steps"]:
    print(f"Action: {step[0]}")
    print(f"Observation: {step[1]}")

# 3. 单元测试
def test_chain():
    test_input = {"question": "测试问题"}
    result = chain.invoke(test_input)
    assert result is not None
    assert len(result["text"]) > 0

# 4. 模拟测试
from unittest.mock import Mock

mock_llm = Mock()
mock_llm.invoke.return_value = "模拟响应"
test_chain = LLMChain(llm=mock_llm, prompt=prompt)

监控最佳实践

  1. 生产环境必须启用LangSmith
  2. 设置告警阈值(延迟、成本、错误率)
  3. 定期分析日志,发现优化点
  4. A/B测试不同策略
  5. 建立性能基准,持续改进

总结

LangChain作为LLM应用开发的主流框架,提供了完整的工具链和最佳实践。通过本文档的学习,你应该掌握:

核心能力

  • ✅ 理解LangChain的架构和核心组件
  • ✅ 掌握Models、Prompts、Memory、Chains、Agents的使用
  • ✅ 实现RAG检索增强生成系统
  • ✅ 构建生产级的AI应用
  • ✅ 优化性能和控制成本

实战技能

  • 智能问答系统开发
  • 文档分析和处理
  • 代码辅助工具
  • 工作流自动化
  • 多Agent协作系统

最佳实践

  • 模块化设计,组件复用
  • 提示工程优化
  • 错误处理和容错
  • 性能监控和调优
  • 成本控制策略

持续学习

  • 关注LangChain官方文档更新
  • 参与社区讨论和贡献
  • 实践不同场景的应用
  • 探索新的模型和工具
  • 分享经验和最佳实践

LangChain生态系统在快速发展,新功能和工具不断涌现。保持学习和实践,才能充分发挥LLM的潜力,构建真正有价值的AI应用。


参考资源

  • 官方文档:https://python.langchain.com/
  • GitHub仓库:https://github.com/langchain-ai/langchain
  • LangSmith平台:https://smith.langchain.com/
  • 社区Discord:https://discord.gg/langchain
  • 博客和教程:https://blog.langchain.dev/

文档版本:v1.0 最后更新:2024年12月