Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法
ストリーム処理は、リアルタイム データ ストリームを処理し、データを処理できるテクノロジです。到着したらすぐに分析して処理します。 Apache Kafka は、スケーラブルなストリーム処理アプリケーションを効率的に構築するために使用できる分散ストリーム処理プラットフォームです。 KSQL は、SQL クエリとリアルタイム ストリーム データの変換に使用できるオープン ソースのストリーム データ処理エンジンです。この記事では、Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法を紹介します。
1. 環境セットアップ
始める前に、ローカルの Kafka および KSQL 環境をセットアップする必要があります。まず、Java JDK、Apache Kafka、Confluent Platform をダウンロードしてインストールする必要があります。次に、次のコマンドを使用して Kafka と KSQL を起動します。
2. Kafka トピックと KSQL テーブルの作成
Java コードを書き始める前に、Kafka トピックを作成し、そこにリアルタイム データを書き込む必要があります。次のコマンドを使用して、「example-topic」という名前のトピックを作成できます:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication-factor 1
次に、リアルタイム データのクエリと変換を行うためのテーブルを KSQL に作成する必要があります。次のコマンドを使用して、KSQL ターミナルで「example-table」という名前のテーブルを作成できます:
CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json) ', key='key');
3. Java コードの実装
Java コードを書き始める前に、Kafka と KSQL への依存関係を追加する必要があります。 Maven または Gradle 構成ファイルに次の依存関係を追加できます:
Maven:
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>
Gradle:
実装 'org.apache.kafka:kafka-clients:2.5.0'
実装 'io。 confluent:ksql-serde:0.10.0'
次に、ストリーム処理アプリケーションを実装する Java コードを記述します。以下は簡単なサンプル コードです:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.Producer.*;
import org.apache .kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams .*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.Wall ClockTimestampExtractor;
import org.apache.kafka.streams.state.* ;
import java.util.*;
import java.util.concurrent.*;
public class StreamProcessingApp {
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // Step 1: Read from Kafka topic KStream<String, String> stream = builder.stream("example-topic"); // Step 2: Transform and process the data stream.mapValues(value -> value.toUpperCase()) .filter((key, value) -> value.startsWith("A")) .to("processed-topic"); // Step 3: Create a Kafka producer to send data to another topic Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Step 4: Consume and process the data from the processed topic KStream<String, String> processedStream = builder.stream("processed-topic"); processedStream.foreach((key, value) -> { // Process the data here System.out.println("Key: " + key + ", Value: " + value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }
}
上記のコード「example-topic」トピックからリアルタイム データを読み取り、そのデータを大文字に変換し、文字「A」で始まるデータを「processed-topic」トピックに書き込む、単純なストリーム処理アプリケーションを作成します。同時に、「processed-topic」トピック内のデータも消費して処理します。
4. アプリケーションの実行
Java コードを作成した後、次のコマンドを使用してアプリケーションをコンパイルし、実行できます:
javac StreamProcessingApp.java
java StreamProcessingApp
これで、Apache Kafka と KSQL に基づくストリーム処理アプリケーションの開発に成功し、Java コードを通じてデータの読み取り、変換、処理、書き込みを実装できました。実際のニーズに応じてコードを変更および拡張し、ビジネス ニーズを満たすことができます。この記事がお役に立てば幸いです!
以上がJava を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。