Heim > Java > javaLernprogramm > Erstellen Sie Stream-Verarbeitungsanwendungen mit Spring Boot und Apache Kafka Streams

Erstellen Sie Stream-Verarbeitungsanwendungen mit Spring Boot und Apache Kafka Streams

WBOY
Freigeben: 2023-06-23 08:32:22
Original
1571 Leute haben es durchsucht

Mit dem Aufkommen des Big-Data-Zeitalters beginnen immer mehr Unternehmen, der Stream-Processing-Technologie Aufmerksamkeit zu schenken, um den Anforderungen der Echtzeit-Datenverarbeitung und -analyse gerecht zu werden. Apache Kafka ist ein skalierbares verteiltes Nachrichtenwarteschlangensystem mit hohem Durchsatz, das zum De-facto-Standard im Bereich der Stream-Verarbeitung geworden ist. Spring Boot ist ein Tool zur schnellen Entwicklung von Spring-Anwendungen, mit dem wir Stream-Verarbeitungsanwendungen schneller und einfacher erstellen können. In diesem Artikel wird erläutert, wie Sie mithilfe von Spring Boot und Apache Kafka Streams eine Stream-Verarbeitungsanwendung erstellen. Außerdem werden die Vor- und Nachteile dieser beiden Tools sowie die Optimierung der Anwendungsleistung erörtert.

  1. Erstellen Sie ein Kafka-Thema.

Bevor wir mit der Erstellung der Anwendung beginnen, müssen wir zunächst ein Kafka-Thema erstellen. In diesem Artikel erstellen wir ein Thema namens „Benutzerklicks“, in dem Benutzerklickereignisse auf der Website gespeichert werden.

Führen Sie den folgenden Befehl in der Befehlszeile aus:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user-clicks
Nach dem Login kopieren

Dadurch wird ein Thema namens „user-clicks“ auf dem Kafka-Server mit nur einer Partition und einer lokalen Kopie erstellt.

  1. Erstellen einer Spring Boot-Anwendung

Als nächstes erstellen wir eine Basisanwendung mit Spring Boot. In Spring Boot können wir Spring Initializr verwenden, um schnell eine Basisanwendung zu erstellen. Stellen Sie beim Erstellen der Anwendung sicher, dass Sie die folgenden Abhängigkeiten auswählen:

  • Spring Kafka
  • Spring Web

Nach dem Erstellen der Anwendung fügen wir die folgenden Abhängigkeiten hinzu:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>2.6.0</version>
</dependency>
Nach dem Login kopieren

Dadurch erhalten wir die Kafka-Stream-Verarbeitungs-API.

  1. Implementieren der Kafka-Stream-Verarbeitung

Jetzt können wir mit dem Schreiben von Kafka-Stream-Verarbeitungscode beginnen. Beim Erstellen der Anwendung haben wir eine Controller-Klasse namens „UserController“ definiert. Jetzt fügen wir der Controller-Klasse einen POST-Request-Handler namens „clicks“ hinzu. Dieser Handler ruft die Klickereignisse des Benutzers aus der POST-Anfrage ab und sendet sie an ein Kafka-Thema namens „user-clicks“. Der Code lautet wie folgt:

@RestController
public class UserController {

   private final KafkaTemplate<String, String> kafkaTemplate;

   @Autowired
   public UserController(KafkaTemplate<String, String> kafkaTemplate) {
       this.kafkaTemplate = kafkaTemplate;
   }

   @PostMapping("/clicks")
   public void clicks(@RequestBody String click) {
       kafkaTemplate.send("user-clicks", click);
   }
}
Nach dem Login kopieren

Im obigen Code verwenden wir die Abhängigkeitsinjektionsfunktion von Spring, um ein KafkaTemplate-Objekt mit dem Namen „kafkaTemplate“ zu injizieren. Mit diesem Objekt können Nachrichten an Kafka-Themen gesendet werden.

  1. Erstellen Sie eine Kafka-Streaming-Topologie.

Als Nächstes erstellen wir eine Kafka-Streaming-Topologie für die Verarbeitung von Klickereignissen, die vom Thema „Benutzerklicks“ empfangen werden. In unserem Beispiel verwenden wir die Kafka Streams API, um die Stream-Verarbeitungstopologie zu implementieren.

In der Spring Boot-Anwendung erstellen wir eine Klasse namens „UserClicksStream“, die die Kafka Streams API zur Verarbeitung von Klickereignissen verwendet. Der Code lautet wie folgt:

@Configuration
@EnableKafkaStreams
public class UserClicksStream {

   @Value("${spring.kafka.bootstrap-servers}")
   private String bootstrapServers;

   @Bean
   public KStream<String, String> kStream(StreamsBuilder builder) {

       KStream<String, String> stream = builder.stream("user-clicks");

       stream.foreach((key, value) -> {
           System.out.println("Received: " + value);
       });

       return stream;
   }

   @Bean
   public KafkaStreams kafkaStreams(StreamsBuilder builder) {
       Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-clicks-stream");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       return new KafkaStreams(builder.build(), props);
   }
}
Nach dem Login kopieren

Im obigen Code verwenden wir die Abhängigkeitsinjektionsfunktion von Spring, um ein StreamsBuilder-Objekt mit dem Namen „StreamsBuilder“ zu injizieren. Dieses Objekt wird zum Erstellen einer Kafka-Stream-Verarbeitungstopologie verwendet.

In der kStream-Methode erstellen wir ein KStream-Objekt aus dem Thema „Benutzerklicks“ und drucken die empfangenen Ereignisse mithilfe der foreach-Methode aus. froeach ist eine Terminaloperation, die wir in späteren Schritten verwenden werden.

In der kafkaStreams-Methode erstellen wir eine Anwendung mit dem Namen „user-clicks-stream“ und geben die Adresse des Kafka-Servers an. Diese Anwendung führt automatisch die Stream-Verarbeitungsvorgänge aus, die wir in der vorherigen Topologie definiert haben.

  1. Führen Sie die Anwendung aus

Jetzt haben wir den gesamten Code für die Anwendung geschrieben. Bevor wir die Anwendung ausführen, müssen wir den Kafka-Server starten.

Führen Sie den folgenden Befehl in der Befehlszeile aus:

bin/kafka-server-start.sh config/server.properties
Nach dem Login kopieren

Dadurch wird der Kafka-Server gestartet. Jetzt können wir mit unserer Bewerbung beginnen.

Führen Sie den folgenden Befehl in der Befehlszeile aus:

mvn spring-boot:run
Nach dem Login kopieren

Dadurch wird unsere Anwendung gestartet. Jetzt können wir mit jedem HTTP-Client wie cURL oder Postman eine POST-Anfrage an die Anwendung senden. Jede Anfrage generiert ein Klickereignis und druckt es in der Konsole aus.

Wenn wir weitere Vorgänge in der Topologie ausführen möchten (z. B. Aggregation, Fensterberechnung usw.), können wir andere von der Kafka Streams API bereitgestellte Vorgänge zum Erstellen der Topologie verwenden.

  1. Zusammenfassung

Das Erstellen von Stream-Verarbeitungsanwendungen mit Spring Boot und Apache Kafka Streams ist eine schnelle und bequeme Möglichkeit, uns dabei zu helfen, Echtzeitdaten einfacher zu verarbeiten. Wir müssen jedoch auf einige Aspekte der Optimierungsleistung achten, z. B. Topologiedesign, Puffergröße, Stream-Verarbeitungszeit usw. Wenn wir diese Probleme verstehen, können wir effizientere Stream-Verarbeitungsanwendungen erstellen.

Das obige ist der detaillierte Inhalt vonErstellen Sie Stream-Verarbeitungsanwendungen mit Spring Boot und Apache Kafka Streams. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage