Rumah > pembangunan bahagian belakang > Golang > Kes (IV) - KisFlow-Golang Stream Real- KisFlow dalam Aplikasi Baris Gilir Mesej (MQ)

Kes (IV) - KisFlow-Golang Stream Real- KisFlow dalam Aplikasi Baris Gilir Mesej (MQ)

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Lepaskan: 2024-07-18 03:44:47
asal
968 orang telah melayarinya

Case (IV) - KisFlow-Golang Stream Real- KisFlow in Message Queue (MQ) Applications

Github: https://github.com/aceld/kis-flow
Dokumen: https://github.com/acceld/kis-flow/wiki


Bahagian1-Paparan Keseluruhan
Bahagian2.1-Pembinaan Projek / Modul Asas
Bahagian2.2-Pembinaan Projek / Modul Asas
Bahagian3-Strim Data
Bahagian4-Penjadualan Fungsi
Bahagian5-Penyambung
Bahagian6-Konfigurasi Import dan Eksport
Bahagian7-KisFlow Action
Bahagian8-Cache/Params Caching Data dan Parameter Data
Bahagian9-Berbilang Salinan Aliran
Bahagian10-Statistik Metrik Prometheus
Bahagian11-Pendaftaran Adaptif Jenis Parameter FaaS Berdasarkan Refleksi


Kes1-Permulaan Pantas
Kes2-Kendalian Selari Aliran
Kes3-Penggunaan KisFlow dalam Pelbagai Goroutine
Case4-KisFlow dalam Aplikasi Baris Mesej (MQ)

Muat turun Sumber KisFlow

$go get github.com/aceld/kis-flow
Salin selepas log masuk

Dokumentasi Pembangun KisFlow

KisFlow dengan Kafka

Contoh kod sumber

https://github.com/acceld/kis-flow-usage/tree/main/12-with_kafka

Dalam contoh ini, kami menggunakan github.com/segmentio/kafka-go sebagai SDK Pelanggan Kafka pihak ketiga (pembangun boleh memilih alatan Kafka Go yang lain).

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "SourceStuScore",
        GroupID:     "group1",
        MinBytes:    10e3,                   // 10KB
        MaxBytes:    10e6,                   // 10MB
        MaxWait:     500 * time.Millisecond, // Maximum wait time
        StartOffset: kafka.FirstOffset,
    })

    defer reader.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ { // Use 5 consumers to consume in parallel
        wg.Add(1)
        go func() {
            // Fork a new flow for each consumer
            flowCopy := flowOrg.Fork(ctx)

            defer wg.Done()
            for {
                // Read a message from Kafka
                message, err := reader.ReadMessage(ctx)
                if err != nil {
                    fmt.Printf("error reading message: %v\n", err)
                    break
                }

                // Commit the message to the flow
                _ = flowCopy.CommitRow(string(message.Value))

                // Run the flow
                if err := flowCopy.Run(ctx); err != nil {
                    fmt.Println("err: ", err)
                    return
                }
            }
        }()
    }

    wg.Wait()

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
Salin selepas log masuk

KisFlow dengan Nsq

Contoh kod sumber:

https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

Pengguna KisFlow ini menggunakan github.com/nsqio/go-nsq sebagai SDK pihak ketiga.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/nsqio/go-nsq"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new NSQ consumer
    config := nsq.NewConfig()
    config.MaxInFlight = 5

    consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // Fork a new flow for each message
        flowCopy := flowOrg.Fork(ctx)

        // Commit the message to the flow
        _ = flowCopy.CommitRow(string(message.Body))

        // Run the flow
        if err := flowCopy.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return err
        }

        return nil
    }))

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    defer consumer.Stop()

    select {}
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
Salin selepas log masuk

KisFlow dengan RocketMQ

Contoh kod sumber:

https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

Menggunakan github.com/apache/rocketmq-client-go sebagai SDK pengguna RocketMQ.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    myFlow := kis.Pool().GetFlow("CalStuAvgScore")
    if myFlow == nil {
        panic("myFlow is nil")
    }

    // Create a new RocketMQ consumer
    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName("group1"),
        consumer.WithNameServer([]string{"localhost:9876"}),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {
            // Commit the message to the flow
            _ = myFlow.CommitRow(string(msg.Body))

        }

        // Run the flow
        if err := myFlow.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return consumer.ConsumeRetryLater, err
        }

        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    err = c.Start()
    if err != nil {
        panic(err)
    }

    defer c.Shutdown()

    select {}
}
Salin selepas log masuk

Pengarang: Acold
GitHub: https://github.com/aceld

Alamat Projek Sumber Terbuka KisFlow: https://github.com/aceld/kis-flow

Dokumen: https://github.com/acceld/kis-flow/wiki


Bahagian1-Paparan Keseluruhan
Bahagian2.1-Pembinaan Projek / Modul Asas
Bahagian2.2-Pembinaan Projek / Modul Asas
Bahagian3-Strim Data
Bahagian4-Penjadualan Fungsi
Bahagian5-Penyambung
Bahagian6-Konfigurasi Import dan Eksport
Bahagian7-KisFlow Action
Bahagian8-Cache/Params Caching Data dan Parameter Data
Bahagian9-Berbilang Salinan Aliran
Bahagian10-Statistik Metrik Prometheus
Bahagian11-Pendaftaran Adaptif Jenis Parameter FaaS Berdasarkan Refleksi


Kes1-Permulaan Pantas
Kes2-Kendalian Selari Aliran
Kes3-Penggunaan KisFlow dalam Pelbagai Goroutine
Case4-KisFlow dalam Aplikasi Baris Mesej (MQ)

Atas ialah kandungan terperinci Kes (IV) - KisFlow-Golang Stream Real- KisFlow dalam Aplikasi Baris Gilir Mesej (MQ). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan