Apache Kafka는 게시-구독 모델을 기반으로 하는 메시지 대기열 시스템으로, 안정적이고 효율적이며 확장 가능한 메시지 전달 메커니즘을 제공하며 빅데이터, 실시간 데이터 스트림 처리, 로그 수집 및 기타 분야에서 널리 사용됩니다. Go 언어는 빠르고 분산된 동시 프로그래밍 언어로, 동시성이 높은 시나리오에서 메시지 전달 및 처리를 처리하는 데 적합합니다. 이 기사에서는 전체 가이드 및 코드 예제와 함께 Go에서 메시징을 위해 Apache Kafka를 사용하는 방법을 다룹니다.
1단계: Apache Kafka 설치 및 구성
먼저 Apache Kafka를 설치하고 구성해야 합니다. 공식 웹사이트에서 최신 Kafka 버전을 다운로드하고 압축을 풀고 Kafka 서버를 시작할 수 있습니다:
$ tar -xzf kafka_2.13-2.8.0.tgz $ cd kafka_2.13-2.8.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties
그런 다음 Kafka 서버를 시작합니다:
$ bin/kafka-server-start.sh config/server.properties
다음으로 메시지 저장 및 전달을 위한 Kafka 주제를 생성해야 합니다:
$ bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
이 명령은 "my_topic"이라는 주제를 생성하고 로컬 노드에 복제 요소와 1개의 파티션을 구성합니다.
2단계: Kafka Go 라이브러리 소개 및 설치
Kafka를 Go 언어로 사용하려면 타사 Kafka Go 라이브러리를 도입해야 합니다. 현재 Go 언어는 공식적으로 Kafka 관련 표준 라이브러리를 제공하지 않지만 커뮤니티의 타사 라이브러리는 이미 매우 성숙하고 안정적입니다.
이 글에서는 사라마 라이브러리를 사용하겠습니다. 다음 명령을 사용하여 설치할 수 있습니다.
$ go get github.com/Shopify/sarama
여기서 sarama 패키지를 소개하고 메시지 전달을 위해 생산자 및 소비자 API를 사용해야 합니다.
3단계: 생산자 API를 사용하여 메시지 보내기
Kafka 생산자 API를 사용하여 Go 언어로 메시지를 보내는 것은 매우 간단합니다. 먼저 Kafka 생산자 개체를 생성해야 합니다.
import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() }
여기에서는 sarama 패키지의 NewSyncProducer() 함수를 사용하여 동기 생산자 개체를 생성하고 Kafka 서버의 주소 및 구성 정보를 지정합니다. 생성이 성공한 후에는 defer 문을 사용하여 프로그램 종료 후 생산자 개체가 닫히는지 확인해야 합니다.
다음으로 Produce() 함수를 사용하여 Kafka 주제에 메시지를 보낼 수 있습니다.
msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset)
여기서 먼저 sarama.ProducerMessage 개체를 만들고 주제 이름과 메시지 내용을 설정한 다음 생산자 개체의 SendMessage( )를 사용합니다. 함수는 대상 주제에 메시지를 보냅니다.
4단계: 소비자 API를 사용하여 주제로부터 메시지 수신
Kafka 소비자 API를 사용하여 Go 언어로 메시지를 수신하는 것도 매우 간단합니다. 먼저 Kafka 소비자 객체를 생성해야 합니다.
config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close()
여기에서는 sarama 패키지의 NewConsumer() 함수를 사용하여 소비자 객체를 생성하고 Kafka 서버와 연결을 설정합니다. 성공적으로 생성된 후에는 defer 문을 사용하여 프로그램 종료 후 소비자 개체가 닫히도록 해야 합니다.
다음으로 ConsumePartition() 함수를 사용하여 특정 주제와 파티션을 구독하고 메시지의 시작 오프셋을 설정합니다. 이 함수는 PartitionConsumer 개체를 반환하므로, 프로그램이 끝난 후 해당 개체가 닫히도록 defer 문을 사용해야 합니다.
마지막으로 for 루프에서 Consumer.Messages() 함수를 사용하여 메시지를 가져와 처리할 수 있습니다.
for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } }
여기에서는 Messages() 함수를 사용하여 PartitionConsumer 개체에서 메시지를 가져온 다음 다음을 사용합니다. for 루프를 사용하여 처리합니다. Kafka는 동시 메시징 시스템이므로 여러 채널의 메시지 알림을 처리하려면 select 문을 사용해야 합니다. 메시지를 처리한 후에는 Ack() 함수를 사용하여 메시지가 소비되었는지 수동으로 확인해야 합니다.
전체 코드 예제
package main import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset) consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } } }
요약
이 기사에서는 Go 언어 메시징에 Apache Kafka를 사용하는 방법을 소개하고 전체 설치, 구성, 종속 라이브러리 소개 및 코드 구현을 제공합니다. Kafka는 빅 데이터, 실시간 데이터 스트림 처리, 로그 수집 및 기타 시나리오에서 널리 사용되는 효율적이고 안정적인 메시징 시스템입니다. Kafka를 사용할 때 메시지 소비 완료를 수동으로 확인하고 여러 채널의 메시지 알림을 처리하는 등 몇 가지 핵심 사항에 주의해야 합니다. 이 글이 Kafka와 Go 언어를 사용하여 동시성이 높은 분산 프로그램을 작성하는 데 도움이 되기를 바랍니다.
위 내용은 Go에서 Apache Kafka 사용: 전체 가이드의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!