
本文详解如何在 LangChain 的流式(streaming)RAG 链中正确保存对话历史到内存,解决 save_context 无法直接接入流式链的痛点,提供可落地的自定义 Runnable 组件方案与工程化注意事项。
本文详解如何在 langchain 的流式(streaming)rag 链中正确保存对话历史到内存,解决 `save_context` 无法直接接入流式链的痛点,提供可落地的自定义 runnable 组件方案与工程化注意事项。
在 LangChain 构建流式 RAG 应用时,一个常见却棘手的问题是:如何在 stream() 模式下安全、可靠地将用户提问与模型回答持久化到对话记忆(如 ConversationBufferMemory)中? 原生链式调用(如 | StrOutputParser() | model.stream())不支持在流结束时自动触发 save_context,因为流返回的是 Iterator[Chunk],而非最终聚合结果;而若强行在链尾插入 RunnableLambda(_save_context),其输入参数将为单个文本块(chunk),而非完整的 answer 字符串——导致记忆内容被截断或重复保存。
官方目前尚未提供开箱即用的流式记忆保存机制(GitHub Issue #11945 仍处于 open 状态),因此需通过自定义 Runnable 组件实现“流收集 + 结束回调”语义。核心思路是:构建一个能透传流式输出、同时在流终止时聚合全部 chunk 并执行副作用(如保存记忆)的中间件。
✅ 推荐方案:自定义 RunnableCollector
以下是一个生产就绪的 RunnableCollector 实现,它继承 RunnableLambda,重写 _transform 方法,在迭代完成时自动拼接所有 chunk 并调用指定回调函数:
from langchain_core.runnables import RunnableLambda
from langchain_core.callbacks import CallbackManagerForChainRun
from langchain_core.runnables.config import RunnableConfig
from typing import Iterator, Any, Callable, cast, Dict, Optional
class RunnableCollector(RunnableLambda):
def __init__(
self,
func: Callable[[Any], None],
*args,
**kwargs
) -> None:
super().__init__(func=func, *args, **kwargs)
def _transform(
self,
input: Iterator[Any],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Iterator[Any]:
final_output = None
chunks = []
# 1. 透传每个 chunk 给下游
for chunk in input:
yield chunk
chunks.append(chunk)
# 2. 流结束后,聚合结果并调用回调
if not chunks:
return
# 尝试智能合并(支持 str / AIMessage / dict 等常见类型)
try:
if isinstance(chunks[0], str):
final_output = "".join(chunks)
elif hasattr(chunks[0], "__add__") and hasattr(chunks[0], "content"):
# 如 AIMessage 支持 + 操作且含 content 属性
final_output = chunks[0]
for c in chunks[1:]:
final_output += c
else:
final_output = chunks[-1] # 默认取最后一个(最安全)
except Exception:
final_output = chunks[-1]
# 3. 执行副作用:保存记忆
if final_output is not None:
self.func(final_output)✅ 集成到你的 RAG 链中(关键修改点)
回到你原始代码中的 answer_chain,只需在 .stream() 前插入 RunnableCollector 即可:
# ✅ 在 answer_chain 末尾添加记忆保存逻辑
def save_memory_context(inputs: Dict[str, Any], answer: str, session_id: str):
memory = _get_memory_with_session_id(session_id)
# inputs 包含 question 和 docs(可选),answer 是完整响应字符串
memory.save_context(
{"question": inputs["question"]},
{"answer": answer}
)
print(f"[✓] Memory saved for session {session_id}")
# 修改后的 answer_chain(注意:需将 session_id 闭包传入)
answer_chain = (
{"docs": RunnablePassthrough()}
| {
"context": lambda x: _combine_documents(x["docs"]),
"question": itemgetter("question"),
}
| ANSWER_PROMPT
| model
| StrOutputParser()
| RunnableCollector(
lambda full_answer: save_memory_context(
docs, # 注意:此处需确保 docs 和 query 在作用域内
full_answer,
session_id
)
)
)⚠️ 重要注意事项:
- 作用域问题:RunnableCollector 的回调函数无法直接访问链外变量(如 session_id, query, docs)。推荐做法是:将 session_id 作为 input 的一部分注入链中(例如通过 RunnablePassthrough.assign(session_id=lambda _: session_id)),并在回调中从 inputs 解构获取。
- 内存实例生命周期:确保 get_memory(session_id) 返回的是同一个内存实例引用,而非每次调用都新建。否则 load_memory_variables() 与 save_context() 将操作不同对象,导致记忆丢失。建议使用单例缓存(如 lru_cache 或全局字典)管理 session-aware memory 实例。
- 流式内容类型兼容性:StrOutputParser() 输出 str,可直接拼接;若使用 AIMessage 输出(如 ChatModel 直出),需改用 AIMessageChunk 合并逻辑,并调用 .content 提取文本。
- 错误防御:在 RunnableCollector._transform 中加入 try/except,避免因 chunk 类型异常导致整个流中断。
✅ 最佳实践总结
| 场景 | 推荐做法 |
|---|---|
| 单次问答流 | 使用 RunnableCollector 聚合 + 回调保存,简洁可控 |
| 多轮会话流 | 将 session_id 作为链输入字段,统一管理 memory 生命周期 |
| 调试与可观测性 | 在 save_memory_context 中打印 memory.load_memory_variables({}) 验证是否生效 |
| 性能敏感场景 | 避免在回调中执行耗时 I/O(如写数据库),可改为异步队列解耦 |
通过以上方案,你既能享受流式响应的低延迟体验,又能确保每一轮人机交互都被准确记录到对话记忆中,为后续的上下文感知 RAG 提供坚实基础。记住:LangChain 的流式能力强大,但记忆持久化需要你主动“收网”——而 RunnableCollector 正是那张可靠的网。










