Apache Kafka ialah sistem baris gilir mesej berdasarkan model publish-subscribe. Ia menyediakan mekanisme penghantaran mesej yang boleh dipercayai, cekap dan berskala serta digunakan secara meluas dalam data besar, pemprosesan aliran data masa nyata, pengumpulan log dan medan lain. . Bahasa Go ialah bahasa pengaturcaraan yang pantas, teragih dan serentak Ia sememangnya sesuai untuk mengendalikan penghantaran dan pemprosesan mesej dalam senario konkurensi tinggi. Dalam artikel ini, kami akan membincangkan cara menggunakan Apache Kafka untuk pemesejan dalam Go, dengan panduan lengkap dan contoh kod.
Langkah Pertama: Pasang dan Konfigurasikan Apache Kafka
Mula-mula, kita perlu memasang dan mengkonfigurasi Apache Kafka. Anda boleh memuat turun versi Kafka terkini dari laman web rasmi, unzip dan mulakan pelayan Kafka:
$ tar -xzf kafka_2.13-2.8.0.tgz $ cd kafka_2.13-2.8.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties
Kemudian mulakan pelayan Kafka:
$ bin/kafka-server-start.sh config/server.properties
Seterusnya, kita perlu mencipta topik Kafka ( topik), gunakan Untuk menyimpan dan menghantar mesej:
$ bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Arahan ini akan mencipta topik bernama "my_topic" dan mengkonfigurasi faktor replikasi dan partition pada nod setempat.
Langkah 2: Perkenalkan dan pasang pustaka Kafka Go
Untuk menggunakan Kafka dalam bahasa Go, kami perlu memperkenalkan pustaka Kafka Go pihak ketiga. Pada masa ini, bahasa Go secara rasmi tidak menyediakan perpustakaan standard berkaitan Kafka, tetapi perpustakaan pihak ketiga dalam komuniti sudah sangat matang dan stabil.
Dalam artikel ini, kami akan menggunakan perpustakaan sarama. Anda boleh menggunakan arahan berikut untuk memasang:
$ go get github.com/Shopify/sarama
Di sini kita perlu memperkenalkan pakej sarama dan menggunakan API pengeluar dan pengguna untuk penghantaran mesej.
Langkah 3: Gunakan API pengeluar untuk menghantar mesej
Adalah sangat mudah untuk menggunakan API pengeluar Kafka untuk menghantar mesej dalam bahasa Go. Mula-mula, kita perlu mencipta objek pengeluar Kafka:
import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() }
Di sini, kami menggunakan fungsi NewSyncProducer() dalam pakej sarama untuk mencipta objek pengeluar segerak dan nyatakan alamat dan maklumat konfigurasi pelayan Kafka . Selepas penciptaan berjaya, anda perlu menggunakan pernyataan tangguh untuk memastikan objek pengeluar ditutup selepas program tamat.
Seterusnya, kita boleh menggunakan fungsi Produce() untuk menghantar mesej ke topik Kafka:
msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset)
Di sini, mula-mula buat objek sarama.ProducerMessage, tetapkan nama topik dan kandungan mesej, The mesej kemudiannya dihantar ke topik sasaran menggunakan fungsi SendMessage() objek pengeluar.
Langkah 4: Gunakan API pengguna untuk menerima mesej daripada topik
Adalah sangat mudah untuk menggunakan API pengguna Kafka untuk menerima mesej dalam bahasa Go. Pertama, kita perlu mencipta objek pengguna Kafka:
config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close()
Di sini, kami menggunakan fungsi NewConsumer() dalam pakej sarama untuk mencipta objek pengguna dan mewujudkan sambungan dengan pelayan Kafka. Selepas penciptaan berjaya, anda perlu menggunakan pernyataan tangguh untuk memastikan objek pengguna ditutup selepas program tamat.
Seterusnya, kami menggunakan fungsi ConsumePartition() untuk melanggan topik dan partition tertentu dan menetapkan offset permulaan mesej. Fungsi ini mengembalikan objek PartitionConsumer, kita perlu menggunakan pernyataan tangguh untuk memastikan ia ditutup selepas program tamat.
Akhir sekali, kita boleh menggunakan fungsi Consumer.Messages() dalam gelung for untuk mendapatkan mesej dan memprosesnya:
for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } }
Di sini, kami menggunakan fungsi Messages() untuk mendapatkan mesej daripada objek PartitionConsumer , dan kemudian gunakan gelung for untuk memprosesnya. Oleh kerana Kafka ialah sistem pemesejan yang sangat serentak, anda perlu menggunakan penyataan terpilih untuk mengendalikan pemberitahuan mesej daripada berbilang saluran. Ambil perhatian bahawa selepas memproses mesej, anda perlu menggunakan fungsi Ack() untuk mengesahkan secara manual bahawa mesej telah digunakan.
Contoh kod penuh
package main import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset) consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } } }
Ringkasan
Dalam artikel ini, kami memperkenalkan cara menggunakan Apache Kafka untuk pemesejan dalam bahasa Go, dan menyediakan pemasangan dan konfigurasi lengkap, memperkenalkan perpustakaan bergantung dan pelaksanaan kod. Kafka ialah sistem pemesejan yang cekap dan boleh dipercayai yang telah digunakan secara meluas dalam data besar, pemprosesan aliran data masa nyata, pengumpulan log dan senario lain. Apabila menggunakan Kafka, anda perlu memberi perhatian kepada beberapa perkara penting, seperti mengesahkan penyiapan penggunaan mesej secara manual, memproses pemberitahuan mesej daripada berbilang saluran, dsb. Saya harap artikel ini akan membantu anda dalam menulis program yang diedarkan bersamaan tinggi menggunakan bahasa Kafka dan Go.
Atas ialah kandungan terperinci Menggunakan Apache Kafka dalam Go: Panduan Lengkap. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!