
本文介绍使用python的fileinput模块逐行读取超大csv文件,避免内存溢出,并结合数据分块策略(如按年份、国家或字段范围)实现灵活筛选与排序。
在处理GB级CSV文件时,一次性加载全部数据到内存(如用pandas.read_csv()或csv.reader配合list())极易导致内存耗尽或程序崩溃。此时,流式分块处理(streaming + chunking) 是更稳健的选择。核心思路是:不“分割文件”为物理子文件,而是逻辑分块——即在逐行读取过程中,依据业务规则(如年份区间、国家分组、数值范围等)将记录动态归类到不同容器中,再分别排序。
✅ 推荐方案:fileinput + 字典分块 + 懒排序
fileinput 模块专为大型文本文件设计,支持内存友好的逐行迭代,且自动处理编码、换行符等细节。以下是一个完整示例,将数据按「年份区间」分为4个逻辑块(如1950–1974、1975–1999、2000–2014、2015–2023),并支持用户输入查询条件后对指定块排序:
import fileinput
from collections import defaultdict
import re
# 定义4个年份区间(可按需调整)
RANGES = [
(1950, 1974, "mid_20th"),
(1975, 1999, "late_20th"),
(2000, 2014, "early_21st"),
(2015, 2023, "recent")
]
# 初始化分块存储:{block_name: [(country, abv, year, expectancy), ...]}
blocks = {name: [] for _, _, name in RANGES}
# 流式读取CSV(跳过表头)
with fileinput.input('life_expectancy.csv', encoding='utf-8') as f:
next(f, None) # 跳过首行(表头)
for line_num, line in enumerate(f, start=2):
line = line.strip()
if not line: # 跳过空行
continue
try:
parts = line.split(',')
# 安全提取字段(防索引越界/类型错误)
if len(parts) < 4:
print(f"警告:第{line_num}行字段不足4个,已跳过")
continue
country = parts[0].strip('"\' ') # 去除引号和空格
abv = parts[1].strip('"\' ')
year = int(parts[2])
expectancy = float(parts[3])
# 按年份归入对应块
assigned = False
for start, end, block_name in RANGES:
if start <= year <= end:
blocks[block_name].append((country, abv, year, expectancy))
assigned = True
break
if not assigned:
print(f"提示:第{line_num}行年份 {year} 不在预设区间内,已忽略")
except (ValueError, IndexError) as e:
print(f"解析错误(第{line_num}行):{e}")
# 用户交互:选择块并排序
print("\n可用数据块:", list(blocks.keys()))
target_block = input("请输入要排序的块名(如 'recent'): ").strip()
if target_block not in blocks:
print("无效块名!")
else:
# 示例:按预期寿命降序排序
sorted_data = sorted(blocks[target_block], key=lambda x: x[3], reverse=True)
print(f"\n【{target_block} 块 - 按预期寿命排序前5条】")
for i, (c, a, y, e) in enumerate(sorted_data[:5], 1):
print(f"{i}. {c} ({a}), {y}: {e:.2f} 年")⚠️ 关键注意事项
- 永远校验字段数量与类型:split(',') 可能因CSV中含逗号(如 "United States, Inc.")而失效。生产环境强烈建议改用 csv.DictReader 配合 fileinput 或 pandas.read_csv(chunksize=N)。
- 内存权衡:上述代码将每个块全部载入内存后再排序。若单块仍过大,可进一步改为「分块排序+外部归并」(如用heapq.merge),或借助数据库(SQLite临时表)。
- 编码与BOM:确保encoding='utf-8-sig'以自动去除UTF-8 BOM头。
- 性能优化:对高频查询场景,可将分块结果持久化为Parquet或SQLite,后续直接索引查询。
✅ 总结
真正的“分割”不是物理切分文件,而是基于业务维度的逻辑分流。fileinput 提供了轻量、可靠的流式入口,配合字典分块与sorted()函数,即可在有限内存下完成超大CSV的灵活分治与排序。对于更复杂需求(多维聚合、模糊匹配),建议逐步过渡到Dask或Polars等分布式/高性能库。










