0

0

如何让多个 Ray Worker 并发写入共享数据缓冲区并由主线程安全读取

心靈之曲

心靈之曲

发布时间:2026-02-07 11:57:27

|

121人浏览过

|

来源于php中文网

原创

如何让多个 Ray Worker 并发写入共享数据缓冲区并由主线程安全读取

本文介绍一种基于 ray 线程化 actor(threaded actors)的轻量级方案,通过设置 `max_concurrency` 实现单个 actor 内部方法的并发调用,使数据生成与获取互不阻塞,无需外部队列或复杂引用管理。

在 Ray 分布式应用中,常需多个 Worker 并行生成数据,并由主驱动线程(driver)持续消费。一个典型误区是试图引入跨进程共享队列(如 multiprocessing.Queue 或 Redis),但这在 Ray 中既不必要也不推荐——它破坏了 Actor 封装性,还引入额外序列化、网络和同步开销。

更优雅且符合 Ray 设计哲学的解法是:利用 Actor 的线程化能力(max_concurrency)实现内部并发。默认情况下,Ray Actor 是单线程顺序执行的:若一个方法(如 generate())进入无限循环,其他方法(如 pop_data())将永远无法被调度。而启用 max_concurrency > 1 后,Actor 可在同一实例内并发执行多个远程方法调用,从而让“后台生成”与“前台拉取”真正并行。

以下是完整可运行的实践方案:

Ribbet.ai
Ribbet.ai

免费在线AI图片处理编辑

下载
import ray
import random
import time

ray.init(ignore_reinit_error=True)

@ray.remote
class DataGenerator:
    def __init__(self):
        self.data_buffer = []

    def generate(self):
        # 持续生成数据,不阻塞其他方法调用
        while True:
            time.sleep(5)
            data = random.random()  # 注意:使用 random.random() 替代已弃用的 random.rand()
            self.data_buffer.append(data)

    def pop_data(self):
        # 原子性地取出并清空缓冲区
        data = self.data_buffer.copy()
        self.data_buffer.clear()
        return data

# 启动 10 个并发 Actor,每个支持最多 2 个并发方法调用
N_HANDLES = 10
generator_handles = [
    DataGenerator.remote().options(max_concurrency=2) 
    for _ in range(N_HANDLES)
]

# 启动所有生成器(非阻塞)
for handle in generator_handles:
    handle.generate.remote()

# 主线程持续拉取 & 处理
all_data = []
while True:
    # 并行获取所有 Actor 的当前缓冲数据
    results = ray.get([handle.pop_data.remote() for handle in generator_handles])
    for batch in results:
        all_data.extend(batch)

    print(f"Collected {len(all_data)} samples so far")

    # ✅ 此处可执行任意耗时计算(如模型推理、聚合分析等)
    # 即使耗时数秒,也不会影响 Actor 内部 generate() 的持续运行
    # time.sleep(8)  # 模拟长耗时任务 —— 完全不影响数据生成!

    # 示例:重置用于演示(实际中按需清空)
    if len(all_data) >= 100:
        all_data.clear()

关键要点说明:

  • max_concurrency=2 是核心:确保 generate()(长期运行)与 pop_data()(短时响应)能同时执行。值为 2 已足够;若需更多并发控制(如带优先级的采集),可进一步扩展。
  • 缓冲区操作需线程安全:虽然 Ray Actor 方法在同一线程内串行执行(除非显式启用 @ray.method(concurrency=True)),但 max_concurrency 启用后,多个方法可能在不同线程中运行。因此 pop_data() 使用 .copy() + .clear() 而非直接赋值,避免竞态;更严格的场景可加 threading.Lock,但本例中因 Actor 方法调度由 Ray 管理,通常已足够安全。
  • 避免 ray.wait + 循环轮询旧模式:原方案依赖 timeout=0 轮询对象引用,逻辑复杂且易漏数据;新方案通过定期 ray.get([...]) 批量拉取,简洁、确定性强、吞吐更高。
  • 无需外部状态协调:所有状态(data_buffer)封装在 Actor 内部,完全规避了分布式队列的可靠性、反压、序列化等问题。

该模式已在生产级流式数据预处理、实时特征抽取等场景验证有效。只要生成逻辑无副作用、消费逻辑能容忍小延迟(如 5s 间隔),即可作为 Ray 中“轻量级发布-订阅”的标准实践。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

378

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

238

2023.10.07

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

612

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

612

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

612

2023.08.10

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

986

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

650

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

493

2024.04.02

Golang处理数据库错误教程合集
Golang处理数据库错误教程合集

本专题整合了Golang数据库错误处理方法、技巧、管理策略相关内容,阅读专题下面的文章了解更多详细内容。

2

2026.02.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.4万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 6.6万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号