As enterprise-level application architectures become increasingly complex, message transmission has become a crucial component. This is when Kafka comes to the fore. Kafka is an efficient and reliable distributed message queue that supports message publishing and subscription. It is a modern enterprise-level messaging system with very high throughput and low latency. In Kafka's API, although the official client provides multiple languages, Golang has become more and more widely used in recent years, so this article uses Golang as the implementation language to explain how to use Golang to implement Kafka.
1. Dependencies
Before you start, you need to download the required dependencies:
The specific usage method is as follows:
go get github.com/Shopify/sarama
go get github.com/ pkg/errors
2. Create a producer
Before introducing Kafka’s API, you need to create a producer instance first. The code of the producer is as follows:
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
The code mainly does the following things:
3. Create a consumer
Secondly, you need to create a consumer instance. The consumer code is as follows:
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
The code mainly does the following things:
4. Summary
Above, we used Golang to implement the producer and consumer parts of Kafka. As one of the important components of realizing a distributed system, Kafka can solve messages The system has problems in high concurrency and distributed environments, and Kafka also has good support documentation and a stable community, making it stress-free to apply in actual development.
The above is the detailed content of Implement kafka with golang. For more information, please follow other related articles on the PHP Chinese website!