Golang と Kafka を一緒に使用する方法
Kafka は、ビッグ データ アプリケーションでリアルタイム データ ストリーム処理アプリケーションを構築するためによく使用されるオープン ソースの分散メッセージ キューです。 Golang は Google によって開発されたプログラミング言語であり、その効率的な同時実行性、強力なライブラリとエコシステムで知られています。では、Golang を使用して Kafka と組み合わせるにはどうすればよいでしょうか?
まず、github.com/Shopify/sarama パッケージをインポートする必要があります。これは、Kafka をサポートする Golang クライアント ライブラリです。インストール プロセス中に、次のコマンドを実行する必要があります:
go get github.com/Shopify/sarama
次に、プロデューサーを作成する必要があります。まず、構成を作成します。
config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true
ここでは、すべての ACK を待機し、最大 5 回まで再試行し、成功後にプロデューサーに成功メッセージを返すようにプロデューサーを設定します。
次に、プロデューサー インスタンスを作成する必要があります:
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close()
Kafka に接続するサービス エンドポイントとして Kafka ブローカー アドレスを指定する必要があります。ここではローカルの Kafka サーバーに接続しています。また、.Close()
メソッドを呼び出して、プロデューサの終了時にクリーンアップを確実に行います。
これで、Kafka トピックへのメッセージのパブリッシュを開始する準備が整いました:
msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello World!"), } part, offset, err := producer.SendMessage(msg) if err != nil { fmt.Printf("Error publishing message: %v", err) } else { fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset) }
この例では、「test」という名前のトピックにメッセージをパブリッシュします。エラーがない場合は、正常にパブリッシュされたパーティションとオフセットが出力されます。
これで、Kafka にメッセージを発行するプロデューサーが作成されました。次に、コンシューマを作成する方法を見てみましょう。
最初に、コンシューマ設定を作成する必要があります:
config := sarama.NewConfig() config.Consumer.Return.Errors = true
ここで受信エラーを設定します。
次に、コンシューマ インスタンスを作成する必要があります:
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer consumer.Close()
ここでは、Kafka ブローカー アドレスも指定します。また、コンシューマーが終了時に確実にクリーンアップするように、.Close()
メソッドを呼び出す必要があります。
これで、Kafka トピックからメッセージを読み取る準備が整いました。
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) if err != nil { panic(err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) case err := <-partitionConsumer.Errors(): fmt.Println("Error: ", err.Error()) } }
この例では、「test」という名前のトピックをサブスクライブします。次に、最初のパーティションのオフセットを読み取ります。次に、ループ内でそのパーティションからメッセージを無限に読み取ります。ループ内の select
ステートメントは、常にメッセージ チャネルとエラー チャネルを監視し、それぞれを出力します。
ここまで、Golang と Kafka を組み合わせて使用する方法を紹介しました。この簡単な例で、Golang と Kafka の基本的な使い方をマスターしたはずです。
以上がGolang と Kafka を一緒に使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











この記事では、Goのパッケージインポートメカニズム:名前付きインポート(例:インポート "fmt&quot;)および空白のインポート(例:_&quot; fmt&quot;)について説明しています。 名前付きインポートはパッケージのコンテンツにアクセス可能になり、空白のインポートはtのみを実行します

この記事では、Webアプリケーションでのページ間データ転送のためのBeegoのnewflash()関数について説明します。 newflash()を使用して、コントローラー間で一時的なメッセージ(成功、エラー、警告)を表示し、セッションメカニズムを活用することに焦点を当てています。 リミア

この記事では、MySQLクエリの結果をGO structスライスに効率的に変換することを詳しく説明しています。 データベース/SQLのスキャン方法を使用して、手動で解析することを避けて強調しています。 DBタグとロブを使用した構造フィールドマッピングのベストプラクティス

この記事では、ユニットテストのためにGOのモックとスタブを作成することを示しています。 インターフェイスの使用を強調し、模擬実装の例を提供し、模擬フォーカスを維持し、アサーションライブラリを使用するなどのベストプラクティスについて説明します。 articl

この記事では、GENICSのGOのカスタムタイプの制約について説明します。 インターフェイスがジェネリック関数の最小タイプ要件をどのように定義するかを詳しく説明し、タイプの安全性とコードの再利用性を改善します。 この記事では、制限とベストプラクティスについても説明しています

この記事では、goで効率的なファイルの書き込みを詳しく説明し、os.writefile(小さなファイルに適している)とos.openfileおよびbuffered write(大規模ファイルに最適)と比較します。 延期エラー処理、Deferを使用し、特定のエラーをチェックすることを強調します。

この記事では、GOでユニットテストを書くことで、ベストプラクティス、モッキングテクニック、効率的なテスト管理のためのツールについて説明します。

この記事では、トレースツールを使用してGOアプリケーションの実行フローを分析します。 手動および自動計装技術について説明し、Jaeger、Zipkin、Opentelemetryなどのツールを比較し、効果的なデータの視覚化を強調しています
