0

0

反序列化/解析错误 KafkaProtobuf Python

PHPz

PHPz

发布时间:2024-02-09 23:48:09

|

1576人浏览过

|

来源于stackoverflow

转载

反序列化/解析错误 kafkaprotobuf python

php小编新一为你介绍一种常见的错误:反序列化/解析错误 KafkaProtobuf Python。在使用KafkaProtobuf Python库时,可能会遇到反序列化或解析错误的问题。这可能是由于消息的序列化格式与消费者代码不匹配,或者是由于消息的格式有误导致的。解决这个问题的方法包括检查消息的序列化格式和消费者代码的兼容性,以及确保消息的格式正确。在本文中,我们将详细介绍这个问题的原因和解决方法,希望能帮助你解决类似的错误。

问题内容

序列化代码(Go 语言)

1。制片人

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}

2.发送Kafka消息

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}

尝试使用消息(Diff,Python 中的应用程序)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

错误

解析消息时出错

我正在尝试调试它,但无法。

  1. 两个应用程序中的 proto 文件是相同的
  2. 我使用 proton 生成 pb2 文件。

感谢您的帮助。

立即学习Python免费学习笔记(深入)”;

谢谢

NatAgent
NatAgent

AI数据情报监测与分析平台

下载

我可以获取原始格式的消息:

原始格式消息。

原始消息数据:b'\x00\x00\x00\x00\x02\x02\x08\n$1775100a-1a47-48b2-93b7-b7a331be59b4\x12\tcompleted'

  • 我尝试使用 UTF-8 对其进行解码,但失败了,因为并未读取所有字段。
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))

以上代码的输出:

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes

解决方法

您的 Go 客户端正在使用架构注册表进行序列化,这意味着您的 Python 代码也必须执行相同的操作。这些记录“不仅仅是 Protobuf”,因为模式 ID 也以字节编码,因此常规 Protobuf 解析器将失败。

存储库中有用于通过注册表集成使用 Protobuf 的示例代码

https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py

python速学教程(入门到精通)
python速学教程(入门到精通)

python怎么学习?python怎么入门?python在哪学?python怎么学才快?不用担心,这里为大家提供了python速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

相关标签:

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

168

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

151

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

github中文官网入口 github中文版官网网页进入
github中文官网入口 github中文版官网网页进入

github中文官网入口https://docs.github.com/zh/get-started,GitHub 是一种基于云的平台,可在其中存储、共享并与他人一起编写代码。 通过将代码存储在GitHub 上的“存储库”中,你可以: “展示或共享”你的工作。 持续“跟踪和管理”对代码的更改。

1013

2026.01.21

http与https有哪些区别
http与https有哪些区别

http与https的区别:1、协议安全性;2、连接方式;3、证书管理;4、连接状态;5、端口号;6、资源消耗;7、兼容性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

2113

2024.08.16

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

1

2026.01.29

java配置环境变量教程合集
java配置环境变量教程合集

本专题整合了java配置环境变量设置、步骤、安装jdk、避免冲突等等相关内容,阅读专题下面的文章了解更多详细操作。

2

2026.01.29

java成品学习网站推荐大全
java成品学习网站推荐大全

本专题整合了java成品网站、在线成品网站源码、源码入口等等相关内容,阅读专题下面的文章了解更多详细推荐内容。

0

2026.01.29

Java字符串处理使用教程合集
Java字符串处理使用教程合集

本专题整合了Java字符串截取、处理、使用、实战等等教程内容,阅读专题下面的文章了解详细操作教程。

0

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号