Kafka를 사용하여 전달되고 주문된 메시지를 가져오고 사용하는 방법
Apache Kafka에서 이벤트가 완전히 일관된 순서로 전송되고 소비되도록 하려면 메시지 분할 및 소비자 할당이 작동하는 방식을 이해하는 것이 중요합니다.
Kafka에서 파티션 사용
-
주제 분할:
- Kafka는 메시지를 주제 내의 파티션으로 구성합니다. 각 파티션은 수신되는 메시지의 순서를 유지합니다. 즉, 메시지는 해당 파티션으로 전송된 순서대로 처리됩니다.
- 순서를 보장하려면 동일한 컨텍스트(예: 사용자 ID 또는 트랜잭션 ID)와 관련된 모든 메시지가 동일한 파티션으로 전송되는 것이 중요합니다. 이는 메시지를 보낼 때 파티션 키를 사용하여 수행됩니다. Kafka는 이 키를 사용하여 해시 함수[1][5]를 사용하여 메시지를 보낼 파티션을 결정합니다.
-
메시지 키:
- 메시지를 보낼 때 키를 지정할 수 있습니다. 동일한 키를 가진 모든 메시지는 동일한 파티션으로 전송되므로 생성된 순서와 동일한 순서로 사용됩니다. 예를 들어 사용자 ID를 키로 사용하면 해당 사용자와 관련된 모든 이벤트가 동일한 파티션으로 이동합니다.
소비자 그룹
-
소비자 할당:
- Kafka의 소비자는 소비자 그룹으로 그룹화됩니다. 각 그룹에는 여러 소비자가 있을 수 있지만 각 파티션은 그룹 내에서 한 번에 한 소비자만 읽을 수 있습니다.
- 이는 파티션보다 소비자가 많으면 일부 소비자가 비활성 상태가 된다는 의미입니다. 질서를 유지하고 효율성을 극대화하려면 최소한 그룹 내 소비자 수만큼 파티션을 두는 것이 좋습니다.
-
오프셋 관리:
- Kafka는 파티션 내 각 메시지에 대한 증분 숫자 식별자인 오프셋을 사용하여 각 소비자의 읽기 상태를 저장합니다. 이를 통해 소비자는 장애 발생 시 중단한 부분부터 다시 시작할 수 있습니다.
추가 전략
- 오버로드 방지: 파티션 키를 선택할 때 일부 파티션은 과부하되고 다른 파티션은 충분히 활용되지 않도록 트래픽 분산을 고려하는 것이 중요합니다.
- 복제 및 내결함성: 파티션에 대해 적절한 복제(1보다 큼)를 구성해야 가용성은 물론 장애에 대한 시스템의 복원력도 향상됩니다.
Avro를 사용하여 Kafka에서 메시지 생성 및 소비 시스템을 구현하여 메시지가 순서대로 처리되도록 하고 가능한 오류를 처리하기 위한 전체 예는 다음과 같습니다. 여기에는 Avro 스키마 정의, 생산자 및 소비자 코드, 오류 처리 전략이 포함됩니다.
아브로 계획
먼저 페이로드에 대한 Avro 스키마를 정의합니다. 메시지 구조를 설명하는 user_signed_up.avsc라는 파일을 생성하겠습니다.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
키 생성
메시지 생성 및 소비의 순서를 보장하기 위해 메시지 유형-날짜로 구성된 키를 사용합니다(예: user-signed-up-2024-11-04).
프로듀서 카프카
Avro 구성표를 사용하여 Kafka에 메시지를 보내는 Producer의 코드는 다음과 같습니다.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
재시도 고려사항
**재시도를 활성화할 때 제대로 처리되지 않으면 메시지 순서가 바뀔 위험이 있다는 점에 유의하는 것이 중요합니다.
이를 방지하려면:
**max.in.flight.requests.per.connection: 이 속성을 1로 설정하면 메시지가 한 번에 하나씩 전송되고 순서대로 처리되도록 할 수 있습니다. 그러나 이는 성능에 영향을 미칠 수 있습니다.
이러한 구성과 적절한 오류 처리를 통해 Kafka 생산자가 더욱 강력해지고 필요한 순서를 유지하면서 메시지 생성 오류를 처리할 수 있는지 확인할 수 있습니다.
**카프카 소비자
**메시지를 읽고 처리하는 소비자:
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
키의 실제 예
다양한 이벤트와 다양한 파티션이 있는 환경에서 현실적인 키는 다음과 같습니다.
{ "type": "record", "name": "UserSignedUp", "namespace": "com.example", "fields": [ { "name": "userId", "type": "int" }, { "name": "userEmail", "type": "string" }, { "name": "timestamp", "type": "string" } // Formato ISO 8601 ] }
이를 통해 특정 날짜, 특정 유형과 관련된 모든 이벤트를 동일한 파티션으로 전송하여 순서대로 처리할 수 있습니다. 또한 필요한 경우 추가 세부정보(예: 세션 또는 거래 ID)를 포함하여 키를 다양화할 수 있습니다.
Avro를 사용하여 Kafka에서 오류를 처리하고 메시지 순서를 보장하기 위한 이러한 구현 및 전략을 통해 강력하고 효율적인 이벤트 관리 시스템을 구축할 수 있습니다.
이제 좀 더 유능한 Kafka 생산자이자 소비자가 되었습니다.
회로 차단기, 로컬 지속성 및 DLQ를 갖춘 Kafka 프로듀서.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Properties; public class AvroProducer { private final KafkaProducer<String, byte[]> producer; private final Schema schema; public AvroProducer(String bootstrapServers) throws IOException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); // Establecer la propiedad de reintentos, Número de reintentos properties.put(ProducerConfig.RETRIES_CONFIG, 3); // Asegura que todos los réplicas reconozcan la escritura, properties.put(ProducerConfig.ACKS_CONFIG, "all"); // Solo un mensaje a la vez properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Habilitar idempotencia, no quiero enviar duplicados properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void sendMessage(String topic, int userId, String userEmail) { GenericRecord record = new GenericData.Record(schema); record.put("userId", userId); record.put("userEmail", userEmail); record.put("timestamp", java.time.Instant.now().toString()); String key = String.format("user-signed-up-%s", java.time.LocalDate.now()); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(record)); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); **handleFailure(exception, producerRecord); ** } else { System.out.printf("Mensaje enviado a la partición %d con offset %d%n", metadata.partition(), metadata.offset()); } }); } private void handleFailure(Exception exception, ProducerRecord<String, byte[]> producerRecord) { // Log the error for monitoring System.err.println("Error sending message: " + exception.getMessage()); // Implement local persistence as a fallback saveToLocalStorage(producerRecord); // Optionally: Notify an external monitoring system or alert } private void saveToLocalStorage(ProducerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje guardado localmente para reenvío: " + record.key()); } catch (IOException e) { System.err.println("Error saving to local storage: " + e.getMessage()); } } private byte[] serialize(GenericRecord record) { // Crear un ByteArrayOutputStream para almacenar los bytes serializados ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // Crear un escritor de datos para el registro Avro DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(record.getSchema()); // Crear un encoder para escribir en el ByteArrayOutputStream Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { // Escribir el registro en el encoder datumWriter.write(record, encoder); // Finalizar la escritura encoder.flush(); } catch (IOException e) { throw new AvroSerializationException("Error serializing Avro record", e); } // Devolver los bytes serializados return outputStream.toByteArray(); } public void close() { producer.close(); } }
DLQ 관리 기능을 갖춘 Kafka Consumer.
import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Properties; public class AvroConsumer { private final KafkaConsumer<String, byte[]> consumer; private final Schema schema; public AvroConsumer(String bootstrapServers, String groupId) throws IOException { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer"); this.consumer = new KafkaConsumer<>(properties); this.schema = new Schema.Parser().parse(new File("src/main/avro/user_signed_up.avsc")); } public void consume(String topic) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, byte[]> record : records) { try { processMessage(record.value()); } catch (Exception e) { handleProcessingError(e, record); } } } } private void processMessage(byte[] data) throws IOException { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); var decoder = DecoderFactory.get().binaryDecoder(data, null); GenericRecord record = reader.read(null, decoder); System.out.printf("Consumido mensaje: %s - %s - %s%n", record.get("userId"), record.get("userEmail"), record.get("timestamp")); } private void handleProcessingError(Exception e, ConsumerRecord<String, byte[]> record) { System.err.println("Error processing message: " + e.getMessage()); // Implement logic to save failed messages for later processing saveFailedMessage(record); } private void saveFailedMessage(ConsumerRecord<String, byte[]> record) { try { // Persist the failed message to a local file or database for later processing Files.write(new File("failed_consumed_messages.log").toPath(), (record.key() + ": " + new String(record.value()) + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); System.out.println("Mensaje consumido guardado localmente para re-procesamiento: " + record.key()); } catch (IOException e) { System.err.println("Error saving consumed message to local storage: " + e.getMessage()); } } public void close() { consumer.close(); } }
user-signed-up-2024-11-04 order-created-2024-11-04 payment-processed-2024-11-04
코드 설명
회로 차단기:
Resilience4j는 생산자의 차단기 회로를 관리하는 데 사용됩니다. 실패율 임계값과 열린 상태에서의 대기 시간이 구성됩니다.
로컬 지속성 및 DLQ:
실패한 메시지는 로컬 파일(failed_messages.log)과 오류 대기열(dead_letter_queue.log)에 모두 저장됩니다.
오류 처리:
생산자와 소비자에서는 오류가 적절하게 처리되고 기록됩니다.
DLQ 처리:
소비자는 기본 주제의 메시지를 소비한 후 DLQ에 저장된 메시지도 처리합니다.
로그 기록:
System.err 및 System.out 메시지는 오류 및 중요한 이벤트를 기록하는 데 사용됩니다.
최종 고려사항
이 구현을 통해:
이는 탄력적인 방식으로 메시지가 전송되고 처리되도록 보장합니다.
일시적이거나 지속적인 오류에 대해서는 적절한 처리가 제공됩니다.
로직에서는 데드 레터 큐(Dead Letter Queue)를 사용하여 효과적인 복구가 가능합니다.
차단기 회로는 장기간 장애 발생 시 시스템이 포화되는 것을 방지하는 데 도움이 됩니다.
이 접근 방식은 Kafka에서 메시지의 질서 있는 전달을 유지하면서 물리적, 논리적 문제를 처리할 수 있는 강력한 시스템을 만듭니다.
위 내용은 Kafka를 사용하여 전달되고 주문된 메시지를 가져오고 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

일부 애플리케이션이 제대로 작동하지 않는 회사의 보안 소프트웨어에 대한 문제 해결 및 솔루션. 많은 회사들이 내부 네트워크 보안을 보장하기 위해 보안 소프트웨어를 배포 할 것입니다. ...

많은 응용 프로그램 시나리오에서 정렬을 구현하기 위해 이름으로 이름을 변환하는 솔루션, 사용자는 그룹으로, 특히 하나로 분류해야 할 수도 있습니다.

시스템 도킹의 필드 매핑 처리 시스템 도킹을 수행 할 때 어려운 문제가 발생합니다. 시스템의 인터페이스 필드를 효과적으로 매핑하는 방법 ...

IntellijideAultimate 버전을 사용하여 봄을 시작하십시오 ...

데이터베이스 작업에 MyBatis-Plus 또는 기타 ORM 프레임 워크를 사용하는 경우 엔티티 클래스의 속성 이름을 기반으로 쿼리 조건을 구성해야합니다. 매번 수동으로 ...

Java 객체 및 배열의 변환 : 캐스트 유형 변환의 위험과 올바른 방법에 대한 심층적 인 논의 많은 Java 초보자가 객체를 배열로 변환 할 것입니다 ...

Redis 캐싱 솔루션은 제품 순위 목록의 요구 사항을 어떻게 인식합니까? 개발 과정에서 우리는 종종 a ... 표시와 같은 순위의 요구 사항을 처리해야합니다.

전자 상거래 플랫폼에서 SKU 및 SPU 테이블의 디자인에 대한 자세한 설명이 기사는 전자 상거래 플랫폼에서 SKU 및 SPU의 데이터베이스 설계 문제, 특히 사용자 정의 판매를 처리하는 방법에 대해 논의 할 것입니다 ...
