本文介绍如何在 python 中持续接收并本地存储高频(每秒万级)逐笔行情数据,避免内存溢出与 i/o 瓶颈;核心方案是使用内置 sqlite 数据库配合批量写入与事务控制,兼顾实时性、可查询性与资源效率。
本文介绍如何在 python 中持续接收并本地存储高频(每秒万级)逐笔行情数据,避免内存溢出与 i/o 瓶颈;核心方案是使用内置 sqlite 数据库配合批量写入与事务控制,兼顾实时性、可查询性与资源效率。
在金融量化场景中,持续采集 40+ 交易品种(如 EURUSD)的逐笔(tick-by-tick)行情数据是一项典型但极具挑战性的工程任务:数据流从周日持续至周六凌晨,峰值可达每秒 10,000+ 条更新,且需支持任意时刻对已存数据进行即席分析——这意味着存储方案必须同时满足三大硬性要求:内存常驻开销极低、写入吞吐足够高、磁盘文件可随时被安全读取与查询。
传统做法(如 pd.concat() 动态扩展 DataFrame 或逐行追加 JSON)存在根本性缺陷:前者随数据增长线性消耗 RAM 并频繁触发全量序列化(to_pickle),后者虽节省内存但文件体积膨胀、无索引、不可直接查询,且纯文本解析性能差。而 HDF5(h5py/pandas HDFStore)虽适合大矩阵,但对高频率、小记录(单 tick
✅ 最优解:SQLite + 批量事务写入
SQLite 是 Python 标准库 sqlite3 模块原生支持的嵌入式关系型数据库,无需额外服务、零配置部署,天然支持 ACID 事务、B-tree 索引、SQL 查询,并可通过 WAL(Write-Ahead Logging)模式实现读写并发。实测表明,在主流硬件(i9-10900K + NVMe SSD)上,该方案可稳定达成 >160,000 条/秒写入吞吐,内存占用仅约 22.5 MB(远低于同等数据量的 DataFrame 常驻内存),完美匹配您的性能需求。
核心实现要点
-
建表与连接管理
使用 CREATE TABLE IF NOT EXISTS 确保幂等初始化;字段设计精简(避免冗余类型转换),例如:CREATE TABLE ticks ( instrument TEXT NOT NULL, bid REAL NOT NULL, ofr REAL NOT NULL, time_req TIMESTAMP NOT NULL, time_rec TIMESTAMP NOT NULL );时间字段直接存 datetime 对象(SQLite 自动转为 ISO8601 字符串),后续可通过 strftime() 高效过滤。
-
批量缓冲 + 事务提交(关键优化)
绝不逐条 INSERT!而是累积一定数量(如 5000 条)后,调用 executemany() 一次性提交:# 缓冲区:暂存待写入的更新对象列表 buffer = [] def on_tick_update(update): buffer.append(update) if len(buffer) >= 5000: _bulk_insert(buffer) buffer.clear() def _bulk_insert(records): data = [ (r.get_instrument(), r.get_bid_value(), r.get_ofr_value(), r.get_time(), datetime.now()) for r in records ] cursor.executemany( "INSERT INTO ticks VALUES (?, ?, ?, ?, ?)", data ) conn.commit() # 一次事务覆盖全部插入 -
读写分离与即时分析
SQLite 支持多进程/线程并发读取(WAL 模式下甚至可并发写入)。分析脚本可随时打开同一数据库文件执行 SQL 查询,例如:# 实时统计某品种最新 100 笔价差 cursor.execute(""" SELECT bid, ofr, (ofr - bid) AS spread FROM ticks WHERE instrument = ? ORDER BY time_rec DESC LIMIT 100 """, ("EURUSD",))
完整可运行示例(精简版)
import sqlite3
import datetime
class TickRecorder:
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path, isolation_level=None) # 自动提交
self.conn.execute("PRAGMA journal_mode = WAL") # 启用 WAL 提升并发
self.conn.execute("""
CREATE TABLE IF NOT EXISTS ticks (
instrument TEXT, bid REAL, ofr REAL,
time_req TIMESTAMP, time_rec TIMESTAMP
)
""")
def append_batch(self, updates: list):
"""高效批量插入(推荐每 1k–5k 条调用一次)"""
data = [
(u.instrument, u.bid, u.ofr, u.time_req, datetime.datetime.now())
for u in updates
]
self.conn.executemany(
"INSERT INTO ticks VALUES (?, ?, ?, ?, ?)", data
)
# 使用示例:模拟接收 40 品种的 tick 流
recorder = TickRecorder("market_data.sqlite3")
buffer = []
for _ in range(100000): # 模拟 10 万 tick
# 此处替换为您的真实数据源回调(如 broker SDK 的 on_tick)
update = InstrumentUpdate(instrument="EURUSD", bid=1.0850, ofr=1.0852)
buffer.append(update)
if len(buffer) >= 5000:
recorder.append_batch(buffer)
buffer.clear()
# 结束时清空剩余缓冲
if buffer:
recorder.append_batch(buffer)注意事项与进阶建议
-
索引加速查询:若需按时间或品种高频筛选,创建复合索引:
CREATE INDEX idx_inst_time ON ticks(instrument, time_rec);
- 分库策略:为避免单库过大(>10GB),可按天自动切换数据库文件(如 ticks_20240317.sqlite3),通过日期前缀路由写入。
- 数据压缩:SQLite 本身不压缩,但可启用 Zstandard 在应用层对 BLOB 字段压缩存储(适用于历史归档)。
- 异常容错:在 append_batch 中包裹 try/except,捕获 sqlite3.DatabaseError 并重试,确保断电/崩溃后数据不丢失(WAL 模式已提供强一致性保障)。
综上,SQLite 并非“退而求其次”的轻量方案,而是针对您场景的工程最优解:它消除了内存膨胀、规避了文件锁争用、提供了开箱即用的查询能力,并以极低的学习与维护成本支撑起专业级行情数据管道。立即采用批量事务写入模式,即可将您的系统从内存瓶颈中彻底解放。










