Rumah > pembangunan bahagian belakang > Golang > Pemprosesan strim masa nyata menggunakan Kafka dan Flink dalam Beego

Pemprosesan strim masa nyata menggunakan Kafka dan Flink dalam Beego

WBOY
Lepaskan: 2023-06-22 16:18:33
asal
1510 orang telah melayarinya

Dengan kemunculan era data besar, kami selalunya perlu memproses dan menganalisis data masa nyata. Teknologi pemprosesan strim masa nyata telah menjadi kaedah arus perdana untuk memproses data masa nyata berskala besar kerana prestasinya yang tinggi, berskala tinggi dan kependaman yang rendah. Dalam teknologi pemprosesan strim masa nyata, Kafka dan Flink adalah komponen biasa dan telah digunakan secara meluas dalam banyak sistem pemprosesan data peringkat perusahaan. Dalam artikel ini, kami akan memperkenalkan cara menggunakan Kafka dan Flink dalam Beego untuk pemprosesan strim masa nyata.

1. Pengenalan kepada Kafka

Apache Kafka ialah platform pemprosesan strim teragih. Ia memisahkan data ke dalam strim (data penstriman) dan mengedarkan data merentas berbilang nod, memberikan prestasi tinggi, ketersediaan tinggi, berskala tinggi dan beberapa ciri lanjutan, seperti jaminan Exactly-Once. Peranan utama Kafka adalah sebagai sistem pemesejan yang boleh dipercayai yang boleh digunakan untuk menyelesaikan masalah komunikasi antara pelbagai komponen dalam sistem teragih dan penghantaran mesej yang boleh dipercayai.

2. Pengenalan kepada Flink

Flink ialah rangka kerja pemprosesan strim data besar yang dipacu peristiwa, diedarkan, dan berprestasi tinggi. Ia menyokong pemprosesan strim dan kelompok, mempunyai pertanyaan seperti SQL dan keupayaan pemprosesan strim, menyokong pengkomputeran penstriman yang sangat boleh dikomposisikan, dan mempunyai sokongan storan tetingkap dan data yang kaya.

3. Kafka dalam Beego

Menggunakan Kafka dalam Beego terbahagi kepada dua bahagian, iaitu pengguna Kafka dan pengeluar Kafka.

  1. Pengeluar Kafka

Menggunakan pengeluar Kafka dalam Beego boleh menghantar data dengan mudah ke gugusan Kafka Berikut ialah cara menggunakan pengeluar Kafka dalam Beego Contoh:

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 创建 kafka 生产者
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    // 创建 Kafka 消息
    msg := &sarama.ProducerMessage{
        Topic: "test",
        Value: sarama.StringEncoder("Hello, World!"),
    }

    // 发送消息
    partition, offset, err := producer.SendMessage(msg)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    fmt.Printf("消息已发送到分区 %d 的偏移量 %d 中
", partition, offset)

    // 关闭 Kafka 生产者
    producer.Close()
}
Salin selepas log masuk
  1. Pengguna Kafka

Menggunakan pengguna Kafka dalam Beego boleh mendapatkan data dengan mudah daripada gugusan Kafka Berikut ialah cara menggunakannya dalam Beego Contoh pengguna Kafka:

import (
    "github.com/Shopify/sarama"
)

func main() {
    // 创建 kafka 消费者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    // 订阅 Topic
    partitions, err := consumer.Partitions("test")

    if err != nil {
        // 处理错误情况
        panic(err)
    }

    for _, partition := range partitions {
        // 从分区的开头读取数据
        partitionConsumer, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest)

        // 处理数据
        go func(partitionConsumer sarama.PartitionConsumer) {
            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    // 处理消息
                    fmt.Printf("收到消息: %v", string(msg.Value))
                }
            }
        }(partitionConsumer)
    }

    // 关闭 Kafka 消费者
    defer consumer.Close()
}
Salin selepas log masuk

. 4. Flink dalam Beego

Menggunakan Flink dalam Beego boleh dilakukan terus melalui API Java Flink, melalui interaksi Cgo antara Java dan Go Complete keseluruhan proses. Di bawah ialah contoh mudah daripada Flink di mana kekerapan setiap perkataan teks Socket dikira melalui pemprosesan strim masa nyata. Dalam contoh ini, kami membaca aliran data teks yang diberikan ke dalam Flink, kemudian menggunakan operator Flink untuk beroperasi pada aliran data, dan akhirnya mengeluarkan hasilnya ke konsol.

  1. Buat sumber data teks Soket
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class SocketTextStreamFunction implements SourceFunction<String> {
    private final String hostname;
    private final int port;

    public SocketTextStreamFunction(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
    }

    public void run(SourceContext<String> context) throws Exception {
        Socket socket = new Socket(hostname, port);
        BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String line;
        while ((line = reader.readLine()) != null) {
            context.collect(line);
        }
        reader.close();
        socket.close();
    }

    public void cancel() {}
}
Salin selepas log masuk
  1. Kira kekerapan setiap perkataan
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        String hostname = "localhost";
        int port = 9999;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 Socket 中读取数据流
        DataStream<String> text = env.addSource(new SocketTextStreamFunction(hostname, port));

        // 计算每个单词的出现频率
        DataStream<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.toLowerCase().split("\W+");
                        for (String word : words) {
                            out.collect(new Tuple2<String, Integer>(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                    public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                        int sum = 0;
                        for (Tuple2<String, Integer> t : input) {
                            sum += t.f1;
                        }
                        out.collect(new Tuple2<String, Integer>((String) key.getField(0), sum));
                    }
                });

        // 打印到控制台
        wordCounts.print();

        env.execute("Socket Text Stream Word Count");
    }
}
Salin selepas log masuk

5

Artikel ini memperkenalkan cara menggunakan Kafka dan Flink dalam Beego untuk pemprosesan strim masa nyata. Kafka boleh digunakan sebagai sistem pemesejan yang boleh dipercayai untuk menyelesaikan masalah komunikasi antara berbilang komponen dalam sistem teragih dan penghantaran mesej yang boleh dipercayai. Flink ialah rangka kerja pemprosesan strim data besar yang didorong oleh peristiwa, diedarkan, dan berprestasi tinggi. Dalam aplikasi praktikal, kami boleh memilih secara fleksibel untuk menggunakan teknologi seperti Kafka dan Flink berdasarkan keperluan khusus untuk menyelesaikan cabaran dalam pemprosesan data masa nyata berskala besar.

Atas ialah kandungan terperinci Pemprosesan strim masa nyata menggunakan Kafka dan Flink dalam Beego. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:php.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan