分散式事務涉及多個微服務,其中每個服務執行事務的一部分。例如,電子商務平台可能涉及支付、庫存和訂單管理等服務。這些服務需要協同工作才能完成交易。但是,如果其中一項服務失敗會發生什麼情況?
想像一個電子商務應用程序,在下訂單期間會發生以下步驟:
如果在扣款之後、建立訂單之前庫存服務失敗,系統最終會處於不一致的狀態。客戶已付費,但未下訂單。
為了處理此類故障,可以考慮使用具有兩階段提交協定的分散式事務。然而,這引入了幾個問題:
在分散式系統中,事務通常跨越多個微服務。確保所有服務要么成功完成,要么根本沒有完成是具有挑戰性的。處理此問題的傳統方法(使用具有兩階段提交的分散式事務)可能會因高延遲、緊密耦合和可用性降低等問題而出現問題。
Saga 模式提供了一種更靈活的方法。 Saga 模式不是嘗試將事務作為單一單元執行,而是將事務分解為可以獨立執行的較小的、孤立的步驟。每個步驟都是一個本地事務,更新資料庫,然後觸發下一步。如果某個步驟失敗,系統會執行補償操作以撤銷先前步驟所做的更改,確保系統可以回到一致的狀態。
Saga 模式本質上是一系列依序執行的較小事務。其工作原理如下:
實現Saga模式主要有兩種方式:編排和編排。
在編舞傳奇中,沒有中央協調員。相反,Saga 中涉及的每個服務都會偵聽事件並根據先前步驟的結果決定何時採取行動。這種方法是分散的,允許服務獨立運作。其工作原理如下:
編排的優點:
編舞的挑戰:
在 Orchestration Saga 中,中央編排器控制事務流。協調器決定步驟的順序並處理服務之間的通訊。其工作原理如下:
編排的優點:
編排的挑戰:
讓我們考慮電子商務場景並使用Saga模式來實現它。
在我們的咖啡購買場景中,每項服務都代表一次本地交易。咖啡服務充當這個傳奇的協調者,協調其他服務來完成購買。
以下是這個傳奇如何運作的詳細說明:
在我的傳奇實作中,每個 SagaItemBuilder 代表我們分散式事務流程中的一個步驟。 ActionBuilder 定義了要執行的操作,包括主操作和發生錯誤時執行的回溯操作。 ActionBuilder 封裝了三部分資訊:
component :要呼叫的方法所在的 Bean 實例。
method :要呼叫的方法的名稱。
args :要傳遞給方法的參數。
ActionBuilder
public class ActionBuilder { private Object component; private String method; private Object[] args; public static ActionBuilder builder() { return new ActionBuilder(); } public ActionBuilder component(Object component) { this.component = component; return this; } public ActionBuilder method(String method) { this.method = method; return this; } public ActionBuilder args(Object... args) { this.args = args; return this; } public Object getComponent() { return component; } public String getMethod() { return method; } public Object[] getArgs() { return args; } }
SagaItemBuilder
import java.util.HashMap; import java.util.Map; import java.util.Objects; public class SagaItemBuilder { private ActionBuilder action; private Map<Class<? extends Exception>, ActionBuilder> onBehaviour; public static SagaItemBuilder builder() { return new SagaItemBuilder(); } public SagaItemBuilder action(ActionBuilder action) { this.action = action; return this; } public SagaItemBuilder onBehaviour(Class<? extends Exception> exception, ActionBuilder action) { if (Objects.isNull(onBehaviour)) onBehaviour = new HashMap<>(); onBehaviour.put(exception, action); return this; } public ActionBuilder getAction() { return action; } public Map<Class<? extends Exception>, ActionBuilder> getBehaviour() { return onBehaviour; } }
場景
import java.util.ArrayList; import java.util.List; public class Scenarios { List<SagaItemBuilder> scenarios; public static Scenarios builder() { return new Scenarios(); } public Scenarios scenario(SagaItemBuilder sagaItemBuilder) { if (scenarios == null) scenarios = new ArrayList<>(); scenarios.add(sagaItemBuilder); return this; } public List<SagaItemBuilder> getScenario() { return scenarios; } }
以下是我如何提交分發事務。
package com.example.demo.saga; import com.example.demo.saga.exception.CanNotRollbackException; import com.example.demo.saga.exception.RollBackException; import com.example.demo.saga.pojo.ActionBuilder; import com.example.demo.saga.pojo.SagaItemBuilder; import com.example.demo.saga.pojo.Scenarios; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Map; import java.util.Set; @Component public class DTC { public boolean commit(Scenarios scenarios) throws Exception { validate(scenarios); for (int i = 0; i < scenarios.getScenario().size(); i++) { SagaItemBuilder scenario = scenarios.getScenario().get(i); ActionBuilder action = scenario.getAction(); Object bean = action.getComponent(); String method = action.getMethod(); Object[] args = action.getArgs(); try { invoke(bean, method, args); } catch (Exception e) { rollback(scenarios, i, e); return false; } } return true; } private void rollback(Scenarios scenarios, Integer failStep, Exception currentStepFailException) { for (int i = failStep; i >= 0; i--) { SagaItemBuilder scenario = scenarios.getScenario().get(i); Map<Class<? extends Exception>, ActionBuilder> behaviours = scenario.getBehaviour(); Set<Class<? extends Exception>> exceptions = behaviours.keySet(); ActionBuilder actionWhenException = null; if (failStep == i) { for(Class<? extends Exception> exception: exceptions) { if (exception.isInstance(currentStepFailException)) { actionWhenException = behaviours.get(exception); } } if (actionWhenException == null) actionWhenException = behaviours.get(RollBackException.class); } else { actionWhenException = behaviours.get(RollBackException.class); } Object bean = actionWhenException.getComponent(); String method = actionWhenException.getMethod(); Object[] args = actionWhenException.getArgs(); try { invoke(bean, method, args); } catch (Exception e) { throw new CanNotRollbackException("Error in %s belong to %s. Can not rollback transaction".formatted(method, bean.getClass())); } } } private void validate(Scenarios scenarios) throws Exception { for (int i = 0; i < scenarios.getScenario().size(); i++) { SagaItemBuilder scenario = scenarios.getScenario().get(i); ActionBuilder action = scenario.getAction(); if (action.getComponent() == null) throw new Exception("Missing bean in scenario"); if (action.getMethod() == null) throw new Exception("Missing method in scenario"); Map<Class<? extends Exception>, ActionBuilder> behaviours = scenario.getBehaviour(); Set<Class<? extends Exception>> exceptions = behaviours.keySet(); if (exceptions.contains(null)) throw new Exception("Exception can not be null in scenario has method %s, bean %s " .formatted(action.getMethod(), action.getComponent().getClass())); if (!exceptions.contains(RollBackException.class)) throw new Exception("Missing default RollBackException in scenario has method %s, bean %s " .formatted(action.getMethod(), action.getComponent().getClass())); } } public String invoke(Object bean, String methodName, Object... args) throws Exception { try { Class<?>[] paramTypes = new Class[args.length]; for (int i = 0; i < args.length; i++) { paramTypes[i] = parameterType(args[i]); } Method method = bean.getClass().getDeclaredMethod(methodName, paramTypes); Object result = method.invoke(bean, args); return result != null ? result.toString() : null; } catch (Exception e) { throw e; } } private static Class<?> parameterType (Object o) { if (o instanceof Integer) { return int.class; } else if (o instanceof Boolean) { return boolean.class; } else if (o instanceof Double) { return double.class; } else if (o instanceof Float) { return float.class; } else if (o instanceof Long) { return long.class; } else if (o instanceof Short) { return short.class; } else if (o instanceof Byte) { return byte.class; } else if (o instanceof Character) { return char.class; } else { return o.getClass(); } } }
我有 3 個呼叫外部服務的服務:BillingService、OrderService、PaymentService。
訂單服務
package com.example.demo.service; import org.springframework.stereotype.Service; @Service public class OrderService { public String prepareOrder(String name, int number) { System.out.println("Prepare order for %s with order id %d ".formatted(name, number)); return "Prepare order for %s with order id %d ".formatted(name, number); } public void Rollback_prepareOrder_NullPointException() { System.out.println("Rollback prepareOrder because NullPointException"); } public void Rollback_prepareOrder_RollBackException() { System.out.println("Rollback prepareOrder because RollBackException"); } }
計費服務
package com.example.demo.service; import org.springframework.stereotype.Service; @Service public class BillingService { public String prepareBilling(String name, int number) { System.out.println("Prepare billing for %s with order id %d ".formatted(name, number)); return "Prepare billing for %s with order id %d ".formatted(name, number); } public String createBilling(String name, int number) { System.out.println("Create billing for %s with order id %d ".formatted(name, number)); return "Create billing for %s with order id %d ".formatted(name, number); } public void Rollback_prepareBilling_NullPointException() { System.out.println("Rollback prepareBilling because NullPointException"); } public void Rollback_prepareBilling_ArrayIndexOutOfBoundsException() { System.out.println("Rollback prepareBilling because ArrayIndexOutOfBoundsException"); } public void Rollback_prepareBilling_RollBackException() { System.out.println("Rollback prepareBilling because RollBackException"); } public void Rollback_createBilling_NullPointException() { System.out.println("Rollback createBilling because NullPointException"); } public void Rollback_createBilling_ArrayIndexOutOfBoundsException() { System.out.println("Rollback createBilling because ArrayIndexOutOfBoundsException"); } public void Rollback_createBilling_RollBackException() { System.out.println("Rollback createBilling because RollBackException"); } }
付款服務
package com.example.demo.service; import org.springframework.stereotype.Service; @Service public class PaymentService { public String createPayment() { System.out.println("Create payment"); return "Create payment"; } public void Rollback_createPayment_NullPointException() { System.out.println("Rollback createPayment because NullPointException"); } public void Rollback_createPayment_RollBackException() { System.out.println("Rollback createPayment because RollBackException"); } }
在 Coffee Service 中,我如下實作它,建立一個場景,然後提交它。
package com.example.demo.service; import com.example.demo.saga.DTC; import com.example.demo.saga.exception.RollBackException; import com.example.demo.saga.pojo.ActionBuilder; import com.example.demo.saga.pojo.SagaItemBuilder; import com.example.demo.saga.pojo.Scenarios; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class CoffeeService { @Autowired private OrderService orderService; @Autowired private BillingService billingService; @Autowired private PaymentService paymentService; @Autowired private DTC dtc; public String test() throws Exception { Scenarios scenarios = Scenarios.builder() .scenario( SagaItemBuilder.builder() .action(ActionBuilder.builder().component(orderService).method("prepareOrder").args("tuanh.net", 123)) .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(orderService).method("Rollback_prepareOrder_NullPointException").args()) .onBehaviour(RollBackException.class, ActionBuilder.builder().component(orderService).method("Rollback_prepareOrder_RollBackException").args()) ).scenario( SagaItemBuilder.builder() .action(ActionBuilder.builder().component(billingService).method("prepareBilling").args("tuanh.net", 123)) .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(billingService).method("Rollback_prepareBilling_NullPointException").args()) .onBehaviour(RollBackException.class, ActionBuilder.builder().component(billingService).method("Rollback_prepareBilling_RollBackException").args()) ).scenario( SagaItemBuilder.builder() .action(ActionBuilder.builder().component(billingService).method("createBilling").args("tuanh.net", 123)) .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(billingService).method("Rollback_createBilling_ArrayIndexOutOfBoundsException").args()) .onBehaviour(RollBackException.class, ActionBuilder.builder().component(billingService).method("Rollback_createBilling_RollBackException").args()) ).scenario( SagaItemBuilder.builder() .action(ActionBuilder.builder().component(paymentService).method("createPayment").args()) .onBehaviour(NullPointerException.class, ActionBuilder.builder().component(paymentService).method("Rollback_createPayment_NullPointException").args()) .onBehaviour(RollBackException.class, ActionBuilder.builder().component(paymentService).method("Rollback_createPayment_RollBackException").args()) ); dtc.commit(scenarios); return "ok"; } }
當我在建立帳單時例外。
public String createBilling(String name, int number) { throw new NullPointerException(); }
結果
2024-08-24T14:21:45.445+07:00 INFO 19736 --- [demo] [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path '/' 2024-08-24T14:21:45.450+07:00 INFO 19736 --- [demo] [main] com.example.demo.DemoApplication : Started DemoApplication in 1.052 seconds (process running for 1.498) 2024-08-24T14:21:47.756+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2024-08-24T14:21:47.756+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2024-08-24T14:21:47.757+07:00 INFO 19736 --- [demo] [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms Prepare order for tuanh.net with order id 123 Prepare billing for tuanh.net with order id 123 Rollback createBilling because RollBackException Rollback prepareBilling because RollBackException Rollback prepareOrder because RollBackException
查看我的 GitHub 儲存庫
總之,Saga 模式透過將分散式事務分解為更小的、可管理的步驟,為管理分散式事務提供了一個強大的解決方案。編排和編排之間的選擇取決於系統的特定需求和架構。編排提供鬆散耦合和彈性,而編排提供集中控制和更輕鬆的監控。透過使用 Saga 模式仔細設計系統,您可以在分散式微服務架構中實現一致性、可用性和靈活性。
如果您對在系統中實現 Saga 模式有任何疑問或需要進一步說明,請隨時在下面發表評論!
閱讀更多文章:Saga 模式如何解決分散式交易問題:方法與實際範例
以上是Saga 模式如何解決分散式事務問題:方法和實際範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!