Rumah > pembangunan bahagian belakang > Golang > Pemprosesan data masa nyata menggunakan Kafka dan Spark Streaming di Beego

Pemprosesan data masa nyata menggunakan Kafka dan Spark Streaming di Beego

PHPz
Lepaskan: 2023-06-22 08:44:28
asal
1188 orang telah melayarinya

Dengan perkembangan berterusan teknologi Internet dan IoT, jumlah data yang dijana dalam pengeluaran dan kehidupan kami semakin meningkat. Data ini memainkan peranan yang sangat penting dalam strategi perniagaan dan membuat keputusan syarikat. Untuk menggunakan data ini dengan lebih baik, pemprosesan data masa nyata telah menjadi bahagian penting dalam kerja harian perusahaan dan institusi penyelidikan saintifik. Dalam artikel ini, kami akan meneroka cara menggunakan Kafka dan Spark Streaming dalam rangka kerja Beego untuk pemprosesan data masa nyata.

1. Apakah itu Kafka

Kafka ialah sistem baris gilir mesej yang diedarkan tinggi yang digunakan untuk memproses data besar-besaran. Kafka menyimpan data mesej dalam berbilang topik dengan cara yang diedarkan dan boleh diambil dan diedarkan dengan cepat. Dalam senario penstriman data, Kafka telah menjadi salah satu sistem pemesejan sumber terbuka yang paling popular dan digunakan secara meluas oleh banyak syarikat teknologi termasuk LinkedIn, Netflix dan Twitter.

2. Apakah itu Spark Streaming

Spark Streaming ialah komponen dalam ekosistem Apache Spark Ia menyediakan rangka kerja pengkomputeran penstriman yang boleh melakukan pemprosesan kumpulan masa nyata bagi strim data. Spark Streaming sangat berskala dan bertolak ansur dengan kesalahan, serta boleh menyokong berbilang sumber data. Spark Streaming boleh digunakan bersama dengan sistem baris gilir mesej seperti Kafka untuk melaksanakan fungsi pengkomputeran penstriman.

3 Gunakan Kafka dan Spark Streaming dalam Beego untuk pemprosesan data masa nyata

Apabila menggunakan rangka kerja Beego untuk pemprosesan data masa nyata, kami boleh menggabungkan Kafka dan Spark Streaming untuk mencapai penerimaan data dan pemprosesan. Berikut ialah proses pemprosesan data masa nyata yang mudah:

1 Gunakan Kafka untuk mewujudkan baris gilir mesej, merangkum data ke dalam mesej dan menghantarnya ke Kafka.
2. Gunakan Spark Streaming untuk membina aplikasi penstriman dan melanggan data dalam baris gilir mesej Kafka.
3. Untuk data yang dilanggan, kami boleh melakukan pelbagai operasi pemprosesan yang kompleks, seperti pembersihan data, pengagregatan data, pengiraan perniagaan, dsb.
4. Keluarkan hasil pemprosesan kepada Kafka atau paparkannya secara visual kepada pengguna.

Di bawah ini kami akan memperkenalkan secara terperinci cara melaksanakan proses di atas.

1. Wujudkan baris gilir mesej Kafka

Pertama, kami perlu memperkenalkan pakej Kafka ke dalam Beego Anda boleh menggunakan pakej sarama dalam bahasa go dan dapatkannya melalui arahan:

pergi dapatkan gopkg.in/Shopify/sarama.v1

Kemudian, wujudkan baris gilir mesej Kafka dalam Beego dan hantar data yang dijana kepada Kafka. Kod sampel adalah seperti berikut:

func initKafka() (err error) {

//配置Kafka连接属性
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//创建Kafka连接器
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    fmt.Println("failed to create producer, err:", err)
    return
}
//异步关闭Kafka
defer client.Close()
//模拟生成数据
for i := 1; i < 5000; i++ {
    id := uint32(i)
    userName := fmt.Sprintf("user:%d", i)
    //数据转为byte格式发送到Kafka
    message := fmt.Sprintf("%d,%s", id, userName)
    msg := &sarama.ProducerMessage{}
    msg.Topic = "test" //topic消息标记
    msg.Value = sarama.StringEncoder(message) //消息数据
    _, _, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send message failed:", err)
    }
    time.Sleep(time.Second)
}
return
Salin selepas log masuk

}

Dalam kod di atas, kami menggunakan kaedah SyncProducer dalam pakej Sarama untuk mencipta penyambung Kafka dan menetapkan sifat sambungan yang diperlukan. Kemudian gunakan gelung for untuk menjana data, dan merangkum data yang dijana ke dalam mesej dan menghantarnya ke Kafka.

2. Gunakan Spark Streaming untuk pemprosesan data masa nyata

Apabila menggunakan Spark Streaming untuk pemprosesan data masa nyata, kita perlu memasang dan mengkonfigurasi Spark dan Kafka, yang boleh dipasang melalui arahan berikut:

sudo apt-get install spark

sudo apt-get install zookeeper

sudo apt-get install kafka

Selepas melengkapkan pemasangan, kita perlu memperkenalkan Spark Streaming dalam Pakej Beego:

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Second, StreamingContext}

import org.apache.spark.kafka.KafkaUtils

Seterusnya, kita perlu memproses strim data. Kod berikut melaksanakan logik untuk menerima data daripada Kafka dan memproses setiap mesej:

func main() {

//创建SparkConf对象
conf := SparkConf().setAppName("test").setMaster("local[2]")
//创建StreamingContext对象,设置1秒钟处理一次
ssc := StreamingContext(conf, Seconds(1))
//从Kafka中订阅test主题中的数据
zkQuorum := "localhost:2181"
group := "test-group"
topics := map[string]int{"test": 1}
directKafkaStream, err := KafkaUtils.CreateDirectStream(ssc, topics, zkQuorum, group)
if err != nil {
    panic(err)
}
lines := directKafkaStream.Map(func(message *sarama.ConsumerMessage) (string, int) {
    //从消息中解析出需要的数据
    data := message.Value
    arr := strings.Split(string(data), ",")
    id, _ := strconv.Atoi(arr[0])
    name := arr[1]
    return name, 1
})
//使用reduceByKey函数对数据进行聚合计算
counts := lines.ReduceByKey(func(a, b int) int {
    return a + b
})
counts.Print() 
//开启流式处理
ssc.Start()
ssc.AwaitTermination()
Salin selepas log masuk

}

Dalam kod di atas, kami Menggunakan SparkConf kaedah dan kaedah StreamingContext untuk mencipta konteks Spark Streaming dan menetapkan selang masa pemprosesan aliran data. Kemudian kami melanggan data dalam baris gilir mesej Kafka, menggunakan kaedah Peta untuk menghuraikan data yang diperlukan daripada mesej yang diterima, dan kemudian menggunakan kaedah ReduceByKey untuk melaksanakan pengiraan pengagregatan data. Akhirnya, hasil pengiraan dicetak ke konsol.

4. Ringkasan

Artikel ini memperkenalkan cara menggunakan Kafka dan Spark Streaming dalam rangka kerja Beego untuk pemprosesan data masa nyata. Dengan mewujudkan baris gilir mesej Kafka dan menggunakan Spark Streaming untuk memproses aliran data, proses pemprosesan data masa nyata yang diperkemas dan cekap boleh dicapai. Kaedah pemprosesan ini telah digunakan secara meluas dalam pelbagai bidang dan menyediakan rujukan penting untuk membuat keputusan korporat.

Atas ialah kandungan terperinci Pemprosesan data masa nyata menggunakan Kafka dan Spark Streaming di Beego. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:php.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan