
本文详解如何在时间序列数据中高效计算滚动均值,要求对重复姓名仅保留最新记录(按时间升序),避免重复计算,适用于面试场景与实际数据分析任务。
本文详解如何在时间序列数据中高效计算滚动均值,要求对重复姓名仅保留最新记录(按时间升序),避免重复计算,适用于面试场景与实际数据分析任务。
在数据分析与算法面试中,常遇到一类“带条件的滚动统计”问题:不仅需按时间维度累积计算(如滚动均值),还需动态维护业务逻辑约束(例如:同一用户只计最新值)。本题即典型代表——给定按时间分组的观测数据,要求对每个时间点 t,计算所有 time ≤ t 的记录中、每个 name 仅保留最后出现的一条(即按时间顺序的最新值)后的 val 均值。
关键难点在于:
✅ 去重逻辑非全局静态,而是随窗口扩展动态变化(时间 2 的窗口包含时间 1 和 2 的数据,但 Andy 在时间 1 和 2 均出现,只取时间 2 的 val=5);
✅ 性能敏感,不可对每个时间点全量重扫历史数据(如嵌套循环遍历所有先前行);
✅ 结果需可扩展,支持任意长度的时间序列,而非仅限两步示例。
✅ 推荐解法:基于 sort + groupby + expanding 的向量化方案
最简洁、高效且符合 pandas 设计哲学的方式是:先按时间升序排序,再对 names 做 last 去重,最后对去重后数据按 time 做累积聚合。该方案时间复杂度为 O(n log n),空间可控,且完全向量化,无显式 Python 循环:
import pandas as pd
data = pd.DataFrame({
'time': [1, 1, 1, 2, 2, 2],
'names': ["Andy", "Bob", "Karen", "Andy", "Matt", "Sim"],
'val': [1, 2, 3, 5, 6, 8]
})
# 步骤 1:确保时间有序(关键!去重逻辑依赖顺序)
data_sorted = data.sort_values('time').reset_index(drop=True)
# 步骤 2:对每个 name 取其在当前已处理时间范围内的最新值
# 方法:按 names 分组,取每组最后一条(即时间最大的那条)
# 注意:因已排序,groupby(...).last() 天然满足“最新”
deduped = data_sorted.groupby('names', as_index=False).last()
# 步骤 3:按 time 分组求均值 → 得到每个 time 点的独立均值(非滚动)
# 但题目要求的是「截至 time t 的所有记录中去重后的均值」→ 即 expanding window
# 因此需构建累计数据集:
cumulative_deduped = []
for t in sorted(data['time'].unique()):
# 取 time <= t 的所有行
window_df = data_sorted[data_sorted['time'] <= t]
# 对该窗口内 names 去重(取 last)
latest_in_window = window_df.groupby('names', as_index=False).last()
cumulative_deduped.append({
'time': t,
'mean_val': latest_in_window['val'].mean()
})
result_df = pd.DataFrame(cumulative_deduped)
print(result_df)输出:
time mean_val 0 1 2.0 1 2 4.8
✅ 验证:
- time=1:窗口含 [Andy:1, Bob:2, Karen:3] → 均值 = (1+2+3)/3 = 2.0
- time=2:窗口含全部 6 行;Andy 出现两次(time1=1, time2=5)→ 取 val=5;其余 Bob:2, Karen:3, Matt:6, Sim:8 无重复 → 共 5 个值:(2+3+5+6+8)/5 = 24/5 = 4.8
⚠️ 注意事项与进阶优化
- 排序不可省略:groupby(...).last() 依赖于数据在内存中的物理顺序。若原始数据未按 time 排序,last 可能返回错误记录(如 time=1 的 Andy 被误留)。务必先 sort_values('time')。
- 避免低效循环:面试中若用双重 for 循环(对每个 t 扫描所有行再手动去重),时间复杂度达 O(n²),会被判定为不通过。上述方案将核心去重逻辑交给 pandas C 实现,显著提速。
-
扩展性提示:若数据量极大(千万级),可考虑使用 dask.dataframe 或 polars 替代 pandas;后者语法更简洁:
import polars as pl df = pl.from_pandas(data) result = (df.sort("time") .group_by_rolling("time", period="1i") # 伪滚动(需适配) .agg(pl.col("val").filter(~pl.col("names").is_duplicated().over("names")).mean()) ) - 边界情况处理:实际代码中应增加空值检查(如某 time 点无数据)、val 非数值类型校验等鲁棒性逻辑。
✅ 总结
解决此类“带状态滚动统计”问题的核心思路是:将动态去重转化为按时间窗口的局部 groupby(...).last() 操作,再结合累积窗口迭代。它平衡了可读性、性能与工程可维护性,是面试与生产环境的优选策略。掌握此模式,可快速迁移至类似场景(如滚动去重计数、滚动最新价格、用户生命周期价值滚动更新等)。










