Jika anda perlu mengetahui asas Kafka, seperti ciri utama, komponen dan kelebihannya, saya ada artikel yang membincangkannya di sini. Sila semak dan ikuti langkah-langkah sehingga anda menyelesaikan pemasangan Kafka menggunakan Docker untuk meneruskan bahagian berikut.
Sama seperti contoh dalam artikel tentang menyambungkan Kafka dengan NodeJS, kod sumber ini juga termasuk dua bahagian: memulakan pengeluar untuk menghantar mesej ke Kafka dan menggunakan pengguna untuk melanggan mesej daripada topik.
Saya akan memecahkan kod kepada bahagian yang lebih kecil untuk pemahaman yang lebih baik. Mula-mula, mari kita takrifkan nilai pembolehubah.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
- Di sini, pakej github.com/confluentinc/confluent-kafka-go/kafka digunakan untuk menyambung ke Kafka.
- broker ialah alamat hos; jika anda menggunakan ZooKeeper, gantikan alamat hos dengan sewajarnya.
- groupId dan topik boleh ditukar mengikut keperluan.
Seterusnya ialah memulakan penerbit.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
Kod di atas digunakan untuk menghantar tatasusunan mesej {"mesej 1", "mesej 2", "mesej 3"} kepada topik dan menggunakan pergi rutin untuk mengulangi acara dengan untuk e := julat p.Events() dan mencetak hasil penghantaran, sama ada ia berjaya atau gagal.
Seterusnya ialah mencipta pengguna untuk melanggan topik dan menerima mesej.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Akhir sekali, kerana ini adalah contoh mudah, panggil fungsi untuk mencipta pengeluar dan pengguna untuk digunakan. Dalam senario dunia sebenar, penggunaan pengeluar dan pengguna biasanya dilakukan pada dua pelayan berbeza dalam sistem perkhidmatan mikro.
func main() { startProducer() startConsumer() }
Selamat mengekod!
Jika anda mendapati kandungan ini membantu, sila lawati artikel asal di blog saya untuk menyokong pengarang dan meneroka kandungan yang lebih menarik.
Sesetengah siri yang mungkin anda rasa menarik:
Atas ialah kandungan terperinci Sambung Kafka dengan Golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!