Ralat penyahserialisasian/penghuraian KafkaProtobuf Python

PHPz
Lepaskan: 2024-02-09 23:48:09
ke hadapan
1100 orang telah melayarinya

反序列化/解析错误 KafkaProtobuf Python

Editor PHP Xinyi memperkenalkan anda kepada ralat biasa: ralat penyahserilan/penghuraian KafkaProtobuf Python. Apabila menggunakan perpustakaan Python KafkaProtobuf, anda mungkin menghadapi masalah dengan ralat penyahserikatan atau penghuraian. Ini mungkin disebabkan oleh ketidakpadanan antara format bersiri mesej dan kod pengguna, atau kerana mesej itu salah bentuk. Penyelesaian kepada masalah ini termasuk menyemak format bersiri mesej dan kod pengguna untuk keserasian, serta memastikan mesej diformat dengan baik. Dalam artikel ini, kami akan memperincikan punca dan penyelesaian masalah ini, dengan harapan dapat membantu anda menyelesaikan ralat yang serupa.

Kandungan soalan

Kod siri (Bahasa Go)

1. Penerbit

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}
Salin selepas log masuk

2.Hantar mesej Kafka

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}
Salin selepas log masuk

Cuba gunakan Messages (Diff, aplikasi dalam Python)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
Salin selepas log masuk

Ralat

解析消息时出错

Saya cuba nyahpepijat tetapi tidak boleh.

  1. Fail proto dalam kedua-dua aplikasi adalah sama
  2. Saya menggunakan proton untuk menjana fail pb2.

Terima kasih atas bantuan anda.

Terima kasih

Saya boleh mendapatkan mesej dalam format asalnya:

Mesej format asal.

原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'

  • Saya cuba menyahkodnya menggunakan UTF-8 tetapi ia gagal kerana tidak semua medan dibaca.
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))
Salin selepas log masuk

Output kod di atas:

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes
Salin selepas log masuk

Penyelesaian

Pelanggan Go anda menggunakan pendaftaran skema untuk bersiri, yang bermaksud kod Python anda mesti melakukan perkara yang sama. Rekod ini "bukan hanya Protobuf" kerana ID skema juga dikodkan dalam bait, jadi penghurai Protobuf biasa akan gagal.

Repositori mempunyai kod sampel untuk menggunakan Protobuf dengan penyepaduan pendaftaran

https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py

Atas ialah kandungan terperinci Ralat penyahserialisasian/penghuraian KafkaProtobuf Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:stackoverflow.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan
Tentang kita Penafian Sitemap
Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!