首页 > 后端开发 > Golang > 案例(二)-KisFlow-Golang流实时计算-流并行运算

案例(二)-KisFlow-Golang流实时计算-流并行运算

PHPz
发布: 2024-07-16 17:28:10
原创
825 人浏览过

Case (II) - KisFlow-Golang Stream Real-Time Computing - Flow Parallel Operation

Github:https://github.com/aceld/kis-flow
文档:https://github.com/aceld/kis-flow/wiki


第 1 部分-概览
Part2.1-项目构建/基础模块
Part2.2-项目构建/基础模块
第三部分-数据流
Part4-功能调度
第5部分-连接器
Part6-配置导入导出
Part7-KisFlow 动作
Part8-Cache/Params 数据缓存和数据参数
Part9-流程多份
Part10-Prometheus Metrics 统计
Part11-基于反射的FaaS参数类型自适应注册


案例1-快速入门
Case2-Flow并行操作
Case3-KisFlow在多Goroutine中的应用
案例4-消息队列(MQ)应用中的KisFlow


下载 KisFlow 源代码

$go get github.com/aceld/kis-flow
登录后复制

KisFlow 开发者文档

源代码示例

https://github.com/aceld/kis-flow-usage/tree/main/8-connector

KisFlow 可以通过 Connector 实现两个流的组合

结合以下两个流程,本介绍将涵盖 Connector 的接口和使用方法。

数据流程图

Flow Diagram

案例介绍

假设一个学生有四个属性:

Student ID: stu_id
Credit 1: score_1
Credit 2: score_2
Credit 3: score_3
登录后复制

定义 Flow1:CalStuAvgScore-1-2,计算学生学分 1 (score_1) 和学分 2 (score_2) 的平均分 (avg_score_1_2)。
定义 Flow2:CalStuAvgScore-3,计算学生的学分 3 的平均分 (score_3) 和 avg_score_1_2,即学分 1、学分 2 和学分 3 的平均值。学分 1 和学分 2 的平均值由 Flow1 提供。

流程1

Flow1 由 4 个函数组成:

V(函数:VerifyStu)验证StuId的有效性
C(函数:AvgStuScore12)计算学分1和学分2的平均分
S(函数:SaveScoreAvg12)将 avg_score_1_2 存储在 Redis
E(函数:PrintStuAvgScore)打印学分1和学分2的平均分。

流程2

Flow2 由 4 个函数组成:

V(函数:VerifyStu)验证StuId的有效性
L(函数:LoadScoreAvg12)读取Flow1计算出的当前学生学分1和学分2的平均分(avg_score_1_2)
C(函数:AvgStuScore3)计算学分3的平均分以及学分1和学分2的平均分
E(函数:PrintStuAvgScore)打印学分1、学分2、学分3的平均分。

conf/func/func-AvgStuScore-3.yml

kistype: func
fname: AvgStuScore3
fmode: Calculate
source:
    name: SourceStuScore
    must:
        - stu_id
登录后复制

conf/func/func-LoadScoreAvg-1-2.yml

kistype: func
fname: LoadScoreAvg12
fmode: Load
source:
    name: SourceStuScore
    must:
        - stu_id
option:
    cname: Score12Cache
登录后复制

基本数据协议

stu_proto.go

package main

type StuScore1_2 struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
}

type StuScoreAvg struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type StuScore3 struct {
    StuId      int     `json:"stu_id"`
    AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
    Score3     int     `json:"score_3"`
}

登录后复制

连接器初始化

本项目中定义的Connector Score12Cache是​​与Redis关联的链接资源。此连接器需要一个初始化方法,用于在 KisFlow 启动时建立连接。

conn_init.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/log"
    "github.com/go-redis/redis/v8"
)

// type ConnInit func(conn Connector) error

func InitScore12Cache(connector kis.Connector) error {
    fmt.Println("===> Call Connector InitScore12Cache")

    // init Redis Conn Client
    rdb := redis.NewClient(&redis.Options{
        Addr:     connector.GetConfig().AddrString, // Redis-Server address
        Password: "",                               // password
        DB:       0,                                // select db
    })

    // Ping test
    pong, err := rdb.Ping(context.Background()).Result()
    if err != nil {
        log.Logger().ErrorF("Failed to connect to Redis: %v", err)
        return err
    }
    fmt.Println("Connected to Redis:", pong)

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)

    return nil
}
登录后复制

这里,成功连接的Redis实例存储在连接器的缓存变量“rdb”中。

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)
登录后复制

FaaS 实施

功能(五):VerifyStu

faas_stu_verify.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
    serialize.DefaultSerialize
    StuId int `json:"stu_id"`
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
    fmt.Printf("->Call Func VerifyStu\n")

    for _, stu := range rows {
        // Filter out invalid data
        if stu.StuId < 0 || stu.StuId > 999 {
            // Terminate the current Flow process, subsequent functions of the current Flow will not be executed
            return flow.Next(kis.ActionAbort)
        }
    }

    return flow.Next(kis.ActionDataReuse)
}

登录后复制

VerifyStu() 用于验证数据。如果数据不满足要求,则终止当前数据流。最后,数据被复用,通过flow.Next(kis.ActionDataReuse)传递到下一层。

函数(C):AvgStuScore12

faas_avg_score_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn_1_2 struct {
    serialize.DefaultSerialize
    StuScore1_2
}

type AvgStuScoreOut_1_2 struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
    fmt.Printf("->Call Func AvgStuScore12\n")

    for _, row := range rows {

        out := AvgStuScoreOut_1_2{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: float64(row.Score1+row.Score2) / 2,
            },
        }

        // Submit result data
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

登录后复制

AvgStuScore12() 计算score_1 和score_2 的平均分,得到avg_score。

函数(S):保存ScoreAvg12

faas_save_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
    "github.com/go-redis/redis/v8"
    "strconv"
)

type SaveStuScoreIn struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // Set data to redis
    pipe := rdb.Pipeline()

    for _, score := range rows {
        // make key
        key := conn.GetConfig().Key + strconv.Itoa(score.StuId)

        pipe.HMSet(context.Background(), key, map[string]interface{}{
            "avg_score": score.AvgScore,
        })
    }

    _, err := pipe.Exec(ctx)
    if err != nil {
        return err
    }

    return nil
}

func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
    fmt.Printf("->Call Func SaveScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("SaveScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err
    }

    if BatchSetStuScores(ctx, conn, rows) != nil {
        fmt.Printf("SaveScoreAvg12(): BatchSetStuScores err = %s\n", err.Error())
        return err
    }

    return flow.Next(kis.ActionDataReuse)
}

登录后复制

SaveScoreAvg12()通过绑定的Connector将数据存储到Redis中,使用的是Connector中配置的key。最后将源数据透传给下一个函数。

函数(E):PrintStuAvgScore

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return flow.Next()
}

登录后复制

PrintStuAvgScore() 打印当前学生的平均分数。

函数(L):LoadScoreAvg12

faas_load_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
    "github.com/go-redis/redis/v8"
    "strconv"
)

type LoadStuScoreIn struct {
    serialize.DefaultSerialize
    StuScore3
}

type LoadStuScoreOut struct {
    serialize.DefaultSerialize
    StuScore3
}

func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // make key
    key := conn.GetConfig().Key + strconv.Itoa(stuId)

    // get data from redis
    result, err := rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return 0, err
    }

    // get value
    avgScoreStr, ok := result["avg_score"]
    if !ok {
        return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
    }

    // parse to float64
    avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
    if err != nil {
        return 0, err
    }

    return avgScore, nil
}

func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
    fmt.Printf("->Call Func LoadScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err
    }

    for _, row := range rows {
        stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
        if err != nil {
            fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
            return err
        }

        out := LoadStuScoreOut{
            StuScore3: StuScore3{
                StuId:      row.StuId,
                Score3:     row.Score3,
                AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
            },
        }

        // commit result
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

登录后复制

LoadScoreAvg12() reads the average score of score_1 and score_2 from Redis through the linked resource Redis of the bound Connector using the key configured in the Connector. It then sends the source data from upstream, along with the newly read average score of score1 and score2, to the next layer.

Function(C): AvgStuScore3

faas_stu_score_avg_3.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScore3In struct {
    serialize.DefaultSerialize
    StuScore3
}

type AvgStuScore3Out struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
    fmt.Printf("->Call Func AvgStuScore3\n")

    for _, row := range rows {

        out := AvgStuScore3Out{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
            },
        }

        // Submit result data
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

登录后复制

AvgStuScore3() recalculates the average score of three scores by adding score_3 and the average score of score_1 and score_2, resulting in the final average score avg_score.

Register FaaS & CaaSInit/CaaS (Register Function/Connector)

main.go

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
    kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
    kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
    kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)

    // Register connectors
    kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
}

登录后复制

Main Process

main.go

package main

import (
    "context"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {

    // Commit data
    _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err
    }

    return nil
}

func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {

    // Commit data
    _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err
    }

    return nil
}

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        // Run flow1 concurrently
        defer wg.Done()

        flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
        if flow1 == nil {
            panic("flow1 is nil")
        }

        if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
            panic(err)
        }
    }()

    go func() {
        // Run flow2 concurrently
        defer wg.Done()

        flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
        if flow2 == nil {
            panic("flow2 is nil")
        }

        if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
            panic(err)
        }
    }()

    wg.Wait()

    return
}
登录后复制

Two Goroutines are launched concurrently to execute Flow1 and Flow2, calculating the final average scores for student 101 and student 102.

Execution Results

===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreAvg12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore3
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]
登录后复制

In Flow[CalStuAvgScore3], we observe the final computed average scores for scores 1, 2, and 3.


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

以上是案例(二)-KisFlow-Golang流实时计算-流并行运算的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板