In this article, php editor Xiaoxin will introduce you to an important concept in the Kafka architecture registry: proxy. In Kafka, the broker is a core component responsible for managing and processing message flow. However, the broker cannot validate records, which means that once a record is written to the broker, it cannot be validated or changed. This feature may have an impact on some specific usage scenarios and security, so you need to pay attention to this when using Kafka. Next, we will explain in detail why the agent cannot verify the record and the problems that may arise.
I am validating the schema using the Kafka schema registry. The problem is that even though I entered the correct schema, I still get the error Broker: Broker failed to verify record.
confluence.value.schema.validation Set to true so that the value's schema can be checked at the current agent level.
The schema I set up and the data I sent is as follows.
{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "description": "Sample schema to help you get started.", "properties": { "test": { "type": "string" } }, "title": "schema_test", "type": "object" }
{"test": "test1"}
In addition, I use go to send data, and the code for the data is as follows.
// main
func main() {
kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
<code>// kafka func ProduceData(topic string, data []byte) { conf := ReadConfig() p, err := kafka.NewProducer(&conf) if err != nil { fmt.Printf("Failed to create producer: %s", err) os.Exit(1) } defer p.Close() // Go-routine to handle message delivery reports and // possibly other event types (errors, stats, etc) go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) } else { fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n", *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) } } } }() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: data, }, nil) // Wait for all messages to be delivered p.Flush(15 * 1000) } </code>
There seems to be a misunderstanding as to how brokers actually verify the data. It works as expected. You need a schema ID. You're just sending plain JSON about the topic, without the ID. The schema on the registry doesn't matter, only its ID.
From documentation
More specifically, the schema you add to the registry is just one of many "versions" that may exist on a topic (e.g. topic-value
). Each version has a unique ID. Authentication doesn't just use the latest version; the ID is encoded on the client side.
See the Confluence example of using JSON schema for generation (which should itself do record validation).
Agent-side validation is just to prevent incorrectly serialized data or a "poison pill" like you are doing now.
The above is the detailed content of Kafka Schema Registry - Broker: The broker cannot validate the record. For more information, please follow other related articles on the PHP Chinese website!