Kafka ialah baris gilir mesej teragih sumber terbuka yang sering digunakan untuk membina aplikasi pemprosesan aliran data masa nyata dalam aplikasi data besar. Golang ialah bahasa pengaturcaraan yang dibangunkan oleh Google dan terkenal dengan kesesuaian yang cekap, perpustakaan yang berkuasa dan ekosistemnya. Jadi, bagaimana menggunakan Golang untuk digabungkan dengan Kafka?
Pertama, kita perlu mengimport pakej github.com/Shopify/sarama. Ini ialah perpustakaan pelanggan Golang yang menyokong Kafka. Semasa proses pemasangan, anda perlu menjalankan arahan berikut:
go get github.com/Shopify/sarama
Seterusnya, kita perlu mencipta pengeluar. Mula-mula, buat konfigurasi:
config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true
Di sini kami tetapkan pengeluar untuk menunggu semua ACK, cuba sehingga 5 percubaan semula dan kembalikan mesej kejayaan kepada pengeluar selepas kejayaan.
Seterusnya, kita perlu mencipta contoh pengeluar:
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close()
Kami perlu menentukan alamat broker Kafka sebagai titik akhir perkhidmatan untuk menyambung ke Kafka. Di sini kami menyambung ke pelayan Kafka tempatan. Kami juga memanggil kaedah .Close()
untuk memastikan pembersihan apabila pengeluar keluar.
Kini kami bersedia untuk mula menerbitkan mesej ke topik Kafka:
msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello World!"), } part, offset, err := producer.SendMessage(msg) if err != nil { fmt.Printf("Error publishing message: %v", err) } else { fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset) }
Dalam contoh ini, kami menerbitkan mesej kepada topik yang dinamakan "ujian". Jika tiada ralat, ia mencetak partition dan offset yang berjaya diterbitkan.
Kini kami telah mencipta penerbit yang menerbitkan mesej kepada Kafka. Seterusnya, mari kita lihat cara mencipta pengguna.
Pertama, kita perlu mencipta konfigurasi pengguna:
config := sarama.NewConfig() config.Consumer.Return.Errors = true
Di sini kami menetapkan ralat penerimaan.
Seterusnya, kami perlu mencipta contoh pengguna:
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer consumer.Close()
Di sini kami juga menyatakan alamat broker Kafka. Kita juga perlu memanggil kaedah .Close()
untuk memastikan pengguna membersihkan apabila ia keluar.
Kini kami bersedia untuk membaca mesej daripada topik Kafka:
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) if err != nil { panic(err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) case err := <-partitionConsumer.Errors(): fmt.Println("Error: ", err.Error()) } }
Dalam contoh ini, kami melanggan topik bernama "ujian". Kemudian kita membaca offset partition pertama. Kami kemudian membaca mesej daripada partition itu secara tak terhingga dalam satu gelung. Kenyataan select
dalam gelung akan sentiasa mendengar mesej dan saluran ralat dan mencetaknya masing-masing.
Setakat ini, kami telah memperkenalkan cara menggunakan Golang dan Kafka untuk digabungkan. Dengan contoh mudah ini, anda sepatutnya telah menguasai penggunaan asas Golang dan Kafka.
Atas ialah kandungan terperinci Cara menggunakan Golang dan Kafka bersama. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!