Dalam artikel pertama siri ini, kami memperkenalkan corak SAGA dan menunjukkan cara Orkestrasi yang minimum boleh mengurus urus niaga teragih dengan orkestra pusat.
Mari menjadi nyata! Kali ini, kita akan menyelami pendekatan Koreografi, di mana perkhidmatan menyelaras aliran kerja dengan memancarkan dan menggunakan acara secara autonomi.
Untuk menjadikan ini praktikal, kami akan melaksanakan aliran kerja penjagaan kesihatan berbilang perkhidmatan menggunakan Go dan RabbitMQ. Setiap perkhidmatan akan mempunyai main.go sendiri, menjadikannya mudah untuk skala, ujian dan dijalankan secara bebas.
Koreografi bergantung pada komunikasi terdesentralisasi. Setiap perkhidmatan mendengar peristiwa dan mencetuskan langkah seterusnya dengan memancarkan peristiwa baharu. Tiada orkestra pusat; aliran itu muncul daripada interaksi perkhidmatan individu.
Mari kita lihat semula aliran kerja penjagaan kesihatan kami dari artikel pertama:
Setiap perkhidmatan akan:
Kami akan menggunakan RabbitMQ sebagai baris gilir acara. Jalankannya secara tempatan menggunakan Docker:
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Akses antara muka pengurusan RabbitMQ di http://localhost:15672 (nama pengguna: tetamu, kata laluan: tetamu).
Kami perlu mengkonfigurasi RabbitMQ untuk menampung acara kami. Berikut ialah contoh fail init.go untuk menyediakan infrastruktur RabbitMQ:
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
Kod penuh di sini!
Nota: Dalam tetapan pengeluaran, anda mungkin mahu mengurus persediaan ini menggunakan pendekatan GitOps (cth., dengan Terraform) atau biarkan setiap perkhidmatan mengendalikan baris gilirnya sendiri secara dinamik.
Setiap perkhidmatan akan mempunyai main.go sendiri. Kami juga akan menyertakan tindakan pampasan untuk mengendalikan kegagalan dengan anggun.
Perkhidmatan ini mengesahkan butiran pesakit dan memancarkan acara PatientVerified. Ia juga memberi pampasan dengan memberitahu pesakit jika kegagalan hiliran berlaku.
docker run --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0.5-management
Perkhidmatan ini mendengar PatientVerified dan mengeluarkan ProcedureScheduled. Ia memberi pampasan dengan membatalkan prosedur jika kegagalan hiliran berlaku.
package main import ( "log" "github.com/rabbitmq/amqp091-go" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() err = ch.ExchangeDeclare("events", "direct", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } _, err = ch.QueueDeclare("PatientVerified", true, false, false, false, nil) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } err = ch.QueueBind("PatientVerified", "PatientVerified", "events", false, nil) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } }
Sertakan Perkhidmatan Inventori dan pelaksanaan Perkhidmatan Pengebilan, mengikut struktur yang sama seperti di atas. Setiap perkhidmatan mendengar acara sebelumnya dan mengeluarkan yang seterusnya, memastikan logik pampasan disediakan untuk kegagalan.
Kod penuh di sini!
Mulakan RabbitMQ:
// patient/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[PatientService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "ProcedureScheduleCancelled") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[PatientService] Processing event: ProcedureScheduleCancelled") if err := notifyProcedureScheduleCancellation(); err != nil { log.Fatalf("Failed to notify patient: %v", err) } } }() common.PublishEvent(ch, "events", "PatientVerified", "Patient details verified") fmt.Println("[PatientService] Event published: PatientVerified") select {} } func notifyProcedureScheduleCancellation() error { fmt.Println("Compensation: Notify patient of procedure cancellation.") return nil }
Jalankan Setiap Perkhidmatan:
Buka terminal berasingan dan jalankan:
// scheduler/main.go package main import ( "fmt" "log" "github.com/rabbitmq/amqp091-go" "github.com/thegoodapi/saga_tutorial/choreography/common" ) func main() { conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() go func() { fmt.Println("[SchedulerService] Waiting for events...") msgs, err := common.ConsumeEvent(ch, "PatientVerified") if err != nil { log.Fatalf("Failed to consume event: %v", err) } for range msgs { fmt.Println("[SchedulerService] Processing event: PatientVerified") if err := scheduleProcedure(); err != nil { common.PublishEvent(ch, "events", "ProcedureScheduleFailed", "Failed to schedule procedure") fmt.Println("[SchedulerService] Compensation triggered: ProcedureScheduleFailed") } else { common.PublishEvent(ch, "events", "ProcedureScheduled", "Procedure scheduled successfully") fmt.Println("[SchedulerService] Event published: ProcedureScheduled") } } }() select {} } func scheduleProcedure() error { fmt.Println("Step 2: Scheduling procedure...") return nil // or simulate a failure }
Perhatikan Output:
Setiap perkhidmatan memproses peristiwa dalam urutan, mencatat kemajuan aliran kerja.
Jom pecahkan!
Pertama sekali, untuk tujuan artikel ini, kami tidak melaksanakan SuppliesReserveFailed dan ProcedureScheduleFailed,l untuk mengelakkan kerumitan yang tidak berguna.
Kami sedang melaksanakan acara berikut
Langkah (atau transaksi):
Pampasan:
Mengikuti rajah pelaksanaan ini
Rajah ini mewakili pendekatan biasa untuk mendokumentasikan koreografi. Walau bagaimanapun, saya mendapati ia agak sukar untuk difahami dan agak mengecewakan, terutamanya bagi mereka yang tidak biasa dengan pelaksanaan atau coraknya.
Jom pecahkan!
Rajah di atas adalah lebih bertele-tele dan ia memecahkan setiap langkah yang memudahkan untuk memahami perkara yang sedang berlaku.
Ringkasnya:
Perhatikan bahawa kami tidak melaksanakan kegagalan untuk langkah 1, 4, dan 7 demi ringkasnya; bagaimanapun, pendekatannya adalah sama. Setiap kegagalan ini akan mencetuskan penarikan semula langkah-langkah sebelumnya.
Kecekapan adalah penting untuk penyahpepijatan dan pemantauan sistem teragih. Melaksanakan log, metrik dan jejak memastikan pembangun dapat memahami gelagat sistem dan mendiagnosis isu dengan cekap.
Kami akan menyelami kebolehmerhatian dalam koreografi kemudian dalam siri ini, nantikan!
Nantikan artikel seterusnya, di mana kami akan meneroka Orkestrasi!
Lihat repositori penuh untuk siri ini di sini. Jom bincang dalam komen!
Atas ialah kandungan terperinci Transaksi dalam Perkhidmatan Mikro: Bahagian Corak SAGA dengan Koreografi. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!