Je peux utiliser confluence cli pour me connecter au cluster kafka de confluence, mais je ne peux pas utiliser la bibliothèque kafka-go de segmentio. J'obtiens l'erreur suivante.
with SASL: SASL handshake failed: EOF
C'est ma fonction dans go
package consumer import ( "context" "fmt" "log" "os" "time" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" ) func Consume(ctx context.Context) { // create a new logger that outputs to stdout // and has the `kafka reader` prefix l := log.New(os.Stdout, "kafka reader: ", 0) mechanism := plain.Mechanism{ Username: "my-api-key", Password: "my-api-secret", } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{brokerAddress}, // brokerAddress given in confluent cloud cluster settings. Topic: []string{"steps"}[0], // assign the logger to the reader Logger: l, Dialer: dialer, }) for { // the `ReadMessage` method blocks until we receive the next event msg, err := r.ReadMessage(ctx) if err != nil { panic("could not read message " + err.Error()) } // after receiving the message, log its value fmt.Println("received: ", string(msg.Value)) } }
J'ai essayé de générer de nouvelles clés, en utilisant le nom d'utilisateur et le mot de passe de mon compte, en réduisant les partitions, mais rien n'a fonctionné.
Il semble que la TLS
版本不被接受,您可以使用 MinVersion
de votre serveur force go-kafka à l'accepter :
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, TLS: &tls.Config{ MinVersion: tls.VersionTLS12, }, }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!