
本文详解如何在 LangChain 的流式(streaming)RAG 应用中正确保存用户-模型对话记忆,解决 save_context 无法直接接入流式链的痛点,提供可落地的自定义 RunnableCollector 方案与生产级注意事项。
本文详解如何在 langchain 的流式(streaming)rag 应用中正确保存用户-模型对话记忆,解决 `save_context` 无法直接接入流式链的痛点,提供可落地的自定义 `runnablecollector` 方案与生产级注意事项。
在 LangChain 构建的流式 RAG 应用中,一个常见但棘手的问题是:如何在 stream() 调用结束后,自动将用户提问与模型生成的完整回答存入对话记忆(如 ConversationBufferMemory)? 原生链式 API(如 | StrOutputParser() | model.stream())不支持在流结束时触发回调,save_context 也无法直接嵌入流式节点——因为流输出是分块(chunk)的 Iterator[str],而非最终聚合的完整字符串。
直接在链末尾添加 RunnableLambda(_save_context) 会失败:它接收的是单个 chunk(如 "Hello"),而非完整答案("Hello, you can buy beer at local breweries or supermarkets."),导致记忆内容残缺、语义断裂。
✅ 正确解法:自定义 RunnableCollector
核心思路是创建一个继承自 RunnableLambda 的流式收集器,在 _transform 方法中遍历全部 chunk,累积拼接为最终结果,并在流终止后调用 save_context:
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.utils import Input, Output, AddableDict
from langchain_core.callbacks import CallbackManagerForChainRun
from langchain_core.runnables.config import RunnableConfig
from typing import Iterator, Any, Callable, Dict, cast
class RunnableCollector(RunnableLambda):
def __init__(self, func: Callable[[Any], None], **kwargs):
super().__init__(func=lambda x: None, **kwargs)
self._save_func = func # 保存外部回调函数
def _transform(
self,
input: Iterator[Input],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Iterator[Output]:
final_output = None
chunks = []
# 1. 消费全部流式 chunk 并缓存
for chunk in input:
chunks.append(chunk)
yield chunk # 实时透传,保证前端可流式渲染
# 2. 合并所有 chunk → 构建完整输出
if not chunks:
final_output = ""
elif isinstance(chunks[0], str):
final_output = "".join(chunks)
elif hasattr(chunks[0], "content"): # LangChain Message 类型(如 AIMessageChunk)
final_output = "".join(c.content for c in chunks if hasattr(c, "content"))
else:
final_output = chunks[-1] # 回退策略:取最后一个 chunk
# 3. 在流结束时执行记忆保存
try:
self._save_func(final_output)
except Exception as e:
print(f"[Warning] Failed to save memory: {e}")
# 4. (可选)返回最终结果供下游使用(如日志、审计)
yield final_output? 集成到你的 RAG 链中
基于你原始代码结构,只需在 answer_chain 末尾插入 RunnableCollector,并传入记忆保存逻辑:
def search(session_id: str, query: str) -> Iterator[str]:
memory = _get_memory_with_session_id(session_id)
# ... [preparation_chain 保持不变] ...
def _save_to_memory(full_answer: str):
# 注意:inputs 是原始查询,需从上下文提取或显式传入
memory.save_context({"question": query}, {"answer": full_answer})
answer_chain = (
{"docs": RunnablePassthrough()}
| {
"context": lambda x: _combine_documents(x["docs"]),
"question": itemgetter("question"),
}
| ANSWER_PROMPT
| model
| StrOutputParser()
| RunnableCollector(_save_to_memory) # ✅ 关键注入点
)
return answer_chain.stream({"question": query})? 为什么不用 on_end 回调?
LangChain 当前(v0.3.x)的 CallbackManager 对 stream() 的 on_end 支持不稳定,且无法直接获取聚合后的输出值。RunnableCollector 是更可控、可调试、与链深度集成的方案。
⚠️ 关键注意事项
- 内存实例必须跨请求复用:确保 get_memory(session_id) 返回的是同一个内存实例(例如使用 LRUCache 或全局字典缓存),否则每次调用都会新建空内存,导致历史丢失。
- chat_history 加载需及时:在 preparation_chain.invoke() 前,应先调用 memory.load_memory_variables({}) 触发历史加载(你代码中已做,很好)。
- 避免重复保存:若链被多次调用(如重试、重放),需在 _save_to_memory 中加入幂等判断(如检查是否已存在相同 question timestamp)。
- 类型兼容性:RunnableCollector 中的 final_output 合并逻辑需适配你的 LLM 输出类型(str / AIMessageChunk / dict)。建议统一转为 str 再存入 save_context。
- 性能考量:stream() 本身为低延迟设计,RunnableCollector 的累积操作在内存中完成,开销极小;但若答案超长(>100KB),需考虑流式写入数据库替代内存拼接。
✅ 总结
LangChain 流式记忆持久化没有“开箱即用”的银弹,但通过自定义 RunnableCollector,你可以在不破坏流式体验的前提下,精准捕获完整响应并写入记忆。该方案:
- ✅ 兼容任意 stream() 链(LLM、Parser、Tool 调用均可);
- ✅ 保持前端实时流式渲染能力;
- ✅ 显式控制保存时机与数据结构;
- ✅ 易于单元测试与错误隔离。
最后提醒:LangChain 官方仍在推进 issue #11945 的原生支持,建议关注 RunnableWithMessageHistory 与 stream_log 的演进,未来或可简化此流程。当前阶段,RunnableCollector 是最稳健、最透明的工程实践。










