ホームページ > バックエンド開発 > Golang > Go で Kafka プロデューサーとコンシューマーを構築する

Go で Kafka プロデューサーとコンシューマーを構築する

Barbara Streisand
リリース: 2025-01-03 19:48:43
オリジナル
824 人が閲覧しました

Building a Kafka Producer and Consumer in Go

Apache Kafka は、リアルタイム データ パイプラインとストリーミング アプリケーションの構築に使用される強力な分散ストリーミング プラットフォームです。このブログ投稿では、Golang を使用した Kafka プロデューサーとコンシューマーのセットアップについて説明します。

前提条件

始める前に、次のものがマシンにインストールされていることを確認してください:

  • 進む (1.16 以降)

  • Docker (ローカルで Kafka を実行するため)

  • カフカ

Docker を使用した Kafka のセットアップ

Kafka をすばやくセットアップするには、Docker を使用します。プロジェクト ディレクトリに docker-compose.yml ファイルを作成します:

yamlCopy codeversion: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper
ログイン後にコピー

次のコマンドを実行して、Kafka と Zookeeper を起動します。

docker-compose up -d
ログイン後にコピー

Go で Kafka プロデューサーを作成する

まず、新しい Go モジュールを初期化します。

go mod init kafka-example
ログイン後にコピー

kafka-go ライブラリをインストールします:

go get github.com/segmentio/kafka-go
ログイン後にコピー

ここで、Producer.go ファイルを作成し、次のコードを追加します。

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "log"
    "time"
)

func main() {
    writer := kafka.Writer{
        Addr:     kafka.TCP("localhost:9092"),
        Topic:    "example-topic",
        Balancer: &kafka.LeastBytes{},
    }

    defer writer.Close()

    for i := 0; i < 10; i++ {
        msg := kafka.Message{
            Key:   []byte(fmt.Sprintf("Key-%d", i)),
            Value: []byte(fmt.Sprintf("Hello Kafka %d", i)),
        }

        err := writer.WriteMessages(context.Background(), msg)
        if err != nil {
            log.Fatal("could not write message " + err.Error())
        }

        time.Sleep(1 * time.Second)
        fmt.Printf("Produced message: %s\n", msg.Value)
    }
}
ログイン後にコピー

このコードは、example-topic トピックに 10 個のメッセージを送信する Kafka プロデューサを設定します。

プロデューサーを実行します:

go run producer.go
ログイン後にコピー

メッセージが生成されたことを示す出力が表示されるはずです。

Go で Kafka コンシューマを作成する

ファイル Consumer.go を作成し、次のコードを追加します:

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "log"
)

func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "example-topic",
        GroupID: "example-group",
    })

    defer reader.Close()

    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Fatal("could not read message " + err.Error())
        }
        fmt.Printf("Consumed message: %s\n", msg.Value)
    }
}
ログイン後にコピー

このコンシューマーは、example-topic トピックからメッセージを読み取り、コンソールに出力します。

コンシューマを実行します:

go run consumer.go
ログイン後にコピー

メッセージが消費されたことを示す出力が表示されるはずです。

結論

このブログ投稿では、Golang を使用して Kafka プロデューサーとコンシューマーを設定する方法を説明しました。この簡単な例は、メッセージの生成と消費の基本を示していますが、Kafka の機能はこれをはるかに超えています。 Kafka を使用すると、堅牢でスケーラブルなリアルタイム データ処理システムを構築できます。

メッセージの分割、キーベースのメッセージ配信、他のシステムとの統合など、より高度な機能を自由に探索してください。コーディングを楽しんでください!


それだけです!このブログ投稿では、Go で Kafka を使用するための簡潔な紹介を提供します。これは、リアルタイム データ処理を始めようとしている開発者に最適です。

以上がGo で Kafka プロデューサーとコンシューマーを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート