首页 > 后端开发 > Golang > 反序列化/解析错误 KafkaProtobuf Python

反序列化/解析错误 KafkaProtobuf Python

PHPz
发布: 2024-02-09 23:48:09
转载
1147 人浏览过

反序列化/解析错误 KafkaProtobuf Python

php小编新一为你介绍一种常见的错误:反序列化/解析错误 KafkaProtobuf Python。在使用KafkaProtobuf Python库时,可能会遇到反序列化或解析错误的问题。这可能是由于消息的序列化格式与消费者代码不匹配,或者是由于消息的格式有误导致的。解决这个问题的方法包括检查消息的序列化格式和消费者代码的兼容性,以及确保消息的格式正确。在本文中,我们将详细介绍这个问题的原因和解决方法,希望能帮助你解决类似的错误。

问题内容

序列化代码(Go 语言)

1。制片人

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}
登录后复制

2.发送Kafka消息

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}
登录后复制

尝试使用消息(Diff,Python 中的应用程序)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
登录后复制

错误

解析消息时出错

我正在尝试调试它,但无法。

  1. 两个应用程序中的 proto 文件是相同的
  2. 我使用 proton 生成 pb2 文件。

感谢您的帮助。

谢谢

我可以获取原始格式的消息:

原始格式消息。

原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'

  • 我尝试使用 UTF-8 对其进行解码,但失败了,因为并未读取所有字段。
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))
登录后复制

以上代码的输出:

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes
登录后复制

解决方法

您的 Go 客户端正在使用架构注册表进行序列化,这意味着您的 Python 代码也必须执行相同的操作。这些记录“不仅仅是 Protobuf”,因为模式 ID 也以字节编码,因此常规 Protobuf 解析器将失败。

存储库中有用于通过注册表集成使用 Protobuf 的示例代码

https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py

以上是反序列化/解析错误 KafkaProtobuf Python的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:stackoverflow.com
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板