Kafka で配信および注文されたメッセージを取得して消費する方法
Apache Kafka でイベントが完全に一貫した順序で送信および消費されるようにするには、メッセージの分割とコンシューマの割り当てがどのように機能するかを理解することが不可欠です。
Kafka でのパーティションの使用
-
トピックの分割:
- Kafka はメッセージをトピック内の パーティション に編成します。各パーティションは、受信したメッセージの順序を維持します。つまり、メッセージはそのパーティションに送信された順序で処理されます。
- 順序を確保するには、同じコンテキスト (ユーザー ID やトランザクション ID など) に関連するすべてのメッセージが同じパーティションに送信されることが重要です。これは、メッセージの送信時にパーティション キーを使用することで実現されます。 Kafka はこのキーを使用して、ハッシュ関数を使用してメッセージを送信するパーティションを決定します[1][5]。
-
メッセージキー:
- メッセージを送信するときに、キーを指定できます。同じキーを持つすべてのメッセージは同じパーティションに送信されるため、メッセージは生成されたのと同じ順序で消費されます。たとえば、ユーザー ID がキーとして使用される場合、そのユーザーに関連するすべてのイベントは同じパーティションに移動します。
消費者団体
-
消費者割り当て:
- Kafka のコンシューマは、コンシューマ グループ にグループ化されます。各グループには複数のコンシューマを含めることができますが、各パーティションを一度に読み取ることができるのは、グループ内の 1 人のコンシューマのみです。
- これは、パーティションより多くのコンシューマがある場合、一部のコンシューマが非アクティブになることを意味します。順序を維持し、効率を最大化するには、少なくともグループ内のコンシューマーの数と同じ数のパーティションを作成することをお勧めします。
-
オフセット管理:
- Kafka は、オフセット を使用して各コンシューマーの読み取り状態を保存します。これは、パーティション内の各メッセージの増分数値識別子です。これにより、消費者は障害が発生した場合に中断したところから再開できます。
追加の戦略
- 過負荷の回避: パーティション キーを選択するときは、一部のパーティションが過負荷になり、他のパーティションが十分に活用されていないことを避けるためにトラフィック分散を考慮することが重要です。
- レプリケーションとフォールト トレランス: パーティションに対して適切なレプリケーション (1 より大きい) を構成してください。これにより、可用性が向上するだけでなく、障害に対するシステムの回復力も向上します。
Avro を使用して Kafka でメッセージの生成と消費のシステムを実装し、メッセージが順序どおりに処理され、起こり得る失敗を処理できるようにするための完全な例を次に示します。これには、Avro スキーマの定義、プロデューサー コードとコンシューマー コード、エラー処理の戦略が含まれます。
アブロスキーム
まず、ペイロードの Avro スキーマを定義します。メッセージの構造を記述する user_signed_up.avsc というファイルを作成します。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
鍵の生成
メッセージの生成と消費の順序を確保するために、message-type-date として構造化されたキーを使用します (例: user-signed-up-2024-11-04)。
プロデューサー カフカ
以下は、Avro スキームを使用して Kafka にメッセージを送信する プロデューサー のコードです。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
再試行に関する考慮事項
**再試行を有効にする場合、適切に処理しないとメッセージの順序が変更されるリスクがあることに注意することが重要です。
これを回避するには:
**max.in.flight.requests.per.connection: このプロパティを 1 に設定すると、メッセージが一度に 1 つずつ送信され、順番に処理されるようになります。ただし、これはパフォーマンスに影響を与える可能性があります。
この構成と適切なエラー処理により、Kafka プロデューサがより堅牢になり、必要な順序を維持しながらメッセージ生成の障害を処理できるようになります。
**Kafka コンシューマー
**メッセージを読んで処理するコンシューマ:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
キーの現実的な例
多くの異なるイベントと多くの異なるパーティションがある環境では、現実的なキーは次のようになります。
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
これにより、特定の日付の特定のタイプに関連するすべてのイベントを同じパーティションに送信し、順番に処理できます。さらに、必要に応じて詳細 (セッション ID やトランザクション ID など) を含めることでキーを多様化できます。
Avro を使用して Kafka で障害を処理し、メッセージの順序を確保するためのこの実装と戦略により、イベントを管理するための堅牢で効率的なシステムを構築できます。
現在は、より有能な Kafka プロデューサー兼コンシューマーです。
サーキット ブレーカー、ローカル永続性、DLQ を備えた Kafka プロデューサー。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
DLQ 管理を備えた Kafka コンシューマ。
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
コードの説明
サーキットブレーカー:
Resilience4j は、プロデューサーのブレーカー回路を管理するために使用されます。故障率の閾値とオープン状態での待ち時間を設定します。
ローカル永続性と DLQ:
失敗したメッセージは、ローカル ファイル (failed_messages.log) とエラー キュー (dead_letter_queue.log) の両方に保存されます。
エラー処理:
プロデューサーとコンシューマーでは、エラーが適切に処理され、ログに記録されます。
DLQ 処理:
コンシューマーは、メイン トピックからのメッセージを消費した後、DLQ に保存されたメッセージも処理します。
ロギング:
System.err および System.out メッセージは、エラーと重要なイベントを記録するために使用されます。
最終的な考慮事項
この実装では:
これにより、メッセージが復元力のある方法で送信および処理されることが保証されます。
一時的または永続的なエラーに対して適切な処理が提供されます。
ロジックにより、Dead Letter Queue を使用することで効果的な回復が可能になります。
ブレーカー回路は、長期にわたる障害が発生した場合にシステムが飽和状態になるのを防ぎます。
このアプローチにより、Kafka でのメッセージの秩序ある配信を維持しながら、物理的および論理的問題を処理できる堅牢なシステムが作成されます。
以上がKafka で配信および注文されたメッセージを取得して消費する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











一部のアプリケーションが適切に機能しないようにする会社のセキュリティソフトウェアのトラブルシューティングとソリューション。多くの企業は、内部ネットワークセキュリティを確保するためにセキュリティソフトウェアを展開します。 ...

多くのアプリケーションシナリオでソートを実装するために名前を数値に変換するソリューションでは、ユーザーはグループ、特に1つでソートする必要がある場合があります...

システムドッキングでのフィールドマッピング処理は、システムドッキングを実行する際に難しい問題に遭遇することがよくあります。システムのインターフェイスフィールドを効果的にマッピングする方法A ...

intellijideaultimatiateバージョンを使用してスプリングを開始します...

データベース操作にMyBatis-Plusまたはその他のORMフレームワークを使用する場合、エンティティクラスの属性名に基づいてクエリ条件を構築する必要があることがよくあります。あなたが毎回手動で...

Javaオブジェクトと配列の変換:リスクの詳細な議論と鋳造タイプ変換の正しい方法多くのJava初心者は、オブジェクトのアレイへの変換に遭遇します...

eコマースプラットフォーム上のSKUおよびSPUテーブルの設計の詳細な説明この記事では、eコマースプラットフォームでのSKUとSPUのデータベース設計の問題、特にユーザー定義の販売を扱う方法について説明します。

Redisキャッシュソリューションは、製品ランキングリストの要件をどのように実現しますか?開発プロセス中に、多くの場合、ランキングの要件に対処する必要があります。
