Heim > Java > javaLernprogramm > Beispielcode für ein Springboot-Projekt zum Konfigurieren mehrerer Kafka

Beispielcode für ein Springboot-Projekt zum Konfigurieren mehrerer Kafka

王林
Freigeben: 2023-05-14 12:28:05
nach vorne
2022 Leute haben es durchsucht

1.spring-kafka

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.3.5.RELEASE</version>
</dependency>
Nach dem Login kopieren

2. Informationen zur Konfigurationsdatei

kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
        
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false
Nach dem Login kopieren

3.kafka-Konfigurationsklasse

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${kafka.consumer.enable.auto.commit}")
    private String autoCommit;

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;


    @Value("${kafka.consumer.group.id.pic}")
    private String groupIdPic;

    @Value("${kafka.consumer.concurrency.pic}")
    private int concurrencyPic;

    @Value("${kafka.consumer.enable.auto.commit.pic}")
    private String autoCommitPic;

    @Value("${kafka.bootstrap-servers.pic}")
    private String bootstrapServerPic;


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        String bootstrapServers = bootstrapServer;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }




    @Bean
    public ConsumerFactory<String, String> consumerFactoryPic() {
        String bootstrapServers = bootstrapServerPic;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryPic());
        factory.setConcurrency(concurrencyPic);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}
Nach dem Login kopieren

4

Das obige ist der detaillierte Inhalt vonBeispielcode für ein Springboot-Projekt zum Konfigurieren mehrerer Kafka. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:yisu.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage