


Développement Java : comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel
Développement Java : Comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel
Introduction :
Avec l'essor du Big Data et de l'informatique en temps réel, Apache Kafka Streams, en tant que moteur de traitement de flux, est en train d'être utilisé de plus en plus Utilisé par les développeurs. Il fournit un moyen simple mais puissant de gérer des données de streaming en temps réel et d'effectuer des traitements et des calculs de flux complexes. Cet article explique comment utiliser Apache Kafka Streams pour le traitement et le calcul de flux en temps réel, y compris la configuration de l'environnement, l'écriture de code et des exemples de démonstrations.
1. Préparation :
- Installer et configurer Apache Kafka : Vous devez télécharger et installer Apache Kafka et démarrer le cluster Apache Kafka. Pour une installation et une configuration détaillées, veuillez vous référer à la documentation officielle d'Apache Kafka.
- Introduire des dépendances : introduisez les dépendances liées à Kafka Streams dans le projet Java. Par exemple, en utilisant Maven, vous pouvez ajouter les dépendances suivantes dans les informations de connexion au cluster pom du projet. Voici un exemple de code simple :
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.1</version> </dependency>
- Après avoir créé une application Kafka Streams, vous devez ajouter une logique de traitement et de calcul de flux spécifique. Prenant un exemple simple, nous supposons que nous recevons un message sous forme de chaîne d'un sujet Kafka nommé "input-topic", effectuons un calcul de longueur sur le message, puis envoyons le résultat à un sujet Kafka nommé "output-topic" . Voici un exemple de code :
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class KafkaStreamsApp { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); StreamsBuilder builder = new StreamsBuilder(); // 在这里添加流处理和计算逻辑 Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); // 添加Shutdown Hook,确保应用程序在关闭时能够优雅地停止 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
Copier après la connexion Dans l'exemple de code ci-dessus, un objet KStream est d'abord créé à partir du sujet d'entrée, puis chaque message est divisé en mots à l'aide de l'opération flatMapValues et compté statistiquement. Enfin, les résultats sont envoyés au sujet de sortie.
- 3. Exemple de démonstration :
- Afin de vérifier nos applications de traitement de flux et de calcul en temps réel, vous pouvez utiliser l'outil de ligne de commande Kafka pour envoyer des messages et afficher les résultats. Voici les étapes pour un exemple de démonstration :
Exécutez les commandes suivantes dans la ligne de commande pour créer des sujets Kafka nommés "sujet d'entrée" et "sujet de sortie" :
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; public class KafkaStreamsApp { // 省略其他代码... public static void main(String[] args) { // 省略其他代码... KStream<String, String> inputStream = builder.stream("input-topic"); KTable<String, Long> wordCounts = inputStream .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("output-topic"); // 省略其他代码... } }
Envoyer un message Vers le sujet d'entrée :
- Exécutez la commande suivante dans la ligne de commande pour envoyer des messages à "input-topic" :
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Copier après la connexionbin/kafka-console-consumer.sh --topic output-topic --from- début --bootstrap -server localhost : 9092
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092 >hello world >apache kafka streams >real-time processing >``` 3. 查看结果: 在命令行中执行以下命令,从"output-topic"中消费结果消息:
- temps réel : 1
- traitement : 1
apache : 1 kafka : 1
bonjour : 2
monde : 1可以看到,输出的结果是单词及其对应的计数值:
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

Video Face Swap
Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Guide du nombre parfait en Java. Nous discutons ici de la définition, comment vérifier le nombre parfait en Java ?, des exemples d'implémentation de code.

Guide de Weka en Java. Nous discutons ici de l'introduction, de la façon d'utiliser Weka Java, du type de plate-forme et des avantages avec des exemples.

Guide du nombre de Smith en Java. Nous discutons ici de la définition, comment vérifier le numéro Smith en Java ? exemple avec implémentation de code.

Dans cet article, nous avons conservé les questions d'entretien Java Spring les plus posées avec leurs réponses détaillées. Pour que vous puissiez réussir l'interview.

Java 8 présente l'API Stream, fournissant un moyen puissant et expressif de traiter les collections de données. Cependant, une question courante lors de l'utilisation du flux est: comment se casser ou revenir d'une opération FOREAK? Les boucles traditionnelles permettent une interruption ou un retour précoce, mais la méthode Foreach de Stream ne prend pas directement en charge cette méthode. Cet article expliquera les raisons et explorera des méthodes alternatives pour la mise en œuvre de terminaison prématurée dans les systèmes de traitement de flux. Lire plus approfondie: Améliorations de l'API Java Stream Comprendre le flux Forach La méthode foreach est une opération terminale qui effectue une opération sur chaque élément du flux. Son intention de conception est

Guide de TimeStamp to Date en Java. Ici, nous discutons également de l'introduction et de la façon de convertir l'horodatage en date en Java avec des exemples.

Les capsules sont des figures géométriques tridimensionnelles, composées d'un cylindre et d'un hémisphère aux deux extrémités. Le volume de la capsule peut être calculé en ajoutant le volume du cylindre et le volume de l'hémisphère aux deux extrémités. Ce tutoriel discutera de la façon de calculer le volume d'une capsule donnée en Java en utilisant différentes méthodes. Formule de volume de capsule La formule du volume de la capsule est la suivante: Volume de capsule = volume cylindrique volume de deux hémisphères volume dans, R: Le rayon de l'hémisphère. H: La hauteur du cylindre (à l'exclusion de l'hémisphère). Exemple 1 entrer Rayon = 5 unités Hauteur = 10 unités Sortir Volume = 1570,8 unités cubes expliquer Calculer le volume à l'aide de la formule: Volume = π × r2 × h (4

Java est un langage de programmation populaire qui peut être appris aussi bien par les développeurs débutants que par les développeurs expérimentés. Ce didacticiel commence par les concepts de base et progresse vers des sujets avancés. Après avoir installé le kit de développement Java, vous pouvez vous entraîner à la programmation en créant un simple programme « Hello, World ! ». Une fois que vous avez compris le code, utilisez l'invite de commande pour compiler et exécuter le programme, et « Hello, World ! » s'affichera sur la console. L'apprentissage de Java commence votre parcours de programmation et, à mesure que votre maîtrise s'approfondit, vous pouvez créer des applications plus complexes.
