Java를 사용하여 Apache Kafka 및 KSQL 기반의 스트림 처리 애플리케이션을 개발하는 방법
스트림 처리는 데이터가 도착하는 즉시 분석하고 처리할 수 있도록 실시간 데이터 스트림을 처리하는 기술입니다. Apache Kafka는 확장 가능한 스트림 처리 애플리케이션을 효율적으로 구축하는 데 사용할 수 있는 분산 스트림 처리 플랫폼입니다. KSQL은 실시간 스트림 데이터의 SQL 쿼리 및 변환에 사용할 수 있는 오픈 소스 스트림 데이터 처리 엔진입니다. 이 기사에서는 Java를 사용하여 Apache Kafka 및 KSQL 기반의 스트림 처리 애플리케이션을 개발하는 방법을 소개합니다.
1. 환경 설정
시작하기 전에 로컬 Kafka 및 KSQL 환경을 설정해야 합니다. 먼저 Java JDK, Apache Kafka 및 Confluent Platform을 다운로드하여 설치해야 합니다. 그런 다음 다음 명령을 사용하여 Kafka 및 KSQL을 시작할 수 있습니다.
2. Kafka 주제 및 KSQL 테이블 만들기
Java 코드 작성을 시작하기 전에 먼저 해야 할 일이 있습니다. Kafka 주제를 생성하고 여기에 실시간 데이터를 씁니다. 다음 명령을 사용하여 "example-topic"이라는 주제를 생성할 수 있습니다:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example-topic --partitions 1 --replication - 요소 1
다음으로 실시간 데이터를 쿼리하고 변환하기 위해 KSQL에서 테이블을 생성해야 합니다. 다음 명령을 사용하여 KSQL 터미널에서 "example-table"이라는 테이블을 생성할 수 있습니다:
CREATE TABLE example_table (key VARCHAR, value VARCHAR) WITH (kafka_topic='example-topic', value_format='json', key= 'key');
3. Java 코드 구현
Java 코드 작성을 시작하기 전에 Kafka 및 KSQL에 대한 종속성을 추가해야 합니다. Maven 또는 Gradle 구성 파일에 다음 종속성을 추가할 수 있습니다.
Maven:
<groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version>
<groupId>io.confluent</groupId> <artifactId>ksql-serde</artifactId> <version>0.10.0</version>
Gradle:
implementation 'org.apache.kafka:kafka-clients:2.5.0'
implementation 'io.confluent:ksql-serde:0.10.0'
다음으로 스트림 처리 애플리케이션을 구현하는 Java 코드를 작성할 수 있습니다. 다음은 간단한 예제 코드입니다.
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream . *;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.state.*;
import java.util.*;
import java.util.concurrent.*;
public class StreamProcessingApp {
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // Step 1: Read from Kafka topic KStream<String, String> stream = builder.stream("example-topic"); // Step 2: Transform and process the data stream.mapValues(value -> value.toUpperCase()) .filter((key, value) -> value.startsWith("A")) .to("processed-topic"); // Step 3: Create a Kafka producer to send data to another topic Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); // Step 4: Consume and process the data from the processed topic KStream<String, String> processedStream = builder.stream("processed-topic"); processedStream.foreach((key, value) -> { // Process the data here System.out.println("Key: " + key + ", Value: " + value); }); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }
}
위 코드는 "example-topic" 주제의 실시간 데이터를 읽고 데이터를 대문자로 변환한 후 문자 "A"로 쓰는 간단한 스트림 처리 애플리케이션을 구현합니다. 처음의 데이터는 "processed-topic" 주제에 기록됩니다. 동시에 "처리된 주제" 주제의 데이터도 소비하고 처리합니다.
4. 애플리케이션 실행
Java 코드를 작성한 후 다음 명령을 사용하여 애플리케이션을 컴파일하고 실행할 수 있습니다.
javac StreamProcessingApp.java
java StreamProcessingApp
이제 Apache Kafka 기반 스트림을 성공적으로 개발했습니다. KSQL 애플리케이션을 처리하고 Java 코드를 통해 데이터 읽기, 변환, 처리 및 쓰기를 구현합니다. 비즈니스 요구 사항을 충족하기 위해 실제 요구 사항에 따라 코드를 수정하고 확장할 수 있습니다. 이 기사가 도움이 되기를 바랍니다!
위 내용은 Java를 사용하여 Apache Kafka 및 KSQL 기반 스트림 처리 애플리케이션을 개발하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!