Saya bekerja dengan protokol Apache Kafka pada tahap rendah agak sedikit. Bukan mudah untuk mula melakukan ini mengikut panduan rasmi sahaja dan saya banyak membaca kod itu. Dengan siaran ini, saya ingin memberi anda permulaan yang baik dengan membimbing anda langkah demi langkah daripada nilai primitif kepada permintaan yang bermakna.
Dalam siaran ini:
Kami akan menggunakan Python sebagai bahasa pengaturcaraan. Walau bagaimanapun, kod tersebut akan menjadi kebergantungan sifar dan mudah dibawa ke bahasa pilihan anda.
Apache Kafka mempunyai protokol binari tersuai yang berversi, dengan pelbagai jenis data, medan pilihan, dll. Malangnya, ia tidak menggunakan format bersiri yang terkenal seperti Protobuf. Skema mesej protokol diterangkan dalam JSON. Kod Java sebenar yang melakukan penyirian dan penyahserikatan dijana daripada penerangan ini.
Apabila anda berada di dunia Java, anda boleh menggunakan perpustakaan pelanggan rasmi. Tetapi jika anda menggunakan platform lain, anda bergantung pada pelaksanaan pihak ketiga. Mereka wujud, tetapi mereka memfokuskan kebanyakannya pada pengeluar dan pengguna, jarang pada beberapa aspek pelanggan pentadbir. Jika anda perlu melakukan perkara lain, anda bersendirian.
Siaran ini akan membantu anda mula menggodam protokol Kafka. (Jika anda sedang mencari perpustakaan Python (de-)serialization sedia untuk protokol Kafka, lihat Kio1. Untuk Rust, lihat perpustakaan yang sedang saya usahakan.)
Anda boleh mencari kod daripada siaran ini dan beberapa lagi ujian seperti dalam repositori ini di Github.
Anda boleh mendapatkan penerangan protokol rasmi di halaman ini. Saya menggalakkan anda membiasakan diri dengannya, sekurang-kurangnya baca bahagian "Preliminaries" dan "The Protocol".
Berikut ialah beberapa sorotan. Protokol Kafka ialah protokol permintaan-tindak balas binari berasaskan TCP:
Setiap jenis mesej API terdiri daripada pasangan permintaan dan respons dan dikenal pasti dengan nilai angka yang dipanggil kunci API. Contohnya, Produce dan Fetch, RPC Kafka yang paling berciri, mempunyai kunci API 0 dan 1 yang sepadan. Pada masa kini, terdapat hampir 90 jenis mesej API (sesetengahnya adalah antara broker, bukan klien-broker).
Permintaan dan respons diterangkan oleh skema versi. Pemberian versi membolehkan evolusi protokol, contohnya, menambah atau mengalih keluar medan atau menukar jenis datanya.
Berikut ialah beberapa perkara yang boleh anda lakukan untuk mula bekerja dengan protokol Kafka.
Kod Kafka ialah sumber kebenaran (secara praktikal) tentang protokol. Lihat kod Kafka daripada Github dan tukar kepada keluaran yang anda minati (mis. 3.8.0):
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Anda boleh menemui definisi mesej API dalam JSON dalam klien/src/main/resources/common/message. Setiap fail JSON mengandungi takrifan satu mesej2 jenis dengan semua versinya. clients/src/main/resources/common/message/README.md memberikan gambaran keseluruhan yang baik tentang format definisi skema. Beri perhatian kepada perkara seperti nilai lalai, versi fleksibel dan medan berteg.
Selain daripada jenis mesej API konkrit yang anda minati, lihat klien/src/main/resources/common/message/RequestHeader.json dan ResponseHeader.json, yang menerangkan pengepala yang digunakan dalam setiap pertukaran permintaan-tindak balas .
Mari jalankan penjana kod:
./gradlew processMessages
Kini anda boleh mencari kelas yang dijana dalam klien/src/generated/java/org/apache/kafka/common/message.
Sila lihat klien/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java. Utiliti ini:
Fail lain dijana satu-sama-satu daripada JSON skema yang sepadan (kadang-kadang dengan postfix Data, ia adalah perkara keserasian). Dalam fail ini anda akan dapati:
Beri perhatian kepada kelas dalaman serta ia mewakili struktur kompleks mesej.
Menjalankan Kafka dalam Docker ialah cara yang mudah untuk membolehkan broker menjalankan ujian protokol atau menangkap pertukaran rangkaian. Sejak versi 3.7.0, pasukan Kafka membina imej Docker rasmi, yang boleh anda jalankan sebagai:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Jika anda berminat dengan versi lama, cari Docker Hub untuk imej lain. Walau bagaimanapun, ini mungkin tidak diperlukan memandangkan protokol Kafka adalah serasi ke belakang dan ke hadapan: broker baharu akan mengenali versi protokol lama dengan baik dan pelanggan lama boleh berkomunikasi dengan broker baharu.
Jika anda membaca ini, anda mungkin sudah mempunyai alatan baris arahan Kafka pada mesin anda, tetapi untuk berjaga-jaga, anda juga boleh menjalankannya dalam Docker. Contohnya, jalankan ini untuk mencipta topik:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Setelah membiasakan diri dengan kod Kafka, mari lihat protokol dalam tindakan. Wireshark ialah alat yang digunakan secara meluas untuk pemeriksaan sedemikian. Ia boleh membedah protokol Kafka (dan menyokong versi terkini jika versi anda cukup segar).
Saya membina Wireshark daripada sumber versi 4.5.0, kerana pakej sistem pengendalian saya sudah lama dan tidak dapat membedah protokol Kafka dengan versi baharu. Wireshark 4.5.0 sepatutnya menyokong versi protokol Kafka 3.7. Walau bagaimanapun, anda boleh mencuba versi yang tersedia dan melihat cara ia berfungsi untuk anda.
Mari jalankan Wireshark pada antara muka gelung balik dengan penapis tangkapan port 9092 (1) dan penapis paparan kafka (2):
Buat topik dan lihat perkara yang ditunjukkan oleh Wireshark kepada kami:
./gradlew processMessages
Penapis paparan mengalih keluar semua yang tidak berkaitan dan hanya meninggalkan permintaan dan respons Kafka. Memandangkan Wireshark memahami kebanyakan versi mesej dalam protokol (bergantung pada versi Wireshark, sudah tentu), anda boleh melihat dengan mudah struktur setiap mesej. Wireshark juga akan menunjukkan bait yang sepadan.
Wireshark ialah alat penyahpepijatan yang hebat yang boleh membantu anda memahami cara protokol berfungsi dalam kes tertentu dan apa yang salah dengan pelaksanaan anda.
Protokol mentakrifkan beberapa jenis primitif, yang penerangan penuh boleh anda temui di sini. Mari kita laksanakan kod baca dan tulis untuk mereka. Anda boleh menemui semua fungsi dalam fail ini, lihat juga fail ujian yang sepadan.
Ini ialah nombor integer dengan panjang tetap yang diketahui: 1, 2, 4 atau 8 bait. Sememangnya, anda boleh menemui medan sedemikian banyak sepanjang protokol. Dalam kelas ini, anda mungkin melihat bagaimana (secara remeh) pembacaan dan penulisan mereka dilaksanakan dalam Kafka.
Mari kita tentukan dahulu fungsi untuk membaca bilangan bait yang tepat daripada penimbal3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Petunjuk jenis BinaryIO dalam Python mewakili objek dari mana bait boleh dibaca dan untuknya ia boleh ditulis. Ia mempunyai kaedah seperti membaca, menulis, memberitahu (untuk mendapatkan kedudukan semasa), mencari (untuk menukar kedudukan).
Kini kita boleh melaksanakan bacaan INT8:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka menggunakan pesanan bait big-endian (rangkaian AKA), oleh itu byteorder="big".
Sekarang menulis:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Saya tidak akan mengulangi ini untuk INT16, INT32 dan INT64: satu-satunya perbezaan ketara ialah bilangan bait (2, 4 dan 8 yang sepadan) dan julat yang diperiksa ([-(2**15), 2* *15 - 1], [-(2**31), 2**31 - 1] dan [-(2**63), 2**63 - 1] sepadan).
UINT16 serupa dengan INT16:
./gradlew processMessages
Perhatikan tandatangan=Salah di sini.
BOOLEAN pada asasnya ialah INT8 dengan logik tambahan: == 0 bermaksud palsu, != 0 bermaksud benar.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Anda boleh melihat contoh BOOLEAN dalam medan allowAutoTopicCreation kelas MetadataRequestData yang dijana.
FLOAT64 ialah nilai IEEE 754 64-bit berketepatan ganda. Python tidak mempunyai to_bytes dan from_bytes untuk float seperti yang ada untuk int. Jadi sebaliknya kita akan menggunakan modul struct daripada perpustakaan standard.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d bermaksud "nilai berganda dalam susunan bait besar-endian".
Integer panjang boleh ubah ialah pendekatan yang membenarkan penggunaan bit yang lebih sedikit bagi setiap nilai apabila nilainya kecil. Kafka menggunakan pendekatan varian dari Penampan Protokol. Ideanya mudah sahaja:
Setiap bait dalam varian mempunyai bit kesinambungan yang menunjukkan jika bait yang mengikutinya adalah sebahagian daripada varian. Ini ialah bit paling ketara (MSB) bagi bait (kadangkala juga dipanggil bit tanda). 7 bit yang lebih rendah adalah muatan; integer yang terhasil dibina dengan menambahkan bersama muatan 7-bit bait konstituennya.
Anda boleh menyemak spesifikasi Protobuf dan pelaksanaan Kafka (baca, tulis) untuk butiran.
Jenis ini tidak digunakan dalam medan protokol semata-mata, tetapi ia digunakan untuk koleksi padat yang diterangkan di bawah.
Mari kita laksanakan. Untuk keyakinan, kami mendapat beberapa contoh terus daripada sumber kebenaran, kelas ByteUtils Kafka:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Menjalankan ini, kita akan mendapat:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
Mari kita laksanakan ini mungkin bukan yang paling berprestasi, tetapi dengan cara yang mudah:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID ialah nilai 128-bit yang digunakan untuk mengenal pasti entiti secara unik. Contohnya, ia digunakan untuk menghantar ID topik dalam CreateTopicsResponse.
Anda boleh melihat cara ia dibaca dan ditulis dalam kod Kafka. Ia mudah untuk menghasilkan semula:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
Perhatikan bahawa Kafka menganggap null/None sebagai UUID sifar, jadi kami melakukan perkara yang sama di sini.
Protokol Kafka mempunyai 4 jenis rentetan:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
Kekompakan menunjukkan sama ada panjang rentetan dikodkan dengan INT16 atau dengan UNSIGNED_VARINT. Ia bergantung pada versi mesej (ia diperkenalkan sekitar 2017). Kebolehnullan ialah sama ada nilai boleh menjadi batal. Ia bergantung pada tujuan mesej dan versi juga (kadangkala medan rentetan menjadi pilihan semasa evolusi protokol).
String ada di mana-mana dalam protokol. Sebagai contoh, lihat nama medan dalam kelas yang dijana MetadataRequestData.MetadataRequestTopic.
Rentetan dikodkan dengan agak mudah: mula-mula mengikut panjang dan kemudian badan berkod UTF-8. Panjang maksimum yang dibenarkan ialah 32767 bait. Rentetan null mempunyai panjang -1 dan jelas tiada badan.
Memandangkan satu-satunya perbezaan antara padat dan tidak padat ialah cara panjang rentetan dikodkan, kita boleh mempunyai satu fungsi untuk kedua-dua mod.
Mari kita mulakan dengan membaca dan menulis rentetan nullable:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Fungsi rentetan yang tidak boleh dibatalkan boleh dibina di atas ini:
./gradlew processMessages
Tatasusunan bait sangat serupa dengan rentetan. Mereka mempunyai potensi kebolehbatalan dan kekompakan yang sama:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
Ia juga dikodkan dengan cara yang sama: badan panjang. Sememangnya, badan tidak dianggap sebagai rentetan UTF-8, tetapi sebagai tatasusunan bait legap. Panjang maksimum tatasusunan bait ialah 2147483647;
Anda boleh mencari contoh bait dalam metadata medan dalam kelas yang dijana JoinGroupRequestData.JoinGroupRequestProtocol.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Seperti yang anda lihat, perbezaan antara fungsi ini dan fungsi yang sepadan untuk rentetan adalah kecil.
Protokol menyokong tatasusunan jenis selain bait: rentetan, nombor, struct (tetapi bukan tatasusunan bersarang): ARRAY dan COMPACT_ARRAY. Kekompakan adalah sama seperti dalam tatasusunan bait dan rentetan.
Kebolehbatalan tidak disebut secara eksplisit dalam spesifikasi protokol atas sebab tertentu. Walau bagaimanapun, tatasusunan boleh menjadi nullable. Ini dikawal oleh nullableVersions dalam definisi skema, seperti di sini.
Memandangkan kita telah melaksanakan read_array_length dan write_array_length, mari laksanakan fungsi pembaca dan penulis:
./gradlew processMessages
REKOD mengekod rekod Kafka. Strukturnya agak rumit dan saya tidak akan menerangkannya dalam panduan ini (namun, sila beritahu saya dalam ulasan ? ️ jika anda ingin memilikinya.) Untuk memudahkan, kami boleh menganggap rekod sebagai NULLABLE_BYTES atau COMPACT_NULLABLE_BYTES ( bergantung pada versi mesej).
Medan bertanda ialah sambungan kepada protokol Kafka yang membenarkan data pilihan dilampirkan pada mesej. Ideanya ada dua:
Sebagai contoh, lihat di medan ini. Ia mempunyai taggedVersions, yang menyatakan sejak versi mana medan ini ditandakan (dalam kebanyakan kes, ia adalah versi yang sama apabila medan itu ditambahkan).
Medan bertanda terdiri daripada:
Anda boleh mendapatkan butiran lanjut tentang medan berteg dalam KIP-482.
Mari kita laksanakan:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Di sini mereka diberi tajuk "tidak diketahui". Medan yang diketahui perlu dibuat di dalam strukturnya.
Struktur mesej peringkat tinggi sangat mudah. Mengikut spesifikasi:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Iaitu, mesej itu sendiri didahului oleh saiznya dalam bait. Kedua-dua mesej permintaan dan respons terdiri daripada pengepala serta-merta diikuti oleh badan. Atas sebab tertentu, ini tidak didokumenkan secara eksplisit4, tetapi anda boleh mempercayai saya? atau semak kod.
Pengepala permintaan wujud dalam tiga versi: 0, 1 dan 2. Ia dinyatakan dalam protokol sebagai:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER ialah medan berteg yang dinyatakan sebelum ini.
Mari kita laksanakannya sebagai kelas data Python:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Seperti yang anda lihat, terdapat beberapa medan berteg dalam versi 2, tiada medan yang dijangka diketahui. Jika beberapa medan teg dihantar secara tersilap kepada broker, ia akan diabaikan.
Pengepala respons wujud dalam dua versi: 0 dan 1. Ia dinyatakan dalam protokol sebagai:
./gradlew processMessages
Mari kita juga melaksanakannya:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Kami tidak melaksanakan baca untuk pengepala permintaan dan tulis untuk respons. Ini adalah untuk ringkas: kami tidak akan menghantar pengepala respons dan menerima permintaan dalam contoh kami kerana kami tidak memprogramkan bahagian pelayan. Walau bagaimanapun, jika anda berminat dengan bahagian pelayan juga, anda perlu melaksanakan kedua-dua fungsi (yang sepatutnya mudah).
Perhatikan terutamanya medan correlation_id dalam pengepala permintaan dan respons. Protokol menyokong saluran paip: pelanggan boleh mempunyai lebih daripada satu permintaan tertunggak bagi setiap sambungan. ID korelasi membenarkannya memadankan respons kepada permintaan.
Versi yang mesti digunakan ialah fungsi kunci API dan versi mesej. Pada masa ini ia tidak didokumenkan dalam panduan protokol secara eksplisit5.
Gunakan fungsi requestHeaderVersion dan responseHeaderVersion dalam kelas yang dijana ApiMessageType sebagai rujukan.
Sekarang, dengan semua pengetahuan dan kod ini, mari akhirnya menghantar permintaan ApiVersions dan menerima serta membaca jawapan. ApiVersions biasanya permintaan pertama yang dihantar oleh pelanggan. Tujuannya adalah untuk mencari versi dan ciri API yang disokong oleh broker. Kami melaksanakan versi terkini 3.
Dalam spesifikasi protokol, ia ditakrifkan sebagai:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Jom buat kelas data:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Dan jawapannya:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] bermaksud "susunan api_keys", dengan api_keys ialah struktur yang ditakrifkan dua baris di bawah.
Menukar ini kepada kelas data Python:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Apabila kita bercakap tentang tatasusunan, kita perlu tahu sama ada kita memerlukan tatasusunan yang padat atau tidak padat. Untuk mengetahui perkara ini, mari kita lihat definisi skema dalam ApiVersionsRequest.json. Anda boleh melihat "flexibleVersions": "3", yang bermaksud tatasusunan padat digunakan bermula dari versi 3 (lebih lanjut mengenai ini dalam README.md dalam direktori skema). Memandangkan kami sedang bekerja dengan versi 3 di sini, kami menggunakan tatasusunan padat.
Setelah kelas permintaan dan respons dilaksanakan, kami boleh menghantar dan menerima permintaan ini. Untuk ApiVersions v3 ini kita memerlukan pengepala permintaan v2 dan pengepala respons v0 (semak ApiMessageType.java yang dijana). Kunci API (18) boleh anda temui dalam ApiVersionsRequest.json atau dalam spesifikasi protokol.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Jika anda menjalankan kod ini, anda akan melihat pengepala respons dan mesej dicetak dalam konsol. Tahniah, anda telah melakukan pertukaran rangkaian yang betul dengan broker Kafka!
Anda akan melihat tiga medan bertanda diletakkan dalam _unknownTaggedFields. Kaedah baca dan tulis kelas ApiVersionsResponseData yang dihasilkan dan juga definisi mesej dalam ApiVersionsResponse.json akan membantu anda mentafsirnya. Pertimbangkan kerja rumah ini ?
Dalam pekerjaan saya, kami membangunkan perpustakaan sumber terbuka Kio. Ia membolehkan kami melakukan panggilan API Kafka sewenang-wenangnya daripada Python dengan mudah. Kod bersiri/deserialisasi, seperti dalam Kafka sendiri, dihasilkan daripada takrifan protokol JSON. Kod yang dijana diuji dengan teliti, termasuk ujian sifat terhadap kod Java Kafka sebenar. ↩
Atau "mesej" jika anda suka: sesetengah skema bukan untuk API, tetapi mis. untuk data pada cakera. ↩
Fungsi read_exact mempunyai kelemahan iaitu ia menduplikasi data apabila penimbal asas sudah berada dalam ingatan. Walau bagaimanapun, ia lebih mudah untuk tujuan pendidikan. ↩
Saya membuat PR untuk membetulkan perkara ini. ↩
Sekali lagi, saya membuat PR untuk membetulkan perkara ini. ↩
Atas ialah kandungan terperinci Panduan praktikal protokol Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!