
引言:多文件加载与自定义需求
在数据分析工作中,我们经常需要处理存储在多个文件中的数据,例如按产品、日期或区域划分的 CSV 文件。一个常见的需求是,在将这些文件合并成一个统一的 DataFrame 时,能够为每条记录添加一个标识其来源的列,例如文件名称或从文件名中提取的特定信息(如产品ID)。
考虑以下场景:您有一系列 CSV 文件,命名模式为 data_product_1.csv, data_product_2.csv 等,它们结构相同。您希望将所有数据合并到一个 Polars DataFrame 中,并额外添加一列 product_code,其值应从文件名中提取,例如 product_1、product_2。
直接使用 polars.read_csv("data_*.csv") 可以将所有文件合并,但这种方法不提供在加载过程中添加自定义列的机制。虽然可以逐个文件加载、添加列再合并,但这可能无法充分利用 Polars 的并行处理优势,尤其是在文件数量众多时。
Polars 解决方案:结合惰性计算与并行处理
为了高效地解决上述问题,Polars 提供了 scan_csv(或 scan_parquet 等)结合 LazyFrame 的方式,允许我们对每个文件进行预处理,然后并行地收集结果。
1. 准备示例数据
首先,我们创建几个示例 CSV 文件,以便后续代码能够运行。
多奥淘宝客程序免费版拥有淘宝客站点的基本功能,手动更新少,管理简单等优点,适合刚接触网站的淘客们,或者是兼职做淘客们。同样拥有VIP版的模板引擎技 术、强大的文件缓存机制,但没有VIP版的伪原创跟自定义URL等多项创新的搜索引擎优化技术,除此之外也是一款高效的API数据系统实现无人值守全自动 化运行的淘宝客网站程序。4月3日淘宝联盟重新开放淘宝API申请,新用户也可使用了
import polars as pl
from pathlib import Path
# 创建一个临时目录来存放CSV文件
temp_dir = Path("temp_data")
temp_dir.mkdir(exist_ok=True)
# 创建示例CSV文件
data_product_1 = pl.DataFrame({
"data": ["2000-01-01", "2000-01-02"],
"value": [1, 2]
})
data_product_1.write_csv(temp_dir / "data_product_1.csv")
data_product_2 = pl.DataFrame({
"data": ["2000-01-01", "2000-01-02"],
"value": [3, 4]
})
data_product_2.write_csv(temp_dir / "data_product_2.csv")
data_product_3 = pl.DataFrame({
"data": ["2000-01-01", "2000-01-02"],
"value": [5, 6]
})
data_product_3.write_csv(temp_dir / "data_product_3.csv")
print("示例CSV文件已创建在 'temp_data' 目录下。")2. 核心实现:使用 scan_csv 和 concat
该方法的核心思想是:
- 惰性扫描: 使用 pl.scan_csv() 而不是 pl.read_csv()。scan_csv 不会立即读取文件内容,而是返回一个 LazyFrame 对象,它代表了未来要执行的计算计划。
- 逐文件转换: 对每个 LazyFrame 应用 with_columns() 方法,添加基于文件名的自定义列。
- 并行合并与收集: 使用 pl.concat() 将所有 LazyFrame 合并,然后调用 .collect() 触发实际的数据读取和计算。Polars 可以在 collect() 阶段并行处理这些独立的 LazyFrame。
import polars as pl
from pathlib import Path
# 假设文件位于当前目录或指定目录
# 如果文件在 'temp_data' 目录下,则路径应为 Path("temp_data")
data_directory = Path("temp_data")
# 获取所有匹配的文件路径
csv_files = list(data_directory.glob("data_*.csv"))
# 创建 LazyFrame 列表,并为每个 LazyFrame 添加 product_code 列
lazy_frames = []
for f_path in csv_files:
# 提取文件名作为 product_code
# f_path.stem 获取不带扩展名的文件名 (e.g., "data_product_1")
# .replace("data_", "") 进一步提取 "product_1"
product_code = f_path.stem.replace("data_", "")
# 使用 scan_csv 创建 LazyFrame
# 使用 with_columns 添加 product_code 列
lf = pl.scan_csv(f_path).with_columns(
pl.lit(product_code).alias("product_code")
)
lazy_frames.append(lf)
# 使用 pl.concat 合并所有 LazyFrame,然后使用 .collect() 触发计算
# 默认情况下,pl.concat 会并行处理 LazyFrame
if lazy_frames:
final_df = pl.concat(lazy_frames).collect()
print(final_df)
else:
print("未找到匹配的CSV文件。")
# 清理示例数据
import shutil
if temp_dir.exists():
shutil.rmtree(temp_dir)
print("\n示例数据目录 'temp_data' 已删除。")输出示例:
shape: (6, 3) ┌────────────┬───────┬──────────────┐ │ data ┆ value ┆ product_code │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ str │ ╞════════════╪═══════╪══════════════╡ │ 2000-01-01 ┆ 1 ┆ product_1 │ │ 2000-01-02 ┆ 2 ┆ product_1 │ │ 2000-01-01 ┆ 3 ┆ product_2 │ │ 2000-01-02 ┆ 4 ┆ product_2 │ │ 2000-01-01 ┆ 5 ┆ product_3 │ │ 2000-01-02 ┆ 6 ┆ product_3 │ └────────────┴───────┴──────────────┘
3. 简化版本(列表推导式)
上述 for 循环可以通过列表推导式进一步简化,代码更加紧凑:
import polars as pl
from pathlib import Path
data_directory = Path("temp_data")
# 重新创建示例数据以确保代码可运行
temp_dir = Path("temp_data")
temp_dir.mkdir(exist_ok=True)
data_product_1 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [1, 2]})
data_product_1.write_csv(temp_dir / "data_product_1.csv")
data_product_2 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [3, 4]})
data_product_2.write_csv(temp_dir / "data_product_2.csv")
data_product_3 = pl.DataFrame({"data": ["2000-01-01", "2000-01-02"], "value": [5, 6]})
data_product_3.write_csv(temp_dir / "data_product_3.csv")
lazy_frames = [
pl.scan_csv(f_path).with_columns(
pl.lit(f_path.stem.replace("data_", "")).alias("product_code")
)
for f_path in data_directory.glob("data_*.csv")
]
if lazy_frames:
final_df = pl.concat(lazy_frames).collect()
print(final_df)
else:
print("未找到匹配的CSV文件。")
# 清理示例数据
import shutil
if temp_dir.exists():
shutil.rmtree(temp_dir)关键概念与优势
- 惰性计算 (LazyFrame): pl.scan_csv() 返回的是 LazyFrame。这意味着 Polars 只是构建了一个计算计划,而没有立即执行数据读取和转换。所有操作都被“记录”下来,直到调用 .collect() 时才一次性执行。
- 优化与并行化: 由于 Polars 知道整个计算图,它可以在 .collect() 阶段对操作进行优化,并利用多核处理器并行读取和处理多个文件。这对于处理大量文件或大型文件时,能显著提高性能。
- 灵活性: 这种方法允许在每个文件的 LazyFrame 上应用任意的 Polars 表达式 (with_columns, filter, select 等),从而实现高度定制化的预处理逻辑,而无需在内存中加载整个文件。
- 内存效率: 对于非常大的文件,逐个文件加载到 LazyFrame 并进行转换,可以避免一次性将所有数据加载到内存中,从而减少内存压力。
注意事项
- 文件路径: 确保 Path().glob("data_*.csv") 或 data_directory.glob("data_*.csv") 能够正确找到您的文件。
- 文件名解析: f_path.stem.replace("data_", "") 是一种简单的文件名解析方式。如果您的文件名模式更复杂,可能需要使用正则表达式 (re 模块) 来提取所需信息。
- 错误处理: 在生产环境中,您可能需要添加错误处理机制,例如使用 try-except 块来处理文件不存在或格式错误的情况。
- 数据类型: pl.lit() 创建的字面量列的数据类型将根据输入自动推断。如果需要特定类型,可以使用 pl.lit(value).cast(pl.String) 等进行强制转换。
- 替代方案(DuckDB): 值得一提的是,其他数据处理工具如 DuckDB 提供了直接在 read_csv_auto 函数中通过 filename=true 参数添加文件名列的功能。Polars 目前尚未在 read_csv 或 scan_csv 中内置此功能,但通过上述 LazyFrame 的组合使用,可以灵活地实现相同的效果。
总结
通过巧妙地结合 Polars 的 scan_csv、LazyFrame 和 concat 方法,我们能够高效且灵活地处理多文件数据加载场景。这种方法不仅允许在合并前对每个文件进行自定义转换,还充分利用了 Polars 的并行处理能力,从而在处理大规模数据集时提供了卓越的性能和内存效率。掌握这一模式,将极大地提升您在 Polars 中处理复杂数据管道的能力。









