目次
説明
最もよくある質問の要約
究極の方法
アイデア
実装
コード
ホームページ Java &#&チュートリアル springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

May 20, 2023 pm 08:58 PM
springboot kafka @kafkalistener

説明

このプロジェクトは springboot kafak 統合プロジェクトであるため、springboot で kafak 消費アノテーション @KafkaListener を使用します

最初に、コンマで区切られた application.properties を構成します。 複数のトピック。

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

方法: Spring の SpEl 式を使用してトピックを次のように構成します: @KafkaListener(topics = “#{’${topics}’.split(’ ,’ )}”)

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

プログラムを実行すると、コンソールの出力結果は次のようになります。

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

これは、次のとおりです。コンシューマ スレッドのみを開くため、すべてのトピックとパーティションがこのスレッドに割り当てられます。

複数のコンシューマ スレッドを開いてこれらのトピックを消費したい場合は、@KafkaListener アノテーションのパラメータ concurrency を追加します。値には、必要なコンシューマの数を指定できます (消費はパーティションの数は、開いているすべてのトピックのパーティションの合計数以下である必要があります)

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

プログラムを実行すると、コンソールの出力は次のようになります。

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

最もよくある質問の要約

#コンシューマが変更されたトピックを利用できるように、プログラムの実行中にトピックを変更するにはどうすればよいですか?

ans: 試してみると、@KafkaListener アノテーションを使用してこの要件を達成することはできません。プログラムが開始されると、プログラムは @KafkaListener アノテーション情報に基づいてコンシューマーを初期化します。指定されたトピックを消費します。プログラムの実行中にトピックが変更された場合、コンシューマはコンシューマ構成を変更してトピックを再サブスクライブすることはできません。

ただし、妥協策として、トピックのマッチングに @KafkaListener の topicPattern パラメーターを使用することもできます。

究極の方法

アイデア

Kafka ネイティブ クライアントの依存関係を使用し、コンシューマーを手動で初期化し、@KafkaListener を使用する代わりにコンシューマー スレッドを開始します。

コンシューマ スレッドでは、各サイクルが構成、データベース、またはその他の構成ソースから最新のトピック情報を取得し、前のトピックと比較し、変更が発生した場合はトピックを再サブスクライブするか、コンシューマを初期化します。

実装

kafka クライアントの依存関係を追加します (このテスト サーバーの kafka バージョン: 2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>
ログイン後にコピー

コード

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}
ログイン後にコピー

それについて話しましょうコードの 72:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
ログイン後にコピー

上記のコード行は、Kafka のブローカーが 100 ミリ秒以内にデータを返すのを待ちます。スーパーマーケット パラメーターは、利用可能なデータがあるかどうかに関係なく、ポーリングが返されるまでの時間を指定します。

トピックを変更した後、トピックを再サブスクライブする前に、このポーリングによって取得されたメッセージが処理されるまで待機し、while (true) ループ中にトピックの変更を検出する必要があります。

poll() メソッド 1 回のプルで取得されるメッセージのデフォルト数は、次の図に示すように 500 で、kafka クライアントのソース コードで設定されます。

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

この構成をカスタマイズする場合は、初期化時に

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

実行結果 (テスト トピック) を追加できます。 Consumer まったくデータがありません)

springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法

注: KafkaConsumer はスレッド安全ではありません。複数のコンシューマを開くために 1 つの KafkaConsumer インスタンスを使用しないでください。複数のコンシューマを開くには、新しいコンシューマが必要ですKafkaConsumer インスタンス。

以上がspringboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHP と Kafka を使用してリアルタイム株価分析を実装する方法 PHP と Kafka を使用してリアルタイム株価分析を実装する方法 Jun 28, 2023 am 10:04 AM

インターネットとテクノロジーの発展に伴い、デジタル投資への関心が高まっています。多くの投資家は、より高い投資収益率を得ることを期待して、投資戦略を模索し、研究し続けています。株式取引では、リアルタイムの株式分析が意思決定に非常に重要であり、Kafka のリアルタイム メッセージ キューと PHP テクノロジの使用は効率的かつ実用的な手段です。 1. Kafka の概要 Kafka は、LinkedIn によって開発された高スループットの分散型パブリッシュおよびサブスクライブ メッセージング システムです。 Kafka の主な機能は次のとおりです。

SpringBootとSpringMVCの比較と差異分析 SpringBootとSpringMVCの比較と差異分析 Dec 29, 2023 am 11:02 AM

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivo​​tal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 Sep 27, 2023 pm 02:25 PM

React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 はじめに: ビッグ データとリアルタイム データ処理の台頭により、リアルタイム データ処理アプリケーションの構築が多くの開発者の追求となっています。人気のあるフロントエンド フレームワークである React と、高性能分散メッセージング システムである Apache Kafka を組み合わせることで、リアルタイム データ処理アプリケーションを構築できます。この記事では、React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法を紹介します。

SpringBoot+Dubbo+Nacos開発実践チュートリアル SpringBoot+Dubbo+Nacos開発実践チュートリアル Aug 15, 2023 pm 04:49 PM

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

Kafkaを探索するための可視化ツール5選 Kafkaを探索するための可視化ツール5選 Feb 01, 2024 am 08:03 AM

Kafka 視覚化ツールの 5 つのオプション ApacheKafka は、大量のリアルタイム データを処理できる分散ストリーム処理プラットフォームです。これは、リアルタイム データ パイプライン、メッセージ キュー、イベント駆動型アプリケーションの構築に広く使用されています。 Kafka の視覚化ツールは、ユーザーが Kafka クラスターを監視および管理し、Kafka データ フローをより深く理解するのに役立ちます。以下は、5 つの人気のある Kafka 視覚化ツールの紹介です。 ConfluentControlCenterConfluent

Kafka 視覚化ツールの比較分析: 最適なツールを選択するには? Kafka 視覚化ツールの比較分析: 最適なツールを選択するには? Jan 05, 2024 pm 12:15 PM

適切な Kafka 視覚化ツールを選択するにはどうすればよいですか? 5 つのツールの比較分析 はじめに: Kafka は、ビッグ データの分野で広く使用されている、高性能、高スループットの分散メッセージ キュー システムです。 Kafka の人気に伴い、Kafka クラスターを簡単に監視および管理するためのビジュアル ツールを必要とする企業や開発者が増えています。この記事では、読者がニーズに合ったツールを選択できるように、一般的に使用される 5 つの Kafka 視覚化ツールを紹介し、その特徴と機能を比較します。 1.カフカマネージャー

Rocky Linux に Apache Kafka をインストールするにはどうすればよいですか? Rocky Linux に Apache Kafka をインストールするにはどうすればよいですか? Mar 01, 2024 pm 10:37 PM

RockyLinux に ApacheKafka をインストールするには、次の手順に従います。 システムの更新: まず、RockyLinux システムが最新であることを確認し、次のコマンドを実行してシステム パッケージを更新します: sudoyumupdate Java のインストール: ApacheKafka は Java に依存しているため、最初に JavaDevelopmentKit (JDK) をインストールします)。 OpenJDK は、次のコマンドを使用してインストールできます。 sudoyuminstalljava-1.8.0-openjdk-devel ダウンロードして解凍します。 ApacheKafka 公式 Web サイト () にアクセスして、最新のバイナリ パッケージをダウンロードします。安定したバージョンを選択してください

go-zero と Kafka+Avro の実践: 高性能対話型データ処理システムの構築 go-zero と Kafka+Avro の実践: 高性能対話型データ処理システムの構築 Jun 23, 2023 am 09:04 AM

近年、ビッグ データと活発なオープン ソース コミュニティの台頭により、ますます多くの企業が増大するデータ ニーズを満たすために高性能の対話型データ処理システムを探し始めています。このテクノロジー アップグレードの波の中で、go-zero と Kafka+Avro はますます多くの企業に注目され、採用されています。 go-zero は、Golang 言語をベースに開発されたマイクロサービス フレームワークで、高いパフォーマンス、使いやすさ、拡張の容易さ、メンテナンスの容易さという特徴を備えており、企業が効率的なマイクロサービス アプリケーション システムを迅速に構築できるように設計されています。その急速な成長

See all articles