Heim > Java > javaLernprogramm > Wie man mit Java eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL entwickelt

Wie man mit Java eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL entwickelt

WBOY
Freigeben: 2023-09-21 08:23:04
Original
1062 Leute haben es durchsucht

如何使用Java开发一个基于Apache Kafka和KSQL的流处理应用

So entwickeln Sie eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL mit Java

Stream-Verarbeitung ist eine Technologie, die Echtzeit-Datenströme verarbeitet, sodass die Daten analysiert und verarbeitet werden können, sobald sie ankommen. Apache Kafka ist eine verteilte Stream-Verarbeitungsplattform, mit der sich effizient skalierbare Stream-Verarbeitungsanwendungen erstellen lassen. KSQL ist eine Open-Source-Stream-Datenverarbeitungs-Engine, die für SQL-Abfragen und die Konvertierung von Echtzeit-Stream-Daten verwendet werden kann. In diesem Artikel stellen wir vor, wie Sie mit Java eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL entwickeln.

1. Umgebungseinrichtung
Bevor wir beginnen, müssen wir eine lokale Kafka- und KSQL-Umgebung einrichten. Zuerst müssen wir Java JDK, Apache Kafka und Confluent Platform herunterladen und installieren. Anschließend können wir Kafka und KSQL mit den folgenden Befehlen starten:

  1. ZooKeeper starten:
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Kafka Broker starten:
    bin/kafka-server-start.sh config /server.properties
  3. Starten Sie KSQL Server:
    bin/ksql-server-start.sh config/ksql-server.properties

2. Erstellen Sie ein Kafka-Thema und eine KSQL-Tabelle
Bevor wir mit dem Schreiben von Java-Code beginnen, müssen wir zunächst Folgendes tun Erstellen Sie ein Kafka-Thema und schreiben Sie Echtzeitdaten hinein. Mit dem folgenden Befehl können wir ein Thema namens „example-topic“ erstellen:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - Faktor 1

Als nächstes müssen wir in KSQL eine Tabelle zum Abfragen und Transformieren von Echtzeitdaten erstellen. Mit dem folgenden Befehl können wir im KSQL-Terminal eine Tabelle mit dem Namen „example-table“ erstellen:

CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json', key= 'key');

3. Java-Code-Implementierung
Bevor wir mit dem Schreiben von Java-Code beginnen, müssen wir Abhängigkeiten von Kafka und KSQL hinzufügen. Wir können die folgenden Abhängigkeiten in der Maven- oder Gradle-Konfigurationsdatei hinzufügen:

Maven:

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
Nach dem Login kopieren


<groupId>io.confluent</groupId>
<artifactId>ksql-serde</artifactId>
<version>0.10.0</version>
Nach dem Login kopieren

Gradle:

implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'io.confluent:ksql-serde:0.10.0'

Als nächstes können wir Java-Code schreiben, um die Stream-Verarbeitungsanwendung zu implementieren. Hier ist ein einfacher Beispielcode:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream . *;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.*;
import java.util.*;
import java.util.concurrent.*;

öffentliche Klasse StreamProcessingApp {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    StreamsBuilder builder = new StreamsBuilder();

    // Step 1: Read from Kafka topic
    KStream<String, String> stream = builder.stream("example-topic");

    // Step 2: Transform and process the data
    stream.mapValues(value -> value.toUpperCase())
          .filter((key, value) -> value.startsWith("A"))
          .to("processed-topic");

    // Step 3: Create a Kafka producer to send data to another topic
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    // Step 4: Consume and process the data from the processed topic
    KStream<String, String> processedStream = builder.stream("processed-topic");
    processedStream.foreach((key, value) -> {
        // Process the data here
        System.out.println("Key: " + key + ", Value: " + value);
    });

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
}
Nach dem Login kopieren

}

Der obige Code implementiert eine einfache Stream-Verarbeitungsanwendung, die Echtzeitdaten im Thema „Beispielthema“ liest, die Daten in Großbuchstaben umwandelt und sie mit dem Buchstaben „A“ schreibt. Die Daten am Anfang werden in das Thema „processed-topic“ geschrieben. Gleichzeitig werden auch die Daten im Thema „verarbeitetes Thema“ verbraucht und verarbeitet.

4. Führen Sie die Anwendung aus
Nachdem wir den Java-Code geschrieben haben, können wir die folgenden Befehle verwenden, um die Anwendung zu kompilieren und auszuführen:

javac StreamProcessingApp.java
java StreamProcessingApp

Jetzt haben wir erfolgreich einen Stream basierend auf Apache Kafka entwickelt und KSQL-Verarbeitungsanwendungen und Implementierung des Lesens, Konvertierens, Verarbeitens und Schreibens von Daten durch Java-Code. Sie können den Code entsprechend den tatsächlichen Anforderungen ändern und erweitern, um Ihren Geschäftsanforderungen gerecht zu werden. Ich hoffe, dieser Artikel ist hilfreich für Sie!

Das obige ist der detaillierte Inhalt vonWie man mit Java eine Stream-Verarbeitungsanwendung basierend auf Apache Kafka und KSQL entwickelt. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage