我在低階使用過很多 Apache Kafka 協定。僅按照官方指南開始執行此操作並不容易,而且我閱讀了很多程式碼。透過這篇文章,我想一步步指導您從原始值到有意義的請求,為您提供一個良好的開端。
在這篇文章中:
我們將使用Python作為程式語言。但是,程式碼將是零依賴性的,並且可以輕鬆移植到您選擇的語言。
Apache Kafka 有一個自訂的二進位協議,該協議是有版本的,具有各種資料類型、可選欄位等。不幸的是,它沒有使用像 Protobuf 這樣眾所周知的序列化格式。協定訊息架構以 JSON 格式描述。執行序列化和反序列化的實際 Java 程式碼是根據此描述產生的。
當你身處Java世界時,你可以使用官方的客戶端函式庫。但如果您使用其他平台,則需要依賴第三方實作。它們存在,但主要關註生產者和消費者,很少關注管理客戶端的某些方面。如果您需要做其他事情,您就得靠自己了。
這篇文章將幫助您開始破解 Kafka 協定。 (如果您正在為 Kafka 協定尋找現成的 Python(反)序列化函式庫,請查看 Kio1。對於 Rust,請查看我正在開發的程式庫。)
您可以在 Github 上的這個儲存庫中找到這篇文章中的程式碼以及更多類似的測試。
您可以在此頁面找到官方協議說明。我鼓勵您熟悉它,至少閱讀“預備知識”和“協議”部分。
以下是一些亮點。 Kafka 協定是基於 TCP 的二進位請求-回應協定:
每種 API 訊息類型都由請求和回應對組成,並由稱為 API 金鑰的數值進行識別。例如,Kafka 最具特色的 RPC 的 Produce 和 Fetch 對應的 API 金鑰為 0 和 1。如今,API 訊息類型接近 90 種(其中一些是經紀商間的訊息類型,而不是客戶經紀商間的訊息類型)。
請求和回應由版本化模式描述。版本控制允許協定演變,例如新增或刪除欄位或變更其資料類型。
您可以執行以下一些操作來開始使用 Kafka 協定。
Kafka 程式碼是該協定(實際上)的真相來源。從 Github 查看 Kafka 程式碼並切換到您感興趣的版本(例如 3.8.0):
git clone git@github.com:apache/kafka.git git checkout 3.8.0
您可以在clients/src/main/resources/common/message中找到JSON格式的API訊息定義。每個 JSON 檔案包含一條訊息2 類型及其所有版本的定義。 client/src/main/resources/common/message/README.md 很好地概述了模式定義格式。注意預設值、靈活版本和標記欄位等內容。
除了您感興趣的特定API訊息類型之外,還可以查看clients/src/main/resources/common/message/RequestHeader.json和ResponseHeader.json,它們描述了每個請求-回應交換中使用的標頭.
讓我們運行程式碼產生器:
./gradlew processMessages
現在你可以在clients/src/ generated/java/org/apache/kafka/common/message中找到產生的類別。
看看clients/src/ generated/java/org/apache/kafka/common/message/ApiMessageType.java。該實用程式:
其他檔案是從對應的架構 JSON 一對一產生的(有時帶有資料後綴,這是相容性問題)。在這些文件中,您會發現:
注意內部類,它們代表了訊息的複雜結構。
在 Docker 中運行 Kafka 是一種讓代理程式運行來測試協定或捕獲網路交換的便捷方法。從 3.7.0 版本開始,Kafka 團隊建立了官方 Docker 映像,您可以透過以下方式運行:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
如果您對舊版本感興趣,請在 Docker Hub 中搜尋其他映像。然而,考慮到 Kafka 協定是向後和向前相容的,這可能是不需要的:新的代理將很好地識別舊的協定版本,並且舊的客戶端可以與新的代理進行通訊。
如果您閱讀本文,您的電腦上可能已經安裝了 Kafka 命令列工具,但為了以防萬一,您也可以在 Docker 中執行它們。例如,執行此命令來建立主題:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
熟悉了 Kafka 程式碼後,讓我們看看實際的協定。 Wireshark 是此類檢查廣泛使用的工具。它可以剖析 Kafka 協定(如果您的版本足夠新,則支援最新版本)。
我從4.5.0版本的原始碼建置了Wireshark,因為我的作業系統套件很舊,無法用新版本解析Kafka協定。 Wireshark 4.5.0 應該主要支援 Kafka 3.7 協定版本。不過,您可以嘗試可用的版本,看看它如何適合您。
讓我們在環回介面上執行 Wireshark,使用連接埠 9092 擷取過濾器 (1) 和 kafka 顯示過濾器 (2):
建立一個主題,看看 Wireshark 向我們展示了什麼:
./gradlew processMessages
顯示過濾器刪除所有不相關的內容,只留下 Kafka 請求和回應。由於 Wireshark 可以理解協定中的大多數訊息版本(當然取決於 Wireshark 版本),因此您可以輕鬆查看每個訊息的結構。 Wireshark 也會顯示對應的位元組。
Wireshark 是一個很棒的偵錯工具,可以幫助您了解協定在特定情況下如何運作以及您的實作有什麼問題。
該協定定義了許多原始類型,您可以在此處找到完整的描述。讓我們為它們實作讀寫程式碼。你可以在這個檔案中找到所有的函數,也可以查看對應的測試檔案。
這些是已知固定長度的整數:1、2、4 或 8 個位元組。當然,您可以在整個協議中找到很多這樣的欄位。在本課程中,您可能會(簡單地)看到他們的讀寫是如何在 Kafka 中實現的。
我們先定義從緩衝區讀取確切位元組數的函數3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Python 中的 BinaryIO 類型提示表示一個可以從中讀取位元組並可以寫入位元組的物件。它有 read、write、tell(用於獲取當前位置)、seek(用於更改位置)等方法。
現在我們可以實作讀取INT8了:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka 使用 big-endian(又稱網路)位元組排序,因此 byteorder="big"。
現在寫:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
對於INT16、INT32 和INT64,我不會重複這一點:唯一顯著的區別是位元組數(分別為2、4 和8)和檢查範圍([-(2**15), 2* *15 - 1]、[-(2**31)、2**31 - 1] 和[-(2**63)、2**63 - 1]相應地)。
UINT16 與 INT16 類似:
./gradlew processMessages
注意這裡的signed=False。
BOOLEAN 本質上是帶有額外邏輯的 INT8:== 0 表示 false,!= 0 表示 true。
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
您可以在MetadataRequestData產生的類別的allowAutoTopicCreation欄位中看到BOOLEAN的範例。
FLOAT64 是雙精確度 64 位元 IEEE 754 值。 Python 不像 int 有用於 float 的 to_bytes 和 from_bytes 。因此,我們將使用標準庫中的 struct 模組。
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d 表示「大端位元組順序中的雙精確值」。
可變長度整數是一種允許在數值較小時每個值使用較少位數的方法。 Kafka 使用 Protocol Buffers 的 Varint 方法。這個想法很簡單:
varint 中的每個位元組都有一個連續位,指示其後面的位元組是否是 varint 的一部分。這是位元組的最高有效位元 (MSB)(有時也稱為符號位元)。低7位元是有效負載;產生的整數是透過將其組成位元組的 7 位元有效負載附加在一起而建構的。
詳細資訊可以查看Protobuf規格和Kafka實作(讀、寫)。
此類型本身不用於協定字段,但它用於下面描述的緊湊集合。
讓我們實現它。為了信心起見,我們直接從事實來源,Kafka 的 ByteUtils 類別中取得一些範例:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
運行這個,我們會得到:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
讓我們以可能不是最高效但最簡單的方式來實現它:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID 是用於唯一識別實體的 128 位元值。例如,它們用於在 CreateTopicsResponse 中傳遞主題 ID。
你可以看到它們在Kafka程式碼中是如何讀寫的。重現很簡單:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
請注意,Kafka 將 null/None 視為零 UUID,因此我們在這裡也這樣做。
Kafka協定有4種類型的字串:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
緊湊性指示字串長度是使用 INT16 還是使用 UNSIGNED_VARINT 編碼。這取決於訊息版本(2017年左右推出)。可空性是指該值是否可以為空。這也取決於訊息的目的和版本(有時字串欄位在協定演變過程中變得可選)。
字串在協定中無所不在。例如,查看產生的類別MetadataRequestData.MetadataRequestTopic中的欄位名稱。
字串的編碼非常簡單:首先是長度,然後是 UTF-8 編碼的正文。允許的最大長度為 32767 位元組。空字串的長度為 -1 並且顯然沒有正文。
由於緊湊型和非緊湊型之間的唯一區別在於字串長度的編碼方式,因此我們可以為兩種模式使用一個函數。
讓我們從讀取和寫入可為空字串開始:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
不可為 null 的字串函數可以建構在這些函數之上:
./gradlew processMessages
位元組數組與字串非常相似。它們具有相同的潛在可空性和緊湊性:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
它們也以相同的方式編碼:長度主體。當然,主體不會被視為 UTF-8 字串,而是被視為不透明的位元組數組。位元組數組的最大長度為 2147483647;
您可以在產生的類別JoinGroupRequestData.JoinGroupRequestProtocol中找到字段元資料中位元組的範例。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
如您所見,這些函數與字串對應的函數之間的差異很小。
此協定支援位元組以外類型的陣列:字串、數字、結構(但不包括巢狀陣列):ARRAY 和 COMPACT_ARRAY。 緊湊性與位元組數組和字串相同。
出於某種原因,協議規範中沒有明確提及可空性。但是,數組可以為空。這是由架構定義中的 nullableVersions 控制的,如下所示。
考慮到我們已經實作了 read_array_length 和 write_array_length,讓我們實作 reader 和 writer 函數:
./gradlew processMessages
RECORDS 對 Kafka 記錄進行編碼。這個結構非常複雜,我不會在本指南中描述它(但是,如果您想要它,請在評論中告訴我?️。)為了簡單起見,我們可以將記錄視為NULLABLE_BYTES 或COMPACT_NULLABLE_BYTES (取決於訊息版本)。
標記欄位是 Kafka 協定的擴展,它允許將可選資料附加到訊息中。這個想法是雙重的:
例如,看看這個欄位。它有 taggedVersions,它表示從哪個版本開始標記該欄位(在大多數情況下,它與新增該欄位時的版本相同)。
標記欄位包含:
您可以在 KIP-482 中找到有關標記欄位的更多詳細資訊。
讓我們實現:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
這裡它們的標題是「未知」。已知字段需要在其結構內進行建構。
進階訊息結構非常簡單。依規範:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
也就是說,它是一條訊息本身,前面有其大小(以位元組為單位)。請求和回應訊息均由緊跟其後的標頭組成。由於某種原因,這沒有明確記錄4,但你可以相信我嗎?或查看代碼。
請求頭存在三個版本:0、1、2。它們在協定中指定為:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER 就是前面提到的標記欄位。
讓我們將它們實作為 Python 資料類別:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
如您所見,版本 2 中有一些標記字段,沒有預期的已知字段。如果某些標記欄位被錯誤地傳送到代理,它將被忽略。
回應頭存在兩個版本:0和1。它們在協定中指定為:
./gradlew processMessages
讓我們也實現它們:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
我們沒有實作請求標頭的讀取和回應標頭的寫入。這是為了簡潔起見:在我們的範例中,我們不會發送回應標頭並接收請求標頭,因為我們不會對伺服器端進行程式設計。但是,如果您也對伺服器端感興趣,則需要實現這兩個功能(這應該很簡單)。
特別注意請求和回應標頭中的correlation_id 欄位。此協定支援管道:客戶端每個連線可以有多個未完成的請求。相關 ID 允許其將回應與請求進行配對。
必須使用哪個版本是API金鑰和訊息版本的函數。目前協議指南中尚未明確記錄5.
參考生成的類別ApiMessageType中的requestHeaderVersion和responseHeaderVersion函數。
現在,掌握了所有這些知識和程式碼,讓我們最終發送 ApiVersions 請求並接收和讀取回應。 ApiVersions 通常是客戶端發送的第一個請求。其目的是查找代理程式支援的 API 版本和功能。我們實作了最新版本 3。
在協定規範中,定義為:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
讓我們建立資料類別:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
回覆:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] 表示“api_keys 陣列”,其中 api_keys 是下面兩行定義的結構體。
將其轉換為 Python 資料類別:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
當我們談論數組時,我們需要知道我們是否需要緊湊數組或非緊湊數組。為了找到答案,讓我們來看看 ApiVersionsRequest.json 中的架構定義。可以看到 "flexibleVersions": "3 ",這意味著從版本 3 開始使用緊湊數組(更多資訊請參閱 schema 目錄中的 README.md)。由於我們在這裡使用版本 3,因此我們使用緊湊數組。
實現請求和回應類別後,我們可以發送和接收這些請求。對於此 ApiVersions v3,我們需要 v2 請求標頭和 v0 回應標頭(檢查產生的 ApiMessageType.java)。您可以在 ApiVersionsRequest.json 或協定規格中找到 API 金鑰 (18)。
git clone git@github.com:apache/kafka.git git checkout 3.8.0
如果執行此程式碼,您將看到控制台中列印的回應標頭和訊息。恭喜,您已經與 Kafka Broker 進行了正確的網路交換!
您會注意到 _unknownTaggedFields 中放入了三個標記欄位。產生的 ApiVersionsResponseData 類別的讀寫方法以及 ApiVersionsResponse.json 中的訊息定義將幫助您解釋它們。考慮一下這個作業嗎?
在我的日常工作中,我們開發了一個開源程式庫 Kio。它允許我們輕鬆地從 Python 進行任意 Kafka API 呼叫。序列化/反序列化程式碼,就像 Kafka 本身一樣,是根據 JSON 協定定義產生的。產生的程式碼經過嚴格測試,包括針對真實 Java Kafka 程式碼的屬性測試。 ↩
如果您願意,也可以使用「訊息」:某些模式不適用於 API,但例如磁碟上的資料。 ↩
read_exact 函數有一個缺點,即當底層緩衝區已在記憶體中時,它會複製資料。然而,它對於教育目的來說更方便。 ↩
我做了一個 PR 來解決這個問題。 ↩
我再次發布了 PR 來解決這個問題。 ↩
以上是Kafka協議實用指南的詳細內容。更多資訊請關注PHP中文網其他相關文章!