
本文介绍在 polars 中处理流式逐行数据生成场景的最优方案,重点推荐基于 lazyframe 的 `sink_csv` 流式写入、`batched` 批量构造及 `map_elements` 向量化预处理,避免低效的逐行 vstack 或重复 list 追加。
在实际数据工程中,常遇到外部系统(如传感器、API 流、日志解析器)以逐行方式持续产出原始记录,需经 decompose() 等逻辑提取结构化特征,并高效累积为 Polars DataFrame 后定期落盘(如 CSV)。此时,传统“每行建 DataFrame + vstack”或“纯 Python list 累积再转 DataFrame”的做法存在明显性能瓶颈:前者因频繁内存分配与元数据重建导致 O(n²) 时间复杂度;后者虽内存友好,但缺乏 Polars 原生向量化优势,且手动管理 flush 逻辑易出错。
✅ 推荐方案一:LazyFrame + sink_csv(首选,真正流式)
Polars 的 LazyFrame 支持从可迭代对象(包括生成器)直接构建,并通过 sink_csv() 实现零拷贝、分批、内存可控的流式写入,无需显式维护中间 DataFrame:
import polars as pl
def generation_mechanism():
for row in external_data_stream(): # 如 requests.iter_lines()、kafka consumer 等
yield row
# 直接从生成器构建 LazyFrame(不触发计算)
lf = pl.LazyFrame(generation_mechanism(), schema=["raw_row"])
# 使用 map_batches + vectorized decompose(关键优化)
def decompose_batch(df: pl.DataFrame) -> pl.DataFrame:
# 假设 decompose 可向量化:输入 Series,输出 struct 列
return df.select(
pl.col("raw_row")
.map_elements(lambda x: (x["id"] * 2, x["value"].upper()),
return_dtype=pl.Struct({"feature_a": pl.Int64, "feature_b": pl.String}))
.struct.unnest()
)
lf = lf.map_batches(decompose_batch, streamable=True)
lf.sink_csv("output.csv", batch_size=10_000) # 自动按 10k 行分块写入⚠️ 注意:map_batches 中的函数需标记 streamable=True 且避免非流式操作(如 sort, join),确保流式执行;decompose 应尽可能向量化(用 pl.col().str.xxx / pl.col().dt.xxx 替代 map_elements)。
✅ 推荐方案二:itertools.batched + 批量构造(兼容性强,适合复杂 decompose)
若 decompose() 逻辑难以向量化,可借助 Python 标准库 batched 分组,再对每批数据统一处理,显著减少 DataFrame 构造次数:
from itertools import batched
import polars as pl
flush_threshold = 500
for batch in batched(generation_mechanism(), flush_threshold):
# 批量应用 decompose(仍为逐行,但仅调用 N/batch_size 次)
processed = [decompose(row) for row in batch]
# 一次性构造 DataFrame(高效!)
df = pl.DataFrame(
processed,
schema={"feature_a": pl.Int64, "feature_b": pl.String}
)
# 追加写入(注意:CSV 不支持原生追加,需用 'a' 模式并确保无 header)
with open("output.csv", "a") as f:
if f.tell() == 0: # 首次写入添加 header
df.write_csv(f, include_header=True)
else:
df.write_csv(f, include_header=False)❌ 应避免的方案
- 逐行 vstack:data = data.vstack(new_row_df) 触发全量内存复制,时间复杂度随行数平方增长,大数据集下极慢。
- 纯 Python list 累积:虽内存稳定,但 pl.DataFrame({"col": list}) 在大数据量时序列化开销大,且丧失 Polars 延迟执行与查询优化能力。
? 总结建议:
- 优先使用 LazyFrame.sink_csv() —— 它是 Polars 官方为流式场景设计的终极解法,内存恒定、无需手动 flush、自动批处理;
- 若需兼容旧版 Polars(
- 持续重构 decompose() 为 Polars 原生表达式(如 pl.col("x").str.extract(r"(\d+)")),彻底摆脱 Python 循环,获得数量级性能提升。










