Apabila seni bina aplikasi peringkat perusahaan menjadi semakin kompleks, pemesejan telah menjadi komponen penting. Ini adalah apabila Kafka menjadi tumpuan. Kafka ialah baris gilir mesej edaran yang cekap dan boleh dipercayai yang menyokong penerbitan dan langganan mesej Ia adalah sistem pemesejan peringkat perusahaan moden dengan daya pemprosesan yang sangat tinggi dan kependaman yang rendah. Dalam API Kafka, walaupun pelanggan rasmi menyediakan berbilang bahasa, Golang telah semakin digunakan secara meluas sejak beberapa tahun kebelakangan ini, jadi artikel ini menggunakan Golang sebagai bahasa pelaksanaan untuk menerangkan cara menggunakan Golang untuk melaksanakan Kafka.
1. Ketergantungan
Sebelum bermula, anda perlu memuat turun kebergantungan yang diperlukan:
Kaedah penggunaan khusus adalah seperti berikut:
go get github.com/Shopify/sarama
go get github.com / pkg/errors
2 Buat pengeluar
Sebelum memperkenalkan API Kafka, anda perlu mencipta contoh pengeluar terlebih dahulu. Kod pengeluar adalah seperti berikut:
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
Kod terutamanya melakukan perkara berikut:
3. Buat pengguna
Kedua, anda perlu mencipta contoh pengguna. Kod pengguna adalah seperti berikut:
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
Kod terutamanya melakukan perkara berikut:
4. Ringkasan
Di atas, kami menggunakan Golang untuk melaksanakan bahagian pengeluar dan pengguna Kafka Sebagai salah satu komponen penting dalam merealisasikan sistem teragih, Kafka boleh menyelesaikan mesej The sistem mempunyai masalah dalam persekitaran selaras dan teragih yang tinggi, dan Kafka juga mempunyai dokumentasi sokongan yang baik dan komuniti yang stabil, menjadikannya bebas tekanan untuk digunakan dalam pembangunan sebenar.
Atas ialah kandungan terperinci Laksanakan kafka dengan golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!