


Comment SpringBoot intègre les transactions, les diffusions et les messages séquentiels RocketMQ
Environnement : springboot2.3.9RELEASE + RocketMQ4.8.0
Dépend de
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.2.0</version> </dependency>
Fichier de configuration
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
Message normal
Envoyer
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
Recevoir
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }</string>
Message séquentiel
Envoyer
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
Voici la clé de hachage qui envoie le message à Dans différentes files d'attente,
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }</string>
consumeMode = ConsumeMode.ORDERLY indique que le mode message est le mode séquentiel, une file d'attente et un thread.
Résultat
Lorsque consumerMode = ConsumeMode.CONCURRENTLY, le résultat de l'exécution est le suivant :
Mode message cluster/diffusion
Expéditeur
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
Mode message cluster
Consum euh
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
message Model = MessageModel .CLUSTERING
Test
Démarrez deux services dont les ports sont respectivement 8080 et 8081
Service 8080
Service 8081
En mode message cluster, chaque service reçoit une partie du message séparément, réalisant la charge équilibrage
Mode message de diffusion
Côté consommateur
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<string> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }</string>
messageModel = MessageModel.BROADCASTING
Test
Démarrer deux services avec les ports 8080 et 8081
Service 8080
Service 8081
Dans le message du cluster mode , chaque service a reçu le même message séparément.
Message de transaction
3 statuts des transactions RocketMQ
TransactionStatus.CommitTransaction : validez le message de transaction, le consommateur peut consommer ce message
TransactionStatus.RollbackTransaction : annulez la transaction, ce qui signifie que le message sera supprimé et non autorisé à consommer.
TransactionStatus.Unknown : Statut intermédiaire, qui représente la nécessité de vérifier la file d'attente des messages pour déterminer le statut.
RocketMQ implémente les messages de transaction en deux phases principales : l'envoi et la soumission normaux des transactions et le processus de compensation des informations sur les transactions. Le processus global est le suivant :
Phase d'envoi et de soumission normale des transactions
1. (Le demi-message fait référence au message que le consommateur ne peut pas consommer temporairement)
2. Le serveur répond au résultat de l'écriture du message et le demi-message est envoyé avec succès
3. Commence à exécuter la transaction locale
4. ou Rollback en fonction de l'état d'exécution de la transaction locale Opération
Processus de compensation des informations sur la transaction
1. Si MQServer ne reçoit pas l'état d'exécution de la transaction locale pendant une longue période, il lancera une demande d'opération de révision de confirmation auprès du producteur
2. Une fois que le producteur a reçu la demande de révision de confirmation, vérifiez l'état d'exécution des transactions locales
3. Exécutez les opérations de validation ou de restauration en fonction des résultats de la vérification
La phase de compensation est principalement utilisée pour résoudre le problème de délai d'attente ou d'échec. lorsque les producteurs envoient des opérations Commit ou Rollback.
Expéditeur
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
Producteur correspondant auditeur
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 这里执行本地的事务操作,比如保存数据。 try { // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据 String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 这里检查本地事务是否执行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("执行查询ID为:" + id + " 的数据是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
Consumer
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<users> { @Override public void onMessage(Users users) { System.out.println("TX接收到消息:" + users) ; } }</users>
Service
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("数据错误") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
Controller
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }
Test
Après avoir appelé l'interface, la console affiche
On peut le voir dans le journal d'impression. le consommateur ne reçoit le message qu'une fois que tout a été enregistré.
Supprimez les données puis testez si l'ID est 1, une erreur sera signalée.
Il n'y a aucune donnée dans la base de données. . .
Ce n’est pas très compliqué, ça se gère en 2 étapes.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

Video Face Swap
Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Sujets chauds





Introduction à Jasypt Jasypt est une bibliothèque Java qui permet à un développeur d'ajouter des fonctionnalités de chiffrement de base à son projet avec un minimum d'effort et ne nécessite pas une compréhension approfondie du fonctionnement du chiffrement. Haute sécurité pour le chiffrement unidirectionnel et bidirectionnel. technologie de cryptage basée sur des normes. Cryptez les mots de passe, le texte, les chiffres, les binaires... Convient pour l'intégration dans des applications basées sur Spring, API ouverte, pour une utilisation avec n'importe quel fournisseur JCE... Ajoutez la dépendance suivante : com.github.ulisesbocchiojasypt-spring-boot-starter2 1.1. Les avantages de Jasypt protègent la sécurité de notre système. Même en cas de fuite du code, la source de données peut être garantie.

Scénario d'utilisation 1. La commande a été passée avec succès mais le paiement n'a pas été effectué dans les 30 minutes. Le paiement a expiré et la commande a été automatiquement annulée 2. La commande a été signée et aucune évaluation n'a été effectuée pendant 7 jours après la signature. Si la commande expire et n'est pas évaluée, le système donne par défaut une note positive. 3. La commande est passée avec succès. Si le commerçant ne reçoit pas la commande pendant 5 minutes, la commande est annulée. 4. Le délai de livraison expire et. un rappel par SMS est envoyé... Pour les scénarios avec des délais longs et de faibles performances en temps réel, nous pouvons utiliser la planification des tâches pour effectuer un traitement d'interrogation régulier. Par exemple : xxl-job Aujourd'hui, nous allons choisir

1. Redis implémente le principe du verrouillage distribué et pourquoi les verrous distribués sont nécessaires. Avant de parler de verrous distribués, il est nécessaire d'expliquer pourquoi les verrous distribués sont nécessaires. Le contraire des verrous distribués est le verrouillage autonome. Lorsque nous écrivons des programmes multithreads, nous évitons les problèmes de données causés par l'utilisation d'une variable partagée en même temps. Nous utilisons généralement un verrou pour exclure mutuellement les variables partagées afin de garantir l'exactitude de celles-ci. les variables partagées. Son champ d’utilisation est dans le même processus. S’il existe plusieurs processus qui doivent exploiter une ressource partagée en même temps, comment peuvent-ils s’exclure mutuellement ? Les applications métier d'aujourd'hui sont généralement une architecture de microservices, ce qui signifie également qu'une application déploiera plusieurs processus si plusieurs processus doivent modifier la même ligne d'enregistrements dans MySQL, afin d'éviter les données sales causées par des opérations dans le désordre, les besoins de distribution. à introduire à ce moment-là. Le style est verrouillé. Vous voulez marquer des points

Springboot lit le fichier, mais ne peut pas accéder au dernier développement après l'avoir empaqueté dans un package jar. Il existe une situation dans laquelle Springboot ne peut pas lire le fichier après l'avoir empaqueté dans un package jar. La raison en est qu'après l'empaquetage, le chemin virtuel du fichier. n’est pas valide et n’est accessible que via le flux Read. Le fichier se trouve sous les ressources publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

Lorsque Springboot+Mybatis-plus n'utilise pas d'instructions SQL pour effectuer des opérations d'ajout de plusieurs tables, les problèmes que j'ai rencontrés sont décomposés en simulant la réflexion dans l'environnement de test : Créez un objet BrandDTO avec des paramètres pour simuler le passage des paramètres en arrière-plan. qu'il est extrêmement difficile d'effectuer des opérations multi-tables dans Mybatis-plus. Si vous n'utilisez pas d'outils tels que Mybatis-plus-join, vous pouvez uniquement configurer le fichier Mapper.xml correspondant et configurer le ResultMap malodorant et long, puis. écrivez l'instruction SQL correspondante Bien que cette méthode semble lourde, elle est très flexible et nous permet de

SpringBoot et SpringMVC sont tous deux des frameworks couramment utilisés dans le développement Java, mais il existe des différences évidentes entre eux. Cet article explorera les fonctionnalités et les utilisations de ces deux frameworks et comparera leurs différences. Tout d’abord, découvrons SpringBoot. SpringBoot a été développé par l'équipe Pivotal pour simplifier la création et le déploiement d'applications basées sur le framework Spring. Il fournit un moyen rapide et léger de créer des fichiers exécutables autonomes.

1. Personnalisez RedisTemplate1.1, mécanisme de sérialisation par défaut RedisAPI. L'implémentation du cache Redis basée sur l'API utilise le modèle RedisTemplate pour les opérations de mise en cache des données. Ici, ouvrez la classe RedisTemplate et affichez les informations sur le code source de la classe. Déclarer la clé, diverses méthodes de sérialisation de la valeur, la valeur initiale est vide @NullableprivateRedisSe

Dans les projets, certaines informations de configuration sont souvent nécessaires. Ces informations peuvent avoir des configurations différentes dans l'environnement de test et dans l'environnement de production, et peuvent devoir être modifiées ultérieurement en fonction des conditions commerciales réelles. Nous ne pouvons pas coder en dur ces configurations dans le code. Il est préférable de les écrire dans le fichier de configuration. Par exemple, vous pouvez écrire ces informations dans le fichier application.yml. Alors, comment obtenir ou utiliser cette adresse dans le code ? Il existe 2 méthodes. Méthode 1 : Nous pouvons obtenir la valeur correspondant à la clé dans le fichier de configuration (application.yml) via le ${key} annoté avec @Value. Cette méthode convient aux situations où il y a relativement peu de microservices. Méthode 2 : En réalité. projets, Quand les affaires sont compliquées, la logique
