springboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法
説明
このプロジェクトは springboot kafak 統合プロジェクトであるため、springboot で kafak 消費アノテーション @KafkaListener を使用します
最初に、コンマで区切られた application.properties を構成します。 複数のトピック。
方法: Spring の SpEl 式を使用してトピックを次のように構成します: @KafkaListener(topics = “#{’${topics}’.split(’ ,’ )}”)
プログラムを実行すると、コンソールの出力結果は次のようになります。
これは、次のとおりです。コンシューマ スレッドのみを開くため、すべてのトピックとパーティションがこのスレッドに割り当てられます。
複数のコンシューマ スレッドを開いてこれらのトピックを消費したい場合は、@KafkaListener アノテーションのパラメータ concurrency を追加します。値には、必要なコンシューマの数を指定できます (消費はパーティションの数は、開いているすべてのトピックのパーティションの合計数以下である必要があります)
プログラムを実行すると、コンソールの出力は次のようになります。
最もよくある質問の要約
#コンシューマが変更されたトピックを利用できるように、プログラムの実行中にトピックを変更するにはどうすればよいですか?
ans: 試してみると、@KafkaListener アノテーションを使用してこの要件を達成することはできません。プログラムが開始されると、プログラムは @KafkaListener アノテーション情報に基づいてコンシューマーを初期化します。指定されたトピックを消費します。プログラムの実行中にトピックが変更された場合、コンシューマはコンシューマ構成を変更してトピックを再サブスクライブすることはできません。
ただし、妥協策として、トピックのマッチングに @KafkaListener の topicPattern パラメーターを使用することもできます。
究極の方法
アイデア
Kafka ネイティブ クライアントの依存関係を使用し、コンシューマーを手動で初期化し、@KafkaListener を使用する代わりにコンシューマー スレッドを開始します。
コンシューマ スレッドでは、各サイクルが構成、データベース、またはその他の構成ソースから最新のトピック情報を取得し、前のトピックと比較し、変更が発生した場合はトピックを再サブスクライブするか、コンシューマを初期化します。
実装
kafka クライアントの依存関係を追加します (このテスト サーバーの kafka バージョン: 2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
コード
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
それについて話しましょうコードの 72:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
上記のコード行は、Kafka のブローカーが 100 ミリ秒以内にデータを返すのを待ちます。スーパーマーケット パラメーターは、利用可能なデータがあるかどうかに関係なく、ポーリングが返されるまでの時間を指定します。
トピックを変更した後、トピックを再サブスクライブする前に、このポーリングによって取得されたメッセージが処理されるまで待機し、while (true) ループ中にトピックの変更を検出する必要があります。
poll() メソッド 1 回のプルで取得されるメッセージのデフォルト数は、次の図に示すように 500 で、kafka クライアントのソース コードで設定されます。
この構成をカスタマイズする場合は、初期化時に
実行結果 (テスト トピック) を追加できます。 Consumer まったくデータがありません)
注: KafkaConsumer はスレッド安全ではありません。複数のコンシューマを開くために 1 つの KafkaConsumer インスタンスを使用しないでください。複数のコンシューマを開くには、新しいコンシューマが必要ですKafkaConsumer インスタンス。
以上がspringboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









インターネットとテクノロジーの発展に伴い、デジタル投資への関心が高まっています。多くの投資家は、より高い投資収益率を得ることを期待して、投資戦略を模索し、研究し続けています。株式取引では、リアルタイムの株式分析が意思決定に非常に重要であり、Kafka のリアルタイム メッセージ キューと PHP テクノロジの使用は効率的かつ実用的な手段です。 1. Kafka の概要 Kafka は、LinkedIn によって開発された高スループットの分散型パブリッシュおよびサブスクライブ メッセージング システムです。 Kafka の主な機能は次のとおりです。

SpringBoot と SpringMVC はどちらも Java 開発で一般的に使用されるフレームワークですが、それらの間には明らかな違いがいくつかあります。この記事では、これら 2 つのフレームワークの機能と使用法を調べ、その違いを比較します。まず、SpringBoot について学びましょう。 SpringBoot は、Spring フレームワークに基づいたアプリケーションの作成と展開を簡素化するために、Pivotal チームによって開発されました。スタンドアロンの実行可能ファイルを構築するための高速かつ軽量な方法を提供します。

React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法 はじめに: ビッグ データとリアルタイム データ処理の台頭により、リアルタイム データ処理アプリケーションの構築が多くの開発者の追求となっています。人気のあるフロントエンド フレームワークである React と、高性能分散メッセージング システムである Apache Kafka を組み合わせることで、リアルタイム データ処理アプリケーションを構築できます。この記事では、React と Apache Kafka を使用してリアルタイム データ処理アプリケーションを構築する方法を紹介します。

この記事では、dubbo+nacos+Spring Boot の実際の開発について詳しく説明する例を書きます。この記事では理論的な知識はあまり取り上げませんが、dubbo を nacos と統合して開発環境を迅速に構築する方法を説明する最も簡単な例を書きます。

Kafka 視覚化ツールの 5 つのオプション ApacheKafka は、大量のリアルタイム データを処理できる分散ストリーム処理プラットフォームです。これは、リアルタイム データ パイプライン、メッセージ キュー、イベント駆動型アプリケーションの構築に広く使用されています。 Kafka の視覚化ツールは、ユーザーが Kafka クラスターを監視および管理し、Kafka データ フローをより深く理解するのに役立ちます。以下は、5 つの人気のある Kafka 視覚化ツールの紹介です。 ConfluentControlCenterConfluent

適切な Kafka 視覚化ツールを選択するにはどうすればよいですか? 5 つのツールの比較分析 はじめに: Kafka は、ビッグ データの分野で広く使用されている、高性能、高スループットの分散メッセージ キュー システムです。 Kafka の人気に伴い、Kafka クラスターを簡単に監視および管理するためのビジュアル ツールを必要とする企業や開発者が増えています。この記事では、読者がニーズに合ったツールを選択できるように、一般的に使用される 5 つの Kafka 視覚化ツールを紹介し、その特徴と機能を比較します。 1.カフカマネージャー

RockyLinux に ApacheKafka をインストールするには、次の手順に従います。 システムの更新: まず、RockyLinux システムが最新であることを確認し、次のコマンドを実行してシステム パッケージを更新します: sudoyumupdate Java のインストール: ApacheKafka は Java に依存しているため、最初に JavaDevelopmentKit (JDK) をインストールします)。 OpenJDK は、次のコマンドを使用してインストールできます。 sudoyuminstalljava-1.8.0-openjdk-devel ダウンロードして解凍します。 ApacheKafka 公式 Web サイト () にアクセスして、最新のバイナリ パッケージをダウンロードします。安定したバージョンを選択してください

近年、ビッグ データと活発なオープン ソース コミュニティの台頭により、ますます多くの企業が増大するデータ ニーズを満たすために高性能の対話型データ処理システムを探し始めています。このテクノロジー アップグレードの波の中で、go-zero と Kafka+Avro はますます多くの企業に注目され、採用されています。 go-zero は、Golang 言語をベースに開発されたマイクロサービス フレームワークで、高いパフォーマンス、使いやすさ、拡張の容易さ、メンテナンスの容易さという特徴を備えており、企業が効率的なマイクロサービス アプリケーション システムを迅速に構築できるように設計されています。その急速な成長
