l'éditeur php Xigua vous propose aujourd'hui un article sur le "facteur de réplication invalide" dans le client Kafka Go. Kafka est une plate-forme de traitement de flux distribuée hautes performances et évolutive, tandis que Go est un langage de programmation concis et efficace. Cet article se concentrera sur le problème du « facteur de réplication invalide » qui se produit dans le client Kafka Go, explorera ses causes et ses solutions, et aidera les lecteurs à mieux comprendre et gérer ce défi technique courant. Des facteurs de réplication non valides peuvent entraîner des incohérences de données et une dégradation des performances. Il est donc important que les utilisateurs de Kafka comprennent comment résoudre ce problème. Explorons ensemble en profondeur !
Je suis nouveau sur Kafka et j'essaie de démarrer mon projet. J'ai ça dans mon docker-compose.yml
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.3.0 container_name: broker depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Ensuite, je lance mon fichier main.go avec les producteurs et les consommateurs et quelques sujets fictifs.
package main import ( "fmt" "log" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { topic := "HVSE" p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "client.id": "foo", "acks": "all", }) go func() { consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "foo", "auto.offset.reset": "smallest", }) if err != nil { log.Fatal(err) } err = consumer.Subscribe(topic, nil) if err != nil { log.Fatal(err) } for { ev := consumer.Poll(100) // fmt.Println(ev) switch e := ev.(type) { case *kafka.Message: fmt.Printf("consumed message from queue: %s\n", string(e.Value)) case *kafka.Error: fmt.Printf("%v\n", e) // return // default: // fmt.Printf("Ignored %v\n", e) } } }() deliverch := make(chan kafka.Event, 10000) for { err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("FOO"), }, deliverch, ) if err != nil { log.Fatal(err) } <- deliverch time.Sleep(time.Second * 1) } }
Si je décommente la valeur par défaut, j'y entre.
Sinon, j'obtiens cette erreur dans la console.
2023/09/26 13:45:05 Broker: Invalid replication factor exit status 1
Mes conteneurs Kafka et Zookeeper fonctionnent.
J'ai modifié le fichier docker-compose.yml mais cela n'a pas aidé. J'ai trouvé que mon consumer.Events() est nul mais je ne comprends pas pourquoi cela se produit
J'ai copié votre code et il est correct mais le problème dans compose.yml n'est ajusté qu'ADVERTISED_LISTENERS
au lieu de ceci :
KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
Utilisez ceci :
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!