Heim > Backend-Entwicklung > Golang > Echtzeit-Datenverarbeitung mit Kafka und Spark Streaming in Beego

Echtzeit-Datenverarbeitung mit Kafka und Spark Streaming in Beego

PHPz
Freigeben: 2023-06-22 08:44:28
Original
1153 Leute haben es durchsucht

Mit der kontinuierlichen Weiterentwicklung der Internet- und IoT-Technologie nimmt die Menge der in unserer Produktion und unserem Leben erzeugten Daten zu. Diese Daten spielen eine sehr wichtige Rolle in der Geschäftsstrategie und Entscheidungsfindung des Unternehmens. Um diese Daten besser nutzen zu können, ist die Echtzeit-Datenverarbeitung zu einem wichtigen Bestandteil der täglichen Arbeit von Unternehmen und wissenschaftlichen Forschungseinrichtungen geworden. In diesem Artikel untersuchen wir, wie man Kafka und Spark Streaming im Beego-Framework für die Echtzeit-Datenverarbeitung verwendet.

1. Was ist Kafka? Kafka ist ein verteiltes Nachrichtenwarteschlangensystem mit hohem Durchsatz, das zur Verarbeitung großer Datenmengen verwendet wird. Kafka speichert Nachrichtendaten in mehreren Themen verteilt und kann schnell abgerufen und verteilt werden. Im Daten-Streaming-Szenario hat sich Kafka zu einem der beliebtesten Open-Source-Messaging-Systeme entwickelt und wird von vielen Technologieunternehmen, darunter LinkedIn, Netflix und Twitter, häufig verwendet.

2. Was ist Spark Streaming? Spark Streaming ist eine Komponente im Apache Spark-Ökosystem. Es bietet ein Streaming-Computing-Framework, das eine Stapelverarbeitung von Datenströmen in Echtzeit durchführen kann. Spark Streaming ist hoch skalierbar und fehlertolerant und kann mehrere Datenquellen unterstützen. Spark Streaming kann in Verbindung mit Nachrichtenwarteschlangensystemen wie Kafka verwendet werden, um Streaming-Computing-Funktionen zu implementieren.

3. Verwenden Sie Kafka und Spark Streaming in Beego für die Echtzeit-Datenverarbeitung.

Bei Verwendung des Beego-Frameworks für die Echtzeit-Datenverarbeitung können wir Kafka und Spark Streaming kombinieren, um den Datenempfang und die Datenverarbeitung zu erreichen. Das Folgende ist ein einfacher Echtzeit-Datenverarbeitungsprozess:

1. Verwenden Sie Kafka, um eine Nachrichtenwarteschlange einzurichten, die Daten in Nachrichten zu kapseln und sie an Kafka zu senden.

2. Verwenden Sie Spark Streaming, um eine Streaming-Anwendung zu erstellen und Daten in der Kafka-Nachrichtenwarteschlange zu abonnieren.

3. Für die abonnierten Daten können wir verschiedene komplexe Verarbeitungsvorgänge durchführen, wie z. B. Datenbereinigung, Datenaggregation, Geschäftsberechnungen usw.

4. Geben Sie die Verarbeitungsergebnisse an Kafka aus oder zeigen Sie sie dem Benutzer visuell an.


Im Folgenden stellen wir Ihnen im Detail vor, wie Sie den oben genannten Prozess umsetzen.

1. Richten Sie eine Kafka-Nachrichtenwarteschlange ein

Zuerst müssen wir das Kafka-Paket in der Go-Sprache einführen und es über den Befehl abrufen:

go get gopkg.in/Shopify/. sarama.v1

Richten Sie dann eine Kafka-Nachrichtenwarteschlange in Beego ein und senden Sie die generierten Daten an Kafka. Der Beispielcode lautet wie folgt:

func initKafka() (err error) {

//配置Kafka连接属性
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//创建Kafka连接器
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    fmt.Println("failed to create producer, err:", err)
    return
}
//异步关闭Kafka
defer client.Close()
//模拟生成数据
for i := 1; i < 5000; i++ {
    id := uint32(i)
    userName := fmt.Sprintf("user:%d", i)
    //数据转为byte格式发送到Kafka
    message := fmt.Sprintf("%d,%s", id, userName)
    msg := &sarama.ProducerMessage{}
    msg.Topic = "test" //topic消息标记
    msg.Value = sarama.StringEncoder(message) //消息数据
    _, _, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send message failed:", err)
    }
    time.Sleep(time.Second)
}
return
Nach dem Login kopieren

}

Im obigen Code verwenden wir die SyncProducer-Methode im Sarama-Paket, um einen Kafka-Connector zu erstellen und die erforderlichen Verbindungseigenschaften festzulegen. Verwenden Sie dann eine for-Schleife, um Daten zu generieren, kapseln Sie die generierten Daten in Nachrichten und senden Sie sie an Kafka.

2. Verwenden Sie Spark Streaming für die Echtzeit-Datenverarbeitung.

Wenn Sie Spark Streaming für die Echtzeit-Datenverarbeitung verwenden, müssen Sie Spark und Kafka installieren und konfigurieren, die über den folgenden Befehl installiert werden können:

sudo apt- get install spark

sudo apt -get install zookeeper

sudo apt-get install kafka

Nach Abschluss der Installation müssen wir das Spark-Streaming-Paket in Beego einführen:

import org.apache.spark.SparkConf

import org.apache.spark.streaming. {Seconds, StreamingContext}

import org.apache.spark.streaming.kafka.KafkaUtils

Als nächstes müssen wir den Datenstrom verarbeiten. Der folgende Code implementiert die Logik zum Empfangen von Daten von Kafka und zum Verarbeiten jeder Nachricht:

func main() {

//创建SparkConf对象
conf := SparkConf().setAppName("test").setMaster("local[2]")
//创建StreamingContext对象,设置1秒钟处理一次
ssc := StreamingContext(conf, Seconds(1))
//从Kafka中订阅test主题中的数据
zkQuorum := "localhost:2181"
group := "test-group"
topics := map[string]int{"test": 1}
directKafkaStream, err := KafkaUtils.CreateDirectStream(ssc, topics, zkQuorum, group)
if err != nil {
    panic(err)
}
lines := directKafkaStream.Map(func(message *sarama.ConsumerMessage) (string, int) {
    //从消息中解析出需要的数据
    data := message.Value
    arr := strings.Split(string(data), ",")
    id, _ := strconv.Atoi(arr[0])
    name := arr[1]
    return name, 1
})
//使用reduceByKey函数对数据进行聚合计算
counts := lines.ReduceByKey(func(a, b int) int {
    return a + b
})
counts.Print() 
//开启流式处理
ssc.Start()
ssc.AwaitTermination()
Nach dem Login kopieren

}

Im obigen Code verwenden wir die SparkConf-Methode und die StreamingContext-Methode, um einen Spark Streaming-Kontext zu erstellen. und legt das Verarbeitungsintervall für den Datenstrom fest. Anschließend abonnieren wir die Daten in der Kafka-Nachrichtenwarteschlange, analysieren mit der Map-Methode die erforderlichen Daten aus der empfangenen Nachricht und führen dann mit der ReduceByKey-Methode Datenaggregationsberechnungen durch. Abschließend werden die Berechnungsergebnisse auf der Konsole ausgegeben.

4. Zusammenfassung

Dieser Artikel stellt vor, wie man Kafka und Spark Streaming im Beego-Framework für die Echtzeit-Datenverarbeitung verwendet. Durch die Einrichtung einer Kafka-Nachrichtenwarteschlange und die Verwendung von Spark Streaming zur Verarbeitung des Datenstroms kann ein optimierter und effizienter Datenverarbeitungsprozess in Echtzeit erreicht werden. Diese Verarbeitungsmethode ist in verschiedenen Bereichen weit verbreitet und stellt eine wichtige Referenz für Unternehmensentscheidungen dar.

Das obige ist der detaillierte Inhalt vonEchtzeit-Datenverarbeitung mit Kafka und Spark Streaming in Beego. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage