Maison > développement back-end > Golang > Erreurs de désérialisation/analyse KafkaProtobuf Python

Erreurs de désérialisation/analyse KafkaProtobuf Python

PHPz
Libérer: 2024-02-09 23:48:09
avant
1136 Les gens l'ont consulté

反序列化/解析错误 KafkaProtobuf Python

L'éditeur PHP Xinyi vous présente une erreur courante : l'erreur de désérialisation/analyse KafkaProtobuf Python. Lorsque vous utilisez la bibliothèque Python KafkaProtobuf, vous pouvez rencontrer des problèmes de désérialisation ou d'erreurs d'analyse. Cela peut être dû à une inadéquation entre le format de sérialisation du message et le code du consommateur, ou à un message mal formé. Les solutions à ce problème incluent la vérification de la compatibilité du format de sérialisation du message et du code consommateur, ainsi que la garantie que le message est bien formaté. Dans cet article, nous détaillerons la cause et la solution de ce problème, dans l’espoir de vous aider à résoudre des erreurs similaires.

Contenu de la question

Code de sérialisation (langue Go)

1. Producteur

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}
Copier après la connexion

2.Envoyer un message à Kafka

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}
Copier après la connexion

Essayez d'utiliser Messages (Diff, une application en Python)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
Copier après la connexion

Erreur

解析消息时出错

J'essaie de le déboguer mais je n'y parviens pas.

  1. Les fichiers proto dans les deux applications sont les mêmes
  2. J'utilise proton pour générer des fichiers pb2.

Merci pour votre aide.

Merci

Je peux recevoir le message dans son format original :

Message au format original.

原始消息数据:b'x00x00x00x00x02x02x08n75100a-1a47-48b2-93b7-b7a331be59b4x12tcompleted'

  • J'ai essayé de le décoder en utilisant UTF-8 mais cela a échoué car tous les champs n'étaient pas lus.
print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))
Copier après la connexion

Sortie du code ci-dessus :

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes
Copier après la connexion

Solution de contournement

Votre client Go utilise le registre de schémas pour la sérialisation, ce qui signifie que votre code Python doit faire de même. Ces enregistrements ne sont "pas seulement Protobuf" car l'ID de schéma est également codé en octets, donc l'analyseur Protobuf standard échouera.

Le référentiel contient un exemple de code pour utiliser Protobuf avec l'intégration du registre

https://github.com/confluenceinc/confluence-kafka-python/blob/master/examples/protobuf_consumer.py

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:stackoverflow.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal