editor php Xigua hari ini membawakan anda artikel tentang "faktor replikasi tidak sah" dalam pelanggan Kafka Go. Kafka ialah platform pemprosesan strim teragih berprestasi tinggi, berskala, manakala Go ialah bahasa pengaturcaraan yang ringkas dan cekap. Artikel ini akan menumpukan pada masalah "faktor replikasi tidak sah" yang berlaku dalam klien Kafka Go, meneroka punca dan penyelesaiannya serta membantu pembaca memahami dengan lebih baik dan menangani cabaran teknikal biasa ini. Faktor replikasi yang tidak sah boleh menyebabkan ketidakkonsistenan data dan kemerosotan prestasi, jadi penting bagi pengguna Kafka memahami cara menangani isu ini. Mari kita meneroka secara mendalam bersama-sama!
Saya baru mengenali Kafka dan cuba memulakan projek saya. Saya mempunyai ini dalam docker-compose.yml saya
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.3.0 container_name: broker depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Kemudian saya menjalankan fail main.go saya dengan pengeluar dan pengguna serta beberapa topik olok-olok.
package main import ( "fmt" "log" "time" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { topic := "HVSE" p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "client.id": "foo", "acks": "all", }) go func() { consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "foo", "auto.offset.reset": "smallest", }) if err != nil { log.Fatal(err) } err = consumer.Subscribe(topic, nil) if err != nil { log.Fatal(err) } for { ev := consumer.Poll(100) // fmt.Println(ev) switch e := ev.(type) { case *kafka.Message: fmt.Printf("consumed message from queue: %s\n", string(e.Value)) case *kafka.Error: fmt.Printf("%v\n", e) // return // default: // fmt.Printf("Ignored %v\n", e) } } }() deliverch := make(chan kafka.Event, 10000) for { err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("FOO"), }, deliverch, ) if err != nil { log.Fatal(err) } <- deliverch time.Sleep(time.Second * 1) } }
Jika saya menyahkomen nilai lalai saya akan masuk ke dalamnya.
Jika tidak, saya mendapat ralat ini dalam konsol.
2023/09/26 13:45:05 Broker: Invalid replication factor exit status 1
Bekas kafka dan Zookeeper saya sedang berjalan.
Saya menukar fail docker-compose.yml tetapi ia tidak membantu. Saya mendapati pengguna saya.Events() tiada tetapi saya tidak faham mengapa ini berlaku
Saya menyalin kod anda dan ia betul tetapi masalah dalam compose.yml hanya melaraskan ADVERTISED_LISTENERS
bukan ini:
KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
Gunakan ini:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
Atas ialah kandungan terperinci 'Faktor replikasi tidak sah' dalam pelanggan Convergence Kafka Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!