php editor Xinyi introduces you to a common error: deserialization/parsing error KafkaProtobuf Python. When using the KafkaProtobuf Python library, you may encounter issues with deserialization or parsing errors. This could be due to a mismatch between the serialization format of the message and the consumer code, or because the message is malformed. Solutions to this include checking the compatibility of the message's serialization format and consumer code, as well as ensuring that the message is well-formatted. In this article, we will detail the cause and solution of this problem, hoping to help you solve similar errors.
func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) { producerConfig := getKafkaProducerConfig(config.EnvConfig) producer, err := confluent_kafka.NewProducer(producerConfig) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer") log.Panicf("Unable to create Kafka Producer") } client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl)) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client") log.Panicf("Unable to create Kafka Client") } serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig()) if err != nil { log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer") log.Panicf("Unable to create Kafka Serializer") } KafkaProducerInstance = &KafkaProducer{ producer: producer, serializer: serializer, } log.Info("Created Kafka Producer and Serializer") }
func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) { deliveryChan := make(chan confluent_kafka.Event) payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message) if err != nil { log.Errorf("Failed to serialize payload: %v\n", err) close(deliveryChan) return } err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{ TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny}, Value: payload, }, deliveryChan) if err != nil { log.Errorf("Failed to Produce: %v\n", err) close(deliveryChan) return } e := <-deliveryChan m := e.(*confluent_kafka.Message) if m.TopicPartition.Error != nil { log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error) close(deliveryChan) return } else { log.Infof("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } close(deliveryChan) }
from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 # replace with your generated module name from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", # Replace with your Kafka server address 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic from confluent_kafka import Consumer, KafkaError import KafkaDiagnoseResult_pb2 from google.protobuf.message import DecodeError # Kafka consumer configuration conf = { 'bootstrap.servers': "localhost:9092/v3/", 'group.id': "myGroup", 'auto.offset.reset': 'earliest' } # Create a consumer instance consumer = Consumer(conf) # Subscribe to a topic consumer.subscribe(['diagnosis']) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event continue else: print(msg.error()) break # Deserialize the message try: data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() data.ParseFromString(msg.value()) except DecodeError as e: print(f"Error parsing message: {e}") print(f"Raw message data: {msg.value()}") print("Received message: ", data) except KeyboardInterrupt: pass finally: consumer.close()
Error parsing message
I'm trying to debug it but can't.
proton
to generate the pb2 file. thanks for your help.
Thanks
I can get the message in its original format:
Original message data: b'\x00\x00\x00\x00\x02\x02\x08\n$1775100a-1a47-48b2-93b7-b7a331be59b4\x12\tcompleted'
print(" Decode 1: ", dict_str) print("Decode 2: ", ast.literal_eval(dict_str))
Output of the above code:
Unparsed Message: b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted' Decode 1: $ccb0ad7e-abb2-4af6-90d1-187381f9d47e completed Inner Exception Here source code string cannot contain null bytes
Your Go client is using the schema registry for serialization, which means your Python code must do the same. These records are "not just Protobuf" because the schema ID is also encoded in bytes, so the regular Protobuf parser will fail.
The repository has sample code for using Protobuf with registry integration
https://github.com/confluenceinc /confluence-kafka-python/blob/master/examples/protobuf_consumer.py
The above is the detailed content of Deserialization/parsing errors KafkaProtobuf Python. For more information, please follow other related articles on the PHP Chinese website!