このプロジェクトは springboot kafak 統合プロジェクトであるため、springboot で kafak 消費アノテーション @KafkaListener を使用します
最初に、コンマで区切られた application.properties を構成します。 複数のトピック。
方法: Spring の SpEl 式を使用してトピックを次のように構成します: @KafkaListener(topics = “#{’${topics}’.split(’ ,’ )}”)
プログラムを実行すると、コンソールの出力結果は次のようになります。
これは、次のとおりです。コンシューマ スレッドのみを開くため、すべてのトピックとパーティションがこのスレッドに割り当てられます。
複数のコンシューマ スレッドを開いてこれらのトピックを消費したい場合は、@KafkaListener アノテーションのパラメータ concurrency を追加します。値には、必要なコンシューマの数を指定できます (消費はパーティションの数は、開いているすべてのトピックのパーティションの合計数以下である必要があります)
プログラムを実行すると、コンソールの出力は次のようになります。
#コンシューマが変更されたトピックを利用できるように、プログラムの実行中にトピックを変更するにはどうすればよいですか?
ans: 試してみると、@KafkaListener アノテーションを使用してこの要件を達成することはできません。プログラムが開始されると、プログラムは @KafkaListener アノテーション情報に基づいてコンシューマーを初期化します。指定されたトピックを消費します。プログラムの実行中にトピックが変更された場合、コンシューマはコンシューマ構成を変更してトピックを再サブスクライブすることはできません。
ただし、妥協策として、トピックのマッチングに @KafkaListener の topicPattern パラメーターを使用することもできます。
Kafka ネイティブ クライアントの依存関係を使用し、コンシューマーを手動で初期化し、@KafkaListener を使用する代わりにコンシューマー スレッドを開始します。
コンシューマ スレッドでは、各サイクルが構成、データベース、またはその他の構成ソースから最新のトピック情報を取得し、前のトピックと比較し、変更が発生した場合はトピックを再サブスクライブするか、コンシューマを初期化します。
kafka クライアントの依存関係を追加します (このテスト サーバーの kafka バージョン: 2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消费者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必须指定消费者组 props.put("group.id", "haha"); //设置数据key和value的序列化处理类 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //创建消息者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic的消息 consumer.subscribe(topicList); return consumer; } /** * 开启消费者线程 * 异常请自己根据需求自己处理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 开启一个消费者线程 new Thread(() -> { while (true) { // 模拟从配置源中获取最新的topic(字符串,逗号隔开) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic发生变化 if (!topicList.equals(newTopic)) { log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新订阅topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:关闭原来的消费者,重新初始化一个消费者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
それについて話しましょうコードの 72:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
上記のコード行は、Kafka のブローカーが 100 ミリ秒以内にデータを返すのを待ちます。スーパーマーケット パラメーターは、利用可能なデータがあるかどうかに関係なく、ポーリングが返されるまでの時間を指定します。
トピックを変更した後、トピックを再サブスクライブする前に、このポーリングによって取得されたメッセージが処理されるまで待機し、while (true) ループ中にトピックの変更を検出する必要があります。
poll() メソッド 1 回のプルで取得されるメッセージのデフォルト数は、次の図に示すように 500 で、kafka クライアントのソース コードで設定されます。
この構成をカスタマイズする場合は、初期化時に
実行結果 (テスト トピック) を追加できます。 Consumer まったくデータがありません)
注: KafkaConsumer はスレッド安全ではありません。複数のコンシューマを開くために 1 つの KafkaConsumer インスタンスを使用しないでください。複数のコンシューマを開くには、新しいコンシューマが必要ですKafkaConsumer インスタンス。
以上がspringboot+kafka で @KafkaListener を使用して複数のトピックを動的に指定する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。