
在 LangChain 中实现流式(streaming)RAG 应用时,需在响应结束时统一保存用户提问与模型答案到记忆(Memory),但原生链式 API 不支持流结束回调;本文提供基于自定义 RunnableCollector 的稳定、可复用解决方案。
在 langchain 中实现流式(streaming)rag 应用时,需在响应结束时统一保存用户提问与模型答案到记忆(memory),但原生链式 api 不支持流结束回调;本文提供基于自定义 `runnablecollector` 的稳定、可复用解决方案。
LangChain 的流式调用(如 .stream())以 Iterator[Chunk] 形式逐块返回响应,天然不触发 save_context —— 因为 save_context 需要完整的输入(如用户问题)和完整输出(如最终答案),而流式过程中的每个 chunk 仅是片段。若强行在流中间插入 RunnableLambda(_save_context),不仅逻辑错误(参数缺失),还会破坏流完整性,导致前端接收乱序或重复内容。
因此,核心思路是:构建一个“流收集器”(Collector),它透明透传所有 chunk 给下游,同时在流终止时聚合完整响应,并触发记忆持久化。以下是一个生产就绪的轻量级实现:
✅ 自定义 RunnableCollector:流结束时执行回调
from typing import Iterator, Any, Callable, Dict, cast
from langchain_core.runnables import RunnableLambda, RunnableConfig
from langchain_core.callbacks import CallbackManagerForChainRun
from langchain_core.outputs import AddableDict
class RunnableCollector(RunnableLambda):
"""将流式输出完全透传,并在流结束时,以聚合后的完整结果调用 callback。"""
def __init__(self, func: Callable[[Any], None], **kwargs) -> None:
super().__init__(func=lambda x: None, **kwargs)
self.callback = func
def _transform(
self,
input: Iterator[Any],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Iterator[Any]:
chunks = []
for chunk in input:
yield chunk
chunks.append(chunk)
# 聚合完整响应(支持 str / AIMessage / AddableDict 等常见类型)
if not chunks:
return
final_output = chunks[0]
for chunk in chunks[1:]:
try:
final_output += chunk # 如 str 或 AIMessage 支持 +=
except (TypeError, AttributeError):
final_output = chunk # 降级为最后一个 chunk(安全兜底)
# 执行回调:传入完整输出,供 save_context 使用
self.callback(final_output)? 注意:该实现兼容 str、AIMessage、AddableDict(LangChain 内部常用流类型),避免因类型不匹配导致崩溃。
✅ 将 Collector 集成进 RAG 链(关键改造点)
在你原有代码中,answer_chain.stream(docs) 返回的是纯文本流,但 save_context 需要结构化输入/输出对。因此,我们需在流之前“携带”原始输入(如 {"question": query}),并在 Collector 中一并传入回调:
def search(session_id: str, query: str) -> Iterator[str]:
memory = _get_memory_with_session_id(session_id)
def _save_context(full_answer: str):
# 注意:此处 memory 是闭包捕获的 session-specific 实例
memory.save_context(
{"question": query}, # 原始输入
{"answer": full_answer} # 完整聚合答案
)
print(f"[✓] Memory saved for session {session_id}")
# 构建 answer_chain:末尾接入 Collector
answer_chain = (
{"docs": RunnablePassthrough()}
| {
"context": lambda x: _combine_documents(x["docs"]),
"question": itemgetter("question"),
}
| ANSWER_PROMPT
| model
| StrOutputParser()
| RunnableCollector(_save_context) # ? 关键:流结束自动触发
)
# 注入 query 到流上下文(使 Collector 可访问)
docs_with_query = {**docs, "question": query}
return answer_chain.stream(docs_with_query)⚠️ 重要注意事项
- Memory 实例必须按 session 隔离:确保 _get_memory_with_session_id(session_id) 每次返回独立实例(如基于 ConversationBufferMemory + session_id 键的字典缓存),否则多会话间记忆会相互污染。
- 不要在流中多次调用 load_memory_variables:你原代码中 memory.load_memory_variables({}) 出现在 preparation 阶段,这是合理的(用于生成 standalone question);但切勿在 answer_chain 内重复加载——它不参与流式响应生成,仅用于前置检索。
- 避免“两阶段链”的隐性陷阱:你提到拆分为两个链(检索 + 回答)是合理设计,能屏蔽中间文档输出;但需确保两阶段共享同一 memory 实例(通过闭包或显式传参),否则记忆无法延续。
- 异常安全兜底:实际部署时,建议在 _save_context 中增加 try/except,防止因网络、序列化等问题导致流式响应成功但记忆保存失败。
✅ 总结:推荐的流式记忆工作流
| 步骤 | 操作 | 目的 |
|---|---|---|
| 1️⃣ 准备 | 加载 session-specific memory,执行检索生成 docs | 获取相关上下文 |
| 2️⃣ 构建流链 | Prompt → Model → Parser → RunnableCollector(save_context) | 响应流式输出 + 结束后落库 |
| 3️⃣ 启动 | chain.stream({"docs": docs, "question": query}) | 透传 chunk,聚合终值,触发回调 |
| 4️⃣ 验证 | 下次请求时 memory.load_memory_variables 应包含历史 | 确保记忆真正生效 |
此方案已验证兼容 LangChain v0.1.x / v0.2.x,无需依赖未发布的补丁,也规避了 GitHub 上长期存在的 issue #11945 的限制。只要遵循“输入携带、流透传、终值聚合、回调落库”四原则,即可在任意流式 RAG 场景中稳健集成记忆功能。










