Dalam artikel ini, editor PHP Xiaoxin akan memperkenalkan anda kepada konsep penting dalam pendaftaran seni bina Kafka: proksi. Dalam Kafka, broker ialah komponen teras yang bertanggungjawab untuk mengurus dan memproses aliran mesej. Walau bagaimanapun, broker tidak boleh mengesahkan rekod, yang bermaksud bahawa sebaik sahaja rekod ditulis kepada broker, ia tidak boleh disahkan atau diubah. Ciri ini mungkin mempunyai kesan pada beberapa senario penggunaan dan keselamatan tertentu, jadi anda perlu memberi perhatian kepada perkara ini apabila menggunakan Kafka. Seterusnya, kami akan menerangkan secara terperinci mengapa ejen tidak boleh mengesahkan rekod dan masalah yang mungkin timbul.
Saya sedang mengesahkan skema menggunakan pendaftaran skema Kafka. Masalahnya, walaupun saya memasukkan skema yang betul, saya masih mendapat ralat Broker: Broker failed to verify record.
confluence.value.schema.validation ditetapkan kepada benar supaya skema nilai boleh disemak pada peringkat ejen semasa.
Skema yang saya sediakan dan data yang saya hantar adalah seperti berikut.
{ "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "description": "Sample schema to help you get started.", "properties": { "test": { "type": "string" } }, "title": "schema_test", "type": "object" }
{"test": "test1"}
Selain itu, saya menggunakan go untuk menghantar data, dan kod untuk data adalah seperti berikut.
// main
func main() {
kafka.ProduceData("schema_test", []byte(`{"test": "test1"}`))
}
<code>// kafka func ProduceData(topic string, data []byte) { conf := ReadConfig() p, err := kafka.NewProducer(&conf) if err != nil { fmt.Printf("Failed to create producer: %s", err) os.Exit(1) } defer p.Close() // Go-routine to handle message delivery reports and // possibly other event types (errors, stats, etc) go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition) } else { fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n", *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value)) } } } }() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: data, }, nil) // Wait for all messages to be delivered p.Flush(15 * 1000) } </code>
Nampaknya terdapat salah faham tentang cara broker benar-benar mengesahkan data. Ia berfungsi seperti yang diharapkan. Anda memerlukan Skema ID. Anda hanya menghantar JSON biasa tentang topik, tanpa ID. Skema pada pendaftaran tidak penting, hanya IDnya.
Daripada dokumentasi
Secara lebih khusus, corak yang anda tambahkan pada pendaftaran hanyalah satu daripada banyak "versi" yang mungkin wujud pada tema (cth. topic-value
). Setiap versi mempunyai ID unik. Pengesahan bukan sahaja menggunakan versi terkini; ID dikodkan pada bahagian klien.
Lihat contoh Confluence menggunakan penjanaan skema JSON (yang sepatutnya melakukan pengesahan rekod).
Pengesahan bahagian broker hanyalah untuk mengelakkan data bersiri yang salah atau "pil racun" seperti yang anda lakukan sekarang.
Atas ialah kandungan terperinci Kafka Schema Registry - Broker: Broker tidak boleh mengesahkan rekod. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!