In actual projects, we try to avoid distributed transactions. However, sometimes it is really necessary to do some service splitting, which will lead to distributed transaction problems.
At the same time, distributed transactions are also asked about the market during interviews. You can practice with this case, and you can talk about 123 in the interview.
Here is a business chestnut: when a user receives a coupon, the number of times the user receives the coupon needs to be deducted, and then a record of the user receiving the coupon is recorded.
Originally you could use messages here Queue mode, using asynchronous to add new user collection records. However, the requirement here is that users need to be able to view their collection records immediately after receiving it, so we introduced Atomikos
to implement distributed transaction issues.
Distributed transactions refers to spanning multiple computers or databases transactions, there may be network delays, failures, or inconsistencies between these computers or databases. Distributed transactions need to ensure the atomicity, consistency, isolation and durability of all operations to ensure the correctness and integrity of data.
There are two main types of distributed transaction protocols: 2PC (Two-Phase Commit) and 3PC (Three-Phase Commit).
2PC is currently the most commonly used distributed transaction protocol, and its process is divided into two stages: the preparation stage and the submission stage. In the preparation phase, the transaction coordinator issues a prepare request to all participants, and the participants execute the local transaction to the prepare state and return the prepare results to the transaction coordinator. In the commit phase, if all participants execute successfully, the transaction coordinator issues a commit request to all participants, and the participants commit the local transaction. Otherwise, the transaction coordinator issues a rollback request to all participants, and the participants roll back the local transaction. roll.
3PC is an improved version of 2PC, which adds a preparation and submission stage based on 2PC. In the prepare-to-submit phase, the coordinator asks the participants whether they can submit. If the participant returns consent, it will be submitted directly in the submission phase, otherwise it will be rolled back in the submission phase.
Distributed transaction solution implementation solutions include:
JTA (Java Transaction API) is the programming interface specification of J2EE. It is the JAVA implementation of the XA protocol. It mainly defines:
A transaction manager interfacejavax.transaction.TransactionManager
, which defines the start, submission, withdrawal and other operations of the transaction.
A resource definition interface that meets the XA specificationjavax.transaction.xa.XAResource
. If a resource wants to support JTA transactions, its resources need to implement the XAResource
Interface, and implement the two-phase submission related interface defined by this interface.
If we have an application that uses the JTA interface to implement transactions, when the application is running, it needs a container that implements JTA. Generally, this is a J2EE container, such as JBoss, Websphere and other application servers.
However, there are also some independent frameworks that implement JTA. For example, Atomikos and bitronix provide JTA implementation frameworks in the form of jar packages. In this way, we can run application systems that use JTA to implement transactions on servers such as Tomcat or Jetty.
As mentioned in the difference between local transactions and external transactions above, JTA transactions are external transactions and can be used to implement transactionality for multiple resources. It controls two-phase submission exactly through the XAResource
implemented by each resource. Interested students can take a look at the methods of this interface. In addition to commit, rollback and other methods, there are also end()
, forget()
, isSameRM()
, prepare()
and so on. From these interfaces alone, you can imagine the complexity of JTA in implementing two-phase transactions.
XA is a distributed transaction architecture (or protocol) proposed by the X/Open organization. The XA architecture mainly defines the interface between the (global) Transaction Manager and the (local) Resource Manager. The XA interface is a bidirectional system interface that forms a communication bridge between the Transaction Manager and one or more Resource Managers. In other words, in a transaction based on XA, we can perform transaction management on multiple resources. For example, a system accesses multiple databases, or accesses both databases and resources such as message middleware. In this way, we can directly implement all submitted or all canceled transactions in multiple databases and message middleware. The XA specification is not a Java specification, but a universal specification. Currently, various databases and many message middleware support the XA specification.
JTA is a specification that meets the XA specification and is used for Java development. Therefore, when we say that we use JTA to implement distributed transactions, we actually mean to use JTA specifications to implement transactions with multiple databases, message middleware and other resources in the system.
Atomikos is a very popular open source transaction manager and can be embedded into your Spring Boot application. The Tomcat application server does not implement the JTA specification. When using Tomcat as the application server, you need to use a third-party transaction manager class as the global transaction manager, and the Atomikos framework does this, integrating transaction management into the application. Does not depend on application server.
It’s useless to talk about a bunch of theories, show me the code.
Technology stack:Spring Boot MyBatis Atomikos MySQL
If you follow the code in this article, pay attention to your mysql version.
First build two databases (my-db_0 and my-db_1), and then create a table in each database.
In database my-db_0:
CREATE TABLE `t_user_0` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `age` int NOT NULL, `gender` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
In database my-db_1:
CREATE TABLE `t_user_1` ( `id` bigint NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `age` int NOT NULL, `gender` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
This is just to demonstrate distributed transactions, don’t worry about the specific meaning of the table.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tian</groupId> <artifactId>spring-boot-atomikos</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <name>spring-boot-atomikos</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- mybatis依赖 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--分布式事务--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> </dependencies> <build> <plugins> <!-- 要使生成的jar可运行,需要加入此插件 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> <resources> <resource> <directory>src/main/java</directory> <excludes> <exclude>**/*.java</exclude> </excludes> </resource> <resource> <!-- 编译xml文件 --> <directory>src/main/resources</directory> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
server.port=9001 spring.application.name=atomikos-demo spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true spring.datasource.user0.user=root spring.datasource.user0.password=123456 spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true spring.datasource.user1.user=root spring.datasource.user1.password=123456 mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml mybatis.typeAliasesPackage=com.tian.entity mybatis.configuration.cache-enabled=true
/** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 配置好两个数据源 */ @Configuration public class DataSourceConfig { // 将这个对象放入spring容器中(交给Spring管理) @Bean // 读取 application.yml 中的配置参数映射成为一个对象 @ConfigurationProperties(prefix = "spring.datasource.user0") public XADataSource getDataSource0() { // 创建XA连接池 return new MysqlXADataSource(); } /** * 创建Atomikos数据源 * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean */ @Bean @DependsOn("getDataSource0") @Primary public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) { //这里的AtomikosDataSourceBean使用的是spring提供的 AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource); atomikosDataSourceBean.setMaxPoolSize(20); return atomikosDataSourceBean; } @Bean @ConfigurationProperties(prefix = "spring.datasource.user1") public XADataSource getDataSource1() { // 创建XA连接池 return new MysqlXADataSource(); } @Bean @DependsOn("getDataSource1") public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) { //这里的AtomikosDataSourceBean使用的是spring提供的 AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSource(xaDataSource); return atomikosDataSourceBean; } }
@Configuration @MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate") public class MybatisPreConfig { @Autowired @Qualifier("dataSourcePre") private DataSource dataSource; /** * 创建 SqlSessionFactory */ @Bean @Primary public SqlSessionFactory preSqlSessionFactory() throws Exception{ SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); bean.setMapperLocations(new PathMatchingResourcePatternResolver(). getResources("classpath*:com/tian/mapper/user0/*.xml")); return bean.getObject(); } /** * 通过 SqlSessionFactory 来创建 SqlSessionTemplate */ @Bean @Primary public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用 return new SqlSessionTemplate(sqlSessionFactory); } }
The other one is basically the same, that is, the scan path is changed to:
("classpath*:com/tian/mapper/user1/*.xml")
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.tian.mapper.user0.User0Mapper"> <!-- --> <cache eviction="LRU" flushInterval="10000" size="1024" /> <resultMap id="BaseResultMap" type="com.tian.entity.User0"> <id column="id" jdbcType="BIGINT" property="id" /> <result column="user_name" jdbcType="VARCHAR" property="userName" /> <result column="age" jdbcType="INTEGER" property="age" /> <result column="gender" jdbcType="INTEGER" property="gender" /> </resultMap> <sql id="Base_Column_List"> id, user_name, age, gender </sql> <insert id="insert" parameterType="com.tian.entity.User0"> insert into t_user_0 (id, user_name,age, gender) values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER}) </insert> </mapper>
The other one is basically the same, so I posted it here.
The corresponding mapper interface is also very simple. Here is one:
public interface User0Mapper { int insert(User0 record); }
/** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 模拟三种场景:正常、制造异常、数据库异常 */ @Service public class UserServiceImpl implements UserService { @Resource private User0Mapper user0Mapper; @Resource private User1Mapper user1Mapper; /** * 正常逻辑 同时对两个数据库进行 插入数据 */ @Transactional @Override public int transaction1() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); user0.setUserName("111111"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); return 1; } /** * 正常逻辑 同时对两个数据库进行 插入数据 * 数据插入完后 出现异常 */ @Transactional @Override public int transaction2() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); user0.setUserName("111111"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); //认为制造一个异常 int a=1/0; return 1; } /** * 第一个数据插入成功 第二个数据插入失败 */ @Transactional @Override public int transaction3() throws Exception { User1 user1 = new User1(); user1.setUserName("22222"); user1.setAge(11); user1.setGender(0); user1Mapper.add(user1); System.out.println("---------------------------"); // sit(数据源1) User0 user0 = new User0(); //故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"); user0.setAge(11); user0.setGender(0); user0Mapper.insert(user0); return 1; } }
@RestController @RequestMapping("/user") public class UserController { @Resource private UserService userService; @PostMapping("/test1") public CommonResult test1() { int i = 0; try { i = userService.transaction1(); return CommonResult.success(i); } catch (Exception e) { e.printStackTrace(); } return CommonResult.success(i); } @PostMapping("/test2") public CommonResult test2() { int i = 0; try { i = userService.transaction2(); return CommonResult.success(i); } catch (Exception e) { e.printStackTrace(); } return CommonResult.success(i); } @PostMapping("/test3") public CommonResult test3() { int i = 0; try { i = userService.transaction3(); return CommonResult.success(i); } catch (Exception e) { e.printStackTrace(); } return CommonResult.success(i); } }
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; /** * @author tianwc 公众号:java后端技术全栈、面试专栏 * @version 1.0.0 * @date 2023年05月11日 19:38 * 博客地址:<a href="http://woaijava.cc/">博客地址</a> * <p> * 项目启动类 */ @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) //@ComponentScan(basePackages = {"com.tian"}) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Start the project and test the following three respectively:
http://localhost:9001/user/test1
Result: In both databases, a new table data is added
http://localhost:9001/user/test2
Result: An exception that the divisor cannot be Zero is thrown, and no new data is added to either database.
http://localhost:9001/user/test3
Result: A data field value too long exception is thrown, and no new data is added to either database.
Okay, now we have implemented distributed transactions.
The above is the detailed content of Spring Boot+MyBatis+Atomikos+MySQL (with source code). For more information, please follow other related articles on the PHP Chinese website!