
本文介绍如何在python中持续接收并本地存储高频(每秒万级)tick级行情数据,同时严格控制内存占用、避免重复读写开销,并支持实时可查——推荐采用内置sqlite数据库配合批量写入与事务优化策略。
本文介绍如何在python中持续接收并本地存储高频(每秒万级)tick级行情数据,同时严格控制内存占用、避免重复读写开销,并支持实时可查——推荐采用内置sqlite数据库配合批量写入与事务优化策略。
在金融量化场景中,持续采集40+交易品种的Tick级行情(如EURUSD),且需支撑周度不间断运行(周日启动至周六凌晨结束),对数据持久化方案提出了严苛要求:既要应对峰值超10,000 ticks/秒的写入压力,又要杜绝pandas.concat()导致的指数级内存增长,也不能使用逐行追加JSON这类低效、无索引、难查询的纯文本方式。直接答案是:SQLite 是兼顾简洁性、可靠性、性能与零依赖的最佳选择——它内置于Python标准库,无需额外服务,天然支持ACID事务、结构化查询与增量写入,且经实测可稳定达到 16万+ records/秒 的写入吞吐(i9 + NVMe SSD环境),内存常驻仅约22.5 MB。
✅ 核心设计原则
- 零内存累积:不将全部历史数据加载进RAM;仅缓存待写入批次(如5000条),满即刷盘。
- 最小I/O开销:禁用单条INSERT(每次commit开销巨大),改用executemany()批量插入 + 显式事务控制。
- 即时可分析:数据落库即可见,任意时刻可用SQL查询(如SELECT * FROM UpdateTable WHERE instrument='EURUSD' AND time_req > '2024-03-17')。
- 无缝扩展性:支持按日分库(如ticks_20240317.sqlite3),避免单文件过大影响备份与分析效率。
? 实现代码(生产就绪版)
import sqlite3
import datetime
from typing import List, Optional
class TickRecorder:
TABLE_NAME = "ticks"
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA journal_mode = WAL") # 启用WAL模式,提升并发写入性能
self.conn.execute("PRAGMA synchronous = NORMAL") # 平衡安全性与速度
self._create_table()
def _create_table(self):
self.conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instrument TEXT NOT NULL,
bid REAL NOT NULL,
ofr REAL NOT NULL,
time_req TIMESTAMP NOT NULL,
time_rec TIMESTAMP NOT NULL
)
""")
# 关键:为高频查询字段建立索引
self.conn.execute(f"CREATE INDEX IF NOT EXISTS idx_inst_time ON {self.TABLE_NAME}(instrument, time_req)")
def append_batch(self, updates: List[dict]) -> None:
"""批量写入,强烈推荐!避免单条INSERT"""
if not updates:
return
data = [
(
u["instrument"],
u["bid"],
u["ofr"],
u["time_req"], # 原始时间戳(datetime对象)
datetime.datetime.now() # 接收时间
)
for u in updates
]
self.conn.executemany(
f"INSERT INTO {self.TABLE_NAME} (instrument, bid, ofr, time_req, time_rec) VALUES (?, ?, ?, ?, ?)",
data
)
self.conn.commit() # 一次commit提交整个批次
def query(self, instrument: str, start_time: Optional[datetime.datetime] = None) -> List[tuple]:
"""实时查询示例:获取某品种指定时间后的所有Tick"""
base_sql = f"SELECT * FROM {self.TABLE_NAME} WHERE instrument = ?"
params = [instrument]
if start_time:
base_sql += " AND time_req >= ?"
params.append(start_time)
base_sql += " ORDER BY time_req LIMIT 10000" # 防止全表扫描失控
return self.conn.execute(base_sql, params).fetchall()
def close(self):
self.conn.close()? 使用示例与关键注意事项
# 初始化记录器(自动建库建表)
recorder = TickRecorder("ticks_week1.sqlite3")
# 模拟接收流:每批收集N条Tick后批量写入(非逐条!)
batch = []
for _ in range(10000): # 假设收到1万条更新
# 替换为你的真实数据源逻辑,例如 broker.get_tick()
tick = {
"instrument": "EURUSD",
"bid": 1.08523,
"ofr": 1.08527,
"time_req": datetime.datetime.fromtimestamp(1710672345.123) # 精确到毫秒
}
batch.append(tick)
# 每满5000条触发一次批量写入(可根据磁盘性能调整)
if len(batch) >= 5000:
recorder.append_batch(batch)
batch.clear()
# 周末分析时,直接查询(无需加载全量数据到内存!)
recent_eurusd = recorder.query(
instrument="EURUSD",
start_time=datetime.datetime(2024, 3, 17, 0, 0)
)
print(f"Found {len(recent_eurusd)} EURUSD ticks since Sunday")
recorder.close()⚠️ 关键注意事项:
- 永远不要单条写入:append_record()(单条INSERT)在万级TPS下会成为性能瓶颈,务必使用append_batch()。
- 启用WAL模式:PRAGMA journal_mode = WAL 可显著减少写锁竞争,尤其适合长时运行场景。
- 合理设置批大小:5000–10000条/批在多数SSD上表现最佳;HDD环境可降至1000–2000条以平衡延迟与吞吐。
- 索引至关重要:instrument和time_req组合索引能将范围查询提速百倍,避免全表扫描。
- 分库管理建议:按日期命名数据库(如ticks_20240317.sqlite3),既便于归档,也规避单库过大导致VACUUM耗时问题。
综上,SQLite并非“妥协方案”,而是针对本场景的精准解法:它消除了外部依赖、内存可控、写入极速、查询灵活,且代码简洁可维护。当数据规模进一步扩大(如月度PB级),再平滑迁移至TimescaleDB或专用时序数据库即可,而当前架构已为未来演进预留了清晰路径。










