Apache Kafka は、パブリッシュ/サブスクライブ モデルに基づくメッセージ キュー システムです。信頼性が高く、効率的でスケーラブルなメッセージ配信メカニズムを提供し、ビッグ データ、リアルタイム データ ストリーム処理、ログ収集などの分野で広く使用されています。 。 Go 言語は、高速で分散型の同時実行プログラミング言語であり、同時実行性の高いシナリオでのメッセージの受け渡しや処理の処理に当然適しています。この記事では、Go でのメッセージングに Apache Kafka を使用する方法を、完全なガイドとコード例とともに説明します。
ステップ 1: Apache Kafka のインストールと構成
まず、Apache Kafka をインストールして構成する必要があります。最新の Kafka バージョンを公式 Web サイトからダウンロードして解凍し、Kafka サーバーを起動します:
$ tar -xzf kafka_2.13-2.8.0.tgz $ cd kafka_2.13-2.8.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties
次に、Kafka サーバーを起動します:
$ bin/kafka-server-start.sh config/server.properties
次に、Kafka トピックを作成する必要があります ( topic) を使用するには、メッセージの保存と配信に使用します。
$ bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
このコマンドは、「my_topic」という名前のトピックを作成し、ローカル ノード上にレプリケーション ファクタとパーティションを構成します。
ステップ 2: Kafka Go ライブラリの導入とインストール
Go 言語で Kafka を使用するには、サードパーティの Kafka Go ライブラリを導入する必要があります。現在、Go 言語は公式には Kafka 関連の標準ライブラリを提供していませんが、コミュニティ内のサードパーティ ライブラリはすでに非常に成熟しており、安定しています。
この記事では、sarama ライブラリを使用します。次のコマンドを使用してインストールできます。
$ go get github.com/Shopify/sarama
ここでは、sarama パッケージを導入し、メッセージ パッシングに 2 つの API (プロデューサーとコンシューマー) を使用する必要があります。
ステップ 3: プロデューサー API を使用してメッセージを送信する
Kafka プロデューサー API を使用して Go 言語でメッセージを送信するのは非常に簡単です。まず、Kafka プロデューサー オブジェクトを作成する必要があります。
import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() }
ここでは、sarama パッケージの NewSyncProducer() 関数を使用して同期プロデューサー オブジェクトを作成し、Kafka サーバーのアドレスと構成情報を指定します。作成が成功したら、defer ステートメントを使用して、プログラムの終了後にプロデューサー オブジェクトが確実に閉じられるようにする必要があります。
次に、Produce() 関数を使用して、Kafka トピックにメッセージを送信できます。
msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset)
ここでは、まず sarama.ProducerMessage オブジェクトを作成し、トピック名とメッセージの内容を設定します。次に、プロデューサー オブジェクトの SendMessage() 関数を使用して、メッセージがターゲット トピックに送信されます。
ステップ 4: コンシューマ API を使用してトピックからメッセージを受信する
Kafka コンシューマ API を使用して Go 言語でメッセージを受信することも非常に簡単です。まず、Kafka コンシューマー オブジェクトを作成する必要があります。
config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close()
ここでは、sarama パッケージの NewConsumer() 関数を使用してコンシューマー オブジェクトを作成し、Kafka サーバーとの接続を確立します。作成が成功したら、defer ステートメントを使用して、プログラムの終了後にコンシューマ オブジェクトが確実に閉じられるようにする必要があります。
次に、ConsumePartition() 関数を使用して特定のトピックとパーティションをサブスクライブし、メッセージの開始オフセットを設定します。この関数は PartitionConsumer オブジェクトを返します。defer ステートメントを使用して、プログラム終了後にオブジェクトが確実に閉じられるようにする必要があります。
最後に、for ループで Consumer.Messages() 関数を使用してメッセージを取得し、処理します。
for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } }
ここでは、Messages() 関数を使用してメッセージを取得します。 PartitionConsumer オブジェクトから取得し、for ループを使用してそれを処理します。 Kafka は高度な同時メッセージング システムであるため、複数のチャネルからのメッセージ通知を処理するには select ステートメントを使用する必要があります。メッセージを処理した後、Ack() 関数を使用して、メッセージが消費されたことを手動で確認する必要があることに注意してください。
完全なコード例
package main import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset) consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } } }
概要
この記事では、Go 言語でのメッセージングに Apache Kafka を使用する方法を紹介し、完全なインストールと構成を提供し、依存ライブラリを紹介します。そしてコードの実装。 Kafka は、ビッグ データ、リアルタイム データ ストリーム処理、ログ収集、その他のシナリオで広く使用されている効率的で信頼性の高いメッセージング システムです。 Kafka を使用する場合は、メッセージ消費の完了を手動で確認する、複数のチャネルからのメッセージ通知を処理するなど、いくつかの重要な点に注意する必要があります。この記事が、Kafka と Go 言語を使用して同時実行性の高い分散プログラムを作成する際に役立つことを願っています。
以上がGo での Apache Kafka の使用: 完全ガイドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。