
本文介绍使用 loguru 为多条数据处理流水线(如 “store1_sales”、“store1_warehouses”)配置独立日志文件的方法,通过 `bind()` 绑定上下文标签 + 自定义 filter 实现按执行组自动路由日志到指定文件,并支持差异化轮转策略。
在构建分布式或模块化的数据处理管道(如 ETL 流程)时,将日志按业务维度(例如按“店铺+业务类型”分组:store1_sales、store1_warehouses、store2_sales)隔离存储,不仅能提升故障定位效率,还便于后续按管道做日志归档、监控与审计。Loguru 原生不提供“命名 logger”概念,但其 bind() + filter 机制可优雅实现等效效果——即为每个执行组创建逻辑上独立的日志入口,同时复用同一全局 logger 实例。
核心思路是:
✅ 为每类管道预注册一个专属 handler(指向唯一文件路径 + 独立 rotation/retention 策略);
✅ 该 handler 仅接收携带特定上下文标签(如 "task": "store1-sales")的日志记录;
✅ 各模块使用 logger.bind(task="...") 获取专属 logger 实例,调用 .info() 等方法时自动注入标签。
以下为完整实践示例:
# config_logger.py —— 日志初始化(建议在项目入口统一执行一次)
from loguru import logger
import os
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)
# 为 store1_sales 管道配置专属 handler
logger.add(
sink=os.path.join(LOG_DIR, "store1_sales.log"),
level="INFO",
rotation="1 week", # 每周轮转
retention="90 days", # 保留90天
compression="zip", # 归档压缩
filter=lambda record: record["extra"].get("task") == "store1-sales",
serialize=False, # 非 JSON 格式(更易读),如需结构化可设 True
)
# 为 store1_warehouses 配置另一 handler
logger.add(
sink=os.path.join(LOG_DIR, "store1_warehouses.log"),
level="INFO",
rotation="500 MB", # 按大小轮转
retention="30 days",
filter=lambda record: record["extra"].get("task") == "store1-warehouses",
)
# 为 store2_sales 配置第三 handler
logger.add(
sink=os.path.join(LOG_DIR, "store2_sales.log"),
level="INFO",
rotation="100 MB",
retention="14 days",
filter=lambda record: record["extra"].get("task") == "store2-sales",
)在各业务模块中(无论物理路径如何),只需导入 logger 并绑定任务标识即可:
# extract/store1_sales.py
from loguru import logger
# 绑定当前管道标识 → 后续所有日志自动匹配对应 handler
log = logger.bind(task="store1-sales")
def store1_extract_sales():
log.info("Starting sales extraction for Store 1")
# ... actual logic
log.success("Sales extraction completed")
# transform/store1_sales.py
from loguru import logger
log = logger.bind(task="store1-sales") # 复用相同 task 标签
def store1_convert_sales():
log.debug("Applying currency conversion")
log.info("Sales conversion finished")⚠️ 关键注意事项: 所有 logger.add(...) 必须在任何 bind() 调用之前完成(通常放在应用启动阶段); filter 函数中务必使用 record["extra"].get("key") 安全取值,避免 KeyError; 若需 JSON 序列化(如对接 ELK),请将 serialize=True,此时日志内容会自动转为字典格式,extra 字段仍保留; 不同 handler 可设置完全不同的级别(如 store2_sales 设为 DEBUG,而 store1_warehouses 仅 WARNING),实现精细化控制; 若管道数量动态变化(如从配置文件加载),可用循环批量注册 handler,保持扩展性。
最终,每个管道的日志将严格写入各自文件,互不干扰,且轮转策略(时间/大小/保留期)完全解耦——真正实现“一管道一世界”的日志治理范式。










