


Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?
Hello everyone, I am Brother Jun.
Recently, a reader was asked a question when participating in an interview. If a consumer pulls a batch of messages, such as 100, the consumption of the 100th message is successful, but the consumption of the 50th message fails. The offset How will it be updated? Regarding this issue, let’s talk today about how to save the offset if a batch of messages fails to be consumed.
1 Pull message
1.1 Encapsulate pull request
Taking RocketMQ push mode as an example, the RocketMQ consumer startup code is as follows:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
The above DefaultMQPushConsumer is a push mode consumer, and the startup method is start. After the consumer is started, the rebalance thread (RebalanceService) will be triggered. The task of this thread is to continuously rebalance in an infinite loop, and finally encapsulate the request to pull the message into the pullRequestQueue. The UML class diagram involved in this process is as follows:
1.2 Processing pull requests
After encapsulating the pull request PullRequest, RocketMQ will not Continuously obtain message pull requests from pullRequestQueue for processing. The UML class diagram is as follows:
The entry method for pulling messages is an infinite loop, the code is as follows:
//PullMessageService public void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
After pulling the message here, submit it to PullCallback is handled by this callback function.
The pulled message is first put into msgTreeMap in ProcessQueue, and then encapsulated into the thread class ConsumeRequest for processing. After streamlining the code, the ConsumeRequest processing logic is as follows:
//ConsumeMessageConcurrentlyService.java public void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { //1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) { //2.处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
2 Processing consumption results
2.1 Concurrent messages
The code for concurrent message processing consumption results is simplified as follows:
//ConsumeMessageConcurrentlyService.java public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ){ int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break; case RECONSUME_LATER: break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
As can be seen from the above code, if the logic of processing messages is serial, for example, the code at the beginning of the article uses a for loop to process messages, then if the processing of a certain message fails, exit the loop directly and give The ackIndex variable of ConsumeConcurrentlyContext is assigned the position of the failed message in the message list, so that the messages following this failed message will no longer be processed and will be sent to the Broker to wait for re-pulling. The code is as follows:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) { try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
Successfully consumed messages are removed from msgTreeMap in ProcessQueue, and the smallest offset (firstKey) in msgTreeMap is returned for update. Note: The cluster mode offset is stored on the Broker side. To update the offset, a message needs to be sent to the Broker. However, the broadcast mode offset is stored on the Consumer side and only the local offset needs to be updated.
If the logic of processing messages is parallel, it is meaningless to assign a value to ackIndex after the message processing fails, because multiple messages may fail, and assigning a value to the ackIndex variable is not accurate. The best way is to assign a value of 0 to ackIndex and consume the entire batch of messages again, which may cause other problems.
2.2 Sequential messages
For sequential messages, after taking the message from msgTreeMap, it must first be placed on consumingMsgOrderlyTreeMap. When updating the offset, the largest message offset is taken from consumingMsgOrderlyTreeMap ( lastKey).
3 Summary
Going back to the initial question, if a batch of messages is consumed in order, it is impossible for the 100th message to be consumed successfully, but the 50th message to fail, because When the 50th message fails, the loop should be exited and consumption should not continue.
If it is concurrent consumption, if this situation occurs, it is recommended that the entire batch of messages be consumed again, that is, assign a value of 0 to ackIndex. In this way, issues such as underworld must be considered.
The above is the detailed content of Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



1. SpringBoot integrates RocketMQ. Integrating RocketMQ in SpringBoot only requires four simple steps: 1. Introduce the relevant dependency org.apache.rocketmqrocketmq-spring-boot-starter2. Add the relevant configuration of RocketMQ rocketmq:consumer:group:springboot_consumer_group# pull once The maximum value of the message. Note that it is the maximum value of the message pulled, not the maximum value consumed. pull-batch-size:10name-server:10.5.103.6:9876pr

Hello everyone, I am Brother Jun. Recently, a reader was asked a question during an interview. If a consumer pulls a batch of messages, such as 100 messages, and the 100th message is successfully consumed, but the 50th message fails, how will the offset be updated? Regarding this issue, let’s talk today about how to save the offset if a batch of messages fails to be consumed. 1 Pulling messages 1.1 Encapsulating pull requests Taking RocketMQ push mode as an example, the RocketMQ consumer startup code is as follows: public static void main(String[] args) throws InterruptedException, MQClie

Application scenarios When implementing RocketMQ consumption, the @RocketMQMessageListener annotation is generally used to define Group, Topic, and selectorExpression (data filtering and selection rules). In order to support dynamic filtering of data, expressions are generally used, and then dynamic switching is performed through apollo or cloudconfig. . Introduce dependency org.apache.rocketmqrocketmq-spring-boot-starter2.0.4 consumer code @RocketMQMessageListener(consumerGroup=&qu

The process of installing RocketMQ in Docker is as follows: Create a Docker network: Execute the following command in the terminal to create a Docker network for communication between containers: dockernetworkcreaterocketmq-network Download the RocketMQ image: Execute the following command in the terminal to download RocketMQ Docker image: dockerpullrocketmqinc/rocketmq Start the NameServer container: Execute the following command in the terminal to start the NameServer container: dockerrun -d --namermqnamesrv --

Implementation Methods of Queue Producer and Consumer Patterns in PHP and MySQL With the rapid development of Internet business, the need to handle a large number of tasks in the system has become more and more urgent. Queues are a common solution to handle tasks efficiently. The implementation of the queue's producer-consumer pattern (Producer-ConsumerPattern) in PHP and MySQL is a common solution. This article will introduce the specific implementation method and provide code examples. producer-consumer pattern

A prevalent synchronization challenge in concurrent computing is known as the producer-consumer problem. Given that multiple threads or processes are designed to coordinate their operations when accessing a shared source; this problem requires complex communication tasks as well as balanced execution. Today's discussion will help to understand the concepts behind this difficulty, while recognizing its importance in contemporary computer science frameworks - particularly in C++ implementation practice. Understanding the Definition and Purpose of the Producer-Consumer Problem Solutions to the challenges posed by the producer-consumer problem come from clearly demarcating responsibilities between those responsible for producing and using information. When producers generate new records themselves, consumers ensure they are used correctly by synchronizing their operations. One must be careful to avoid problems such as race conditions or deadlocks, e.g.

1 message publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//Topic name privateStringtopic;//Message extension information, Tag, keys, delay level all exist here privateMappproperties;//Message body, byte array privatebyte[]body;//Set the key of the message , publicvoidsetKeys(Stringkeys){}//Set

1. Explain that consumer represents consumption. The interface accepts the general parameter t, calls accept, and performs a series of operations on the parameters, but there is no return value. 2. Instance For Consumer, we need to provide input parameters for consumption. classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}What can Java be used for? Java is mainly used in: 1.web
