
使用 loguru 为数据处理流水线(如 store1_sales、store1_warehouses)创建隔离的日志文件,通过 bind() 绑定任务标识 + 自定义 filter 实现按执行组自动路由日志到专属文件,并支持独立轮转策略。
在构建多管道数据处理系统(如 ETL:extract → transform → load)时,将日志按业务维度(例如 "store1_sales"、"store2_warehouses")物理隔离,不仅能提升故障排查效率,还能实现精细化运维——比如为高频流水线配置更激进的轮转(如每天),而低频任务保留更长日志周期(如30天)。Loguru 原生不提供“命名子 logger”,但可通过 Handler + Filter + bind 三者协同实现等效效果。
核心思路是:
✅ 为每个执行组预先注册一个专用 Handler(指向唯一日志文件 + 独立 rotation/retention);
✅ 使用 filter 函数精准拦截仅属于该组的日志记录(基于 record["extra"] 中的绑定键值);
✅ 在各模块中通过 logger.bind(task="xxx") 创建轻量级上下文化 logger 实例,避免全局污染。
以下为可直接落地的实践方案:
1. 全局初始化(推荐在主入口或 config 模块中执行一次)
from loguru import logger
# 为每个 pipeline 注册独立 handler
logger.add(
"logs/store1_sales.log",
level="INFO",
rotation="1 week",
retention="90 days",
compression="zip",
filter=lambda rec: rec["extra"].get("task") == "store1-sales",
serialize=True,
encoding="utf-8"
)
logger.add(
"logs/store1_warehouses.log",
level="INFO",
rotation="30 days",
retention="180 days",
filter=lambda rec: rec["extra"].get("task") == "store1-warehouses",
serialize=True,
encoding="utf-8"
)
logger.add(
"logs/store2_sales.log",
level="INFO",
rotation="1 day", # 高频流水线启用日粒度轮转
retention="7 days",
filter=lambda rec: rec["extra"].get("task") == "store2-sales",
serialize=True,
encoding="utf-8"
)2. 各业务模块中按需绑定(解耦且线程安全)
# extract/store1/sales.py
from loguru import logger
logger_store1_sales = logger.bind(task="store1-sales")
def store1_extract_sales():
logger_store1_sales.info("Started extracting sales data from API")
# ... processing logic
logger_store1_sales.success("Extraction completed, rows={count}", count=1247)# transform/store1/sales.py
from loguru import logger
logger_store1_sales = logger.bind(task="store1-sales") # 复用相同 task 标识
def store1_convert_sales():
logger_store1_sales.debug("Applying currency conversion and deduplication")
# ... transformation logic
logger_store1_sales.info("Converted {rows} records", rows=1247)# load/store1/sales.py
from loguru import logger
logger_store1_sales = logger.bind(task="store1-sales")
def store1_load_sales():
logger_store1_sales.info("Inserting into warehouse.sales table...")
# ... DB insertion
logger_store1_sales.success("Loaded {n} rows successfully", n=1247)⚠️ 关键注意事项: filter 函数中务必使用 rec["extra"].get("task") 而非 rec["extra"]["task"],避免因未绑定导致 KeyError; 所有同属一个 pipeline 的模块必须使用完全一致的 task 字符串(如统一用 "store1-sales" 而非混用 "store1_sales" 或 "STORE1-SALES"); bind() 返回的是新 logger 实例,无状态共享,天然支持多线程/协程场景; 若需动态创建 handler(如 pipeline 名称来自配置),可用循环注册,但需确保 handler ID 不重复(Loguru 会自动分配,无需手动管理)。
最终,store1_sales.log 将只包含所有 logger.bind(task="store1-sales") 发出的日志,且按周轮转压缩;其他流水线日志互不干扰。这种设计既保持了代码的模块化与路径无关性(各 .py 文件可分散在任意目录),又实现了日志治理的强隔离性与高灵活性。










