最新の Web アプリケーションでは、効率的なメッセージングは非常に重要な部分です。メッセージ キューは、異なるシステム間でメッセージを非同期配信するためのソリューションであり、データ配信と処理効率を最適化できます。 Go 言語では、Beego フレームワークは、Web アプリケーションと API の開発をサポートする非常に人気のある Web フレームワークです。この記事では、効率的なメッセージ配信のために、Beego で kafka を使用してメッセージ キューを実装する方法を検討します。
1. Kafka の概要
Kafka は、分散型のパーティション分割されたマルチコピー メッセージ キュー システムで、元々は LinkedIn によって開発され、後に Apache Software Foundation によって保守されました。 Kafka は主に、大量のリアルタイム データを処理し、高スループットのメッセージングをサポートし、複数のコンシューマーとプロデューサーにわたるさまざまなアプリケーションをサポートするために使用されます。
kafka の中核となる概念は、トピック、パーティション、オフセットです。トピックとはメッセージの分類を指し、各メッセージは特定のトピックに属します。パーティションはトピックのサブセットであり、各パーティションは順序付けされた不変のメッセージ キューです。各パーティションを複数のサーバー間で複製して、同じパーティションを同時に処理する複数のコンシューマをサポートできます。オフセットは、各メッセージを一意に識別する値です。コンシューマは、メッセージの読み取りを開始する特定のオフセットを指定できます。
2. Beego で Kafka を使用する
kafka のインストールは非常に簡単で、公式 Web サイトから圧縮パッケージをダウンロードするだけです。 kafka を解凍して、指定したディレクトリに移動するだけです。この例では、kafka_2.12-2.3.0 バージョンを使用します。
kafka の使用を開始する前に、新しいトピックとパーティションを作成する必要があります。 Kafka 独自の管理ツール (kafka-topics.sh) を使用して、トピックとパーティションを作成できます。コマンド ラインで次のコマンドを実行します。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
このコマンドは、パーティションが 1 つだけあり、バックアップ番号が 1 の「test」という名前のトピックを作成します。必要に応じてパーティションとバックアップの数を変更できます。
Kafka プロデューサーを作成する手順は次のとおりです:
package main import ( "github.com/Shopify/sarama" ) func main() { // 设置kafka配置 config := sarama.NewConfig() config.Producer.Return.Successes = true // 新建生产者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } // 构造消息 message := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("test message"), } // 发送消息 _, _, err = producer.SendMessage(message) if err != nil { panic(err) } producer.Close() }
その中で、sarama は Go 言語クライアント ライブラリです。 Kafkaクラスターに接続して操作します。上記のコードでは、新しい SyncProducer オブジェクトを作成し、「test」トピックにメッセージを送信します。
kafka コンシューマを作成する手順は次のとおりです。
package main import ( "fmt" "github.com/Shopify/sarama" "log" "os" "os/signal" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true // 新建一个消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } // 准备订阅话题 topic := "test" partitionList, err := consumer.Partitions(topic) if err != nil { panic(err) } // 启动goroutine处理消息 for _, partition := range partitionList { // 构造一个partitionConsumer pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) if err != nil { panic(err) } go func(partitionConsumer sarama.PartitionConsumer) { defer func() { // 关闭consumer if err := partitionConsumer.Close(); err != nil { log.Fatalln(err) } }() for msg := range partitionConsumer.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%s Value:%s ", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } // 处理中断信号 sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, os.Interrupt) <-sigterm fmt.Println("Shutdown") consumer.Close() }
上記のコードは、新しいコンシューマ オブジェクトを作成し、サブスクライブします。それは「テスト」トピックです。次に、複数のゴルーチンが開始され、異なるパーティションからのメッセージを同時に処理します。メッセージが処理された後、Close() メソッドが呼び出されてコンシューマを閉じます。
3. 概要
この記事では、kafka を使用して Beego にメッセージ キューを実装する方法を紹介しました。これは、高スループットのデータを処理する必要がある Web アプリケーションに役立ちます。 Kafka を使用すると、複数のコンシューマーとプロデューサーの間でメッセージを非同期に配信し、データ転送と処理効率を最大化できます。 Beego アプリケーションを開発していて、効率的なメッセージングが必要な場合は、Kafka が最適な選択肢です。
以上がBeegoでkafkaを使用してメッセージキューを実装するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。