Python异常默认不跨线程传播,因各线程有独立栈和异常上下文;子线程未捕获异常时仅触发threading.excepthook,默认打印traceback后退出,不会通知主线程。

Python 中的异常默认不会跨线程传播。主线程无法直接捕获子线程中发生的异常,子线程崩溃也不会中断主线程或其他线程。
为什么异常不自动传播?
每个线程有独立的执行栈和异常处理上下文。当子线程抛出未捕获异常时,Python 会调用 threading.excepthook(默认打印 traceback 并退出该线程),但不会向上通知创建它的线程。
手动捕获并传递异常的常用方法
需要显式将异常信息从子线程“带出来”,常见做法有:
-
使用
queue.Queue:子线程把异常对象(或 (type, value, traceback) 元组)放入队列,主线程定时检查并重新抛出 -
保存到共享变量 + 标志位:用
threading.Event或普通变量标记失败,并将sys.exc_info()结果存入线程安全容器(如threading.local()或加锁的 dict) -
使用
concurrent.futures.ThreadPoolExecutor:调用future.result()时,若子线程出错,会原样抛出该异常(推荐,封装了传播逻辑)
ThreadPoolExecutor 是最简洁的方案
它内部通过 _result 和 _exception 属性保存执行结果或异常,并在 result() 调用时触发重抛:
立即学习“Python免费学习笔记(深入)”;
from concurrent.futures import ThreadPoolExecutor
import time
<p>def risky_task():
time.sleep(0.1)
raise ValueError("子线程出错了")</p><p>with ThreadPoolExecutor() as executor:
future = executor.submit(risky_task)
try:
future.result() # 这里会抛出 ValueError
except ValueError as e:
print(f"捕获到:{e}") # 输出:捕获到:子线程出错了</p>自定义 excepthook 可用于日志或调试
如果不想让线程静默退出,可设置全局钩子:
import threading
import sys
<p>def custom_hook(args):
print(f"[线程异常] {args.thread.name}: {args.exc_value}")</p><p>threading.excepthook = custom_hook</p><p>def bad_func():
raise RuntimeError("boom")</p><p>threading.Thread(target=bad_func).start() # 触发 custom_hook</p>注意:这仅用于记录,不能替代异常传播逻辑。









