Avec l'avènement de l'ère du big data, nous avons souvent besoin de traiter et d'analyser des données en temps réel. La technologie de traitement de flux en temps réel est devenue une méthode courante pour traiter des données en temps réel à grande échelle en raison de ses hautes performances, de sa grande évolutivité et de sa faible latence. Dans la technologie de traitement de flux en temps réel, Kafka et Flink sont des composants courants et ont été largement utilisés dans de nombreux systèmes de traitement de données au niveau de l'entreprise. Dans cet article, nous expliquerons comment utiliser Kafka et Flink dans Beego pour le traitement de flux en temps réel.
1. Introduction à Kafka
Apache Kafka est une plateforme de traitement de flux distribuée. Il découple les données en un flux (données en streaming) et les distribue sur plusieurs nœuds, offrant ainsi des performances élevées, une haute disponibilité, une évolutivité élevée et certaines fonctionnalités avancées, telles que la garantie Exactly-Once. Le rôle principal de Kafka est celui d'un système de messagerie fiable pouvant être utilisé pour résoudre des problèmes de communication entre plusieurs composants dans des systèmes distribués et une transmission fiable des messages.
2. Introduction à Flink
Flink est un framework de traitement de flux Big Data distribué et hautes performances, piloté par des événements. Il prend en charge le traitement de flux et par lots, possède des capacités de traitement de requêtes et de flux de type SQL, prend en charge l'informatique en streaming hautement composable et offre une prise en charge riche en fenêtres et en stockage de données.
3. Kafka dans Beego
L'utilisation de Kafka dans Beego est principalement divisée en deux parties, à savoir le consommateur Kafka et le producteur Kafka.
L'utilisation de Kafka Producer dans Beego peut facilement envoyer des données au cluster Kafka. Voici un exemple de la façon d'utiliser Kafka Producer dans Beego :
import ( "github.com/Shopify/sarama" ) func main() { // 创建 kafka 生产者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil) if err != nil { // 处理错误情况 panic(err) } // 创建 Kafka 消息 msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello, World!"), } // 发送消息 partition, offset, err := producer.SendMessage(msg) if err != nil { // 处理错误情况 panic(err) } fmt.Printf("消息已发送到分区 %d 的偏移量 %d 中 ", partition, offset) // 关闭 Kafka 生产者 producer.Close() }
L'utilisation de consommateurs Kafka dans Beego peut. Obtenez facilement des données du cluster Kafka. Voici un exemple d'utilisation des consommateurs Kafka dans Beego :
import ( "github.com/Shopify/sarama" ) func main() { // 创建 kafka 消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { // 处理错误情况 panic(err) } // 订阅 Topic partitions, err := consumer.Partitions("test") if err != nil { // 处理错误情况 panic(err) } for _, partition := range partitions { // 从分区的开头读取数据 partitionConsumer, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest) // 处理数据 go func(partitionConsumer sarama.PartitionConsumer) { for { select { case msg := <-partitionConsumer.Messages(): // 处理消息 fmt.Printf("收到消息: %v", string(msg.Value)) } } }(partitionConsumer) } // 关闭 Kafka 消费者 defer consumer.Close() }
4. L'utilisation de Flink dans Beego peut être effectuée directement via l'API Java de Flink et l'intégralité. Le processus est terminé grâce à l'interaction Cgo entre Java et Go. Vous trouverez ci-dessous un exemple simple de Flink où la fréquence de chaque mot de texte Socket est calculée via un traitement de flux en temps réel. Dans cet exemple, nous lisons le flux de données texte donné dans Flink, puis utilisons les opérateurs de Flink pour opérer sur le flux de données et enfin générons les résultats sur la console.
Créer une source de données texte Socketimport org.apache.flink.streaming.api.functions.source.SourceFunction; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; public class SocketTextStreamFunction implements SourceFunction<String> { private final String hostname; private final int port; public SocketTextStreamFunction(String hostname, int port) { this.hostname = hostname; this.port = port; } public void run(SourceContext<String> context) throws Exception { Socket socket = new Socket(hostname, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line; while ((line = reader.readLine()) != null) { context.collect(line); } reader.close(); socket.close(); } public void cancel() {} }
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { String hostname = "localhost"; int port = 9999; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 Socket 中读取数据流 DataStream<String> text = env.addSource(new SocketTextStreamFunction(hostname, port)); // 计算每个单词的出现频率 DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.toLowerCase().split("\W+"); for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }) .keyBy(0) .timeWindow(Time.seconds(5)) .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { int sum = 0; for (Tuple2<String, Integer> t : input) { sum += t.f1; } out.collect(new Tuple2<String, Integer>((String) key.getField(0), sum)); } }); // 打印到控制台 wordCounts.print(); env.execute("Socket Text Stream Word Count"); } }
Cet article présente comment utiliser Kafka et Flink dans Beego pour le traitement de flux en temps réel. Kafka peut être utilisé comme système de messagerie fiable et peut être utilisé pour résoudre des problèmes de communication entre plusieurs composants dans des systèmes distribués et une transmission fiable de messages. Flink est un framework de traitement de flux de Big Data distribué et hautes performances, piloté par les événements. Dans les applications pratiques, nous pouvons choisir en toute flexibilité d'utiliser des technologies telles que Kafka et Flink en fonction de besoins spécifiques pour résoudre les défis du traitement de données en temps réel à grande échelle.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!