> 백엔드 개발 > Golang > Kafka를 Golang과 연결

Kafka를 Golang과 연결

WBOY
풀어 주다: 2024-09-06 22:30:32
원래의
573명이 탐색했습니다.

소개

주요 기능, 구성 요소, 장점 등 Kafka의 기본 사항을 알고 싶다면 여기에서 해당 내용을 다루는 기사를 참조하세요. 이를 검토하고 Docker를 사용하여 Kafka 설치를 완료할 때까지 단계를 따라 다음 섹션을 진행하세요.

Connect Kafka with Golang

Golang을 사용하여 Kafka에 연결

KafkaNodeJS와 연결하는 방법에 대한 기사의 예와 유사하게 이 소스 코드에는 Producer 메시지Kafka로 보내고 소비자를 사용하여 주제. 더 나은 이해를 위해 코드를 더 작은 부분으로 나누겠습니다. 먼저 변수 값을 정의해 보겠습니다.


- 여기에서는
package main

import (
  "fmt"
  "github.com/confluentinc/confluent-kafka-go/kafka"
)

var (
  broker  = "localhost:9092"
  groupId = "group-id"
  topic   = "topic-name"
)
로그인 후 복사
github.com/confluentinc/confluent-kafka-go/kafka

패키지를 사용하여 Kafka에 연결합니다. -

브로커

는 호스트 주소입니다. ZooKeeper를 사용하는 경우 호스트 주소를 그에 맞게 바꾸세요. -

groupId

주제는 필요에 따라 변경될 수 있습니다. 다음은 Producer 초기화입니다.


위 코드는
func startProducer() {
  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
  if err != nil {
    panic(err)
  }

  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
        }
      }
    }
  }()

  for _, word := range []string{"message 1", "message 2", "message 3"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }
}
로그인 후 복사

{"message 1", "message 2", "message 3"} 메시지 배열을 주제로 보내는 데 사용되며 go-routine for e := range p.Events()를 사용하여 이벤트를 반복하고 배송 결과를 인쇄합니다. 성공 아니면 실패. 다음은

주제

구독하고 메시지를 받는 소비자를 생성하는 것입니다.

마지막으로 간단한 예제이므로
func startConsumer() {
  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": broker,
    "group.id":          groupId,
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }
  c.Subscribe(topic, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
      fmt.Printf("Consumer error: %v (%v)\n", err, msg)
      break
    }
  }

  c.Close()
}
로그인 후 복사
Producer

consumer를 생성하는 함수를 호출해 사용합니다. 실제 시나리오에서 생산자소비자 배포는 일반적으로 마이크로서비스 시스템의 서로 다른 두 서버에서 수행됩니다.

func main() {
  startProducer()
  startConsumer()
}
로그인 후 복사

Connect Kafka with Golang

즐거운 코딩하세요!


이 콘텐츠가 도움이 되셨다면 제 블로그의 원본 기사를 방문하여 작성자를 지원하고 더 흥미로운 콘텐츠를 탐색해 보세요.

Connect Kafka with GolangConnect Kafka with GolangConnect Kafka with GolangConnect Kafka with Golang Connect Kafka with Golang

흥미로울 만한 시리즈:

노드JS
  •  리액트
  • 도커 
  • 쿠버네티스

위 내용은 Kafka를 Golang과 연결의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

원천:dev.to
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿