php-Editor Xigua bringt Ihnen heute einen Artikel über den „ungültigen Replikationsfaktor“ im Kafka Go-Client. Kafka ist eine leistungsstarke, skalierbare verteilte Stream-Verarbeitungsplattform, während Go eine prägnante und effiziente Programmiersprache ist. Dieser Artikel konzentriert sich auf das Problem des „ungültigen Replikationsfaktors“, das im Kafka Go-Client auftritt, untersucht seine Ursachen und Lösungen und hilft den Lesern, diese häufige technische Herausforderung besser zu verstehen und zu bewältigen. Ungültige Replikationsfaktoren können zu Dateninkonsistenzen und Leistungseinbußen führen. Daher ist es für Kafka-Benutzer wichtig zu verstehen, wie sie mit diesem Problem umgehen können. Lassen Sie uns gemeinsam in die Tiefe gehen!
Ich bin neu bei Kafka und versuche, mein Projekt zu starten. Ich habe dies in meiner docker-compose.yml
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.3.0 container_name: broker depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Dann führe ich meine main.go-Datei mit Produzenten und Konsumenten und einigen Scheinthemen aus.
package main import ( "fmt" "log" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { topic := "HVSE" p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "client.id": "foo", "acks": "all", }) go func() { consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "foo", "auto.offset.reset": "smallest", }) if err != nil { log.Fatal(err) } err = consumer.Subscribe(topic, nil) if err != nil { log.Fatal(err) } for { ev := consumer.Poll(100) // fmt.Println(ev) switch e := ev.(type) { case *kafka.Message: fmt.Printf("consumed message from queue: %s\n", string(e.Value)) case *kafka.Error: fmt.Printf("%v\n", e) // return // default: // fmt.Printf("Ignored %v\n", e) } } }() deliverch := make(chan kafka.Event, 10000) for { err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("FOO"), }, deliverch, ) if err != nil { log.Fatal(err) } <- deliverch time.Sleep(time.Second * 1) } }
Wenn ich den Standardwert auskommentiere, komme ich hinein.
Ansonsten erhalte ich diesen Fehler in der Konsole.
2023/09/26 13:45:05 Broker: Invalid replication factor exit status 1
Meine Kafka- und Zookeeper-Container laufen.
Ich habe die Datei docker-compose.yml geändert, aber es hat nicht geholfen. Ich habe festgestellt, dass mein Consumer.Events() Null ist, aber ich verstehe nicht, warum das passiert
Ich habe Ihren Code kopiert und er ist korrekt, aber das Problem in compose.yml wird nur durch ADVERTISED_LISTENERS angepasst
Stattdessen:
KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
Verwenden Sie dies:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
Das obige ist der detaillierte Inhalt von„Ungültiger Replikationsfaktor' im Convergence Kafka Go-Client. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!