Maison > Java > javaDidacticiel > le corps du texte

Spring Boot+MyBatis+Atomikos+MySQL (avec code source)

Libérer: 2023-08-15 16:12:53
avant
710 Les gens l'ont consulté

Dans les projets réels, nous essayons d'éviter les transactions distribuées. Cependant, il est parfois vraiment nécessaire de procéder à un fractionnement des services, ce qui entraînera des problèmes de transactions distribuées.

Dans le même temps, des transactions distribuées sont également demandées sur le marché lors des entretiens. Vous pouvez vous entraîner avec ce cas et vous pouvez parler 123 lors de l'entretien.

Voici une châtaigne commerciale : lorsqu'un utilisateur reçoit un coupon, le nombre de fois où l'utilisateur reçoit le coupon doit être déduit, puis un enregistrement de l'utilisateur recevant le coupon est enregistré.

Spring Boot+MyBatis+Atomikos+MySQL (avec code source)
Avant la scission
Spring Boot+MyBatis+Atomikos+MySQL (avec code source)
Après la scission

À l'origine, vous pouvez utiliser la méthode de file d'attente de messages ici et utiliser asynchrone pour ajouter des enregistrements de collection d'utilisateurs. Cependant, l'exigence ici est que les utilisateurs doivent pouvoir consulter leurs enregistrements de collecte immédiatement après les avoir reçus, nous avons donc introduit Atomikos ici pour implémenter les problèmes de transactions distribuées.

Transactions distribuées

Les transactions distribuées sont des transactions qui s'étendent sur plusieurs ordinateurs ou bases de données, et il peut y avoir des retards, des pannes ou des incohérences réseau entre ces ordinateurs ou bases de données. Les transactions distribuées doivent garantir l'atomicité, la cohérence, l'isolement et la durabilité de toutes les opérations afin de garantir l'exactitude et l'intégrité des données.

Quels sont les protocoles de transactions distribuées ?

Il existe deux principaux types de protocoles de transaction distribués : 2PC (Two-Phase Commit) et 3PC (Three-Phase Commit).

2PC est actuellement le protocole de transaction distribué le plus couramment utilisé, et son processus est divisé en deux étapes : l'étape de préparation et l'étape de soumission. Au cours de la phase de préparation, le coordinateur de transactions envoie des demandes de préparation à tous les participants, et les participants exécutent des transactions locales à l'état de préparation et renvoient les résultats de préparation au coordinateur de transactions. Lors de la phase de validation, si tous les participants s'exécutent avec succès, le coordinateur de transaction émet une demande de validation à tous les participants et les participants valident la transaction locale. Sinon, le coordinateur de transaction émet une demande d'annulation à tous les participants et les participants annulent la transaction locale. transaction.

3PC est une version améliorée de 2PC, qui ajoute une étape de préparation et de soumission basée sur 2PC. Dans la phase de préparation à la soumission, le coordinateur demande aux participants s'ils peuvent soumettre. Si le participant renvoie son consentement, celui-ci sera soumis directement lors de la phase de soumission, sinon il sera annulé lors de la phase de soumission.

Quelles sont les solutions courantes pour les transactions distribuées ?

Les solutions de mise en œuvre de solutions de transactions distribuées comprennent :

  • Solutions de transactions distribuées basées sur des files d'attente de messages (telles que la solution open source de RocketMQ)
  • Solutions de transactions distribuées basées sur des cadres de transactions distribuées (tels que Seata, TCC -Transaction et autres frameworks)
  • Solutions de transactions distribuées basées sur le protocole XA (comme JTA, etc.)
  • Solutions de transactions distribuées basées sur une cohérence éventuelle des messages fiables (comme le middleware de transactions distribuées GTS d'Alibaba)
  • Distribué solution de transaction basée sur le principe CAP (comme le mode de sourcing d'événements dans l'architecture CQRS)

Qu'est-ce que JTA ?

JTA (Java Transaction API) est la spécification de l'interface de programmation de J2EE. C'est l'implémentation JAVA du protocole XA. Il définit principalement :

Une interface de gestionnaire de transactions javax.transaction .TransactionManager, définit les opérations de démarrage, de validation, de retrait, etc. de la transaction. javax.transaction.TransactionManager,定义了有关事务的开始、提交、撤回等>操作。

一个满足XA规范的资源定义接口javax.transaction.xa.XAResource,一种资源如果要支持JTA事务,就需要让它的资源实现该XAResource

Une interface de définition de ressources qui répond aux spécifications XA, si une ressource souhaite prendre en charge les transactions JTA, ses ressources doivent implémenter le <code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;marge gauche : 2px;couleur d'arrière-plan : rgba(27, 31, 35, 0,05);famille de polices : " operator mono consolas monaco menlo monospace de mot couleur break-all rgb>XAResource et implémente l'interface liée à la soumission en deux phases définie par cette interface. Si nous avons une application qui utilise l'interface JTA pour implémenter les transactions, lorsque l'application est en cours d'exécution, elle a besoin d'un conteneur qui implémente JTA. Généralement, il s'agit d'un conteneur J2EE, tel que JBoss, Websphere et d'autres serveurs d'applications.

Cependant, il existe également des frameworks indépendants qui implémentent JTA. Par exemple, Atomikos et bitronix fournissent tous des frameworks d'implémentation JTA sous la forme de packages jar. De cette manière, nous pouvons exécuter des systèmes d'application qui utilisent JTA pour implémenter des transactions sur des serveurs tels que Tomcat ou Jetty. 🎜

Comme mentionné ci-dessus dans la différence entre les transactions locales et les transactions externes, les transactions JTA sont des transactions externes et peuvent être utilisées pour implémenter la transactionnalité sur plusieurs ressources. C'est exactement ce qu'il fait avec chaque ressource XAResource来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end(), forget(), isSameRM(), prepare() et plus encore. À partir de ces seules interfaces, vous pouvez imaginer la complexité de JTA dans la mise en œuvre de transactions en deux phases.

Qu'est-ce que XA ?

XA est une architecture (ou protocole) de transactions distribuées proposée par l'organisation X/Open. L'architecture XA définit principalement l'interface entre le Transaction Manager (global) et le Resource Manager (local). L'interface XA est une interface système bidirectionnelle qui forme un pont de communication entre le Transaction Manager et un ou plusieurs Resource Managers. En d'autres termes, dans une transaction basée sur XA, nous pouvons effectuer une gestion des transactions sur plusieurs ressources. Par exemple, un système accède à plusieurs bases de données, ou accède à la fois aux bases de données et aux ressources telles que le middleware de messages. De cette manière, nous pouvons directement implémenter toutes les transactions soumises ou annulées dans plusieurs bases de données et middleware de messages. La spécification XA n'est pas une spécification Java, mais une spécification universelle. Actuellement, diverses bases de données et de nombreux middlewares de messages prennent en charge la spécification XA.

JTA est une spécification pour le développement Java qui répond à la spécification XA. Par conséquent, lorsque nous disons que nous utilisons JTA pour implémenter des transactions distribuées, nous entendons en fait utiliser les spécifications JTA pour implémenter des transactions avec plusieurs bases de données, middleware de messages et autres ressources du système.

Qu'est-ce qu'Atomikos

Atomikos est un gestionnaire de transactions open source très populaire et peut être intégré à votre application Spring Boot. Le serveur d'applications Tomcat n'implémente pas la spécification JTA. Lorsque vous utilisez Tomcat comme serveur d'applications, vous devez utiliser une classe de gestionnaire de transactions tierce comme gestionnaire de transactions global, et le framework Atomikos le fait, en intégrant la gestion des transactions dans l'application. Ne dépend pas du serveur d'applications.

Spring Boot intègre Atomikos

Ça ne sert à rien de parler d'un tas de théories, montre-moi le code.

Pile technologique : Spring Boot+MyBatis+Atomikos+MySQL

Si vous suivez le code de cet article, faites attention à votre version MySQL.

Construisez d'abord deux bases de données (my-db_0 et my-db_1), puis créez une table dans chaque base de données.

Dans la base de données my-db_0 :

CREATE TABLE `t_user_0` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
Copier après la connexion

Dans la base de données my-db_1 :

CREATE TABLE `t_user_1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
Copier après la connexion

Ceci est juste pour démontrer les transactions distribuées, ne vous inquiétez pas de la signification spécifique de la table. Structure globale du projet

Source de données
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tian</groupId>
    <artifactId>spring-boot-atomikos</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <name>spring-boot-atomikos</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- mybatis依赖 -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- mysql依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
        <!--分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 要使生成的jar可运行,需要加入此插件 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <excludes>
                    <exclude>**/*.java</exclude>
                </excludes>
            </resource>
            <resource>
                <!-- 编译xml文件 -->
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
        </resources>
    </build>
</project>
Copier après la connexion

Spring Boot+MyBatis+Atomikos+MySQL (avec code source)MyBatis Scan
server.port=9001
spring.application.name=atomikos-demo

spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user0.user=root
spring.datasource.user0.password=123456

spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user1.user=root
spring.datasource.user1.password=123456

mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml
mybatis.typeAliasesPackage=com.tian.entity
mybatis.configuration.cache-enabled=true
Copier après la connexion

L'autre est fondamentalement le même, c'est-à-dire que le chemin d'analyse est modifié en :
/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * @date 2023年05月11日 19:38
 * 博客地址:<a href="http://woaijava.cc/">博客地址</a>
 * <p>
 * 配置好两个数据源
 */
@Configuration
public class DataSourceConfig {

    // 将这个对象放入spring容器中(交给Spring管理)
    @Bean
    // 读取 application.yml 中的配置参数映射成为一个对象
    @ConfigurationProperties(prefix = "spring.datasource.user0")
    public XADataSource getDataSource0() {
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    /**
     * 创建Atomikos数据源
     * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
     */
    @Bean
    @DependsOn("getDataSource0")
    @Primary
    public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) {
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        atomikosDataSourceBean.setMaxPoolSize(20);
        return atomikosDataSourceBean;
    }


    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.user1")
    public XADataSource getDataSource1() {
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    @Bean
    @DependsOn("getDataSource1")
    public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) {
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        return atomikosDataSourceBean;
    }
}
Copier après la connexion
mapper.xml

@Configuration
@MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate")
public class MybatisPreConfig {

    @Autowired
    @Qualifier("dataSourcePre")
    private DataSource dataSource;

    /**
     * 创建 SqlSessionFactory
     */
    @Bean
    @Primary
    public SqlSessionFactory preSqlSessionFactory() throws Exception{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().
                getResources("classpath*:com/tian/mapper/user0/*.xml"));
        return bean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     */
    @Bean
    @Primary
    public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
Copier après la connexion
L'autre est fondamentalement le même, et il est publié ici. L'interface du mappeur correspondante est également très simple. En voici une :

("classpath*:com/tian/mapper/user1/*.xml")
Copier après la connexion

service

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tian.mapper.user0.User0Mapper">

    <!-- -->
    <cache eviction="LRU" flushInterval="10000" size="1024"  />

    <resultMap id="BaseResultMap" type="com.tian.entity.User0">
        <id column="id" jdbcType="BIGINT" property="id" />
        <result column="user_name" jdbcType="VARCHAR" property="userName" />
        <result column="age" jdbcType="INTEGER" property="age" />
        <result column="gender" jdbcType="INTEGER" property="gender" />
    </resultMap>

    <sql id="Base_Column_List">
        id, user_name, age, gender
    </sql>
    <insert id="insert" parameterType="com.tian.entity.User0">
        insert into t_user_0 (id, user_name,age, gender)
        values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER})
    </insert>

</mapper>
Copier après la connexion

controller

public interface User0Mapper {

    int insert(User0 record);
}
Copier après la connexion

project startup class
/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * @date 2023年05月11日 19:38
 * 博客地址:<a href="http://woaijava.cc/">博客地址</a>
 * <p>
 * 模拟三种场景:正常、制造异常、数据库异常
 */
@Service
public class UserServiceImpl implements UserService {

    @Resource
    private User0Mapper user0Mapper;
    @Resource
    private User1Mapper user1Mapper;
    /**
     * 正常逻辑 同时对两个数据库进行 插入数据
     */
    @Transactional
    @Override
    public int transaction1() throws Exception {
        User1 user1 = new User1();
        user1.setUserName("22222");
        user1.setAge(11);
        user1.setGender(0);
        user1Mapper.add(user1);
        System.out.println("---------------------------");
        // sit(数据源1)
        User0 user0 = new User0();
        user0.setUserName("111111");
        user0.setAge(11);
        user0.setGender(0);
        user0Mapper.insert(user0);
        return 1;
    }
    /**
     * 正常逻辑 同时对两个数据库进行 插入数据
     * 数据插入完后  出现异常
     */
    @Transactional
    @Override
    public int transaction2() throws Exception {
        User1 user1 = new User1();
        user1.setUserName("22222");
        user1.setAge(11);
        user1.setGender(0);
        user1Mapper.add(user1);
        System.out.println("---------------------------");
        // sit(数据源1)
        User0 user0 = new User0();
        user0.setUserName("111111");
        user0.setAge(11);
        user0.setGender(0);
        user0Mapper.insert(user0);
        //认为制造一个异常
        int a=1/0;
        return 1;
    }

    /**
     * 第一个数据插入成功  第二个数据插入失败
     */
    @Transactional
    @Override
    public int transaction3() throws Exception {
        User1 user1 = new User1();
        user1.setUserName("22222");
        user1.setAge(11);
        user1.setGender(0);
        user1Mapper.add(user1);
        System.out.println("---------------------------");
        // sit(数据源1)
        User0 user0 = new User0();
       //故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001");
        user0.setAge(11);
        user0.setGender(0);
        user0Mapper.insert(user0);
        return 1;
    }
}
Copier après la connexion
.

Test

Démarrez le projet, testez respectivement les trois suivants : http :/ /localhost:9001/user/test1 Résultat : Dans les deux bases de données, une nouvelle table de données est ajoutéehttp://localhost:9001/user/test2 Résultat : une exception indiquant que le diviseur ne peut pas être nul est levée et aucune nouvelle donnée n'est ajoutée à l'une ou l'autre base de données.

http://localhost:9001/user/test3 Résultat : une exception de valeur de champ de données trop longue est levée et aucune nouvelle donnée n'est ajoutée à l'une ou l'autre des bases de données.

D'accord, nous avons maintenant implémenté des transactions distribuées.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:Java后端技术全栈
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal