ホームページ > Java > &#&チュートリアル > Java 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法

Java 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法

WBOY
リリース: 2023-09-21 12:39:24
オリジナル
1508 人が閲覧しました

Java开发:如何使用Apache Kafka Streams进行实时流处理和计算

Java 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法

はじめに:
ビッグ データとリアルタイムの台頭によりコンピューティング、Apache Kafka Streams ストリーム処理エンジンとして、ますます多くの開発者によって使用されています。これは、リアルタイム ストリーミング データを処理し、複雑なストリーム処理と計算を実行するためのシンプルかつ強力な方法を提供します。この記事では、環境の構成、コードの記述、サンプル デモンストレーションなど、リアルタイムのストリーム処理とコンピューティングに Apache Kafka Streams を使用する方法を紹介します。

1. 準備:

  1. Apache Kafka のインストールと構成: Apache Kafka をダウンロードしてインストールし、Apache Kafka クラスターを起動する必要があります。インストールと構成の詳細については、Apache Kafka の公式ドキュメントを参照してください。
  2. 依存関係の導入: Kafka Streams 関連の依存関係を Java プロジェクトに導入します。たとえば、Maven を使用して、プロジェクトの pom.xml ファイルに次の依存関係を追加できます。
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.1</version>
</dependency>
ログイン後にコピー

2. コードを記述します。

  1. Kafka Streams アプリケーションを作成します。
    まず、Kafka Streams アプリケーションを作成し、Kafka クラスターの接続情報を構成する必要があります。以下は簡単なサンプル コードです:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;

public class KafkaStreamsApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        // 在这里添加流处理和计算逻辑

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
ログイン後にコピー
  1. ストリーム処理と計算ロジックを追加します:
    Kafka Streams アプリケーションを作成した後、特定のストリーム処理と計算ロジックを追加する必要があります。簡単な例として、「input-topic」という名前の Kafka トピックから文字列メッセージを受信し、そのメッセージに対して長さの計算を実行し、その結果を「output-topic」という名前の Kafka トピックに送信すると仮定します。以下はサンプル コードです。
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

public class KafkaStreamsApp {

    // 省略其他代码...
    
    public static void main(String[] args) {
        // 省略其他代码...
        
        KStream<String, String> inputStream = builder.stream("input-topic");
        KTable<String, Long> wordCounts = inputStream
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .groupBy((key, word) -> word)
                .count();

        wordCounts.toStream().to("output-topic");

        // 省略其他代码...
    }
}
ログイン後にコピー

上記のサンプル コードでは、最初に入力トピックから KStream オブジェクトが作成され、次に flatMapValues 操作を使用して各メッセージが単語に分割され、統計的なカウントを実行します。最後に、結果が出力トピックに送信されます。

3. デモの例:
リアルタイム ストリーム処理およびコンピューティング アプリケーションを検証するには、Kafka コマンド ライン ツールを使用してメッセージを送信し、結果を表示します。デモンストレーション例の手順は次のとおりです。

  1. 入力トピックと出力トピックを作成します。
    コマンド ラインで次のコマンドを実行して、「input-topic」および「output-topic」という名前の Kafka トピックを作成します。 topic" :
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
ログイン後にコピー
  1. 入力トピックにメッセージを送信します:
    コマンド ラインで次のコマンドを実行して、いくつかのメッセージを「input-topic」に送信します:
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
>hello world
>apache kafka streams
>real-time processing
>```

3. 查看结果:
在命令行中执行以下命令,从"output-topic"中消费结果消息:
ログイン後にコピー

bin/kafka-console-consumer.sh --topic 出力トピック --from-beginning --bootstrap-server localhost:9092

可以看到,输出的结果是单词及其对应的计数值:
ログイン後にコピー

real-time: 1
processing : 1
apache: 1
kafka: 1
streams: 1
hello: 2
world: 1

结论:
通过上述示例,我们了解了如何使用Apache Kafka Streams进行实时流处理和计算。可以根据实际需求,编写更复杂的流处理和计算逻辑,并通过Kafka命令行工具来验证和查看结果。希望本文对于Java开发人员在实时流处理和计算领域有所帮助。

参考文档:
1. Apache Kafka官方文档:https://kafka.apache.org/documentation/
2. Kafka Streams官方文档:https://kafka.apache.org/documentation/streams/
ログイン後にコピー

以上がJava 開発: Apache Kafka Streams を使用してリアルタイムのストリーム処理とコンピューティングを行う方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート