I worked with the Apache Kafka protocol on the low level quite a bit. It wasn't easy to start doing this following the official guide only and I read the code a lot. With this post, I want to give you a head start by guiding you step by step from primitive values to meaningful requests.
In this post:
We will use Python as the programming language. However, the code will be zero-dependency and easily portable to the language of your choice.
Apache Kafka has a custom binary protocol that is versioned, with various data types, optional fields, etc. Unfortunately, it doesn't use a well-known serialization format like Protobuf. The protocol message schema is described in JSON. The actual Java code that does serialization and deserialization is generated from this description.
When you're in the Java world, you can use the official client library. But if you're using another platform, you're relying on third-party implementations. They exist, but they're focusing mostly on the producer and consumer, rarely on some aspects of the admin client. If you need to do something else, you're on your own.
This post will help you start hacking on the Kafka protocol. (If you are looking for a ready Python (de-)serialization library for the Kafka protocol, check out Kio1. For Rust, have a look at the library I'm working on.)
You can find the code from this post and some more like tests in this repository on Github.
You can find the official protocol description on this page. I encourage you to familiarize yourself with it, at least read the "Preliminaries" and "The Protocol" sections.
Here are some highlights. The Kafka protocol is a TCP-based binary request-response protocol:
Each API message type consists of the request and response pair and is identified by a numeric value called the API key. For example, Produce and Fetch, the most characteristic Kafka RPCs, have API keys 0 and 1 correspondingly. Nowadays, there are close to 90 API message types (some of them are inter-broker, not client-broker).
Requests and responses are described by versioned schemas. Versioning allows for protocol evolution, for example, adding or removing fields or changing their data type.
Here are some things you can do to start working with the Kafka protocol.
The Kafka code is the source of truth (practically) about the protocol. Check out the Kafka code from Github and switch to the release you're interested in (e.g. 3.8.0):
git clone git@github.com:apache/kafka.git git checkout 3.8.0
You can find the API message definitions in JSON in clients/src/main/resources/common/message. Each JSON file contains the definition of one message2 type with all its versions. clients/src/main/resources/common/message/README.md gives a good overview of the schema definition format. Pay attention to stuff like default values, flexible versions, and tagged fields.
Apart from the concrete API message types you're interested in, have a look at clients/src/main/resources/common/message/RequestHeader.json and ResponseHeader.json, which describe headers used in each request-response exchange.
Let's run the code generator:
./gradlew processMessages
Now you can find the generated classes in clients/src/generated/java/org/apache/kafka/common/message.
Have a look at clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java. This utility:
Other files are one-to-one generated from the corresponding schema JSONs (sometimes with the Data postfix, it's a compatibility thing). In these files you'll find:
Pay attention to the inner classes as well as they represent the complex structure of the message.
Running Kafka in Docker is a convenient way to get a broker running to test the protocol or capture the network exchange. Since version 3.7.0, the Kafka team builds official Docker images, which you can run as:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
If you're interested in older versions, search Docker Hub for other images. However, this may be not needed considering the Kafka protocol is backward and forward compatible: new brokers will recognize the old protocol versions just fine and the old clients can communicate with newer brokers.
If you read this, you probably already have the Kafka command line tools on your machine, but just in case, you can also run them in Docker. For example, run this to create a topic:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Having familiarized ourselves with the Kafka code, let's look at the protocol in action. Wireshark is a widely used tool for such inspections. It can dissect the Kafka protocol (and supports the latest versions if your version is fresh enough).
I built Wireshark from sources of version 4.5.0, because my operating system package is old and not able to dissect Kafka protocol with new versions. Wireshark 4.5.0 should mostly support Kafka 3.7 protocol versions. However, you can try the available version and see how it works for you.
Let's run Wireshark on the loopback interface with the port 9092 capture filter (1) and the kafka display filter (2):
Create a topic and see what Wireshark shows us:
./gradlew processMessages
The display filter removes everything irrelevant and leaves only Kafka requests and responses. As Wireshark understands most message versions in the protocol (depending on the Wireshark version, of course), you can conveniently look into the structure of each message. Wireshark will also show the corresponding bytes.
Wireshark is a great debugging tool that could help you understand how the protocol works in a particular case and what's wrong with your implementation.
The protocol defines a number of primitive types, which full description you can find here. Let's implement the read and write code for them. You can find all functions in this file, also check out the corresponding test file.
These are integer numbers with known fixed length: 1, 2, 4, or 8 bytes. Naturally, you can find such fields a lot throughout the protocol. In this class you may see how (trivially) their reading and writing are implemented in Kafka.
Let's first define the function for reading an exact number of bytes from a buffer3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
The BinaryIO type hint in Python represents an object from which bytes can be read and to which they can be written. It has methods like read, write, tell (for getting the current position), seek (for changing the position).
Now we can implement reading INT8:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka uses the big-endian (AKA network) byte ordering, hence byteorder="big".
Now writing:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
I won't repeat this for INT16, INT32, and INT64: the only significant differences are the number of bytes (2, 4, and 8 correspondingly) and checked ranges ([-(2**15), 2**15 - 1], [-(2**31), 2**31 - 1], and [-(2**63), 2**63 - 1] correspondingly).
UINT16 is similar to INT16:
./gradlew processMessages
Note the signed=False here.
BOOLEAN is essentially INT8 with extra logic: == 0 means false, != 0 means true.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
You can see an example of BOOLEAN in the allowAutoTopicCreation field of the MetadataRequestData generated class.
FLOAT64 is a double-precision 64-bit IEEE 754 value. Python doesn't have to_bytes and from_bytes for float like it has for int. So instead we will use the struct module from the standard library.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d means "a double value in the big-endian byte order".
Variable length integers are an approach that allows using of fewer bits per value when values are small. Kafka uses the varint approach from Protocol Buffers. The idea is simple:
Each byte in the varint has a continuation bit that indicates if the byte that follows it is part of the varint. This is the most significant bit (MSB) of the byte (sometimes also called the sign bit). The lower 7 bits are a payload; the resulting integer is built by appending together the 7-bit payloads of its constituent bytes.
You can check the Protobuf specification and Kafka implementation (read, write) for details.
This type isn't used in the protocol fields per se, but it's used for compact collections described below.
Let's implement it. For confidence, we get some examples directly from the source of truth, Kafka's ByteUtils class:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Running this, we'll get:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
Let's implement this in probably not the most performant, but a straightforward way:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID are 128-bit values used for uniquely identifying entities. For example, they are used to pass topic IDs in CreateTopicsResponse.
You can see how they are read and written in the Kafka code. It's simple to reproduce:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
Note that Kafka treats null/None as zero UUID, so we're doing the same here.
The Kafka protocol has 4 types of strings:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
Compactness indicates whether the string length is encoded with INT16 or with UNSIGNED_VARINT. It depends on the message version (it was introduced around 2017). Nullability is whether the value can be null. It depends on the message purpose and the version as well (sometimes string fields become optional during the protocol evolution).
Strings are ubiquitous in the protocol. For example, see the field name in the generated class MetadataRequestData.MetadataRequestTopic.
Strings are encoded pretty straightforwardly: first goes the length and then comes the UTF-8 encoded body. The maximum allowed length is 32767 bytes. Null strings have the length of -1 and obviously no body.
As the only difference between compact and non-compact is how the string length is encoded, we can have one function for both modes.
Let's start with reading and writing nullable strings:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Non-nullable string functions can be built on top of these:
./gradlew processMessages
Byte arrays are very similar to strings. They have the same potential nullability and compactness:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
They are also encoded in the same way: length body. Naturally, the body is not treated as an UTF-8 string, but as an opaque byte array. The max length of a byte array is 2147483647;
You can find an example of bytes in the field metadata in the generated class JoinGroupRequestData.JoinGroupRequestProtocol.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
As you can see, the difference between these functions and the corresponding functions for strings is small.
The protocol supports arrays of types other than bytes: strings, numbers, structs (but not nested arrays): ARRAY and COMPACT_ARRAY. Compactness is the same as in byte arrays and strings.
Nullability is not explicitly mentioned in the protocol specification for some reason. However, arrays can be nullable. This is controlled by nullableVersions in the schema definitions, like here.
Considering we already implemented read_array_length and write_array_length, let's implement the reader and writer functions:
./gradlew processMessages
RECORDS encode Kafka records. The structure is pretty complex and I'm not going to describe it in this guide (however, please let me know in the comments ?️ if you would like to have it.) For simplicity, we can treat records as NULLABLE_BYTES or COMPACT_NULLABLE_BYTES (depending on the message version).
Tagged fields are an extension to the Kafka protocol which allows optional data to be attached to messages. The idea is twofold:
Have a look, for instance, at this field. It has taggedVersions, which says since which version this field is tagged (in most cases, it's the same version when the field was added).
A tagged field consists of:
You can find more details about tagged fields in KIP-482.
Let's implement:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Here they are titled "unknown". Known fields need to be made so inside their structures.
The high-level message structure is very straightforward. According to the specification:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
That is, it's a message itself preceded by its size in bytes. Both request and response messages consist of the header immediately followed by the body. For some reason, this isn't explicitly documented4, but you can trust me ? or check the code.
The request header exists in three versions: 0, 1, and 2. They are specified in the protocol as:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER is the tagged fields mentioned earlier.
Let's implement them as Python data classes:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
As you can see, there are some tagged fields in version 2, there are no expected known fields. If some tagged field is sent erroneously to the broker, it will be ignored.
The response header exists in two versions: 0 and 1. They are specified in the protocol as:
./gradlew processMessages
Let's also implement them:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
We don't implement read for the request headers and write for the response ones. This is for brevity: we're not going to send response headers and receive the request ones in our examples as we're not programming the server side. However, if you're interested in the server side as well, you need to implement both functions (which should be straightforward).
Note particularly the correlation_id field in the request and response headers. The protocol supports pipelining: the client can have more than one outstanding request per connection. The correlation ID allows it to match responses to requests.
Which version must be used is a function of the API key and message version. It's not currently documented in the protocol guide explicitly5.
Use the requestHeaderVersion and responseHeaderVersion functions in the generated class ApiMessageType as the reference.
Now, having all this knowledge and code, let's finally send an ApiVersions request and receive and read a response. ApiVersions is normally the first request that the client sends. It's purpose is to find the API versions and features supported by the broker. We implement the latest version 3.
In the protocol specification, it's defined as:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Let's make the data class:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
And the response:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] means "an array of api_keys", where api_keys is the structure defined two lines below.
Converting this to Python data classes:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
When we speak about arrays, we need to know whether we need compact or non-compact ones. To find this out, let's have a look at the schema definition in ApiVersionsRequest.json. You can see "flexibleVersions": "3 ", which means that compact arrays are used starting from version 3 (more on this in README.md in the schema directory). Since we're working with version 3 here, we use compact arrays.
Having the request and response classes implemented, we can send and receive these requests. For this ApiVersions v3 we need the v2 request header and the v0 response header (check the generated ApiMessageType.java). The API key (18) you can find in ApiVersionsRequest.json or in the protocol specification.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
If you run this code, you will see the response header and message printed in the console. Congratulations, you've performed a correct network exchange with the Kafka broker!
You will notice three tagged fields put in _unknownTaggedFields. The read and write methods of the generated ApiVersionsResponseData class and also the message definition in ApiVersionsResponse.json will help you to interpret them. Consider this homework ?
In my day job, we developed an open source library Kio. It allows us to do arbitrary Kafka API calls from Python easily. The serialization /deserialization code, like in Kafka itself, is generated from the JSON protocol definitions. The generated code is rigorously tested, including property testing against real Java Kafka code. ↩
Or "message" if you like: some of the schemas are not for API, but e.g. for on-disk data. ↩
The read_exact function has a drawback that it duplicates the data when the underlying buffer is already in memory. However, it's more convenient for education purposes. ↩
I made a PR to fix this. ↩
Again, I made a PR to fix this. ↩
The above is the detailed content of Kafka protocol practical guide. For more information, please follow other related articles on the PHP Chinese website!