###첫 번째 단계, 동시 소비###
먼저 코드를 살펴보세요. 중요한 점은 ConcurrentKafkaListenerContainerFactory를 사용하고 Factory.setConcurrency(4)를 설정한다는 것입니다. (제 주제에는 속도를 높이기 위해 4개의 파티션이 있습니다.) 4로 설정, 즉 4개의 KafkaMessageListenerContainers가 있음)
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; }
application.properties에 spring.kafka.listener.concurrency=3을 직접 추가한 다음 동시 소비를 위해 @KafkaListener를 사용할 수도 있습니다. .
###두 번째 단계는 일괄 소비###
그 다음은 일괄 소비입니다. 핵심은 Factory.setBatchListener(true);
and propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
하나는 일괄 소비를 활성화하는 것이고, 다른 하나는 일괄 소비가 각각 소비할 수 있는 최대 메시지 레코드 수를 설정하는 것입니다. 시간.
우리가 설정한 ConsumerConfig.MAX_POLL_RECORDS_CONFIG가 50이라는 점에 유의하는 것이 중요합니다. 이는 50개의 메시지에 도달하지 않아도 계속 대기한다는 의미는 아닙니다. 공식적인 설명은 "poll()에 대한 단일 호출에서 반환되는 최대 레코드 수"입니다. 즉, 50은 한 번의 폴링에서 반환되는 최대 레코드 수를 나타냅니다.
시작 로그를 보면 max.poll.interval.ms = 300000이라는 것을 알 수 있는데, 이는 max.poll.interval.ms 간격마다 한 번씩 poll을 호출한다는 의미입니다. 각 설문조사는 최대 50개의 레코드를 반환합니다.
max.poll.interval.ms 공식 설명은 "소비자 그룹 관리를 사용할 때 poll() 호출 간의 최대 지연입니다. 이는 더 많은 레코드를 가져오기 전에 소비자가 유휴 상태일 수 있는 시간의 상한을 지정합니다. 이 시간 초과가 만료되기 전에 poll()이 호출되지 않으면 소비자는 실패한 것으로 간주되고 그룹은 파티션을 다른 구성원에게 재할당하기 위해 재조정됩니다. ";
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public MapconsumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker()); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit()); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId()); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset()); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); return propsMap; }
시작 로그 스크린샷
max.poll 정보 .records 및 max.poll.interval.ms의 공식 설명 스크린샷:
###세 번째 단계, 파티션 소비###
파티션이 하나만 있는 주제의 경우 파티션 소비가 필요하지 않습니다. 의미가 없습니다. 다음 예는 2개의 파티션이 있는 경우입니다(내 전체 코드에는 4개의 listeningPartitionX 메소드가 있고 내 주제에는 4개의 파티션 세트가 있습니다). 독자는 자신의 상황에 따라 이를 조정할 수 있습니다.
위 내용은 @KafkaListener를 사용하여 Spring Boot에서 일괄적으로 메시지를 동시에 수신하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!