0

0

高效管理PyADS通知与大规模数据采集

聖光之護

聖光之護

发布时间:2025-08-11 17:22:29

|

787人浏览过

|

来源于php中文网

原创

高效管理pyads通知与大规模数据采集

本文旨在深入探讨如何利用Python的PyADS库高效地从倍福PLC获取实时数据,特别是针对高频、大规模数据采集场景。我们将重点介绍如何通过面向对象的方法(类)来管理回调函数的内部状态和累积数据,从而避免使用全局变量,并提供性能优化策略,尤其是在处理同类型大量数据时,通过自定义字节解析结合NumPy实现数据转换的显著加速。

PyADS通知机制与状态管理挑战

PyADS库通过其通知(Notification)机制,允许开发者订阅PLC变量的变化,并在变量值更新时触发回调函数。这对于需要高速、实时获取数据的应用场景(例如,每200毫秒或更快)至关重要。然而,回调函数的特性决定了它无法直接返回数据,这给需要累积大量数据或管理内部状态的应用带来了挑战。传统的解决方案可能倾向于使用全局变量,但这通常会导致代码难以维护、可读性差,并可能引发并发问题。

对于需要采集大量数据(例如,每周期1000个值,累积15个信号共100,000个值)的应用,直接在回调函数中处理所有数据并累积,同时避免全局变量,是核心问题。

基于类的PyADS通知管理与数据累积

解决回调函数状态管理问题的“Pythonic”方法是采用面向对象编程,将PyADS连接、通知设置以及数据处理逻辑封装在一个类中。这样,回调函数就可以作为类的方法,自然地访问和修改类的实例变量,从而实现数据的累积和状态的维护,而无需依赖全局变量。

类结构示例

以下是一个将PyADS通知封装到类中的基本结构示例:

import pyads
import ctypes
import numpy as np
import time

# 假设的PLC连接参数和变量定义
PLC_IP = '192.168.1.100'
PLC_AMS_NET_ID = '192.168.1.100.1.1'
LOCAL_AMS_NET_ID = '192.168.1.50.1.1' # 根据实际情况配置本地AMS Net ID

# 示例结构体定义,用于从PLC读取多个DINT数组
structure_def = (
    ('nVar1', pyads.PLCTYPE_DINT, 1000),
    ('nVar2', pyads.PLCTYPE_DINT, 1000),
    ('nVar3', pyads.PLCTYPE_DINT, 1000),
    # ... 可以根据需要添加更多变量
)
SIZE_OF_STRUCTURE = pyads.size_of_structure(structure_def)

class PlcDataManager:
    def __init__(self, plc_ip, plc_ams_net_id, local_ams_net_id):
        self.plc = pyads.Connection(plc_ip, plc_ams_net_id, local_ams_net_id)
        self.data_buffer = [] # 用于累积接收到的数据
        self.notification_handles = {} # 存储通知句柄,便于管理
        self.plc_state_run = False # PLC运行状态标记

        try:
            self.plc.open()
            print(f"成功连接到PLC: {plc_ip}")
            self._setup_notifications()
        except pyads.ADSError as e:
            print(f"连接PLC失败: {e}")
            self.plc = None # 标记连接失败

    def _setup_notifications(self):
        """设置所有ADS通知。"""
        if not self.plc:
            return

        # 1. 订阅数据变量通知
        data_var_name = 'global.sample_structure' # 假设PLC中存在此结构体变量
        attr_data = pyads.NotificationAttrib(SIZE_OF_STRUCTURE)
        try:
            handle_data = self.plc.add_device_notification(
                data_var_name, attr_data, self._on_data_received,
                pyads.ADSTransMode.OnChange, 200 # 每200ms检查一次变化
            )
            self.notification_handles[data_var_name] = handle_data
            print(f"已为 '{data_var_name}' 设置数据通知,句柄: {handle_data}")
        except pyads.ADSError as e:
            print(f"设置数据通知失败: {e}")

        # 2. 订阅PLC状态变化通知 (ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE)
        # 这对于监控PLC是否处于运行模式非常有用
        plc_state_addr = (int("0xF100", 16), int("0x0000", 16)) # ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE
        attr_state = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))
        try:
            handle_state = self.plc.add_device_notification(
                plc_state_addr, attr_state, self._on_plc_status_change,
                pyads.ADSTransMode.OnChange, 500 # 每500ms检查一次状态变化
            )
            self.notification_handles['plc_status'] = handle_state
            print(f"已为PLC状态设置通知,句柄: {handle_state}")
        except pyads.ADSError as e:
            print(f"设置PLC状态通知失败: {e}")

    def _on_data_received(self, handle, name, timestamp, value):
        """数据变量通知回调函数。"""
        # 使用pyads.dict_from_bytes进行初始转换,适用于混合类型或小批量数据
        # values = pyads.dict_from_bytes(value, structure_def)
        # self.data_buffer.append(values)

        # 对于大规模同类型数据,推荐手动解析以优化性能
        # 假设所有变量都是PLCTYPE_DINT,且长度相同
        dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT)
        num_elements_per_var = structure_def[0][2] # 假设所有变量长度相同
        total_elements = len(structure_def) * num_elements_per_var

        # 将字节数据直接转换为NumPy数组
        # 注意:这里假设字节序与系统匹配,通常ADS是小端序
        # 如果需要,可以使用 np.frombuffer(value, dtype='<i4') 指定小端序32位整数
        raw_data = np.frombuffer(value, dtype=np.int32) 

        # 将一维NumPy数组重新塑形为字典,方便访问
        current_data = {}
        for i, (var_name, _, var_len) in enumerate(structure_def):
            start_idx = i * var_len
            end_idx = start_idx + var_len
            current_data[var_name] = raw_data[start_idx:end_idx]

        self.data_buffer.append(current_data)
        # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")

    def _on_plc_status_change(self, notification, _):
        """PLC状态变化通知回调函数。"""
        # pyads.parse_notification用于解析原始通知字节数据
        *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)

        if value == pyads.ADSSTATE_RUN:
            self.plc_state_run = True
            print("PLC进入运行模式 (ADSSTATE_RUN)")
        else:
            self.plc_state_run = False
            print(f"PLC退出运行模式,当前状态: {pyads.ADSSTATE_NAMES.get(value, '未知状态')}")
            # 可以触发错误处理或重连逻辑

    def get_accumulated_data(self):
        """获取并清空累积的数据缓冲区。"""
        data = self.data_buffer[:]
        self.data_buffer.clear()
        return data

    def close(self):
        """关闭PLC连接并移除所有通知。"""
        if self.plc:
            for name, handle in self.notification_handles.items():
                try:
                    self.plc.del_device_notification(handle)
                    print(f"已移除通知: {name} (句柄: {handle})")
                except pyads.ADSError as e:
                    print(f"移除通知 '{name}' 失败: {e}")
            self.plc.close()
            print("PLC连接已关闭。")

# 示例使用
if __name__ == "__main__":
    manager = PlcDataManager(PLC_IP, PLC_AMS_NET_ID, LOCAL_AMS_NET_ID)

    if manager.plc: # 确保PLC连接成功
        print("等待数据...")
        try:
            # 模拟主程序循环,等待数据并进行处理
            for i in range(20): # 运行约4秒 (20 * 200ms)
                time.sleep(0.2) # 模拟主程序等待
                if manager.data_buffer:
                    collected_data = manager.get_accumulated_data()
                    print(f"循环 {i+1}: 收集到 {len(collected_data)} 批数据。")
                    # 这里可以对 collected_data 进行进一步处理,例如存储到文件或数据库
                    # print(f"最新一批数据示例: {collected_data[-1]['nVar1'][0:5]}") # 打印第一批数据的nVar1前5个元素
                else:
                    print(f"循环 {i+1}: 暂无新数据。")

        except KeyboardInterrupt:
            print("\n程序被用户中断。")
        finally:
            manager.close()
    else:
        print("无法运行,PLC连接失败。")

在上述示例中:

Cutout.Pro
Cutout.Pro

AI驱动的视觉设计平台

下载
  • PlcDataManager 类封装了与PLC的连接、通知的设置以及数据缓冲区 self.data_buffer。
  • _on_data_received 方法作为回调函数,它是一个实例方法,可以直接访问 self.data_buffer 并将接收到的数据追加进去。
  • _on_plc_status_change 方法展示了如何使用 pyads.parse_notification 来解析PLC状态变化的通知,这对于监控PLC运行状态非常有用。
  • get_accumulated_data 方法允许主程序在需要时获取并清空累积的数据,实现数据消费与生产的分离。

高性能数据解析与优化

对于大规模、同类型数据的采集,pyads.dict_from_bytes 或默认的变量读取方式可能会因为Python层面的频繁类型转换而成为性能瓶颈。当PLC发送的数据是连续的字节流,且其中包含大量相同类型的元素(如1000个DINT),更高效的方法是:

  1. 直接获取原始字节数据: 在 add_device_notification 中,回调函数接收到的 value 参数即为原始字节数据。
  2. 利用NumPy进行批量转换: Python的NumPy库在处理数值数组方面具有极高的效率。可以将原始字节数据直接转换为NumPy数组,然后根据结构体定义进行切片和重塑。
# 在 _on_data_received 方法中进行优化
# ...
    def _on_data_received(self, handle, name, timestamp, value):
        """数据变量通知回调函数,优化大规模数据解析。"""
        # 假设所有变量都是PLCTYPE_DINT,且长度相同
        # 例如:structure_def = (('nVar1', pyads.PLCTYPE_DINT, 1000), ...)

        # 将字节数据直接转换为NumPy数组
        # dtype='<i4' 表示小端序32位带符号整数 (DINT)
        # 根据实际PLC的字节序和数据类型调整 dtype
        raw_data_np = np.frombuffer(value, dtype='<i4') 

        current_data = {}
        offset = 0
        for var_name, var_type, var_len in structure_def:
            # 获取每个变量的字节大小
            type_size = ctypes.sizeof(var_type)

            # 计算当前变量在NumPy数组中的元素范围
            start_idx = offset
            end_idx = offset + var_len
            current_data[var_name] = raw_data_np[start_idx:end_idx]
            offset += var_len # 更新偏移量

        self.data_buffer.append(current_data)
        # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")
# ...

这种方法通过NumPy的底层C实现进行批量数据转换,可以带来数量级的性能提升,尤其适用于数据记录和大数据量处理场景。

注意事项与总结

  • 装饰器限制: pyads.notification 装饰器不能直接用于类方法。必须通过 plc.add_device_notification 方法显式地将类方法注册为回调函数。
  • 字节序与数据类型: 在手动解析字节数据时,务必注意PLC的数据字节序(通常为小端序)以及正确的数据类型映射(例如,PyADS的 PLCTYPE_DINT 对应NumPy的 np.int32 或 np.intc,且需考虑字节序)。
  • 错误处理: 监控PLC的状态变化(如 _on_plc_status_change 所示)对于构建健壮的通信应用至关重要,可以在PLC退出运行模式时触发告警或重连机制。
  • 资源管理: 确保在程序结束时正确关闭PLC连接并移除所有通知,释放资源。

通过采用类封装和高性能的数据解析策略,开发者可以构建出更加健壮、高效且易于维护的PyADS应用程序,从而有效地处理来自PLC的高速、大规模数据流。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

338

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

225

2025.10.31

c语言 数据类型
c语言 数据类型

本专题整合了c语言数据类型相关内容,阅读专题下面的文章了解更多详细内容。

138

2026.02.12

数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

338

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

225

2025.10.31

c语言 数据类型
c语言 数据类型

本专题整合了c语言数据类型相关内容,阅读专题下面的文章了解更多详细内容。

138

2026.02.12

go语言 面向对象
go语言 面向对象

本专题整合了go语言面向对象相关内容,阅读专题下面的文章了解更多详细内容。

58

2025.09.05

java面向对象
java面向对象

本专题整合了java面向对象相关内容,阅读专题下面的文章了解更多详细内容。

65

2025.11.27

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

49

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 5万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号