Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
$go get github.com/aceld/kis-flow
KisFlow Developer Documentation
https://github.com/aceld/kis-flow-usage/tree/main/8-connector
KisFlow Can Achieve the Combination of Two Flows via Connector
Using the combination of the following two flows, this introduction will cover the interface and usage of the Connector.
Assume a student has four attributes:
Student ID: stu_id Credit 1: score_1 Credit 2: score_2 Credit 3: score_3
Define Flow1: CalStuAvgScore-1-2 to calculate a student's average score of Credit 1 (score_1) and Credit 2 (score_2) (avg_score_1_2).
Define Flow2: CalStuAvgScore-3 to calculate a student's average score of Credit 3 (score_3) and avg_score_1_2, which is the average of Credit 1, Credit 2, and Credit 3. The average of Credit 1 and Credit 2 is provided by Flow1.
Flow1 consists of 4 functions:
V (Function: VerifyStu) to verify the validity of StuId
C (Function: AvgStuScore12) to calculate the average score of Credit 1 and Credit 2
S (Function: SaveScoreAvg12) to store avg_score_1_2 in Redis
E (Function: PrintStuAvgScore) to print the average score of Credit 1 and Credit 2.
Flow2 consists of 4 functions:
V (Function: VerifyStu) to verify the validity of StuId
L (Function: LoadScoreAvg12) to read the current student's average score of Credit 1 and Credit 2 (avg_score_1_2) calculated by Flow1
C (Function: AvgStuScore3) to calculate the average score of Credit 3 and the average score of Credit 1 and Credit 2
E (Function: PrintStuAvgScore) to print the average score of Credit 1, Credit 2, and Credit 3.
conf/func/func-AvgStuScore-3.yml
kistype: func fname: AvgStuScore3 fmode: Calculate source: name: SourceStuScore must: - stu_id
conf/func/func-LoadScoreAvg-1-2.yml
kistype: func fname: LoadScoreAvg12 fmode: Load source: name: SourceStuScore must: - stu_id option: cname: Score12Cache
stu_proto.go
package main type StuScore1_2 struct { StuId int `json:"stu_id"` Score1 int `json:"score_1"` Score2 int `json:"score_2"` } type StuScoreAvg struct { StuId int `json:"stu_id"` AvgScore float64 `json:"avg_score"` } type StuScore3 struct { StuId int `json:"stu_id"` AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg Score3 int `json:"score_3"` }
The Connector defined in this project, Score12Cache, is a link resource associated with Redis. This Connector requires an initialization method for establishing a connection when KisFlow starts.
conn_init.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/log" "github.com/go-redis/redis/v8" ) // type ConnInit func(conn Connector) error func InitScore12Cache(connector kis.Connector) error { fmt.Println("===> Call Connector InitScore12Cache") // init Redis Conn Client rdb := redis.NewClient(&redis.Options{ Addr: connector.GetConfig().AddrString, // Redis-Server address Password: "", // password DB: 0, // select db }) // Ping test pong, err := rdb.Ping(context.Background()).Result() if err != nil { log.Logger().ErrorF("Failed to connect to Redis: %v", err) return err } fmt.Println("Connected to Redis:", pong) // set rdb to connector connector.SetMetaData("rdb", rdb) return nil }
Here, the successfully connected Redis instance is stored in the connector's cache variable "rdb."
// set rdb to connector connector.SetMetaData("rdb", rdb)
faas_stu_verify.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" ) type VerifyStuIn struct { serialize.DefaultSerialize StuId int `json:"stu_id"` } func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error { fmt.Printf("->Call Func VerifyStu\n") for _, stu := range rows { // Filter out invalid data if stu.StuId < 0 || stu.StuId > 999 { // Terminate the current Flow process, subsequent functions of the current Flow will not be executed return flow.Next(kis.ActionAbort) } } return flow.Next(kis.ActionDataReuse) }
VerifyStu() is used to validate data. If the data does not meet the requirements, the current data flow is terminated. Finally, the data is reused and passed to the next layer through flow.Next(kis.ActionDataReuse).
faas_avg_score_1_2.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" ) type AvgStuScoreIn_1_2 struct { serialize.DefaultSerialize StuScore1_2 } type AvgStuScoreOut_1_2 struct { serialize.DefaultSerialize StuScoreAvg } func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error { fmt.Printf("->Call Func AvgStuScore12\n") for _, row := range rows { out := AvgStuScoreOut_1_2{ StuScoreAvg: StuScoreAvg{ StuId: row.StuId, AvgScore: float64(row.Score1+row.Score2) / 2, }, } // Submit result data _ = flow.CommitRow(out) } return flow.Next() }
AvgStuScore12() calculates the average score of score_1 and score_2, resulting in avg_score.
faas_save_score_avg_1_2.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" "github.com/go-redis/redis/v8" "strconv" ) type SaveStuScoreIn struct { serialize.DefaultSerialize StuScoreAvg } func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error { var rdb *redis.Client // Get Redis Client rdb = conn.GetMetaData("rdb").(*redis.Client) // Set data to redis pipe := rdb.Pipeline() for _, score := range rows { // make key key := conn.GetConfig().Key + strconv.Itoa(score.StuId) pipe.HMSet(context.Background(), key, map[string]interface{}{ "avg_score": score.AvgScore, }) } _, err := pipe.Exec(ctx) if err != nil { return err } return nil } func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error { fmt.Printf("->Call Func SaveScoreAvg12\n") conn, err := flow.GetConnector() if err != nil { fmt.Printf("SaveScoreAvg12(): GetConnector err = %s\n", err.Error()) return err } if BatchSetStuScores(ctx, conn, rows) != nil { fmt.Printf("SaveScoreAvg12(): BatchSetStuScores err = %s\n", err.Error()) return err } return flow.Next(kis.ActionDataReuse) }
SaveScoreAvg12() stores the data in Redis through the bound Connector, using the key configured in the Connector. Finally, the source data is transparently transmitted to the next function.
faas_stu_score_avg_print.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" ) type PrintStuAvgScoreIn struct { serialize.DefaultSerialize StuId int `json:"stu_id"` AvgScore float64 `json:"avg_score"` } func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error { fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName()) for _, row := range rows { fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore) } return flow.Next() }
PrintStuAvgScore() prints the average score of the current student.
faas_load_score_avg_1_2.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" "github.com/go-redis/redis/v8" "strconv" ) type LoadStuScoreIn struct { serialize.DefaultSerialize StuScore3 } type LoadStuScoreOut struct { serialize.DefaultSerialize StuScore3 } func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) { var rdb *redis.Client // Get Redis Client rdb = conn.GetMetaData("rdb").(*redis.Client) // make key key := conn.GetConfig().Key + strconv.Itoa(stuId) // get data from redis result, err := rdb.HGetAll(ctx, key).Result() if err != nil { return 0, err } // get value avgScoreStr, ok := result["avg_score"] if !ok { return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId) } // parse to float64 avgScore, err := strconv.ParseFloat(avgScoreStr, 64) if err != nil { return 0, err } return avgScore, nil } func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error { fmt.Printf("->Call Func LoadScoreAvg12\n") conn, err := flow.GetConnector() if err != nil { fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error()) return err } for _, row := range rows { stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId) if err != nil { fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error()) return err } out := LoadStuScoreOut{ StuScore3: StuScore3{ StuId: row.StuId, Score3: row.Score3, AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis) }, } // commit result _ = flow.CommitRow(out) } return flow.Next() }
LoadScoreAvg12() reads the average score of score_1 and score_2 from Redis through the linked resource Redis of the bound Connector using the key configured in the Connector. It then sends the source data from upstream, along with the newly read average score of score1 and score2, to the next layer.
faas_stu_score_avg_3.go
package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" ) type AvgStuScore3In struct { serialize.DefaultSerialize StuScore3 } type AvgStuScore3Out struct { serialize.DefaultSerialize StuScoreAvg } func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error { fmt.Printf("->Call Func AvgStuScore3\n") for _, row := range rows { out := AvgStuScore3Out{ StuScoreAvg: StuScoreAvg{ StuId: row.StuId, AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3, }, } // Submit result data _ = flow.CommitRow(out) } return flow.Next() }
AvgStuScore3() recalculates the average score of three scores by adding score_3 and the average score of score_1 and score_2, resulting in the final average score avg_score.
main.go
func init() { // Register functions kis.Pool().FaaS("VerifyStu", VerifyStu) kis.Pool().FaaS("AvgStuScore12", AvgStuScore12) kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12) kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12) kis.Pool().FaaS("AvgStuScore3", AvgStuScore3) // Register connectors kis.Pool().CaaSInit("Score12Cache", InitScore12Cache) }
main.go
package main import ( "context" "github.com/aceld/kis-flow/file" "github.com/aceld/kis-flow/kis" "sync" ) func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error { // Commit data _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`) _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`) // Run the flow if err := flow.Run(ctx); err != nil { return err } return nil } func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error { // Commit data _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`) _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`) // Run the flow if err := flow.Run(ctx); err != nil { return err } return nil } func main() { ctx := context.Background() // Load Configuration from file if err := file.ConfigImportYaml("conf/"); err != nil { panic(err) } var wg sync.WaitGroup wg.Add(2) go func() { // Run flow1 concurrently defer wg.Done() flow1 := kis.Pool().GetFlow("CalStuAvgScore12") if flow1 == nil { panic("flow1 is nil") } if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil { panic(err) } }() go func() { // Run flow2 concurrently defer wg.Done() flow2 := kis.Pool().GetFlow("CalStuAvgScore3") if flow2 == nil { panic("flow2 is nil") } if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil { panic(err) } }() wg.Wait() return }
Two Goroutines are launched concurrently to execute Flow1 and Flow2, calculating the final average scores for student 101 and student 102.
===> Call Connector InitScore12Cache Connected to Redis: PONG Add FlowRouter FlowName=CalStuAvgScore12 ===> Call Connector InitScore12Cache Connected to Redis: PONG Add FlowRouter FlowName=CalStuAvgScore3 ->Call Func VerifyStu ->Call Func VerifyStu ->Call Func AvgStuScore12 ->Call Func LoadScoreAvg12 ->Call Func SaveScoreAvg12 ->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12] stuid: [101], avg score: [95] stuid: [102], avg score: [90] ->Call Func AvgStuScore3 ->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3] stuid: [101], avg score: [90] stuid: [102], avg score: [83.33333333333333]
In Flow[CalStuAvgScore3], we observe the final computed average scores for scores 1, 2, and 3.
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
The above is the detailed content of Case (II) - KisFlow-Golang Stream Real-Time Computing - Flow Parallel Operation. For more information, please follow other related articles on the PHP Chinese website!