
本文介绍如何为 pandes dataframe 的每列构建一个动态符号指示器(1 或 -1),该指示器在当前值 ≥ 指定倍数 × 当前滚动均值时翻转,并强制要求每次重置前滚动窗口至少包含指定最小观测数。
在时间序列分析或信号检测类任务中,常需根据局部统计量(如滚动均值)的动态变化触发状态切换。本教程解决一个典型场景:对每一列独立维护一个累积滚动均值窗口,当某行值满足 value ≥ multiple × current_rolling_mean 且窗口长度已达最小阈值(min_count)时,重置窗口并翻转指示器符号(+1 ↔ −1)。该逻辑无法直接通过 pd.Series.rolling().mean() 实现,因其依赖状态记忆(当前符号、累计和、计数),需逐行迭代处理。
以下是一个清晰、可复用的纯 Python 实现:
import pandas as pd
def rolling_mean_indicator(col, start=1, multiple=2, min_count=4):
"""
生成滚动均值触发的符号指示器序列。
Parameters:
-----------
col : pd.Series or array-like
输入列数据
start : int, default 1
初始指示器值(1 或 -1)
multiple : float, default 2
触发翻转的倍数阈值
min_count : int, default 4
允许触发重置所需的最小连续观测数
Yields:
-------
int : 当前行对应的指示器值(1 或 -1)
"""
curr = start
num_obs = 0
acc = 0.0
for v in col:
acc += v
num_obs += 1
if num_obs < min_count:
yield curr
continue
mean_val = acc / num_obs
if v >= multiple * mean_val:
curr *= -1
num_obs = 0
acc = 0.0
yield curr
# 示例数据
df = pd.DataFrame({
"A": [0.1, 0.1, 0.15, 0.1, 0.1, 0.7, 0.1, 0.1, 0.5, 1, 0.1, 0.1],
"B": [0.1, 0.1, 0.4, 0.1, 0.8, 0.1, 0.1, 0.1, 0.1, 0.1, 0.9, 0.1],
})
# 应用函数
df["A_ind"] = list(rolling_mean_indicator(df["A"]))
df["B_ind"] = list(rolling_mean_indicator(df["B"]))
print(df[["A", "B", "A_ind", "B_ind"]])输出结果与预期一致:
A B A_ind B_ind 0 0.10 0.1 1 1 1 0.10 0.1 1 1 2 0.15 0.4 1 1 3 0.10 0.1 1 1 4 0.10 0.8 1 -1 5 0.70 0.1 -1 -1 6 0.10 0.1 -1 -1 7 0.10 0.1 -1 -1 8 0.50 0.1 -1 -1 9 1.00 0.1 1 -1 10 0.10 0.9 1 1 11 0.10 0.1 1 1
关键逻辑说明:
- 窗口是累积型(非滑动窗):从上一次重置后首行开始累加,直到触发条件才清空;
- min_count 是硬性约束:即使满足 v ≥ multiple × mean,若 num_obs
- 重置即清零 acc 和 num_obs,下一行重新开始累积(而非跳过);
- 指示器仅在重置发生时翻转,其余时间保持当前值。
性能优化建议(大数据集必选):
对万行以上数据,推荐使用 numba JIT 加速。只需添加 @njit 装饰器并返回 NumPy 数组:
from numba import njit
import numpy as np
@njit
def rolling_mean_indicator_numba(col, start=1, multiple=2, min_count=4):
curr = start
num_obs = 0
acc = 0.0
out = np.empty(len(col), dtype=np.int8)
for i in range(len(col)):
v = col[i]
acc += v
num_obs += 1
if num_obs < min_count:
out[i] = curr
continue
mean_val = acc / num_obs
if v >= multiple * mean_val:
curr *= -1
num_obs = 0
acc = 0.0
out[i] = curr
return out
# 使用方式(注意传入 .values)
df["A_ind_fast"] = rolling_mean_indicator_numba(df["A"].values)
df["B_ind_fast"] = rolling_mean_indicator_numba(df["B"].values)此方案兼顾可读性与工程实用性,适用于金融信号生成、异常脉冲检测、自适应阈值控制系统等场景。注意:该逻辑本质为在线单次遍历算法,不支持向量化回溯,但正因如此,它天然适配流式数据处理。










