目次
springboot はデュアル Kafka を構成します
Maven kafka jar を導入し、2 つの kafka を準備します;
yml 構成ファイルを構成します
KafkaConfig クラスの構成
送信ツール クラス MyKafkaProducer
テスト クラス
受信クラス
テスト結果
ホームページ Java &#&チュートリアル Springboot でデュアル Kafka を構成する方法

Springboot でデュアル Kafka を構成する方法

May 10, 2023 pm 06:43 PM
springboot kafka

springboot はデュアル Kafka を構成します

Spring Boot 2.0.8.RELEASE バージョンを使用します

Maven kafka jar を導入し、2 つの kafka を準備します;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
ログイン後にコピー

yml 構成ファイルを構成します

spring:
  kafka:
    bootstrap-servers: 180.167.180.242:9092 #kafka的访问地址,多个用","隔开
    consumer:
      enable-auto-commit: true
      group-id: kafka #群组ID
  outkafka:
    bootstrap-servers: localhost:9092 #kafka的访问地址,多个用","隔开
    consumer:
      enable-auto-commit: true
      group-id: kafka_1 #群组ID
ログイン後にコピー

KafkaConfig クラスの構成

import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
@Configuration
@EnableKafka
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String innerServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String innerGroupid;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String innerEnableAutoCommit;
 
    @Bean
    @Primary//理解为默认优先选择当前容器下的消费者工厂
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    @Bean//第一个消费者工厂的bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean //生产者工厂配置
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }
    
    @Bean //kafka发送消息模板
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
    
    /**
     * 生产者配置方法
     *
     * 生产者有三个必选属性
     * <p>
     * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址,
     * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。
     * </p>
     * <p>
     * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。
     * </p>
     * <p>
     * 3.value.serializer 值得序列化方式
     * </p>
     *
     *
     * @return
     */
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);
        /**
         * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限
         * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。
         * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改
         */
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        /**
         * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:
         * <ul>
         * <li>
         * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且
         * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。
         * <li> <code> acks = 1 </code>
         * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,
         * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。
         * <li><code> acks = all </code>
         * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。
         * 这相当于acks = -1设置
         */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        /**
         * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。
         */
        // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息
//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    
    @Value("${spring.outkafka.bootstrap-servers}")
    private String outServers;
    @Value("${spring.outkafka.consumer.group-id}")
    private String outGroupid;
    @Value("${spring.outkafka.consumer.enable-auto-commit}")
    private String outEnableAutoCommit;
    
 
    static {
        
    }
    
    /**
     * 连接第二个kafka集群的配置
     */
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryOutSchedule());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    @Bean
    public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule());
    }
 
    /**
     * 连接第二个集群的消费者配置
     */
    @Bean
    public Map<String, Object> consumerConfigsOutSchedule() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    @Bean //生产者工厂配置
    public ProducerFactory<String, String> producerOutFactory() {
        return new DefaultKafkaProducerFactory<>(senderOutProps());
    }
    
    @Bean //kafka发送消息模板
    public KafkaTemplate<String, String> kafkaOutTemplate() {
        return new KafkaTemplate<String, String>(producerOutFactory());
    }
    
    /**
     * 生产者配置方法
     *
     * 生产者有三个必选属性
     * <p>
     * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址,
     * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。
     * </p>
     * <p>
     * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。
     * </p>
     * <p>
     * 3.value.serializer 值得序列化方式
     * </p>
     *
     *
     * @return
     */
    private Map<String, Object> senderOutProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);
        /**
         * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限
         * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。
         * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改
         */
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        /**
         * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:
         * <ul>
         * <li>
         * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且
         * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。
         * <li> <code> acks = 1 </code>
         * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,
         * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。
         * <li><code> acks = all </code>
         * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。
         * 这相当于acks = -1设置
         */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        /**
         * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。
         */
        // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息
//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}
ログイン後にコピー

送信ツール クラス MyKafkaProducer

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * <p>
 * <b>KafkaProducer Description:</b> kafka生产者
 * </p>
 *
 * @author douzaixing<b>DATE</b> 2019年7月8日 下午4:09:29
 */
@Component // 这个必须加入容器不然,不会执行
@EnableScheduling // 这里是为了测试加入定时调度
@Slf4j
public class MyKafkaProducer {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Autowired
    private KafkaTemplate<String, String> kafkaOutTemplate;
 
    public ListenableFuture<SendResult<String, String>> send(String topic, String key, String json) {
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json);
        log.info("inner kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========");
        return result;
    }
 
    public ListenableFuture<SendResult<String, String>> sendOut(String topic, String key, String json) {
        ListenableFuture<SendResult<String, String>> result = kafkaOutTemplate.send(topic, key, json);
        log.info("out kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========");
        return result;
    }
 
}
ログイン後にコピー

テスト クラス

@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes={OesBcServiceApplication.class})
public class MoreKafkaTest {
    
    @Autowired
    private MyKafkaProducer kafkaProducer;
    
    @Test
    public void sendInner() {
        for (int i = 0; i < 1; i++) {
            kafkaProducer.send("inner_test", "douzi" + i, "liyuehua" + i);
            kafkaProducer.sendOut("out_test", "douziout" + i, "fanbingbing" + i);
        }
    }
}
ログイン後にコピー

受信クラス

@Component
@Slf4j
public class KafkaConsumer {  
    @KafkaListener(topics={"inner_test"}, containerFactory="kafkaListenerContainerFactory")
    public void innerlistener(ConsumerRecord<String, String> record) {
        log.info("inner kafka receive #key=" + record.key() + "#value=" + record.value());
    }
    
    @KafkaListener(topics={"out_test"}, containerFactory="kafkaListenerContainerFactoryOutSchedule")
    public void outListener(ConsumerRecord<String, String> record) {
        log.info("out kafka receive #key=" + record.key() + "#value=" + record.value());
    }
}
ログイン後にコピー

テスト結果

07-11 12:41:27.811 情報 [com.wondertek.oes.bc.service.send.MyKafkaProducer] - 内部 Kafka 送信 #topic=inner_test#key=douzi0#json=liyuehua0#プッシュ成功= = =========

07-11 12:41:27.995 情報 [com.wondertek.oes.bc.service.send.KafkaConsumer] - 内部 kafka 受信 #key=douzi0#value = liyuehua0
07-11 12:41:28.005 情報 [com.wondertek.oes.bc.service.send.MyKafkaProducer] - カフカ送信 #topic=out_test#key=douziout0#json=fanbingbing0#プッシュ成功== = ========
07-11 12:41:28.013 情報 [com.wondertek.oes.bc.service.send.KafkaConsumer] - 出力 kafka 受信 #key=douziout0#value=fanbing0

以上がSpringboot でデュアル Kafka を構成する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHP と Kafka を使用してリアルタイム株価分析を実装する方法 PHP と Kafka を使用してリアルタイム株価分析を実装する方法 Jun 28, 2023 am 10:04 AM

インターネットとテクノロジーの発展に伴い、デジタル投資への関心が高まっています。多くの投資家は、より高い投資収益率を得ることを期待して、投資戦略を模索し、研究し続けています。株式取引では、リアルタイムの株式分析が意思決定に非常に重要であり、Kafka のリアルタイム メッセージ キューと PHP テクノロジの使用は効率的かつ実用的な手段です。 1. Kafka の概要 Kafka は、LinkedIn によって開発された高スループットの分散型パブリッシュおよびサブスクライブ メッセージング システムです。 Kafka の主な機能は次のとおりです。

SpringBootとSpringMVCの比較と差異分析 SpringBootとSpringMVCの比較と差異分析 Dec 29, 2023 am 11:02 AM

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivo​​tal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 Sep 27, 2023 pm 02:25 PM

React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 はじめに: ビッグ データとリアルタイム データ処理の台頭により、リアルタイム データ処理アプリケーションの構築が多くの開発者の追求となっています。人気のあるフロントエンド フレームワークである React と、高性能分散メッセージング システムである Apache Kafka を組み合わせることで、リアルタイム データ処理アプリケーションを構築できます。この記事では、React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法を紹介します。

SpringBoot+Dubbo+Nacos開発実践チュートリアル SpringBoot+Dubbo+Nacos開発実践チュートリアル Aug 15, 2023 pm 04:49 PM

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

Kafkaを探索するための可視化ツール5選 Kafkaを探索するための可視化ツール5選 Feb 01, 2024 am 08:03 AM

Kafka 視覚化ツールの 5 つのオプション ApacheKafka は、大量のリアルタイム データを処理できる分散ストリーム処理プラットフォームです。これは、リアルタイム データ パイプライン、メッセージ キュー、イベント駆動型アプリケーションの構築に広く使用されています。 Kafka の視覚化ツールは、ユーザーが Kafka クラスターを監視および管理し、Kafka データ フローをより深く理解するのに役立ちます。以下は、5 つの人気のある Kafka 視覚化ツールの紹介です。 ConfluentControlCenterConfluent

Kafka 視覚化ツールの比較分析: 最適なツールを選択するには? Kafka 視覚化ツールの比較分析: 最適なツールを選択するには? Jan 05, 2024 pm 12:15 PM

適切な Kafka 視覚化ツールを選択するにはどうすればよいですか? 5 つのツールの比較分析 はじめに: Kafka は、ビッグ データの分野で広く使用されている、高性能、高スループットの分散メッセージ キュー システムです。 Kafka の人気に伴い、Kafka クラスターを簡単に監視および管理するためのビジュアル ツールを必要とする企業や開発者が増えています。この記事では、読者がニーズに合ったツールを選択できるように、一般的に使用される 5 つの Kafka 視覚化ツールを紹介し、その特徴と機能を比較します。 1.カフカマネージャー

Rocky Linux に Apache Kafka をインストールするにはどうすればよいですか? Rocky Linux に Apache Kafka をインストールするにはどうすればよいですか? Mar 01, 2024 pm 10:37 PM

RockyLinux に ApacheKafka をインストールするには、次の手順に従います。 システムの更新: まず、RockyLinux システムが最新であることを確認し、次のコマンドを実行してシステム パッケージを更新します: sudoyumupdate Java のインストール: ApacheKafka は Java に依存しているため、最初に JavaDevelopmentKit (JDK) をインストールします)。 OpenJDK は、次のコマンドを使用してインストールできます。 sudoyuminstalljava-1.8.0-openjdk-devel ダウンロードして解凍します。 ApacheKafka 公式 Web サイト () にアクセスして、最新のバイナリ パッケージをダウンロードします。安定したバージョンを選択してください

go-zero と Kafka+Avro の実践: 高性能対話型データ処理システムの構築 go-zero と Kafka+Avro の実践: 高性能対話型データ処理システムの構築 Jun 23, 2023 am 09:04 AM

近年、ビッグ データと活発なオープン ソース コミュニティの台頭により、ますます多くの企業が増大するデータ ニーズを満たすために高性能の対話型データ処理システムを探し始めています。このテクノロジー アップグレードの波の中で、go-zero と Kafka+Avro はますます多くの企業に注目され、採用されています。 go-zero は、Golang 言語をベースに開発されたマイクロサービス フレームワークで、高いパフォーマンス、使いやすさ、拡張の容易さ、メンテナンスの容易さという特徴を備えており、企業が効率的なマイクロサービス アプリケーション システムを迅速に構築できるように設計されています。その急速な成長

See all articles