
本文介绍如何在 pandas 中实现真正灵活的滚动窗口计算——既能按列名(如 `df["a"]`)访问数据,又不受输入/输出列数必须一致的限制,避开 `rolling().apply(raw=true)` 的局限性。
Pandas 原生的 DataFrame.rolling().apply() 在处理复杂滚动逻辑时存在两个关键限制:一是启用 raw=True 时传入的是 numpy.ndarray,丢失列名和索引信息;二是要求返回值形状必须与原始 DataFrame 列数对齐(即 axis=1 下每行输出长度固定),无法自由生成任意数量的新列。
幸运的是,numpy.lib.stride_tricks.sliding_window_view 提供了一种更底层、更可控的替代方案。它能在不复制内存的前提下,将二维数组切分为连续的滑动窗口视图(view),再结合轻量级临时 DataFrame 构造(copy=False),即可在保持高性能的同时,完全保留列语义和输出自由度。
以下是一个完整、可复用的实现示例:
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
# 构造示例数据
df = pd.DataFrame({
"A": range(10),
"B": range(10, 20),
"C": range(20, 30)
})
# 定义滚动窗口大小(行数 × 列数)
window_rows = 2
window_cols = df.shape[1] # 覆盖全部列
# 使用 sliding_window_view 获取 (n_windows, window_rows, n_cols) 形状的视图
# 注意:仅沿行轴滑动,因此 window_shape = (window_rows, window_cols)
windows = sliding_window_view(df.values, window_shape=(window_rows, window_cols))
# 初始化结果列表(首行为 NaN 占位,对应窗口未就绪位置)
results = [tuple([np.nan] * 4)] # 假设输出 4 列:D, E, F, G
# 遍历每个窗口(注意:sliding_window_view 返回的是 view,非 copy)
for window_2d in windows:
# window_2d shape: (2, 3) → 构造临时 DataFrame(copy=False 避免内存开销)
temp_df = pd.DataFrame(window_2d, columns=df.columns, copy=False)
# ✅ 现在可自由使用列名进行计算!
D_val = temp_df["A"].sum() # 如:A 列窗口和
E_val = (temp_df["A"] + temp_df["B"]).mean() # A+B 的均值
F_val = (temp_df["C"] - 1).prod() # C-1 的连乘
G_val = (temp_df["B"] * 2).sum() # B×2 的和
results.append((D_val, E_val, F_val, G_val))
# 合并结果到原 DataFrame
result_df = pd.DataFrame(results, columns=["D", "E", "F", "G"])
df_final = pd.concat([df, result_df], axis=1)
print(df_final)✅ 优势总结:
- 列名友好:每个窗口都封装为标准 pd.DataFrame,支持 .loc, ["col"], query() 等全部 Pandas 语法;
- 输出自由:返回任意长度元组或字典,轻松映射到新列,无需与输入列数对齐;
- 内存高效:sliding_window_view 是零拷贝视图,copy=False 进一步避免中间 DataFrame 冗余复制;
- 可扩展性强:可轻松嵌入自定义函数、条件分支、多步计算,甚至调用 scikit-learn 模型预测。
⚠️ 注意事项:
- sliding_window_view 要求 NumPy ≥ 1.20;旧版本可降级使用 np.lib.stride_tricks.as_strided(需手动计算 strides,更易出错);
- 窗口起始位置默认从第 0 行开始,长度为 len(df) - window_rows + 1,因此结果比原 DataFrame 少 window_rows - 1 行 —— 示例中通过首行填充 NaN 对齐,你也可根据业务选择 min_periods 或 center=True 等策略;
- 若需并行加速(如窗口计算极重),可结合 concurrent.futures.ThreadPoolExecutor 对 windows 进行批处理,但需注意 GIL 和 I/O 密集型场景的收益边界。
该方法虽含显式循环,但其内核是向量化 NumPy 视图操作,实际性能远超纯 Python 循环,且代码清晰、调试友好、维护成本低,是替代 rolling().apply() 复杂定制需求的工业级实践方案。










