Maison > développement back-end > Golang > Traitement de flux en temps réel à l'aide de Kafka et Flink dans Beego

Traitement de flux en temps réel à l'aide de Kafka et Flink dans Beego

WBOY
Libérer: 2023-06-22 16:18:33
original
1510 Les gens l'ont consulté

Avec l'avènement de l'ère du big data, nous avons souvent besoin de traiter et d'analyser des données en temps réel. La technologie de traitement de flux en temps réel est devenue une méthode courante pour traiter des données en temps réel à grande échelle en raison de ses hautes performances, de sa grande évolutivité et de sa faible latence. Dans la technologie de traitement de flux en temps réel, Kafka et Flink sont des composants courants et ont été largement utilisés dans de nombreux systèmes de traitement de données au niveau de l'entreprise. Dans cet article, nous expliquerons comment utiliser Kafka et Flink dans Beego pour le traitement de flux en temps réel.

1. Introduction à Kafka

Apache Kafka est une plateforme de traitement de flux distribuée. Il découple les données en un flux (données en streaming) et les distribue sur plusieurs nœuds, offrant ainsi des performances élevées, une haute disponibilité, une évolutivité élevée et certaines fonctionnalités avancées, telles que la garantie Exactly-Once. Le rôle principal de Kafka est celui d'un système de messagerie fiable pouvant être utilisé pour résoudre des problèmes de communication entre plusieurs composants dans des systèmes distribués et une transmission fiable des messages.

2. Introduction à Flink

Flink est un framework de traitement de flux Big Data distribué et hautes performances, piloté par des événements. Il prend en charge le traitement de flux et par lots, possède des capacités de traitement de requêtes et de flux de type SQL, prend en charge l'informatique en streaming hautement composable et offre une prise en charge riche en fenêtres et en stockage de données.

3. Kafka dans Beego

L'utilisation de Kafka dans Beego est principalement divisée en deux parties, à savoir le consommateur Kafka et le producteur Kafka.

  1. Kafka Producer

L'utilisation de Kafka Producer dans Beego peut facilement envoyer des données au cluster Kafka. Voici un exemple de la façon d'utiliser Kafka Producer dans Beego :

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 创建 kafka 生产者
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    // 创建 Kafka 消息
    msg := &sarama.ProducerMessage{
        Topic: "test",
        Value: sarama.StringEncoder("Hello, World!"),
    }

    // 发送消息
    partition, offset, err := producer.SendMessage(msg)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    fmt.Printf("消息已发送到分区 %d 的偏移量 %d 中
", partition, offset)

    // 关闭 Kafka 生产者
    producer.Close()
}
Copier après la connexion
  1. Kafka Consumer

L'utilisation de consommateurs Kafka dans Beego peut. Obtenez facilement des données du cluster Kafka. Voici un exemple d'utilisation des consommateurs Kafka dans Beego :

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 创建 kafka 消费者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    // 订阅 Topic
    partitions, err := consumer.Partitions("test")

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    for _, partition := range partitions {
        // 从分区的开头读取数据
        partitionConsumer, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest)

        // 处理数据
        go func(partitionConsumer sarama.PartitionConsumer) {
            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    // 处理消息
                    fmt.Printf("收到消息: %v", string(msg.Value))
                }
            }
        }(partitionConsumer)
    }

    // 关闭 Kafka 消费者
    defer consumer.Close()
}
Copier après la connexion

4. L'utilisation de Flink dans Beego peut être effectuée directement via l'API Java de Flink et l'intégralité. Le processus est terminé grâce à l'interaction Cgo entre Java et Go. Vous trouverez ci-dessous un exemple simple de Flink où la fréquence de chaque mot de texte Socket est calculée via un traitement de flux en temps réel. Dans cet exemple, nous lisons le flux de données texte donné dans Flink, puis utilisons les opérateurs de Flink pour opérer sur le flux de données et enfin générons les résultats sur la console.

Créer une source de données texte Socket
  1. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.Socket;
    
    public class SocketTextStreamFunction implements SourceFunction<String> {
        private final String hostname;
        private final int port;
    
        public SocketTextStreamFunction(String hostname, int port) {
            this.hostname = hostname;
            this.port = port;
        }
    
        public void run(SourceContext<String> context) throws Exception {
            Socket socket = new Socket(hostname, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String line;
            while ((line = reader.readLine()) != null) {
                context.collect(line);
            }
            reader.close();
            socket.close();
        }
    
        public void cancel() {}
    }
    Copier après la connexion
Calculer la fréquence de chaque mot
  1. import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    public class SocketTextStreamWordCount {
        public static void main(String[] args) throws Exception {
            String hostname = "localhost";
            int port = 9999;
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 从 Socket 中读取数据流
            DataStream<String> text = env.addSource(new SocketTextStreamFunction(hostname, port));
    
            // 计算每个单词的出现频率
            DataStream<Tuple2<String, Integer>> wordCounts = text
                    .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                            String[] words = value.toLowerCase().split("\W+");
                            for (String word : words) {
                                out.collect(new Tuple2<String, Integer>(word, 1));
                            }
                        }
                    })
                    .keyBy(0)
                    .timeWindow(Time.seconds(5))
                    .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                        public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                            int sum = 0;
                            for (Tuple2<String, Integer> t : input) {
                                sum += t.f1;
                            }
                            out.collect(new Tuple2<String, Integer>((String) key.getField(0), sum));
                        }
                    });
    
            // 打印到控制台
            wordCounts.print();
    
            env.execute("Socket Text Stream Word Count");
        }
    }
    Copier après la connexion
    5. Conclusion

    Cet article présente comment utiliser Kafka et Flink dans Beego pour le traitement de flux en temps réel. Kafka peut être utilisé comme système de messagerie fiable et peut être utilisé pour résoudre des problèmes de communication entre plusieurs composants dans des systèmes distribués et une transmission fiable de messages. Flink est un framework de traitement de flux de Big Data distribué et hautes performances, piloté par les événements. Dans les applications pratiques, nous pouvons choisir en toute flexibilité d'utiliser des technologies telles que Kafka et Flink en fonction de besoins spécifiques pour résoudre les défis du traitement de données en temps réel à grande échelle.

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal