Heim > Backend-Entwicklung > Golang > Fall (II) – KisFlow-Golang-Stream-Echtzeit-Computing – Flow-Parallelbetrieb

Fall (II) – KisFlow-Golang-Stream-Echtzeit-Computing – Flow-Parallelbetrieb

PHPz
Freigeben: 2024-07-16 17:28:10
Original
825 Leute haben es durchsucht

Case (II) - KisFlow-Golang Stream Real-Time Computing - Flow Parallel Operation

Github: https://github.com/aceld/kis-flow
Dokument: https://github.com/aceld/kis-flow/wiki


Teil1-Übersicht
Teil 2.1 – Projektaufbau / Grundmodule
Teil 2.2 – Projektaufbau / Grundmodule
Teil3-Datenstrom
Teil 4 – Funktionsplanung
Teil5-Stecker
Teil 6 – Konfigurationsimport und -export
Teil 7 – KisFlow-Aktion
Teil8-Cache/Params Daten-Caching und Datenparameter
Teil 9 – Mehrere Kopien von Flow
Teil 10 – Prometheus-Metrikstatistik
Teil 11 – Adaptive Registrierung von FaaS-Parametertypen basierend auf Reflexion


Fall 1 – Schnellstart
Case2-Flow Parallelbetrieb
Fall 3 – Anwendung von KisFlow in Multi-Goroutinen
Case4-KisFlow in Message Queue (MQ)-Anwendungen


Laden Sie die KisFlow-Quelle herunter

$go get github.com/aceld/kis-flow
Nach dem Login kopieren

KisFlow-Entwicklerdokumentation

Beispiel für einen Quellcode

https://github.com/aceld/kis-flow-usage/tree/main/8-connector

KisFlow kann die Kombination zweier Flüsse über einen Connector erreichen

Anhand der Kombination der folgenden beiden Abläufe behandelt diese Einführung die Schnittstelle und Verwendung des Connectors.

Datenflussdiagramm

Flow Diagram

Falleinführung

Angenommen, ein Schüler hat vier Eigenschaften:

Student ID: stu_id
Credit 1: score_1
Credit 2: score_2
Credit 3: score_3
Nach dem Login kopieren

Definieren Sie Flow1: CalStuAvgScore-1-2, um die durchschnittliche Punktzahl eines Schülers für Credit 1 (score_1) und Credit 2 (score_2) (avg_score_1_2) zu berechnen.
Definieren Sie Flow2: CalStuAvgScore-3, um die durchschnittliche Punktzahl eines Studenten für Credit 3 (score_3) und avg_score_1_2 zu berechnen, was dem Durchschnitt von Credit 1, Credit 2 und Credit 3 entspricht. Der Durchschnitt von Credit 1 und Credit 2 wird von Flow1 bereitgestellt.

Fluss1

Flow1 besteht aus 4 Funktionen:

V (Funktion: VerifyStu) zur Überprüfung der Gültigkeit von StuId
C (Funktion: AvgStuScore12) zur Berechnung der durchschnittlichen Punktzahl von Credit 1 und Credit 2
S (Funktion: SaveScoreAvg12) zum Speichern von avg_score_1_2 in Redis
E (Funktion: PrintStuAvgScore), um die durchschnittliche Punktzahl von Credit 1 und Credit 2 zu drucken.

Fluss2

Flow2 besteht aus 4 Funktionen:

V (Funktion: VerifyStu) zur Überprüfung der Gültigkeit von StuId
L (Funktion: LoadScoreAvg12), um die durchschnittliche Punktzahl des aktuellen Studenten für Credit 1 und Credit 2 (avg_score_1_2) zu lesen, berechnet von Flow1
C (Funktion: AvgStuScore3) zur Berechnung der durchschnittlichen Punktzahl von Credit 3 und der durchschnittlichen Punktzahl von Credit 1 und Credit 2
E (Funktion: PrintStuAvgScore), um die durchschnittliche Punktzahl von Credit 1, Credit 2 und Credit 3 zu drucken.

conf/func/func-AvgStuScore-3.yml

kistype: func
fname: AvgStuScore3
fmode: Calculate
source:
    name: SourceStuScore
    must:
        - stu_id
Nach dem Login kopieren

conf/func/func-LoadScoreAvg-1-2.yml

kistype: func
fname: LoadScoreAvg12
fmode: Load
source:
    name: SourceStuScore
    must:
        - stu_id
option:
    cname: Score12Cache
Nach dem Login kopieren

Grundlegendes Datenprotokoll

stu_proto.go

package main

type StuScore1_2 struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
}

type StuScoreAvg struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type StuScore3 struct {
    StuId      int     `json:"stu_id"`
    AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
    Score3     int     `json:"score_3"`
}

Nach dem Login kopieren

Connector-Init

Der in diesem Projekt definierte Connector, Score12Cache, ist eine mit Redis verknüpfte Linkressource. Dieser Connector erfordert eine Initialisierungsmethode zum Herstellen einer Verbindung, wenn KisFlow startet.

conn_init.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/log"
    "github.com/go-redis/redis/v8"
)

// type ConnInit func(conn Connector) error

func InitScore12Cache(connector kis.Connector) error {
    fmt.Println("===> Call Connector InitScore12Cache")

    // init Redis Conn Client
    rdb := redis.NewClient(&redis.Options{
        Addr:     connector.GetConfig().AddrString, // Redis-Server address
        Password: "",                               // password
        DB:       0,                                // select db
    })

    // Ping test
    pong, err := rdb.Ping(context.Background()).Result()
    if err != nil {
        log.Logger().ErrorF("Failed to connect to Redis: %v", err)
        return err
    }
    fmt.Println("Connected to Redis:", pong)

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)

    return nil
}
Nach dem Login kopieren

Hier wird die erfolgreich verbundene Redis-Instanz in der Cache-Variablen „rdb“ des Connectors gespeichert.

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)
Nach dem Login kopieren

FaaS-Implementierung

Funktion(V): VerifyStu

faas_stu_verify.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
    serialize.DefaultSerialize
    StuId int `json:"stu_id"`
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
    fmt.Printf("->Call Func VerifyStu\n")

    for _, stu := range rows {
        // Filter out invalid data
        if stu.StuId < 0 || stu.StuId > 999 {
            // Terminate the current Flow process, subsequent functions of the current Flow will not be executed
            return flow.Next(kis.ActionAbort)
        }
    }

    return flow.Next(kis.ActionDataReuse)
}

Nach dem Login kopieren

VerifyStu() wird zur Validierung von Daten verwendet. Entsprechen die Daten nicht den Anforderungen, wird der aktuelle Datenfluss abgebrochen. Schließlich werden die Daten wiederverwendet und über flow.Next(kis.ActionDataReuse) an die nächste Ebene übergeben.

Funktion(C): AvgStuScore12

faas_avg_score_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn_1_2 struct {
    serialize.DefaultSerialize
    StuScore1_2
}

type AvgStuScoreOut_1_2 struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
    fmt.Printf("->Call Func AvgStuScore12\n")

    for _, row := range rows {

        out := AvgStuScoreOut_1_2{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: float64(row.Score1+row.Score2) / 2,
            },
        }

        // Submit result data
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

Nach dem Login kopieren

AvgStuScore12() berechnet die durchschnittliche Punktzahl von Score_1 und Score_2, was zu avg_score.

führt

Funktion(en): SaveScoreAvg12

faas_save_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
    "github.com/go-redis/redis/v8"
    "strconv"
)

type SaveStuScoreIn struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // Set data to redis
    pipe := rdb.Pipeline()

    for _, score := range rows {
        // make key
        key := conn.GetConfig().Key + strconv.Itoa(score.StuId)

        pipe.HMSet(context.Background(), key, map[string]interface{}{
            "avg_score": score.AvgScore,
        })
    }

    _, err := pipe.Exec(ctx)
    if err != nil {
        return err
    }

    return nil
}

func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
    fmt.Printf("->Call Func SaveScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("SaveScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err
    }

    if BatchSetStuScores(ctx, conn, rows) != nil {
        fmt.Printf("SaveScoreAvg12(): BatchSetStuScores err = %s\n", err.Error())
        return err
    }

    return flow.Next(kis.ActionDataReuse)
}

Nach dem Login kopieren

SaveScoreAvg12() speichert die Daten in Redis über den gebundenen Connector und verwendet dabei den im Connector konfigurierten Schlüssel. Abschließend werden die Quelldaten transparent an die nächste Funktion übertragen.

Funktion(E): PrintStuAvgScore

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return flow.Next()
}

Nach dem Login kopieren

PrintStuAvgScore() druckt die durchschnittliche Punktzahl des aktuellen Schülers.

Funktion(L): LoadScoreAvg12

faas_load_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
    "github.com/go-redis/redis/v8"
    "strconv"
)

type LoadStuScoreIn struct {
    serialize.DefaultSerialize
    StuScore3
}

type LoadStuScoreOut struct {
    serialize.DefaultSerialize
    StuScore3
}

func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // make key
    key := conn.GetConfig().Key + strconv.Itoa(stuId)

    // get data from redis
    result, err := rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return 0, err
    }

    // get value
    avgScoreStr, ok := result["avg_score"]
    if !ok {
        return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
    }

    // parse to float64
    avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
    if err != nil {
        return 0, err
    }

    return avgScore, nil
}

func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
    fmt.Printf("->Call Func LoadScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err
    }

    for _, row := range rows {
        stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
        if err != nil {
            fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
            return err
        }

        out := LoadStuScoreOut{
            StuScore3: StuScore3{
                StuId:      row.StuId,
                Score3:     row.Score3,
                AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
            },
        }

        // commit result
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

Nach dem Login kopieren

LoadScoreAvg12() reads the average score of score_1 and score_2 from Redis through the linked resource Redis of the bound Connector using the key configured in the Connector. It then sends the source data from upstream, along with the newly read average score of score1 and score2, to the next layer.

Function(C): AvgStuScore3

faas_stu_score_avg_3.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScore3In struct {
    serialize.DefaultSerialize
    StuScore3
}

type AvgStuScore3Out struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
    fmt.Printf("->Call Func AvgStuScore3\n")

    for _, row := range rows {

        out := AvgStuScore3Out{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
            },
        }

        // Submit result data
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

Nach dem Login kopieren

AvgStuScore3() recalculates the average score of three scores by adding score_3 and the average score of score_1 and score_2, resulting in the final average score avg_score.

Register FaaS & CaaSInit/CaaS (Register Function/Connector)

main.go

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
    kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
    kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
    kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)

    // Register connectors
    kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
}

Nach dem Login kopieren

Main Process

main.go

package main

import (
    "context"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {

    // Commit data
    _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err
    }

    return nil
}

func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {

    // Commit data
    _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err
    }

    return nil
}

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        // Run flow1 concurrently
        defer wg.Done()

        flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
        if flow1 == nil {
            panic("flow1 is nil")
        }

        if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
            panic(err)
        }
    }()

    go func() {
        // Run flow2 concurrently
        defer wg.Done()

        flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
        if flow2 == nil {
            panic("flow2 is nil")
        }

        if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
            panic(err)
        }
    }()

    wg.Wait()

    return
}
Nach dem Login kopieren

Two Goroutines are launched concurrently to execute Flow1 and Flow2, calculating the final average scores for student 101 and student 102.

Execution Results

===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreAvg12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore3
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]
Nach dem Login kopieren

In Flow[CalStuAvgScore3], we observe the final computed average scores for scores 1, 2, and 3.


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

Das obige ist der detaillierte Inhalt vonFall (II) – KisFlow-Golang-Stream-Echtzeit-Computing – Flow-Parallelbetrieb. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage