0

0

如何让多个 Ray Actor 并发写入共享数据缓冲区并由主线程安全读取

花韻仙語

花韻仙語

发布时间:2026-02-07 11:32:35

|

878人浏览过

|

来源于php中文网

原创

如何让多个 Ray Actor 并发写入共享数据缓冲区并由主线程安全读取

本文介绍一种基于 ray 线程化 actor(`max_concurrency`)的轻量级方案,使多个远程 `datagenerator` 实例持续生成数据并存入各自缓冲区,主线程可无阻塞地批量拉取所有缓冲数据,避免传统共享队列的复杂性与同步风险。

在分布式数据生成场景中,常需多个 Ray Actor 并行生产数据,并由主驱动线程统一收集处理。但默认情况下,Ray Actor 是单线程串行执行的:一旦某个方法(如 generate())进入无限循环,其他方法(如 pop_data())将被永久阻塞,导致主线程无法及时获取数据。

核心解法:启用线程化 Actor(Threaded Actor)
通过 .options(max_concurrency=N) 显式允许 Actor 同时处理多个远程调用,即可实现“后台持续生成 + 前台即时读取”的并发模型。只需设置 max_concurrency=2,即允许多至 2 个方法并行执行——一个运行 generate() 循环,另一个响应 pop_data() 请求。

以下是完整、可直接运行的实践代码:

import ray
import random
import time

ray.init(ignore_reinit_error=True)

@ray.remote
class DataGenerator:
    def __init__(self):
        self.data_buffer = []

    def generate(self):
        while True:
            time.sleep(5)  # 模拟耗时数据生成
            data = random.random()  # 注意:原示例中 random.rand() 应为 random.random()
            self.data_buffer.append(data)

    def pop_data(self):
        """原子性获取并清空当前缓冲区"""
        data = self.data_buffer.copy()  # 避免引用共享问题
        self.data_buffer.clear()
        return data

# 启动 10 个并发 Actor,每个支持最多 2 个并发方法调用
N_HANDLES = 10
generator_handles = [
    DataGenerator.remote().options(max_concurrency=2)
    for _ in range(N_HANDLES)
]

# 启动所有生成器(非阻塞)
for handle in generator_handles:
    handle.generate.remote()

# 主线程:周期性拉取全部数据并处理
all_data = []
while True:
    # 并发获取所有 Actor 的当前缓冲数据
    results = ray.get([h.pop_data.remote() for h in generator_handles])

    # 合并数据
    for batch in results:
        all_data.extend(batch)

    print(f"已累积 {len(all_data)} 条数据")

    # ✅ 此处可执行任意计算密集型任务(如模型推理、批处理等)
    # 即使耗时较长,也不会阻塞 Actor 的数据生成
    # time.sleep(30)  # 示例:模拟长耗时处理

    # (可选)重置或限流:防止 all_data 无限增长
    if len(all_data) > 1000:
        all_data = all_data[-500:]  # 保留最新 500 条

关键优势说明

Ribbet.ai
Ribbet.ai

免费在线AI图片处理编辑

下载
  • 零共享状态:无需 threading.Queue、ray.util.queue.Queue 或外部 Redis,规避跨进程/跨节点序列化与锁竞争;
  • 天然隔离:每个 Actor 拥有独立 data_buffer,无并发修改风险;
  • 低延迟读取:pop_data() 始终可立即响应,不受 generate() 循环影响;
  • 弹性伸缩:Actor 数量与 max_concurrency 可按负载独立调整。

⚠️ 注意事项

  • max_concurrency 必须 ≥ 2,否则 pop_data() 调用将永远等待 generate() 结束(而它永不结束);
  • random.random() 替代了原文误写的 random.rand()(后者属于 NumPy,需 import numpy as np);
  • 若需强一致性(如严格 FIFO 全局顺序),本方案不适用——此时应引入中心化队列(如 ray.util.queue.Queue)并配合 ray.wait() 流控;
  • 缓冲区过大会增加内存压力,建议在 pop_data() 中做截断或在主线程中定期清理。

该模式是 Ray 官方推荐的“Actor 内部状态 + 并发访问”典型范式,兼顾简洁性、性能与可维护性,适用于实时数据采集、日志聚合、传感器流预处理等场景。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

378

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

238

2023.10.07

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

612

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

612

2023.08.10

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

986

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

650

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

493

2024.04.02

redis怎么做缓存服务器
redis怎么做缓存服务器

redis 作为缓存服务器的答案:redis 是一款开源、高性能、分布式的键值存储,可作为缓存服务器使用。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

404

2024.04.07

Golang处理数据库错误教程合集
Golang处理数据库错误教程合集

本专题整合了Golang数据库错误处理方法、技巧、管理策略相关内容,阅读专题下面的文章了解更多详细内容。

2

2026.02.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.4万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 6.6万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号