Maison > Périphériques technologiques > IA > Deuxième page d'Alibaba : le consommateur RocketMQ récupère un lot de messages, mais certains d'entre eux ne parviennent pas à être consommés. Comment mettre à jour le décalage ?

Deuxième page d'Alibaba : le consommateur RocketMQ récupère un lot de messages, mais certains d'entre eux ne parviennent pas à être consommés. Comment mettre à jour le décalage ?

WBOY
Libérer: 2023-04-12 23:28:14
avant
1036 Les gens l'ont consulté

Bonjour à tous, je suis frère Jun.

Une question a récemment été posée à un lecteur lors d'un entretien. Si un consommateur extrait un lot de messages, par exemple 100, et que le 100ème message est consommé avec succès, mais que le 50ème message échoue, comment le décalage sera-t-il mis à jour ? Concernant ce problème, parlons aujourd'hui de la façon de sauvegarder le décalage si un lot de messages ne parvient pas à être consommé.

1 Extraction de messages

1.1 Encapsulation des demandes d'extraction

En prenant le mode push de RocketMQ comme exemple, le code de démarrage du consommateur RocketMQ est le suivant :

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Copier après la connexion

Le DefaultMQPushConsumer ci-dessus est un consommateur en mode push, et la méthode de démarrage est start. Une fois le consommateur démarré, le thread de rééquilibrage (RebalanceService) sera déclenché. La tâche de ce thread est de rééquilibrer en continu dans une boucle infinie et enfin d'encapsuler la demande d'extraction du message dans pullRequestQueue. Le diagramme de classes UML impliqué dans ce processus est le suivant :

Deuxième page dAlibaba : le consommateur RocketMQ récupère un lot de messages, mais certains dentre eux ne parviennent pas à être consommés. Comment mettre à jour le décalage ?

1.2 Traitement des demandes d'extraction

Après avoir encapsulé la demande d'extraction PullRequest, RocketMQ obtiendra en permanence la demande d'extraction de message de pullRequestQueue pour traitement. Le diagramme de classes UML est le suivant :

Deuxième page dAlibaba : le consommateur RocketMQ récupère un lot de messages, mais certains dentre eux ne parviennent pas à être consommés. Comment mettre à jour le décalage ?

La méthode de saisie pour extraire les messages est une boucle infinie, et le code est le suivant :

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}
Copier après la connexion

Une fois le message extrait ici, il est soumis à la fonction de rappel PullCallback pour traitement.

Le message extrait est d'abord placé dans msgTreeMap dans ProcessQueue, puis encapsulé dans la classe de thread ConsumeRequest pour être traité. Après avoir rationalisé le code, la logique de traitement ConsumeRequest est la suivante :

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}
Copier après la connexion

2 Traitement des résultats de consommation

2.1 Messages simultanés

Le code de traitement simultané des résultats de consommation est simplifié comme suit :

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}
Copier après la connexion

Comme le montre le code ci-dessus , si le message est traité La logique est série. Par exemple, le code au début de l'article utilise une boucle for pour traiter les messages. Si le traitement d'un certain message échoue, quittez directement la boucle et attribuez la variable ackIndex de ConsumeConcurrentlyContext. à la position du message ayant échoué dans la liste des messages, de sorte que les messages suivant le premier message ayant échoué ne seront plus traités et seront envoyés au courtier pour attendre une nouvelle extraction. Le code est le suivant :

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Copier après la connexion

Le message consommé avec succès est supprimé du msgTreeMap dans ProcessQueue et le plus petit décalage (firstKey) dans le msgTreeMap est renvoyé pour mise à jour. Remarque : Le décalage du mode cluster est stocké du côté du courtier. Pour mettre à jour le décalage, un message doit être envoyé au courtier. Cependant, le décalage du mode de diffusion est stocké du côté du consommateur et seul le décalage local doit être mis à jour.

Si la logique de traitement des messages est parallèle, cela n'a aucun sens d'attribuer une valeur à ackIndex après l'échec du traitement du message, car plusieurs messages peuvent échouer et l'attribution d'une valeur à la variable ackIndex n'est pas exacte. Le meilleur moyen consiste à attribuer une valeur de 0 à ackIndex et à consommer à nouveau l'intégralité du lot de messages, ce qui peut entraîner d'autres problèmes.

2.2 Messages séquentiels

Pour les messages séquentiels, après avoir pris le message de msgTreeMap, il doit d'abord être placé sur consumerMsgOrderlyTreeMap Lors de la mise à jour du décalage, le décalage maximum du message (lastKey) est extrait de consumerMsgOrderlyTreeMap.

3 Résumé

Retour à la question initiale, si un lot de messages est consommé dans l'ordre, il est impossible que le 100ème message soit consommé avec succès, mais le 50ème message échoue, car lorsque le 50ème message échoue, la boucle il faut sortir et la consommation ne doit plus continuer.

S'il s'agit d'une consommation simultanée, si cette situation se produit, il est recommandé de consommer à nouveau l'intégralité du lot de messages, c'est-à-dire d'attribuer une valeur de 0 à ackIndex, afin que des problèmes tels que le monde souterrain doivent être pris en compte.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:51cto.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal