'Invalid replication factor' in Convergence Kafka Go client

王林
Release: 2024-02-14 10:30:09
forward
873 people have browsed it

汇合 Kafka Go 客户端中的“无效复制因子”

php editor Xigua today brings you an article about the "invalid replication factor" in the Kafka Go client. Kafka is a high-performance, scalable distributed stream processing platform, while Go is a concise and efficient programming language. This article will focus on the "invalid replication factor" problem that occurs in the Kafka Go client, explore its causes and solutions, and help readers better understand and deal with this common technical challenge. Invalid replication factors can lead to data inconsistencies and performance degradation, so it is important for Kafka users to understand how to deal with this issue. Let’s explore in depth together!

Question content

I am new to Kafka and trying to start my project. I have this

in my docker-compose.yml
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Copy after login

Then I run my main.go file with producers and consumers and some mock topics.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)



func main()  {
    topic := "HVSE"
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "client.id": "foo",
        "acks": "all",
    })
    go func() {
        consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":    "localhost:9092",
            "group.id":             "foo",
            "auto.offset.reset":    "smallest",
        })

        if err != nil {
            log.Fatal(err)
        }

        err = consumer.Subscribe(topic, nil)

        if err != nil {
            log.Fatal(err)
        }

        for {
                ev := consumer.Poll(100)
                // fmt.Println(ev)
                switch e := ev.(type) {
                case *kafka.Message:
                        fmt.Printf("consumed message from queue: %s\n", string(e.Value))
                case *kafka.Error:
                        fmt.Printf("%v\n", e)
                        // return
                // default:
                //      fmt.Printf("Ignored %v\n", e)
                }
        }
    }()

    deliverch := make(chan kafka.Event, 10000)
    for {
        err = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value: []byte("FOO"),
        },
            deliverch,
        )
        if err != nil {
            log.Fatal(err)
        }
    
        <- deliverch
        time.Sleep(time.Second * 1)
    }
}
Copy after login

If I uncomment the default value, I get into it.

Otherwise I get this error in the console.

2023/09/26 13:45:05 Broker: Invalid replication factor
exit status 1
Copy after login

My kafka and Zookeeper containers are running.

I changed the docker-compose.yml file but that didn't help. I found that my consumer.Events() is nil but I don't understand why this is happening

WORKAROUND

I copied your code and it is correct but in compose.yml The problem only adjusted ADVERTISED_LISTENERS

instead of this:

KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
Copy after login

use this:

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
Copy after login

The above is the detailed content of 'Invalid replication factor' in Convergence Kafka Go client. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template