Rumah > pembangunan bahagian belakang > Tutorial Python > Panduan praktikal protokol Kafka

Panduan praktikal protokol Kafka

Susan Sarandon
Lepaskan: 2024-12-28 17:11:14
asal
415 orang telah melayarinya

Saya bekerja dengan protokol Apache Kafka pada tahap rendah agak sedikit. Bukan mudah untuk mula melakukan ini mengikut panduan rasmi sahaja dan saya banyak membaca kod itu. Dengan siaran ini, saya ingin memberi anda permulaan yang baik dengan membimbing anda langkah demi langkah daripada nilai primitif kepada permintaan yang bermakna.

Dalam siaran ini:

  1. Terokai kod protokol Kafka dan protokol dalam tindakan dengan Wireshark.
  2. Ketahui cara membaca dan menulis nilai primitif.
  3. Gabungkan primitif untuk melaksanakan permintaan yang bermakna.

Kami akan menggunakan Python sebagai bahasa pengaturcaraan. Walau bagaimanapun, kod tersebut akan menjadi kebergantungan sifar dan mudah dibawa ke bahasa pilihan anda.

Pengenalan

Apache Kafka mempunyai protokol binari tersuai yang berversi, dengan pelbagai jenis data, medan pilihan, dll. Malangnya, ia tidak menggunakan format bersiri yang terkenal seperti Protobuf. Skema mesej protokol diterangkan dalam JSON. Kod Java sebenar yang melakukan penyirian dan penyahserikatan dijana daripada penerangan ini.

Apabila anda berada di dunia Java, anda boleh menggunakan perpustakaan pelanggan rasmi. Tetapi jika anda menggunakan platform lain, anda bergantung pada pelaksanaan pihak ketiga. Mereka wujud, tetapi mereka memfokuskan kebanyakannya pada pengeluar dan pengguna, jarang pada beberapa aspek pelanggan pentadbir. Jika anda perlu melakukan perkara lain, anda bersendirian.

Siaran ini akan membantu anda mula menggodam protokol Kafka. (Jika anda sedang mencari perpustakaan Python (de-)serialization sedia untuk protokol Kafka, lihat Kio1. Untuk Rust, lihat perpustakaan yang sedang saya usahakan.)

Anda boleh mencari kod daripada siaran ini dan beberapa lagi ujian seperti dalam repositori ini di Github.

Gambaran keseluruhan protokol

Anda boleh mendapatkan penerangan protokol rasmi di halaman ini. Saya menggalakkan anda membiasakan diri dengannya, sekurang-kurangnya baca bahagian "Preliminaries" dan "The Protocol".

Berikut ialah beberapa sorotan. Protokol Kafka ialah protokol permintaan-tindak balas binari berasaskan TCP:

  • Berasaskan TCP: broker Kafka mendengar di port pada tindanan TCP (yang memberikan beberapa faedah seperti jaminan pesanan).
  • Perduaan: mesej dikodkan dalam bentuk binari dan memerlukan penyiaran dan penyahsirilan khas mengikut skema yang telah ditetapkan.
  • Permintaan-tindak balas: pertukaran dimulakan oleh pelanggan, pelayan adalah pasif dan hanya membalas permintaan.

Setiap jenis mesej API terdiri daripada pasangan permintaan dan respons dan dikenal pasti dengan nilai angka yang dipanggil kunci API. Contohnya, Produce dan Fetch, RPC Kafka yang paling berciri, mempunyai kunci API 0 dan 1 yang sepadan. Pada masa kini, terdapat hampir 90 jenis mesej API (sesetengahnya adalah antara broker, bukan klien-broker).

Permintaan dan respons diterangkan oleh skema versi. Pemberian versi membolehkan evolusi protokol, contohnya, menambah atau mengalih keluar medan atau menukar jenis datanya.

Langkah pertama

Berikut ialah beberapa perkara yang boleh anda lakukan untuk mula bekerja dengan protokol Kafka.

Ketahui kod protokol Kafka

Kod Kafka ialah sumber kebenaran (secara praktikal) tentang protokol. Lihat kod Kafka daripada Github dan tukar kepada keluaran yang anda minati (mis. 3.8.0):

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Anda boleh menemui definisi mesej API dalam JSON dalam klien/src/main/resources/common/message. Setiap fail JSON mengandungi takrifan satu mesej2 jenis dengan semua versinya. clients/src/main/resources/common/message/README.md memberikan gambaran keseluruhan yang baik tentang format definisi skema. Beri perhatian kepada perkara seperti nilai lalai, versi fleksibel dan medan berteg.

Selain daripada jenis mesej API konkrit yang anda minati, lihat klien/src/main/resources/common/message/RequestHeader.json dan ResponseHeader.json, yang menerangkan pengepala yang digunakan dalam setiap pertukaran permintaan-tindak balas .

Mari jalankan penjana kod:

./gradlew processMessages
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Kini anda boleh mencari kelas yang dijana dalam klien/src/generated/java/org/apache/kafka/common/message.

Sila lihat klien/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java. Utiliti ini:

  • menerangkan keseluruhan set jenis mesej API sedia ada bersama-sama dengan skema dan versinya;
  • memetakan versi mesej API kepada versi pengepala permintaan dan respons dalam fungsi requestHeaderVersion dan responseHeaderVersion.

Fail lain dijana satu-sama-satu daripada JSON skema yang sepadan (kadang-kadang dengan postfix Data, ia adalah perkara keserasian). Dalam fail ini anda akan dapati:

  1. Takrif skema versi SCHEMA_0, SCHEMA_1, dsb. Kadangkala skema kekal sama antara versi. Ini adalah perkara biasa dan bermakna hanya rakan sejawatan permintaan yang berubah.
  2. kaedah baca dan tulis di mana anda boleh mencari kebenaran asas untuk pensirilan protokol dan penyahserikatan.

Beri perhatian kepada kelas dalaman serta ia mewakili struktur kompleks mesej.

Jalankan Kafka di Docker

Menjalankan Kafka dalam Docker ialah cara yang mudah untuk membolehkan broker menjalankan ujian protokol atau menangkap pertukaran rangkaian. Sejak versi 3.7.0, pasukan Kafka membina imej Docker rasmi, yang boleh anda jalankan sebagai:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Jika anda berminat dengan versi lama, cari Docker Hub untuk imej lain. Walau bagaimanapun, ini mungkin tidak diperlukan memandangkan protokol Kafka adalah serasi ke belakang dan ke hadapan: broker baharu akan mengenali versi protokol lama dengan baik dan pelanggan lama boleh berkomunikasi dengan broker baharu.

Jika anda membaca ini, anda mungkin sudah mempunyai alatan baris arahan Kafka pada mesin anda, tetapi untuk berjaga-jaga, anda juga boleh menjalankannya dalam Docker. Contohnya, jalankan ini untuk mencipta topik:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Periksa protokol dengan Wireshark

Setelah membiasakan diri dengan kod Kafka, mari lihat protokol dalam tindakan. Wireshark ialah alat yang digunakan secara meluas untuk pemeriksaan sedemikian. Ia boleh membedah protokol Kafka (dan menyokong versi terkini jika versi anda cukup segar).

Saya membina Wireshark daripada sumber versi 4.5.0, kerana pakej sistem pengendalian saya sudah lama dan tidak dapat membedah protokol Kafka dengan versi baharu. Wireshark 4.5.0 sepatutnya menyokong versi protokol Kafka 3.7. Walau bagaimanapun, anda boleh mencuba versi yang tersedia dan melihat cara ia berfungsi untuk anda.

Mari jalankan Wireshark pada antara muka gelung balik dengan penapis tangkapan port 9092 (1) dan penapis paparan kafka (2):

Kafka protocol practical guide

Buat topik dan lihat perkara yang ditunjukkan oleh Wireshark kepada kami:

./gradlew processMessages
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Kafka protocol practical guide

Penapis paparan mengalih keluar semua yang tidak berkaitan dan hanya meninggalkan permintaan dan respons Kafka. Memandangkan Wireshark memahami kebanyakan versi mesej dalam protokol (bergantung pada versi Wireshark, sudah tentu), anda boleh melihat dengan mudah struktur setiap mesej. Wireshark juga akan menunjukkan bait yang sepadan.

Wireshark ialah alat penyahpepijatan yang hebat yang boleh membantu anda memahami cara protokol berfungsi dalam kes tertentu dan apa yang salah dengan pelaksanaan anda.

Membaca dan menulis nilai primitif

Protokol mentakrifkan beberapa jenis primitif, yang penerangan penuh boleh anda temui di sini. Mari kita laksanakan kod baca dan tulis untuk mereka. Anda boleh menemui semua fungsi dalam fail ini, lihat juga fail ujian yang sepadan.

Nilai integer panjang tetap: INT8, INT16, INT32, INT64 dan UINT16

Ini ialah nombor integer dengan panjang tetap yang diketahui: 1, 2, 4 atau 8 bait. Sememangnya, anda boleh menemui medan sedemikian banyak sepanjang protokol. Dalam kelas ini, anda mungkin melihat bagaimana (secara remeh) pembacaan dan penulisan mereka dilaksanakan dalam Kafka.

Mari kita tentukan dahulu fungsi untuk membaca bilangan bait yang tepat daripada penimbal3:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Petunjuk jenis BinaryIO dalam Python mewakili objek dari mana bait boleh dibaca dan untuknya ia boleh ditulis. Ia mempunyai kaedah seperti membaca, menulis, memberitahu (untuk mendapatkan kedudukan semasa), mencari (untuk menukar kedudukan).

Kini kita boleh melaksanakan bacaan INT8:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Kafka menggunakan pesanan bait big-endian (rangkaian AKA), oleh itu byteorder="big".

Sekarang menulis:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Saya tidak akan mengulangi ini untuk INT16, INT32 dan INT64: satu-satunya perbezaan ketara ialah bilangan bait (2, 4 dan 8 yang sepadan) dan julat yang diperiksa ([-(2**15), 2* *15 - 1], [-(2**31), 2**31 - 1] dan [-(2**63), 2**63 - 1] sepadan).

UINT16 serupa dengan INT16:

./gradlew processMessages
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Perhatikan tandatangan=Salah di sini.

BOOLEAN

BOOLEAN pada asasnya ialah INT8 dengan logik tambahan: == 0 bermaksud palsu, != 0 bermaksud benar.

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Anda boleh melihat contoh BOOLEAN dalam medan allowAutoTopicCreation kelas MetadataRequestData yang dijana.

TERApung64

FLOAT64 ialah nilai IEEE 754 64-bit berketepatan ganda. Python tidak mempunyai to_bytes dan from_bytes untuk float seperti yang ada untuk int. Jadi sebaliknya kita akan menggunakan modul struct daripada perpustakaan standard.

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

>d bermaksud "nilai berganda dalam susunan bait besar-endian".

UNSIGNED_VARINT: Nilai integer panjang boleh ubah

Integer panjang boleh ubah ialah pendekatan yang membenarkan penggunaan bit yang lebih sedikit bagi setiap nilai apabila nilainya kecil. Kafka menggunakan pendekatan varian dari Penampan Protokol. Ideanya mudah sahaja:

Setiap bait dalam varian mempunyai bit kesinambungan yang menunjukkan jika bait yang mengikutinya adalah sebahagian daripada varian. Ini ialah bit paling ketara (MSB) bagi bait (kadangkala juga dipanggil bit tanda). 7 bit yang lebih rendah adalah muatan; integer yang terhasil dibina dengan menambahkan bersama muatan 7-bit bait konstituennya.

Anda boleh menyemak spesifikasi Protobuf dan pelaksanaan Kafka (baca, tulis) untuk butiran.

Jenis ini tidak digunakan dalam medan protokol semata-mata, tetapi ia digunakan untuk koleksi padat yang diterangkan di bawah.

Mari kita laksanakan. Untuk keyakinan, kami mendapat beberapa contoh terus daripada sumber kebenaran, kelas ByteUtils Kafka:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Menjalankan ini, kita akan mendapat:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
Salin selepas log masuk
Salin selepas log masuk

Mari kita laksanakan ini mungkin bukan yang paling berprestasi, tetapi dengan cara yang mudah:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Salin selepas log masuk
Salin selepas log masuk

UUID

UUID ialah nilai 128-bit yang digunakan untuk mengenal pasti entiti secara unik. Contohnya, ia digunakan untuk menghantar ID topik dalam CreateTopicsResponse.

Anda boleh melihat cara ia dibaca dan ditulis dalam kod Kafka. Ia mudah untuk menghasilkan semula:

def write_int8(value: int, buffer: BinaryIO) -> None:
    if -(2**7) <= value <= 2**7 - 1:
        buffer.write(value.to_bytes(1, byteorder="big", signed=True))
    else:
        raise ValueError(f"Value {value} is out of range for INT8")
Salin selepas log masuk

Perhatikan bahawa Kafka menganggap null/None sebagai UUID sifar, jadi kami melakukan perkara yang sama di sini.

rentetan

Protokol Kafka mempunyai 4 jenis rentetan:

compact non-compact
nullable COMPACT_NULLABLE_STRING NULLABLE_STRING
non-nullable COMPACT_STRING STRING

Kekompakan menunjukkan sama ada panjang rentetan dikodkan dengan INT16 atau dengan UNSIGNED_VARINT. Ia bergantung pada versi mesej (ia diperkenalkan sekitar 2017). Kebolehnullan ialah sama ada nilai boleh menjadi batal. Ia bergantung pada tujuan mesej dan versi juga (kadangkala medan rentetan menjadi pilihan semasa evolusi protokol).

String ada di mana-mana dalam protokol. Sebagai contoh, lihat nama medan dalam kelas yang dijana MetadataRequestData.MetadataRequestTopic.

Rentetan dikodkan dengan agak mudah: mula-mula mengikut panjang dan kemudian badan berkod UTF-8. Panjang maksimum yang dibenarkan ialah 32767 bait. Rentetan null mempunyai panjang -1 dan jelas tiada badan.

Memandangkan satu-satunya perbezaan antara padat dan tidak padat ialah cara panjang rentetan dikodkan, kita boleh mempunyai satu fungsi untuk kedua-dua mod.

Mari kita mulakan dengan membaca dan menulis rentetan nullable:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Fungsi rentetan yang tidak boleh dibatalkan boleh dibina di atas ini:

./gradlew processMessages
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Tatasusunan bait

Tatasusunan bait sangat serupa dengan rentetan. Mereka mempunyai potensi kebolehbatalan dan kekompakan yang sama:

compact non-compact
nullable COMPACT_NULLABLE_BYTES NULLABLE_BYTES
non-nullable COMPACT_BYTES BYTES

Ia juga dikodkan dengan cara yang sama: badan panjang. Sememangnya, badan tidak dianggap sebagai rentetan UTF-8, tetapi sebagai tatasusunan bait legap. Panjang maksimum tatasusunan bait ialah 2147483647;

Anda boleh mencari contoh bait dalam metadata medan dalam kelas yang dijana JoinGroupRequestData.JoinGroupRequestProtocol.

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Seperti yang anda lihat, perbezaan antara fungsi ini dan fungsi yang sepadan untuk rentetan adalah kecil.

Tatasusunan lain

Protokol menyokong tatasusunan jenis selain bait: rentetan, nombor, struct (tetapi bukan tatasusunan bersarang): ARRAY dan COMPACT_ARRAY. Kekompakan adalah sama seperti dalam tatasusunan bait dan rentetan.

Kebolehbatalan tidak disebut secara eksplisit dalam spesifikasi protokol atas sebab tertentu. Walau bagaimanapun, tatasusunan boleh menjadi nullable. Ini dikawal oleh nullableVersions dalam definisi skema, seperti di sini.

Memandangkan kita telah melaksanakan read_array_length dan write_array_length, mari laksanakan fungsi pembaca dan penulis:

./gradlew processMessages
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

REKOD

REKOD mengekod rekod Kafka. Strukturnya agak rumit dan saya tidak akan menerangkannya dalam panduan ini (namun, sila beritahu saya dalam ulasan ? ️ jika anda ingin memilikinya.) Untuk memudahkan, kami boleh menganggap rekod sebagai NULLABLE_BYTES atau COMPACT_NULLABLE_BYTES ( bergantung pada versi mesej).

Medan bertanda

Medan bertanda ialah sambungan kepada protokol Kafka yang membenarkan data pilihan dilampirkan pada mesej. Ideanya ada dua:

  1. Jika pelanggan perkhidmatan tidak memahami medan yang ditandakan, ia akan menyimpannya sebagai tidak diketahui dan mengabaikannya.
  2. Jika medan jarang digunakan, nilai lalainya boleh dilangkau daripada pemindahan.

Sebagai contoh, lihat di medan ini. Ia mempunyai taggedVersions, yang menyatakan sejak versi mana medan ini ditandakan (dalam kebanyakan kes, ia adalah versi yang sama apabila medan itu ditambahkan).

Medan bertanda terdiri daripada:

  1. Teg jenis UNSIGNED_VARINT.
  2. Data jenis COMPACT_BYTES.

Anda boleh mendapatkan butiran lanjut tentang medan berteg dalam KIP-482.

Mari kita laksanakan:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Di sini mereka diberi tajuk "tidak diketahui". Medan yang diketahui perlu dibuat di dalam strukturnya.

Struktur mesej

Struktur mesej peringkat tinggi sangat mudah. Mengikut spesifikasi:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Iaitu, mesej itu sendiri didahului oleh saiznya dalam bait. Kedua-dua mesej permintaan dan respons terdiri daripada pengepala serta-merta diikuti oleh badan. Atas sebab tertentu, ini tidak didokumenkan secara eksplisit4, tetapi anda boleh mempercayai saya? atau semak kod.

Tajuk permintaan dan respons

Pengepala permintaan wujud dalam tiga versi: 0, 1 dan 2. Ia dinyatakan dalam protokol sebagai:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

TAG_BUFFER ialah medan berteg yang dinyatakan sebelum ini.

Mari kita laksanakannya sebagai kelas data Python:

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Seperti yang anda lihat, terdapat beberapa medan berteg dalam versi 2, tiada medan yang dijangka diketahui. Jika beberapa medan teg dihantar secara tersilap kepada broker, ia akan diabaikan.

Pengepala respons wujud dalam dua versi: 0 dan 1. Ia dinyatakan dalam protokol sebagai:

./gradlew processMessages
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Mari kita juga melaksanakannya:

docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Kami tidak melaksanakan baca untuk pengepala permintaan dan tulis untuk respons. Ini adalah untuk ringkas: kami tidak akan menghantar pengepala respons dan menerima permintaan dalam contoh kami kerana kami tidak memprogramkan bahagian pelayan. Walau bagaimanapun, jika anda berminat dengan bahagian pelayan juga, anda perlu melaksanakan kedua-dua fungsi (yang sepatutnya mudah).

ID Korelasi

Perhatikan terutamanya medan correlation_id dalam pengepala permintaan dan respons. Protokol menyokong saluran paip: pelanggan boleh mempunyai lebih daripada satu permintaan tertunggak bagi setiap sambungan. ID korelasi membenarkannya memadankan respons kepada permintaan.

Pemilihan versi pengepala

Versi yang mesti digunakan ialah fungsi kunci API dan versi mesej. Pada masa ini ia tidak didokumenkan dalam panduan protokol secara eksplisit5.
Gunakan fungsi requestHeaderVersion dan responseHeaderVersion dalam kelas yang dijana ApiMessageType sebagai rujukan.

Menghantar permintaan dan menerima respons

Sekarang, dengan semua pengetahuan dan kod ini, mari akhirnya menghantar permintaan ApiVersions dan menerima serta membaca jawapan. ApiVersions biasanya permintaan pertama yang dihantar oleh pelanggan. Tujuannya adalah untuk mencari versi dan ciri API yang disokong oleh broker. Kami melaksanakan versi terkini 3.

Dalam spesifikasi protokol, ia ditakrifkan sebagai:

docker run --rm -ti --net=host apache/kafka:3.8.0 \
  /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Jom buat kelas data:

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --create \
  --topic test-topic1 --partitions 2
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Dan jawapannya:

def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes:
    value = buffer.read(num_bytes)
    if len(value) != num_bytes:
        raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}")
    return value
Salin selepas log masuk
Salin selepas log masuk

[api_keys] bermaksud "susunan api_keys", dengan api_keys ialah struktur yang ditakrifkan dua baris di bawah.

Menukar ini kepada kelas data Python:

def read_int8(buffer: BinaryIO) -> int:
    return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
Salin selepas log masuk
Salin selepas log masuk

Apabila kita bercakap tentang tatasusunan, kita perlu tahu sama ada kita memerlukan tatasusunan yang padat atau tidak padat. Untuk mengetahui perkara ini, mari kita lihat definisi skema dalam ApiVersionsRequest.json. Anda boleh melihat "flexibleVersions": "3", yang bermaksud tatasusunan padat digunakan bermula dari versi 3 (lebih lanjut mengenai ini dalam README.md dalam direktori skema). Memandangkan kami sedang bekerja dengan versi 3 di sini, kami menggunakan tatasusunan padat.

Setelah kelas permintaan dan respons dilaksanakan, kami boleh menghantar dan menerima permintaan ini. Untuk ApiVersions v3 ini kita memerlukan pengepala permintaan v2 dan pengepala respons v0 (semak ApiMessageType.java yang dijana). Kunci API (18) boleh anda temui dalam ApiVersionsRequest.json atau dalam spesifikasi protokol.

git clone git@github.com:apache/kafka.git
git checkout 3.8.0
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Jika anda menjalankan kod ini, anda akan melihat pengepala respons dan mesej dicetak dalam konsol. Tahniah, anda telah melakukan pertukaran rangkaian yang betul dengan broker Kafka!

Anda akan melihat tiga medan bertanda diletakkan dalam _unknownTaggedFields. Kaedah baca dan tulis kelas ApiVersionsResponseData yang dihasilkan dan juga definisi mesej dalam ApiVersionsResponse.json akan membantu anda mentafsirnya. Pertimbangkan kerja rumah ini ?


  1. Dalam pekerjaan saya, kami membangunkan perpustakaan sumber terbuka Kio. Ia membolehkan kami melakukan panggilan API Kafka sewenang-wenangnya daripada Python dengan mudah. Kod bersiri/deserialisasi, seperti dalam Kafka sendiri, dihasilkan daripada takrifan protokol JSON. Kod yang dijana diuji dengan teliti, termasuk ujian sifat terhadap kod Java Kafka sebenar. ↩

  2. Atau "mesej" jika anda suka: sesetengah skema bukan untuk API, tetapi mis. untuk data pada cakera. ↩

  3. Fungsi read_exact mempunyai kelemahan iaitu ia menduplikasi data apabila penimbal asas sudah berada dalam ingatan. Walau bagaimanapun, ia lebih mudah untuk tujuan pendidikan. ↩

  4. Saya membuat PR untuk membetulkan perkara ini. ↩

  5. Sekali lagi, saya membuat PR untuk membetulkan perkara ini. ↩

Atas ialah kandungan terperinci Panduan praktikal protokol Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Artikel terbaru oleh pengarang
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan