
本文介绍如何基于列数据动态生成±1信号:当当前值达到滚动均值的指定倍数时翻转信号,并确保每次重置后滚动窗口至少累积指定最小长度才允许下一次触发。
在时间序列分析或量化信号处理中,常需构造“自适应状态指示器”——其值(如 1 或 -1)随数据局部统计特征动态切换。本教程解决一个典型场景:对 DataFrame 的每列独立计算带重置逻辑的滚动均值信号,满足以下核心约束:
- 初始信号为 1;
- 每次重置由当前值 ≥ multiple × 当前滚动窗口均值 触发;
- 重置仅在滚动窗口已累积至少 min_count 个观测值后才被允许;
- 一旦重置发生,累计和(acc)与计数(num_obs)清零,从下一行为起点重建滚动窗口;
- 信号翻转(1 ↔ -1),并持续保持直至下一次有效重置。
该逻辑无法直接通过 pandas.DataFrame.rolling().mean() 实现,因其依赖状态感知的前向迭代(即当前决策影响后续窗口起始),而非无状态滑动计算。因此,我们采用显式循环 + 累积状态管理的方式,兼顾可读性与性能。
✅ 基础实现(纯 Python + Pandas)
import pandas as pd
def rolling_mean_signal(col, start=1, multiple=2, min_count=4):
"""
生成滚动均值触发的±1信号序列
Parameters:
-----------
col : pd.Series
输入列数据
start : int, default 1
初始信号值(1 或 -1)
multiple : float, default 2
触发重置的倍数阈值
min_count : int, default 4
允许重置所需的最小窗口长度
Yields:
-------
int : 当前行对应的信号值(1 或 -1)
"""
curr = start
num_obs = 0
acc = 0.0
for v in col:
acc += v
num_obs += 1
if num_obs < min_count:
yield curr
continue
mean_val = acc / num_obs
if v >= multiple * mean_val:
curr *= -1
num_obs = 0
acc = 0.0
yield curr
# 示例应用
df = pd.DataFrame({
"A": [0.1, 0.1, 0.15, 0.1, 0.1, 0.7, 0.1, 0.1, 0.5, 1, 0.1, 0.1],
"B": [0.1, 0.1, 0.4, 0.1, 0.8, 0.1, 0.1, 0.1, 0.1, 0.1, 0.9, 0.1],
})
df["signal_A"] = list(rolling_mean_signal(df["A"]))
df["signal_B"] = list(rolling_mean_signal(df["B"]))
print(df[["A", "B", "signal_A", "signal_B"]])? 关键点说明: acc 和 num_obs 是跨行维持的状态变量,模拟“动态滚动窗口”的累积过程; 重置后 num_obs=0 表示窗口清空,下一行将作为新窗口的第 1 个观测; yield 实现惰性生成,内存友好,适用于大数据流。
⚡ 高性能优化(Numba 加速)
对万行级以上数据,纯 Python 循环可能成为瓶颈。使用 numba.njit 可实现 JIT 编译加速(通常提升 5–20 倍):
from numba import njit
import numpy as np
@njit
def rolling_mean_signal_numba(col, start=1, multiple=2, min_count=4):
n = len(col)
out = np.empty(n, dtype=np.int8)
curr = np.int8(start)
num_obs = 0
acc = 0.0
for i in range(n):
v = col[i]
acc += v
num_obs += 1
if num_obs < min_count:
out[i] = curr
continue
mean_val = acc / num_obs
if v >= multiple * mean_val:
curr *= -1
num_obs = 0
acc = 0.0
out[i] = curr
return out
# 应用加速版(注意传入 .values)
df["signal_A_fast"] = rolling_mean_signal_numba(df["A"].values)
df["signal_B_fast"] = rolling_mean_signal_numba(df["B"].values)⚠️ 注意事项:
- Numba 不支持 Pandas 对象,务必传入 .values(np.ndarray);
- dtype=np.int8 足够表示 ±1,节省内存;
- 首次调用会触发编译,后续调用即达峰值性能。
? 总结与扩展建议
- 该方案本质是在线状态机:每步输入一个值,输出当前状态,并更新内部状态;
- 可轻松扩展为多级阈值(如 v >= 3*m 触发 curr *= -2)、滞后重置(延迟 1 行生效)或混合条件(如同时检查标准差);
- 若需支持 NaN 安全处理,可在循环内添加 if np.isnan(v): ... 分支;
- 对多列批量处理,推荐用 df.apply(lambda s: pd.Series(rolling_mean_signal(s))) 封装,保持接口一致性。
通过结合清晰的状态逻辑与可选的底层加速,本方法在可维护性与执行效率间取得良好平衡,适用于实时信号生成、异常检测触发及规则引擎开发等场景。










