Maison > développement back-end > Tutoriel Python > Guide pratique du protocole Kafka

Guide pratique du protocole Kafka

Susan Sarandon
Libérer: 2024-12-28 17:11:14
original
396 Les gens l'ont consulté

J'ai pas mal travaillé avec le protocole Apache Kafka au bas niveau. Ce n'était pas facile de commencer à faire cela en suivant uniquement le guide officiel et j'ai beaucoup lu le code. Avec cet article, je souhaite vous donner une longueur d'avance en vous guidant pas à pas depuis des valeurs primitives jusqu'à des demandes significatives.

Dans cet article :

  1. Explorez le code du protocole Kafka et le protocole en action avec Wireshark.
  2. Apprenez à lire et à écrire des valeurs primitives.
  3. Combinez des primitives pour effectuer des requêtes significatives.

Nous utiliserons Python comme langage de programmation. Cependant, le code sera sans dépendance et facilement portable dans la langue de votre choix.

Introduction

Apache Kafka dispose d'un protocole binaire personnalisé versionné, avec différents types de données, champs facultatifs, etc. Malheureusement, il n'utilise pas de format de sérialisation bien connu comme Protobuf. Le schéma des messages de protocole est décrit en JSON. Le code Java réel qui effectue la sérialisation et la désérialisation est généré à partir de cette description.

Lorsque vous êtes dans le monde Java, vous pouvez utiliser la bibliothèque client officielle. Mais si vous utilisez une autre plateforme, vous comptez sur des implémentations tierces. Ils existent, mais ils se concentrent principalement sur le producteur et le consommateur, rarement sur certains aspects du client administrateur. Si vous devez faire autre chose, vous êtes seul.

Cet article vous aidera à commencer à pirater le protocole Kafka. (Si vous recherchez une bibliothèque de (dé)sérialisation Python prête pour le protocole Kafka, consultez Kio1. Pour Rust, jetez un œil à la bibliothèque sur laquelle je travaille.)

Vous pouvez trouver le code de cet article et d'autres tests similaires dans ce référentiel sur Github.

Aperçu du protocole

Vous pouvez retrouver la description officielle du protocole sur cette page. Je vous encourage à vous en familiariser, lisez au moins les sections "Préliminaires" et "Le Protocole".

Voici quelques faits saillants. Le protocole Kafka est un protocole de requête-réponse binaire basé sur TCP :

  • Basé sur TCP : le courtier Kafka écoute les ports de la pile TCP (ce qui offre certains avantages comme la commande de garanties).
  • Binaire : les messages sont codés sous forme binaire et nécessitent une sérialisation et une désérialisation particulières selon des schémas prédéfinis.
  • Demande-réponse : les échanges sont initiés par le client, le serveur est passif et ne répond qu'aux requêtes.

Chaque type de message API se compose de la paire demande et réponse et est identifié par une valeur numérique appelée clé API. Par exemple, Produce et Fetch, les RPC Kafka les plus caractéristiques, ont respectivement les clés API 0 et 1. Il existe aujourd'hui près de 90 types de messages API (certains d'entre eux sont inter-courtiers et non client-courtier).

Les requêtes et les réponses sont décrites par des schémas versionnés. Le versioning permet l'évolution du protocole, par exemple, l'ajout ou la suppression de champs ou la modification de leur type de données.

Premiers pas

Voici quelques choses que vous pouvez faire pour commencer à travailler avec le protocole Kafka.

Apprendre le code du protocole Kafka

Le code Kafka est la source de vérité (pratiquement) sur le protocole. Consultez le code Kafka sur Github et passez à la version qui vous intéresse (par exemple 3.8.0) :

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Vous pouvez trouver les définitions des messages API en JSON dans clients/src/main/resources/common/message. Chaque fichier JSON contient la définition d'un type de message2 avec toutes ses versions. clients/src/main/resources/common/message/README.md donne un bon aperçu du format de définition de schéma. Faites attention à des éléments tels que les valeurs par défaut, les versions flexibles et les champs balisés.

Outre les types de messages API concrets qui vous intéressent, jetez un œil à clients/src/main/resources/common/message/RequestHeader.json et ResponseHeader.json, qui décrivent les en-têtes utilisés dans chaque échange requête-réponse. .

Exécutons le générateur de code :

./gradlew processMessages
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Vous pouvez maintenant trouver les classes générées dans clients/src/generated/java/org/apache/kafka/common/message.

Jetez un œil à clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java. Cet utilitaire :

  • décrit l'ensemble des types de messages API existants ainsi que leurs schémas et versions ;
  • mappe les versions des messages de l'API aux versions d'en-tête de requête et de réponse dans les fonctions requestHeaderVersion et ResponseHeaderVersion.

Les autres fichiers sont générés un à un à partir des schémas JSON correspondants (parfois avec le suffixe Data, c'est une question de compatibilité). Dans ces fichiers vous trouverez :

  1. Les définitions de schéma versionnées SCHEMA_0, SCHEMA_1, etc. Parfois, les schémas restent les mêmes entre les versions. Ceci est normal et signifie que seule la contrepartie requête-réponse a changé.
  2. des méthodes de lecture et d'écriture où vous pouvez trouver la vérité terrain pour la sérialisation et la désérialisation du protocole.

Faites attention aux classes internes car elles représentent la structure complexe du message.

Exécuter Kafka dans Docker

Exécuter Kafka dans Docker est un moyen pratique de faire fonctionner un courtier pour tester le protocole ou capturer l'échange réseau. Depuis la version 3.7.0, l'équipe Kafka crée des images Docker officielles, que vous pouvez exécuter en tant que :

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Si vous êtes intéressé par les anciennes versions, recherchez d'autres images dans Docker Hub. Cependant, cela n'est peut-être pas nécessaire étant donné que le protocole Kafka est compatible en amont et en aval : les nouveaux courtiers reconnaîtront très bien les anciennes versions du protocole et les anciens clients pourront communiquer avec les nouveaux courtiers.

Si vous lisez ceci, vous disposez probablement déjà des outils de ligne de commande Kafka sur votre machine, mais juste au cas où, vous pouvez également les exécuter dans Docker. Par exemple, exécutez ceci pour créer un sujet :

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Inspecter le protocole avec Wireshark

Après nous être familiarisés avec le code Kafka, regardons le protocole en action. Wireshark est un outil largement utilisé pour de telles inspections. Il peut décortiquer le protocole Kafka (et prend en charge les dernières versions si votre version est suffisamment récente).

J'ai construit Wireshark à partir des sources de la version 4.5.0, car mon package de système d'exploitation est ancien et incapable de disséquer le protocole Kafka avec les nouvelles versions. Wireshark 4.5.0 devrait principalement prendre en charge les versions du protocole Kafka 3.7. Cependant, vous pouvez essayer la version disponible et voir comment elle fonctionne pour vous.

Exécutons Wireshark sur l'interface de bouclage avec le filtre de capture du port 9092 (1) et le filtre d'affichage kafka (2) :

Kafka protocol practical guide

Créez un sujet et voyez ce que Wireshark nous montre :

./gradlew processMessages
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Kafka protocol practical guide

Le filtre d'affichage supprime tout ce qui n'est pas pertinent et ne laisse que les requêtes et réponses Kafka. Comme Wireshark comprend la plupart des versions de messages dans le protocole (en fonction de la version de Wireshark, bien sûr), vous pouvez facilement examiner la structure de chaque message. Wireshark affichera également les octets correspondants.

Wireshark est un excellent outil de débogage qui pourrait vous aider à comprendre comment le protocole fonctionne dans un cas particulier et quel est le problème avec votre implémentation.

Lire et écrire des valeurs primitives

Le protocole définit un certain nombre de types primitifs, dont vous pouvez trouver la description complète ici. Implémentons le code de lecture et d'écriture pour eux. Vous pouvez retrouver toutes les fonctions dans ce fichier, consultez également le fichier de test correspondant.

Valeurs entières de longueur fixe : INT8, INT16, INT32, INT64 et UINT16

Ce sont des nombres entiers de longueur fixe connue : 1, 2, 4 ou 8 octets. Naturellement, vous pouvez trouver de tels champs dans tout le protocole. Dans ce cours, vous verrez comment (trivialement) leur lecture et leur écriture sont implémentées dans Kafka.

Définissons d'abord la fonction permettant de lire un nombre exact d'octets à partir d'un tampon3 :

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

L'indice de type BinaryIO en Python représente un objet à partir duquel les octets peuvent être lus et dans lequel ils peuvent être écrits. Il a des méthodes comme lire, écrire, dire (pour obtenir la position actuelle), chercher (pour changer la position).

Nous pouvons maintenant implémenter la lecture INT8 :

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Kafka utilise l'ordre des octets big-endian (réseau AKA), d'où byteorder="big".

Maintenant j'écris :

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Je ne répéterai pas cela pour INT16, INT32 et INT64 : les seules différences significatives sont le nombre d'octets (2, 4 et 8 respectivement) et les plages vérifiées ([-(2**15), 2* *15 - 1], [-(2**31), 2**31 - 1] et [-(2**63), 2**63 - 1] en conséquence).

UINT16 est similaire à INT16 :

./gradlew processMessages
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Notez le signé=False ici.

BOOLÉEN

BOOLEAN est essentiellement INT8 avec une logique supplémentaire : == 0 signifie faux, != 0 signifie vrai.

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Vous pouvez voir un exemple de BOOLEAN dans le champ allowAutoTopicCreation de la classe générée MetadataRequestData.

FLOTTEUR64

FLOAT64 est une valeur IEEE 754 64 bits double précision. Python n'a pas to_bytes et from_bytes pour float comme il l'a pour int. Nous utiliserons donc à la place le module struct de la bibliothèque standard.

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

>d signifie "une valeur double dans l'ordre des octets big-endian".

UNSIGNED_VARINT : valeurs entières de longueur variable

Les entiers de longueur variable sont une approche qui permet d'utiliser moins de bits par valeur lorsque les valeurs sont petites. Kafka utilise l'approche Varint de Protocol Buffers. L'idée est simple :

Chaque octet du varint a un bit de continuation qui indique si l'octet qui le suit fait partie du varint. Il s'agit du bit de poids fort (MSB) de l'octet (parfois également appelé bit de signe). Les 7 bits inférieurs sont une charge utile ; l'entier résultant est construit en ajoutant ensemble les charges utiles de 7 bits de ses octets constitutifs.

Vous pouvez consulter la spécification Protobuf et l'implémentation de Kafka (lecture, écriture) pour plus de détails.

Ce type n'est pas utilisé dans les champs de protocole en soi, mais il est utilisé pour les collections compactes décrites ci-dessous.

Mettez-le en œuvre. Pour plus de confiance, nous obtenons quelques exemples directement de la source de vérité, la classe ByteUtils de Kafka :

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion

En exécutant ceci, nous obtiendrons :

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
Copier après la connexion
Copier après la connexion

Implémentons cela de la manière probablement pas la plus performante, mais simple :

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Copier après la connexion
Copier après la connexion

UUID

Les UUID sont des valeurs de 128 bits utilisées pour identifier les entités de manière unique. Par exemple, ils sont utilisés pour transmettre des identifiants de sujet dans CreateTopicsResponse.

Vous pouvez voir comment ils sont lus et écrits dans le code Kafka. C'est simple à reproduire :

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) <= value <= 2**7 - 1:
        buffer.write(value.to_bytes(1, byteorder="big", signed=True))
    else:
        raise ValueError(f"Value {value} is out of range for INT8")
Copier après la connexion

Notez que Kafka traite null/None comme un UUID nul, nous faisons donc la même chose ici.

Cordes

Le protocole Kafka comporte 4 types de chaînes :

compact non-compact
nullable COMPACT_NULLABLE_STRING NULLABLE_STRING
non-nullable COMPACT_STRING STRING

La compacité indique si la longueur de la chaîne est codée avec INT16 ou avec UNSIGNED_VARINT. Cela dépend de la version du message (il a été introduit vers 2017). La nullité indique si la valeur peut être nulle. Cela dépend de l'objectif du message et de la version également (parfois les champs de chaîne deviennent facultatifs lors de l'évolution du protocole).

Les chaînes sont omniprésentes dans le protocole. Par exemple, consultez le nom du champ dans la classe générée MetadataRequestData.MetadataRequestTopic.

Les chaînes sont codées assez simplement : il y a d'abord la longueur, puis vient le corps codé en UTF-8. La longueur maximale autorisée est de 32 767 octets. Les chaînes nulles ont une longueur de -1 et évidemment pas de corps.

Comme la seule différence entre compact et non compact réside dans la manière dont la longueur de la chaîne est codée, nous pouvons avoir une fonction pour les deux modes.

Commençons par lire et écrire des chaînes nullables :

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Des fonctions de chaîne non nullables peuvent être construites sur celles-ci :

./gradlew processMessages
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Tableaux d'octets

Les tableaux d'octets sont très similaires aux chaînes. Ils ont la même nullité potentielle et la même compacité :

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

Ils sont également codés de la même manière : longueur corps. Naturellement, le corps n’est pas traité comme une chaîne UTF-8, mais comme un tableau d’octets opaque. La longueur maximale d'un tableau d'octets est de 2147483647 ;

Vous pouvez trouver un exemple d'octets dans les métadonnées du champ dans la classe générée JoinGroupRequestData.JoinGroupRequestProtocol.

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Comme vous pouvez le constater, la différence entre ces fonctions et les fonctions correspondantes pour les chaînes est faible.

Autres tableaux

Le protocole prend en charge les tableaux de types autres que les octets : chaînes, nombres, structures (mais pas les tableaux imbriqués) : ARRAY et COMPACT_ARRAY. La compacité est la même que dans les tableaux d'octets et les chaînes.

La nullité n'est pas explicitement mentionnée dans la spécification du protocole pour une raison quelconque. Cependant, les tableaux peuvent être nullables. Ceci est contrôlé par nullableVersions dans les définitions de schéma, comme ici.

Considérant que nous avons déjà implémenté read_array_length et write_array_length, implémentons les fonctions de lecture et d'écriture :

./gradlew processMessages
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

ENREGISTREMENTS

ENREGISTREMENTS encodent les enregistrements Kafka. La structure est assez complexe et je ne vais pas la décrire dans ce guide (cependant, merci de me le faire savoir dans les commentaires ?️ si vous souhaitez l'avoir.) Pour plus de simplicité, nous pouvons traiter les enregistrements comme NULLABLE_BYTES ou COMPACT_NULLABLE_BYTES ( selon la version du message).

Champs balisés

Les champs balisés sont une extension du protocole Kafka qui permet de joindre des données facultatives aux messages. L'idée est double :

  1. Si le client du service ne comprend pas le champ balisé, il l'enregistrera comme inconnu et l'ignorera.
  2. Si un champ est rarement utilisé, sa valeur par défaut peut être ignorée lors du transfert.

Jetez par exemple un oeil à ce champ. Il a taggedVersions, qui indique depuis quelle version ce champ est balisé (dans la plupart des cas, c'est la même version lorsque le champ a été ajouté).

Un champ balisé se compose de :

  1. La balise de type UNSIGNED_VARINT.
  2. Les données de type COMPACT_BYTES.

Vous pouvez trouver plus de détails sur les champs balisés dans KIP-482.

Mettez en œuvre :

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Ici, ils sont intitulés « inconnus ». Les champs connus doivent l'être à l'intérieur de leurs structures.

Structure des messages

La structure du message de haut niveau est très simple. Selon le cahier des charges :

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

C'est-à-dire qu'il s'agit d'un message lui-même précédé de sa taille en octets. Les messages de demande et de réponse sont constitués de l’en-tête immédiatement suivi du corps. Pour une raison quelconque, cela n'est pas explicitement documenté4, mais vous pouvez me faire confiance ? ou vérifiez le code.

En-tête de demande et de réponse

L'en-tête de requête existe en trois versions : 0, 1 et 2. Elles sont spécifiées dans le protocole comme :

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion

TAG_BUFFER correspond aux champs balisés mentionnés précédemment.

Implémentons-les en tant que classes de données Python :

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Comme vous pouvez le voir, il y a des champs balisés dans la version 2, il n'y a aucun champ connu attendu. Si un champ balisé est envoyé par erreur au courtier, il sera ignoré.

L'en-tête de réponse existe en deux versions : 0 et 1. Elles sont spécifiées dans le protocole comme suit :

./gradlew processMessages
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Mettons-les également en œuvre :

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Nous n'implémentons pas de lecture pour les en-têtes de requête et d'écriture pour ceux de réponse. Par souci de concision : nous n'allons pas envoyer d'en-têtes de réponse et recevoir ceux de requête dans nos exemples car nous ne programmons pas le côté serveur. Cependant, si vous êtes également intéressé par le côté serveur, vous devez implémenter les deux fonctions (ce qui devrait être simple).

ID de corrélation

Notez particulièrement le champ corrélation_id dans les en-têtes de requête et de réponse. Le protocole prend en charge le pipeline : le client peut avoir plus d'une requête en attente par connexion. L'ID de corrélation lui permet de faire correspondre les réponses aux demandes.

Sélection de la version d'en-tête

La version à utiliser dépend de la clé API et de la version du message. Ce n'est actuellement pas documenté explicitement dans le guide du protocole5.
Utilisez les fonctions requestHeaderVersion et ResponseHeaderVersion dans la classe générée ApiMessageType comme référence.

Envoi de demandes et réception de réponses

Maintenant, ayant toutes ces connaissances et ce code, envoyons enfin une requête ApiVersions et recevons et lisons une réponse. ApiVersions est normalement la première requête envoyée par le client. Son objectif est de trouver les versions et fonctionnalités de l'API prises en charge par le courtier. Nous implémentons la dernière version 3.

Dans la spécification du protocole, il est défini comme :

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Créons la classe de données :

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Copier après la connexion
Copier après la connexion
Copier après la connexion

Et la réponse :

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
Copier après la connexion
Copier après la connexion

[api_keys] signifie "un tableau d'api_keys", où api_keys est la structure définie deux lignes ci-dessous.

Conversion en classes de données Python :

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Copier après la connexion
Copier après la connexion

Quand nous parlons de tableaux, nous devons savoir si nous avons besoin de tableaux compacts ou non compacts. Pour le savoir, jetons un œil à la définition du schéma dans ApiVersionsRequest.json. Vous pouvez voir "flexibleVersions": "3", ce qui signifie que les tableaux compacts sont utilisés à partir de la version 3 (plus d'informations à ce sujet dans README.md dans le répertoire du schéma). Puisque nous travaillons ici avec la version 3, nous utilisons des tableaux compacts.

Une fois les classes de requêtes et de réponses implémentées, nous pouvons envoyer et recevoir ces requêtes. Pour cette ApiVersions v3, nous avons besoin de l'en-tête de requête v2 et de l'en-tête de réponse v0 (vérifiez l'ApiMessageType.java généré). La clé API (18) que vous pouvez trouver dans ApiVersionsRequest.json ou dans la spécification du protocole.

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Si vous exécutez ce code, vous verrez l'en-tête de réponse et le message imprimés dans la console. Félicitations, vous avez effectué un échange réseau correct avec le courtier Kafka !

Vous remarquerez trois champs balisés placés dans _unknownTaggedFields. Les méthodes de lecture et d'écriture de la classe ApiVersionsResponseData générée ainsi que la définition du message dans ApiVersionsResponse.json vous aideront à les interpréter. Considérez ce devoir ?


  1. Dans mon travail quotidien, nous avons développé une bibliothèque open source Kio. Cela nous permet d'effectuer facilement des appels arbitraires à l'API Kafka depuis Python. Le code de sérialisation/désérialisation, comme dans Kafka lui-même, est généré à partir des définitions du protocole JSON. Le code généré est rigoureusement testé, y compris des tests de propriétés par rapport au vrai code Java Kafka. ↩

  2. Ou "message" si vous préférez : certains schémas ne sont pas destinés à l'API, mais par ex. pour les données sur disque. ↩

  3. La fonction read_exact présente l'inconvénient de dupliquer les données lorsque le tampon sous-jacent est déjà en mémoire. Cependant, c'est plus pratique à des fins éducatives. ↩

  4. J'ai fait un PR pour résoudre ce problème. ↩

  5. Encore une fois, j'ai fait un PR pour résoudre ce problème. ↩

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal