
本文介绍一种基于 ray 线程化 actor(threaded actors)的轻量级方案,通过设置 `max_concurrency` 实现单个 actor 内部方法的并发调用,使数据生成与获取互不阻塞,无需外部队列或复杂引用管理。
在 Ray 分布式应用中,常需多个 Worker 并行生成数据,并由主驱动线程(driver)持续消费。一个典型误区是试图引入跨进程共享队列(如 multiprocessing.Queue 或 Redis),但这在 Ray 中既不必要也不推荐——它破坏了 Actor 封装性,还引入额外序列化、网络和同步开销。
更优雅且符合 Ray 设计哲学的解法是:利用 Actor 的线程化能力(max_concurrency)实现内部并发。默认情况下,Ray Actor 是单线程顺序执行的:若一个方法(如 generate())进入无限循环,其他方法(如 pop_data())将永远无法被调度。而启用 max_concurrency > 1 后,Actor 可在同一实例内并发执行多个远程方法调用,从而让“后台生成”与“前台拉取”真正并行。
以下是完整可运行的实践方案:
import ray
import random
import time
ray.init(ignore_reinit_error=True)
@ray.remote
class DataGenerator:
def __init__(self):
self.data_buffer = []
def generate(self):
# 持续生成数据,不阻塞其他方法调用
while True:
time.sleep(5)
data = random.random() # 注意:使用 random.random() 替代已弃用的 random.rand()
self.data_buffer.append(data)
def pop_data(self):
# 原子性地取出并清空缓冲区
data = self.data_buffer.copy()
self.data_buffer.clear()
return data
# 启动 10 个并发 Actor,每个支持最多 2 个并发方法调用
N_HANDLES = 10
generator_handles = [
DataGenerator.remote().options(max_concurrency=2)
for _ in range(N_HANDLES)
]
# 启动所有生成器(非阻塞)
for handle in generator_handles:
handle.generate.remote()
# 主线程持续拉取 & 处理
all_data = []
while True:
# 并行获取所有 Actor 的当前缓冲数据
results = ray.get([handle.pop_data.remote() for handle in generator_handles])
for batch in results:
all_data.extend(batch)
print(f"Collected {len(all_data)} samples so far")
# ✅ 此处可执行任意耗时计算(如模型推理、聚合分析等)
# 即使耗时数秒,也不会影响 Actor 内部 generate() 的持续运行
# time.sleep(8) # 模拟长耗时任务 —— 完全不影响数据生成!
# 示例:重置用于演示(实际中按需清空)
if len(all_data) >= 100:
all_data.clear()关键要点说明:
- max_concurrency=2 是核心:确保 generate()(长期运行)与 pop_data()(短时响应)能同时执行。值为 2 已足够;若需更多并发控制(如带优先级的采集),可进一步扩展。
- 缓冲区操作需线程安全:虽然 Ray Actor 方法在同一线程内串行执行(除非显式启用 @ray.method(concurrency=True)),但 max_concurrency 启用后,多个方法可能在不同线程中运行。因此 pop_data() 使用 .copy() + .clear() 而非直接赋值,避免竞态;更严格的场景可加 threading.Lock,但本例中因 Actor 方法调度由 Ray 管理,通常已足够安全。
- 避免 ray.wait + 循环轮询旧模式:原方案依赖 timeout=0 轮询对象引用,逻辑复杂且易漏数据;新方案通过定期 ray.get([...]) 批量拉取,简洁、确定性强、吞吐更高。
- 无需外部状态协调:所有状态(data_buffer)封装在 Actor 内部,完全规避了分布式队列的可靠性、反压、序列化等问题。
该模式已在生产级流式数据预处理、实时特征抽取等场景验证有效。只要生成逻辑无副作用、消费逻辑能容忍小延迟(如 5s 间隔),即可作为 Ray 中“轻量级发布-订阅”的标准实践。








