Heim > Backend-Entwicklung > Python-Tutorial > Praktischer Leitfaden zum Kafka-Protokoll

Praktischer Leitfaden zum Kafka-Protokoll

Susan Sarandon
Freigeben: 2024-12-28 17:11:14
Original
440 Leute haben es durchsucht

Ich habe ziemlich viel mit dem Apache Kafka-Protokoll auf der niedrigen Ebene gearbeitet. Es war nicht einfach, damit anzufangen, indem ich nur der offiziellen Anleitung folgte, und ich habe den Code oft gelesen. Mit diesem Beitrag möchte ich Ihnen einen Vorsprung verschaffen, indem ich Sie Schritt für Schritt von primitiven Werten zu sinnvollen Anliegen begleite.

In diesem Beitrag:

  1. Entdecken Sie den Kafka-Protokollcode und das Protokoll in Aktion mit Wireshark.
  2. Lernen Sie, wie man primitive Werte liest und schreibt.
  3. Kombinieren Sie Grundelemente, um sinnvolle Anfragen auszuführen.

Wir werden Python als Programmiersprache verwenden. Der Code ist jedoch keinerlei Abhängigkeiten und lässt sich leicht auf die Sprache Ihrer Wahl übertragen.

Einführung

Apache Kafka verfügt über ein benutzerdefiniertes Binärprotokoll, das mit verschiedenen Datentypen, optionalen Feldern usw. versioniert ist. Leider verwendet es kein bekanntes Serialisierungsformat wie Protobuf. Das Protokollnachrichtenschema wird in JSON beschrieben. Aus dieser Beschreibung wird der eigentliche Java-Code generiert, der die Serialisierung und Deserialisierung durchführt.

Wenn Sie sich in der Java-Welt befinden, können Sie die offizielle Client-Bibliothek verwenden. Wenn Sie jedoch eine andere Plattform verwenden, sind Sie auf Implementierungen von Drittanbietern angewiesen. Es gibt sie, aber sie konzentrieren sich hauptsächlich auf den Produzenten und Verbraucher und selten auf einige Aspekte des Admin-Clients. Wenn Sie etwas anderes tun müssen, sind Sie auf sich allein gestellt.

Dieser Beitrag wird Ihnen helfen, mit dem Hacken des Kafka-Protokolls zu beginnen. (Wenn Sie nach einer fertigen Python-(De-)Serialisierungsbibliothek für das Kafka-Protokoll suchen, schauen Sie sich Kio1 an. Für Rust schauen Sie sich die Bibliothek an, an der ich arbeite.)

Den Code aus diesem Beitrag und einige weitere ähnliche Tests finden Sie in diesem Repository auf Github.

Protokollübersicht

Die offizielle Protokollbeschreibung finden Sie auf dieser Seite. Ich empfehle Ihnen, sich damit vertraut zu machen und zumindest die Abschnitte „Vorbereitungen“ und „Das Protokoll“ zu lesen.

Hier sind einige Highlights. Das Kafka-Protokoll ist ein TCP-basiertes binäres Anforderungs-Antwort-Protokoll:

  • TCP-basiert: Der Kafka-Broker lauscht an Ports auf dem TCP-Stack (was einige Vorteile wie Bestellgarantien bietet).
  • Binär: Nachrichten werden in binärer Form codiert und erfordern eine spezielle Serialisierung und Deserialisierung gemäß vordefinierten Schemata.
  • Anfrage-Antwort: Der Austausch wird vom Client initiiert, der Server ist passiv und antwortet nur auf Anfragen.

Jeder API-Nachrichtentyp besteht aus dem Anforderungs- und Antwortpaar und wird durch einen numerischen Wert namens API-Schlüssel identifiziert. Beispielsweise verfügen Produce und Fetch, die charakteristischsten Kafka-RPCs, entsprechend über die API-Schlüssel 0 und 1. Heutzutage gibt es fast 90 API-Nachrichtentypen (einige davon sind zwischen Brokern und nicht zwischen Client und Broker).

Anfragen und Antworten werden durch versionierte Schemata beschrieben. Die Versionierung ermöglicht die Weiterentwicklung des Protokolls, beispielsweise das Hinzufügen oder Entfernen von Feldern oder das Ändern ihres Datentyps.

Erste Schritte

Hier sind einige Dinge, die Sie tun können, um mit dem Kafka-Protokoll zu arbeiten.

Lernen Sie den Kafka-Protokollcode

Der Kafka-Code ist (praktisch) die Quelle der Wahrheit über das Protokoll. Schauen Sie sich den Kafka-Code von Github an und wechseln Sie zu der Version, an der Sie interessiert sind (z. B. 3.8.0):

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Sie finden die API-Nachrichtendefinitionen in JSON unter „clients/src/main/resources/common/message“. Jede JSON-Datei enthält die Definition eines Nachrichtentyps2 mit allen seinen Versionen. clients/src/main/resources/common/message/README.md gibt einen guten Überblick über das Schemadefinitionsformat. Achten Sie auf Dinge wie Standardwerte, flexible Versionen und getaggte Felder.

Schauen Sie sich neben den konkreten API-Nachrichtentypen, die Sie interessieren, auch „clients/src/main/resources/common/message/RequestHeader.json“ und „ResponseHeader.json“ an, in denen die Header beschrieben werden, die bei jedem Anfrage-Antwort-Austausch verwendet werden .

Lassen Sie uns den Codegenerator ausführen:

./gradlew processMessages
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Jetzt finden Sie die generierten Klassen in client/src/generated/java/org/apache/kafka/common/message.

Schauen Sie sich client/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java an. Dieses Dienstprogramm:

  • beschreibt den gesamten Satz vorhandener API-Nachrichtentypen zusammen mit ihren Schemata und Versionen;
  • ordnet API-Nachrichtenversionen den Anforderungs- und Antwortheaderversionen in den Funktionen requestHeaderVersion und ResponseHeaderVersion zu.

Andere Dateien werden eins-zu-eins aus den entsprechenden Schema-JSONs generiert (manchmal mit dem Daten-Postfix, das ist eine Kompatibilitätssache). In diesen Dateien finden Sie:

  1. Die versionierten Schemadefinitionen SCHEMA_0, SCHEMA_1 usw. Manchmal bleiben Schemata zwischen den Versionen gleich. Das ist normal und bedeutet, dass sich nur das Anfrage-Antwort-Gegenstück geändert hat.
  2. Lese- und Schreibmethoden, mit denen Sie die Grundwahrheit für die Protokollserialisierung und -deserialisierung finden können.

Achten Sie auf die inneren Klassen, da diese die komplexe Struktur der Nachricht darstellen.

Führen Sie Kafka in Docker aus

Das Ausführen von Kafka in Docker ist eine bequeme Möglichkeit, einen Broker zum Testen des Protokolls oder zum Erfassen des Netzwerkaustauschs zum Laufen zu bringen. Seit Version 3.7.0 erstellt das Kafka-Team offizielle Docker-Images, die Sie wie folgt ausführen können:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wenn Sie an älteren Versionen interessiert sind, durchsuchen Sie Docker Hub nach anderen Bildern. Dies ist jedoch möglicherweise nicht erforderlich, da das Kafka-Protokoll abwärts- und vorwärtskompatibel ist: Neue Broker erkennen die alten Protokollversionen problemlos und die alten Clients können mit neueren Brokern kommunizieren.

Wenn Sie dies lesen, haben Sie wahrscheinlich bereits die Kafka-Befehlszeilentools auf Ihrem Computer, aber für alle Fälle können Sie sie auch in Docker ausführen. Führen Sie beispielsweise Folgendes aus, um ein Thema zu erstellen:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Überprüfen Sie das Protokoll mit Wireshark

Nachdem wir uns mit dem Kafka-Code vertraut gemacht haben, schauen wir uns das Protokoll in Aktion an. Wireshark ist ein weit verbreitetes Tool für solche Inspektionen. Es kann das Kafka-Protokoll analysieren (und unterstützt die neuesten Versionen, wenn Ihre Version aktuell genug ist).

Ich habe Wireshark aus Quellen der Version 4.5.0 erstellt, da mein Betriebssystempaket alt ist und das Kafka-Protokoll nicht mit neuen Versionen analysieren kann. Wireshark 4.5.0 sollte größtenteils Kafka 3.7-Protokollversionen unterstützen. Sie können jedoch die verfügbare Version ausprobieren und sehen, wie sie für Sie funktioniert.

Lassen Sie uns Wireshark auf der Loopback-Schnittstelle mit dem Port 9092-Erfassungsfilter (1) und dem Kafka-Anzeigefilter (2) ausführen:

Kafka protocol practical guide

Erstellen Sie ein Thema und sehen Sie, was Wireshark uns zeigt:

./gradlew processMessages
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Kafka protocol practical guide

Der Anzeigefilter entfernt alles Unwichtige und hinterlässt nur Kafka-Anfragen und -Antworten. Da Wireshark die meisten Nachrichtenversionen im Protokoll versteht (natürlich abhängig von der Wireshark-Version), können Sie bequem einen Blick in die Struktur jeder Nachricht werfen. Wireshark zeigt auch die entsprechenden Bytes an.

Wireshark ist ein großartiges Debugging-Tool, das Ihnen helfen kann, zu verstehen, wie das Protokoll in einem bestimmten Fall funktioniert und was mit Ihrer Implementierung nicht stimmt.

Lesen und Schreiben von Grundwerten

Das Protokoll definiert eine Reihe primitiver Typen, deren vollständige Beschreibung Sie hier finden. Lassen Sie uns den Lese- und Schreibcode für sie implementieren. Alle Funktionen finden Sie in dieser Datei, schauen Sie sich auch die entsprechende Testdatei an.

Ganzzahlwerte fester Länge: INT8, INT16, INT32, INT64 und UINT16

Dies sind Ganzzahlen mit bekannter fester Länge: 1, 2, 4 oder 8 Bytes. Natürlich finden Sie solche Felder im gesamten Protokoll häufig. In diesem Kurs können Sie sehen, wie (trivial) ihr Lesen und Schreiben in Kafka umgesetzt wird.

Lassen Sie uns zunächst die Funktion zum Lesen einer genauen Anzahl von Bytes aus einem Puffer definieren3:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Der BinaryIO-Typhinweis in Python stellt ein Objekt dar, von dem Bytes gelesen und in das sie geschrieben werden können. Es verfügt über Methoden wie Lesen, Schreiben, Tell (zum Abrufen der aktuellen Position) und Suchen (zum Ändern der Position).

Jetzt können wir das Lesen von INT8:
implementieren

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Kafka verwendet die Big-Endian-Bytereihenfolge (AKA-Netzwerk), daher byteorder="big".

Jetzt schreibe:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Ich werde dies nicht für INT16, INT32 und INT64 wiederholen: Die einzigen signifikanten Unterschiede sind die Anzahl der Bytes (2, 4 bzw. 8) und überprüfte Bereiche ([-(2**15), 2* *15 - 1], [-(2**31), 2**31 - 1] und [-(2**63), 2**63 - 1] entsprechend).

UINT16 ähnelt INT16:

./gradlew processMessages
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Beachten Sie hier „signed=False“.

BOOLEAN

BOOLEAN ist im Wesentlichen INT8 mit zusätzlicher Logik: == 0 bedeutet falsch, != 0 bedeutet wahr.

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Sie können ein Beispiel für BOOLEAN im Feld „allowAutoTopicCreation“ der generierten Klasse „MetadataRequestData“ sehen.

FLOAT64

FLOAT64 ist ein 64-Bit-IEEE 754-Wert mit doppelter Genauigkeit. Python verfügt nicht über to_bytes und from_bytes für float wie für int. Stattdessen verwenden wir das Strukturmodul aus der Standardbibliothek.

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

>d bedeutet „ein doppelter Wert in der Big-Endian-Byte-Reihenfolge“.

UNSIGNED_VARINT: Ganzzahlwerte variabler Länge

Ganzzahlen variabler Länge sind ein Ansatz, der die Verwendung von weniger Bits pro Wert ermöglicht, wenn die Werte klein sind. Kafka verwendet den Varint-Ansatz von Protocol Buffers. Die Idee ist einfach:

Jedes Byte in der Variante hat ein Fortsetzungsbit, das angibt, ob das darauf folgende Byte Teil der Variante ist. Dies ist das höchstwertige Bit (MSB) des Bytes (manchmal auch Vorzeichenbit genannt). Die unteren 7 Bits sind eine Nutzlast; Die resultierende Ganzzahl wird durch Aneinanderfügen der 7-Bit-Nutzlasten ihrer konstituierenden Bytes erstellt.

Weitere Informationen finden Sie in der Protobuf-Spezifikation und der Kafka-Implementierung (Lesen, Schreiben).

Dieser Typ wird nicht in den Protokollfeldern per se verwendet, sondern für die unten beschriebenen kompakten Sammlungen.

Lasst es uns umsetzen. Zur Sicherheit erhalten wir einige Beispiele direkt von der Quelle der Wahrheit, der ByteUtils-Klasse von Kafka:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wenn wir das ausführen, erhalten wir:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
Nach dem Login kopieren
Nach dem Login kopieren

Lassen Sie uns dies auf eine wahrscheinlich nicht leistungsstärkste, aber unkomplizierte Art und Weise umsetzen:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Nach dem Login kopieren
Nach dem Login kopieren

UUID

UUID sind 128-Bit-Werte, die zur eindeutigen Identifizierung von Entitäten verwendet werden. Sie werden beispielsweise verwendet, um Themen-IDs in CreateTopicsResponse zu übergeben.

Sie können sehen, wie sie im Kafka-Code gelesen und geschrieben werden. Es ist einfach zu reproduzieren:

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) <= value <= 2**7 - 1:
        buffer.write(value.to_bytes(1, byteorder="big", signed=True))
    else:
        raise ValueError(f"Value {value} is out of range for INT8")
Nach dem Login kopieren

Beachten Sie, dass Kafka null/None als Null-UUID behandelt, daher machen wir hier dasselbe.

Saiten

Das Kafka-Protokoll verfügt über 4 Arten von Zeichenfolgen:

compact non-compact
nullable COMPACT_NULLABLE_STRING NULLABLE_STRING
non-nullable COMPACT_STRING STRING

Kompaktheit gibt an, ob die Stringlänge mit INT16 oder mit UNSIGNED_VARINT kodiert ist. Dies hängt von der Nachrichtenversion ab (sie wurde etwa 2017 eingeführt). Die Nullfähigkeit gibt an, ob der Wert null sein kann. Dies hängt vom Zweck der Nachricht und auch von der Version ab (manchmal werden Zeichenfolgenfelder während der Protokollentwicklung optional).

Strings sind im Protokoll allgegenwärtig. Sehen Sie sich beispielsweise den Feldnamen in der generierten Klasse MetadataRequestData.MetadataRequestTopic.

an

Strings werden ziemlich einfach codiert: Zuerst kommt die Länge und dann der UTF-8-codierte Text. Die maximal zulässige Länge beträgt 32767 Byte. Null-Strings haben die Länge -1 und natürlich keinen Körper.

Da der einzige Unterschied zwischen kompakt und nicht kompakt darin besteht, wie die Stringlänge codiert wird, können wir eine Funktion für beide Modi haben.

Beginnen wir mit dem Lesen und Schreiben von nullbaren Zeichenfolgen:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Auf diesen können nicht nullbare Zeichenfolgenfunktionen aufgebaut werden:

./gradlew processMessages
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Byte-Arrays

Byte-Arrays sind Strings sehr ähnlich. Sie haben die gleiche potenzielle Nullbarkeit und Kompaktheit:

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

Sie werden auch auf die gleiche Weise kodiert: Länge Körper. Natürlich wird der Body nicht als UTF-8-String behandelt, sondern als undurchsichtiges Byte-Array. Die maximale Länge eines Byte-Arrays beträgt 2147483647;

Ein Beispiel für Bytes in den Feldmetadaten finden Sie in der generierten Klasse JoinGroupRequestData.JoinGroupRequestProtocol.

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wie Sie sehen, ist der Unterschied zwischen diesen Funktionen und den entsprechenden Funktionen für Strings gering.

Andere Arrays

Das Protokoll unterstützt Arrays anderer Typen als Bytes: Zeichenfolgen, Zahlen, Strukturen (jedoch keine verschachtelten Arrays): ARRAY und COMPACT_ARRAY. Die Kompaktheit ist die gleiche wie bei Byte-Arrays und Strings.

Nullbarkeit wird aus irgendeinem Grund in der Protokollspezifikation nicht explizit erwähnt. Arrays können jedoch nullbar sein. Dies wird durch nullableVersions in den Schemadefinitionen gesteuert, wie hier.

Angesichts der Tatsache, dass wir read_array_length und write_array_length bereits implementiert haben, implementieren wir nun die Lese- und Schreibfunktionen:

./gradlew processMessages
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

AUFZEICHNUNGEN

RECORDS kodieren Kafka-Datensätze. Die Struktur ist ziemlich komplex und ich werde sie in diesem Handbuch nicht beschreiben (lassen Sie es mir jedoch bitte in den Kommentaren wissen ?️, wenn Sie sie haben möchten.) Der Einfachheit halber können wir Datensätze als NULLABLE_BYTES oder COMPACT_NULLABLE_BYTES behandeln ( abhängig von der Nachrichtenversion).

Markierte Felder

Getaggte Felder sind eine Erweiterung des Kafka-Protokolls, die das Anhängen optionaler Daten an Nachrichten ermöglicht. Die Idee ist zweifach:

  1. Wenn der Kunde des Dienstes das markierte Feld nicht versteht, speichert er es als unbekannt und ignoriert es.
  2. Wenn ein Feld selten verwendet wird, kann sein Standardwert bei der Übertragung übersprungen werden.

Schauen Sie sich zum Beispiel dieses Feld an. Es hat taggedVersions, das angibt, seit welcher Version dieses Feld getaggt ist (in den meisten Fällen handelt es sich um dieselbe Version, als das Feld hinzugefügt wurde).

Ein markiertes Feld besteht aus:

  1. Das Tag vom Typ UNSIGNED_VARINT.
  2. Die Daten vom Typ COMPACT_BYTES.

Weitere Details zu markierten Feldern finden Sie in KIP-482.

Lassen Sie uns Folgendes umsetzen:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Hier tragen sie den Titel „unbekannt“. Bekannte Felder müssen innerhalb ihrer Strukturen so gestaltet werden.

Nachrichtenstruktur

Die Nachrichtenstruktur auf hoher Ebene ist sehr einfach. Laut Spezifikation:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Das heißt, es handelt sich um eine Nachricht selbst, der ihre Größe in Bytes vorangestellt ist. Sowohl Anforderungs- als auch Antwortnachrichten bestehen aus dem Header, direkt gefolgt vom Textkörper. Aus irgendeinem Grund ist dies nicht explizit dokumentiert4, aber können Sie mir vertrauen? oder überprüfen Sie den Code.

Anforderungs- und Antwortheader

Der Anforderungsheader existiert in drei Versionen: 0, 1 und 2. Sie werden im Protokoll wie folgt angegeben:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

TAG_BUFFER sind die zuvor erwähnten getaggten Felder.

Lassen Sie uns sie als Python-Datenklassen implementieren:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wie Sie sehen können, gibt es in Version 2 einige getaggte Felder, es gibt jedoch keine erwarteten bekannten Felder. Wenn ein markiertes Feld fälschlicherweise an den Broker gesendet wird, wird es ignoriert.

Der Antwortheader existiert in zwei Versionen: 0 und 1. Sie werden im Protokoll angegeben als:

./gradlew processMessages
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Lasst uns sie auch umsetzen:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wir implementieren kein Lesen für die Anforderungsheader und kein Schreiben für die Antwortheader. Dies dient der Kürze: Wir werden in unseren Beispielen keine Antwortheader senden und keine Anforderungsheader empfangen, da wir nicht serverseitig programmieren. Wenn Sie jedoch auch an der Serverseite interessiert sind, müssen Sie beide Funktionen implementieren (was unkompliziert sein sollte).

Korrelations-ID

Beachten Sie insbesondere das Feld „korrelation_id“ in den Anforderungs- und Antwortheadern. Das Protokoll unterstützt Pipelining: Der Client kann mehr als eine ausstehende Anfrage pro Verbindung haben. Die Korrelations-ID ermöglicht es, Antworten auf Anfragen abzugleichen.

Auswahl der Header-Version

Welche Version verwendet werden muss, hängt vom API-Schlüssel und der Nachrichtenversion ab. Es ist derzeit nicht explizit im Protokollleitfaden dokumentiert5.
Verwenden Sie die Funktionen „requestHeaderVersion“ und „responseHeaderVersion“ in der generierten Klasse ApiMessageType als Referenz.

Senden von Anfragen und Empfangen von Antworten

Mit all diesem Wissen und Code können wir nun endlich eine ApiVersions-Anfrage senden und eine Antwort erhalten und lesen. ApiVersions ist normalerweise die erste Anfrage, die der Client sendet. Der Zweck besteht darin, die vom Broker unterstützten API-Versionen und Funktionen zu finden. Wir implementieren die neueste Version 3.

In der Protokollspezifikation ist es wie folgt definiert:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Lassen Sie uns die Datenklasse erstellen:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Und die Antwort:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
Nach dem Login kopieren
Nach dem Login kopieren

[api_keys] bedeutet „ein Array von api_keys“, wobei api_keys die zwei Zeilen unten definierte Struktur ist.

Konvertieren in Python-Datenklassen:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Nach dem Login kopieren
Nach dem Login kopieren

Wenn wir über Arrays sprechen, müssen wir wissen, ob wir kompakte oder nicht kompakte Arrays benötigen. Um dies herauszufinden, werfen wir einen Blick auf die Schemadefinition in ApiVersionsRequest.json. Zu sehen ist „flexibleVersions“: „3“, was bedeutet, dass ab Version 3 kompakte Arrays verwendet werden (mehr dazu in README.md im Schema-Verzeichnis). Da wir hier mit Version 3 arbeiten, verwenden wir kompakte Arrays.

Nachdem wir die Anforderungs- und Antwortklassen implementiert haben, können wir diese Anforderungen senden und empfangen. Für diese ApiVersions v3 benötigen wir den v2-Anfrageheader und den v0-Antwortheader (überprüfen Sie die generierte ApiMessageType.java). Den API-Schlüssel (18) finden Sie in ApiVersionsRequest.json oder in der Protokollspezifikation.

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wenn Sie diesen Code ausführen, werden der Antwortheader und die Nachricht in der Konsole gedruckt. Herzlichen Glückwunsch, Sie haben einen korrekten Netzwerkaustausch mit dem Kafka-Broker durchgeführt!

Sie werden feststellen, dass drei getaggte Felder in _unknownTaggedFields eingefügt wurden. Die Lese- und Schreibmethoden der generierten ApiVersionsResponseData-Klasse sowie die Nachrichtendefinition in ApiVersionsResponse.json helfen Ihnen bei der Interpretation. Betrachten Sie diese Hausaufgabe?


  1. In meinem Hauptjob haben wir eine Open-Source-Bibliothek Kio entwickelt. Es ermöglicht uns, problemlos beliebige Kafka-API-Aufrufe von Python aus durchzuführen. Der Serialisierungs-/Deserialisierungscode wird, wie in Kafka selbst, aus den JSON-Protokolldefinitionen generiert. Der generierte Code wird strengen Tests unterzogen, einschließlich Eigenschaftstests mit echtem Java-Kafka-Code. ↩

  2. Oder „Nachricht“, wenn Sie möchten: Einige der Schemata sind nicht für API, aber z.B. für Daten auf der Festplatte. ↩

  3. Die Funktion read_exact hat den Nachteil, dass sie die Daten dupliziert, wenn sich der zugrunde liegende Puffer bereits im Speicher befindet. Für Bildungszwecke ist es jedoch praktischer. ↩

  4. Ich habe eine PR erstellt, um das Problem zu beheben. ↩

  5. Ich habe noch einmal eine PR erstellt, um das Problem zu beheben. ↩

Das obige ist der detaillierte Inhalt vonPraktischer Leitfaden zum Kafka-Protokoll. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage