Dans cet article, l'éditeur PHP Xiaoxin vous présentera un concept important dans le registre de l'architecture Kafka : le proxy. Dans Kafka, le courtier est un composant essentiel responsable de la gestion et du traitement du flux de messages. Cependant, le proxy ne peut pas valider les enregistrements, ce qui signifie qu'une fois qu'un enregistrement est écrit sur le proxy, il ne peut pas être validé ou modifié. Cette fonctionnalité peut avoir un impact sur certains scénarios d'utilisation et sur la sécurité spécifiques, vous devez donc y prêter attention lorsque vous utilisez Kafka. Ensuite, nous expliquerons en détail pourquoi l'agent ne peut pas vérifier le dossier et les problèmes qui peuvent survenir.
Je valide le schéma à l'aide du registre de schémas Kafka. Le problème est que même si j'ai entré le bon schéma, j'obtiens toujours l'erreur Broker: Broker failed to verify record.
confluence.value.schema.validation est défini sur true afin que le schéma de la valeur puisse être vérifié au niveau actuel de l'agent.
Le schéma que j'ai mis en place et les données que j'ai envoyées sont les suivantes.
{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "description": "Sample schema to help you get started.", "properties": { "test": { "type": "string" } }, "title": "schema_test", "type": "object" }
{"test": "test1"}
De plus, j'utilise go pour envoyer des données, et le code des données est le suivant.
// main
func main() {
kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
<code>// kafka func ProduceData(topic string, data []byte) { conf := ReadConfig() p, err := kafka.NewProducer(&conf) if err != nil { fmt.Printf("Failed to create producer: %s", err) os.Exit(1) } defer p.Close() // Go-routine to handle message delivery reports and // possibly other event types (errors, stats, etc) go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) } else { fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n", *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) } } } }() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: data, }, nil) // Wait for all messages to be delivered p.Flush(15 * 1000) } </code>
Il semble y avoir un malentendu sur la manière dont les courtiers vérifient réellement les données. Cela fonctionne comme prévu. Vous avez besoin d'un ID de schéma. Vous envoyez simplement du JSON simple sur le sujet, sans l'ID. Le schéma du registre n'a pas d'importance, seul son identifiant.
D'après la documentation
Plus précisément, le modèle que vous ajoutez au registre n'est qu'une des nombreuses "versions" qui peuvent exister sur le thème (par exemple topic-value
). Chaque version possède un identifiant unique. L'authentification n'utilise pas uniquement la dernière version ; l'ID est codé côté client.
Voir l'exemple Confluence utilisant la génération de schéma JSON (qui devrait lui-même effectuer la validation des enregistrements).
La validation côté courtier sert simplement à éviter les données mal sérialisées ou les « pilules empoisonnées » comme vous le faites actuellement.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!