Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法
Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法
ストリーム処理は、リアルタイム データ ストリームを処理し、データを処理できるテクノロジです。到着したらすぐに分析して処理します。 Apache Kafka は、スケーラブルなストリーム処理アプリケーションを効率的に構築するために使用できる分散ストリーム処理プラットフォームです。 KSQL は、SQL クエリとリアルタイム ストリーム データの変換に使用できるオープン ソースのストリーム データ処理エンジンです。この記事では、Java を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法を紹介します。
1. 環境セットアップ
始める前に、ローカルの Kafka および KSQL 環境をセットアップする必要があります。まず、Java JDK、Apache Kafka、Confluent Platform をダウンロードしてインストールする必要があります。次に、次のコマンドを使用して Kafka と KSQL を起動します。
- Start ZooKeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties - Start Kafka Broker :
bin/kafka-server-start.sh config/server.properties - KSQL サーバーの起動:
bin/ksql-server-start.sh config/ksql-server.properties
2. Kafka トピックと KSQL テーブルの作成
Java コードを書き始める前に、Kafka トピックを作成し、そこにリアルタイム データを書き込む必要があります。次のコマンドを使用して、「example-topic」という名前のトピックを作成できます:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication-factor 1
次に、リアルタイム データのクエリと変換を行うためのテーブルを KSQL に作成する必要があります。次のコマンドを使用して、KSQL ターミナルで「example-table」という名前のテーブルを作成できます:
CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json) ', key='key');
3. Java コードの実装
Java コードを書き始める前に、Kafka と KSQL への依存関係を追加する必要があります。 Maven または Gradle 構成ファイルに次の依存関係を追加できます:
Maven:
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>
Gradle:
実装 'org.apache.kafka:kafka-clients:2.5.0'
実装 'io。 confluent:ksql-serde:0.10.0'
次に、ストリーム処理アプリケーションを実装する Java コードを記述します。以下は簡単なサンプル コードです:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.Producer.*;
import org.apache .kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams .*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.Wall ClockTimestampExtractor;
import org.apache.kafka.streams.state.* ;
import java.util.*;
import java.util.concurrent.*;
public class StreamProcessingApp {
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // Step 1: Read from Kafka topic KStream<String, String> stream = builder.stream("example-topic"); // Step 2: Transform and process the data stream.mapValues(value -> value.toUpperCase()) .filter((key, value) -> value.startsWith("A")) .to("processed-topic"); // Step 3: Create a Kafka producer to send data to another topic Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Step 4: Consume and process the data from the processed topic KStream<String, String> processedStream = builder.stream("processed-topic"); processedStream.foreach((key, value) -> { // Process the data here System.out.println("Key: " + key + ", Value: " + value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }
}
上記のコード「example-topic」トピックからリアルタイム データを読み取り、そのデータを大文字に変換し、文字「A」で始まるデータを「processed-topic」トピックに書き込む、単純なストリーム処理アプリケーションを作成します。同時に、「processed-topic」トピック内のデータも消費して処理します。
4. アプリケーションの実行
Java コードを作成した後、次のコマンドを使用してアプリケーションをコンパイルし、実行できます:
javac StreamProcessingApp.java
java StreamProcessingApp
これで、Apache Kafka と KSQL に基づくストリーム処理アプリケーションの開発に成功し、Java コードを通じてデータの読み取り、変換、処理、書き込みを実装できました。実際のニーズに応じてコードを変更および拡張し、ビジネス ニーズを満たすことができます。この記事がお役に立てば幸いです!
以上がJava を使用して Apache Kafka と KSQL に基づくストリーム処理アプリケーションを開発する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











Java業界には5つの雇用方向がありますが、どれがあなたに適していますか? Java は、ソフトウェア開発の分野で広く使用されているプログラミング言語として、常に人気があります。 Java の強力なクロスプラットフォーム性と豊富な開発フレームワークにより、Java 開発者にはさまざまな業界で幅広い雇用の機会があります。 Java 業界には、JavaWeb 開発、モバイル アプリケーション開発、ビッグ データ開発、組み込み開発、クラウド コンピューティング開発の 5 つの主要な雇用方向があります。それぞれの方向に特徴と利点がありますので、以下では 5 つの方向について説明します。

Java 開発者にとって重要: 最適な逆コンパイル ツールを推奨します。特定のコード サンプルが必要です。 はじめに: Java 開発プロセスでは、既存の Java クラスを逆コンパイルする必要がある状況によく遭遇します。逆コンパイルは、他の人のコードを理解して学習したり、修復や最適化を行うのに役立ちます。この記事では、いくつかの最高の Java 逆コンパイル ツールを推奨し、読者がこれらのツールをよりよく学習して使用できるように、いくつかの具体的なコード例を提供します。 1. JD-GUIJD-GUI は非常に人気のあるオープンソースです

IoT テクノロジーの発展に伴い、インターネットに接続し、インターネットを介して通信および対話できるデバイスがますます増えています。 IoT アプリケーションの開発では、メッセージ キュー テレメトリ トランスポート プロトコル (MQTT) が軽量の通信プロトコルとして広く使用されています。この記事では、Java開発の実務経験を活かしてMQTTによるIoT機能を実装する方法を紹介します。 1. MQT とは何ですか? QTT は、パブリッシュ/サブスクライブ モデルに基づくメッセージ送信プロトコルです。シンプルな設計と低いオーバーヘッドを備えており、少量のデータを迅速に送信するアプリケーション シナリオに適しています。

Java 開発スキルが明らかに: データの暗号化と復号化機能の実装 現在の情報化時代において、データのセキュリティは非常に重要な問題となっています。機密データのセキュリティを保護するために、多くのアプリケーションは暗号化アルゴリズムを使用してデータを暗号化します。 Java は非常に人気のあるプログラミング言語として、暗号化テクノロジとツールの豊富なライブラリも提供します。この記事では、開発者がデータのセキュリティをより適切に保護できるように、Java 開発でデータの暗号化および復号化機能を実装するためのいくつかのテクニックを紹介します。 1. データ暗号化アルゴリズムの選択 Java は多くのデータ暗号化アルゴリズムをサポートしています

Java は非常に人気のあるプログラミング言語として、常に誰からも好まれてきました。私が初めて Java 開発を学び始めたとき、メッセージ サブスクリプション システムを構築する方法という問題に遭遇したことがあります。この記事では、他の Java 初心者の役に立つことを願って、メッセージ サブスクリプション システムをゼロから構築した私の経験を共有します。ステップ 1: 適切なメッセージ キューを選択する メッセージ サブスクリプション システムを構築するには、まず適切なメッセージ キューを選択する必要があります。現在市場でよく使われているメッセージ キューには、ActiveMQ などがあります。

Javaはソフトウェア開発の分野で広く使われているプログラミング言語で、その豊富なライブラリと強力な機能を利用してさまざまなアプリケーションを開発できます。画像の圧縮とトリミングは、Web およびモバイル アプリケーション開発における一般的な要件です。この記事では、開発者が画像圧縮およびトリミング機能を実装するのに役立つ Java 開発テクニックをいくつか紹介します。まず、画像圧縮の実装について説明します。 Web アプリケーションでは、多くの場合、画像をネットワーク経由で送信する必要があります。画像が大きすぎると、読み込みに時間がかかり、より多くの帯域幅が使用されます。したがって、私たちは

Java 開発の実践経験の共有: ロギングと分析機能の構築 まとめ: ロギングはソフトウェア開発の重要な部分であり、問題の追跡と解決、アプリケーションの実行状況の把握に役立ちます。この記事では、Java 開発において効率的なロギングおよび分析機能を構築する方法を紹介します。ログの重要性、適切なログ フレームワークの選択方法、ロガーの設定と使用方法について説明し、一般的なログ分析のヒントをいくつか紹介します。キーワード: Java開発、ロギング、ログ分析、ログフレームワーク

Java開発の実践経験の共有:分散ログ収集機能の構築 はじめに: インターネットの急速な発展と大規模データの出現に伴い、分散システムの適用はますます広がっています。分散システムでは、ログの収集と分析は非常に重要な部分です。この記事では、Java 開発で分散ログ収集機能を構築した経験を共有し、読者の参考になれば幸いです。 1. 背景の紹介 分散システムでは、各ノードが大量のログ情報を生成します。これらのログ情報は、システム パフォーマンスの監視、トラブルシューティング、データ分析に役立ちます。
