Maison > développement back-end > Golang > le corps du texte

Cas (IV) - KisFlow-Golang Stream Real - KisFlow dans les applications Message Queue (MQ)

WBOY
Libérer: 2024-07-18 03:44:47
original
792 Les gens l'ont consulté

Case (IV) - KisFlow-Golang Stream Real- KisFlow in Message Queue (MQ) Applications

Github : https://github.com/aceld/kis-flow
Document : https://github.com/aceld/kis-flow/wiki


Part1-OverView
Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion


Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)

Télécharger KisFlow Source

$go get github.com/aceld/kis-flow
Copier après la connexion

Documentation du développeur KisFlow

KisFlow avec Kafka

Exemple de code source

https://github.com/aceld/kis-flow-usage/tree/main/12-with_kafka

Dans cet exemple, nous utilisons github.com/segmentio/kafka-go comme SDK client Kafka tiers (les développeurs peuvent choisir d'autres outils Kafka Go).

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "SourceStuScore",
        GroupID:     "group1",
        MinBytes:    10e3,                   // 10KB
        MaxBytes:    10e6,                   // 10MB
        MaxWait:     500 * time.Millisecond, // Maximum wait time
        StartOffset: kafka.FirstOffset,
    })

    defer reader.Close()

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ { // Use 5 consumers to consume in parallel
        wg.Add(1)
        go func() {
            // Fork a new flow for each consumer
            flowCopy := flowOrg.Fork(ctx)

            defer wg.Done()
            for {
                // Read a message from Kafka
                message, err := reader.ReadMessage(ctx)
                if err != nil {
                    fmt.Printf("error reading message: %v\n", err)
                    break
                }

                // Commit the message to the flow
                _ = flowCopy.CommitRow(string(message.Value))

                // Run the flow
                if err := flowCopy.Run(ctx); err != nil {
                    fmt.Println("err: ", err)
                    return
                }
            }
        }()
    }

    wg.Wait()

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
Copier après la connexion

KisFlow avec Nsq

Exemple de code source :

https://github.com/aceld/kis-flow-usage/tree/main/13-with_nsq

Ce consommateur KisFlow utilise github.com/nsqio/go-nsq comme SDK tiers.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/nsqio/go-nsq"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flowOrg := kis.Pool().GetFlow("CalStuAvgScore")
    if flowOrg == nil {
        panic("flowOrg is nil")
    }

    // Create a new NSQ consumer
    config := nsq.NewConfig()
    config.MaxInFlight = 5

    consumer, err := nsq.NewConsumer("SourceStuScore", "channel1", config)
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        // Fork a new flow for each message
        flowCopy := flowOrg.Fork(ctx)

        // Commit the message to the flow
        _ = flowCopy.CommitRow(string(message.Body))

        // Run the flow
        if err := flowCopy.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return err
        }

        return nil
    }))

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    defer consumer.Stop()

    select {}
}

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
Copier après la connexion

KisFlow avec RocketMQ

Exemple de code source :

https://github.com/aceld/kis-flow-usage/tree/main/14-with_rocketmq

Utilisation de github.com/apache/rocketmq-client-go comme SDK grand public RocketMQ.

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    myFlow := kis.Pool().GetFlow("CalStuAvgScore")
    if myFlow == nil {
        panic("myFlow is nil")
    }

    // Create a new RocketMQ consumer
    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName("group1"),
        consumer.WithNameServer([]string{"localhost:9876"}),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe("SourceStuScore", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

        for _, msg := range msgs {
            // Commit the message to the flow
            _ = myFlow.CommitRow(string(msg.Body))

        }

        // Run the flow
        if err := myFlow.Run(ctx); err != nil {
            fmt.Println("err: ", err)
            return consumer.ConsumeRetryLater, err
        }

        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    err = c.Start()
    if err != nil {
        panic(err)
    }

    defer c.Shutdown()

    select {}
}
Copier après la connexion

Auteur : Aceld
GitHub : https://github.com/aceld

Adresse du projet Open Source KisFlow : https://github.com/aceld/kis-flow

Document : https://github.com/aceld/kis-flow/wiki


Part1-OverView
Partie2.1-Construction du projet / Modules de base
Partie2.2-Construction du projet / Modules de base
Partie 3-Flux de données
Planification des fonctions Part4
Partie 5-Connecteur
Partie 6-Importation et exportation de configuration
Part7-Action KisFlow
Part8-Cache/Params Mise en cache des données et paramètres de données
Partie 9-Copies multiples du flux
Part10-Statistiques des métriques Prometheus
Partie 11 - Enregistrement adaptatif des types de paramètres FaaS basé sur la réflexion


Cas 1 – Démarrage rapide
Opération parallèle Case2-Flow
Cas3-Application de KisFlow dans Multi-Goroutines
Case4-KisFlow dans les applications de file d'attente de messages (MQ)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal
À propos de nous Clause de non-responsabilité Sitemap
Site Web PHP chinois:Formation PHP en ligne sur le bien-être public,Aidez les apprenants PHP à grandir rapidement!