Table of Contents
1 Pull message
1.1 Encapsulate pull request
1.2 Processing pull requests
2 Processing consumption results
2.1 Concurrent messages
2.2 Sequential messages
3 Summary
Home Technology peripherals AI Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?

Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?

Apr 12, 2023 pm 11:28 PM
rocketmq consumer consumer

Hello everyone, I am Brother Jun.

Recently, a reader was asked a question when participating in an interview. If a consumer pulls a batch of messages, such as 100, the consumption of the 100th message is successful, but the consumption of the 50th message fails. The offset How will it be updated? Regarding this issue, let’s talk today about how to save the offset if a batch of messages fails to be consumed.

1 Pull message

1.1 Encapsulate pull request

Taking RocketMQ push mode as an example, the RocketMQ consumer startup code is as follows:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
 }catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Copy after login

The above DefaultMQPushConsumer is a push mode consumer, and the startup method is start. After the consumer is started, the rebalance thread (RebalanceService) will be triggered. The task of this thread is to continuously rebalance in an infinite loop, and finally encapsulate the request to pull the message into the pullRequestQueue. The UML class diagram involved in this process is as follows:

Alibabas second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?

1.2 Processing pull requests

After encapsulating the pull request PullRequest, RocketMQ will not Continuously obtain message pull requests from pullRequestQueue for processing. The UML class diagram is as follows:

Alibabas second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?

The entry method for pulling messages is an infinite loop, the code is as follows:

//PullMessageService
public void run(){
 log.info(this.getServiceName() + " service started");

 while (!this.isStopped()) {
try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
 log.error("Pull Message Service Run Method exception", e);
}
 }

 log.info(this.getServiceName() + " service end");
}
Copy after login

After pulling the message here, submit it to PullCallback is handled by this callback function.

The pulled message is first put into msgTreeMap in ProcessQueue, and then encapsulated into the thread class ConsumeRequest for processing. After streamlining the code, the ConsumeRequest processing logic is as follows:

//ConsumeMessageConcurrentlyService.java
public void run(){
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;
 try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
 }
}
Copy after login

2 Processing consumption results

2.1 Concurrent messages

The code for concurrent message processing consumption results is simplified as follows:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
){
 int ackIndex = context.getAckIndex();
 switch (status) {
case CONSUME_SUCCESS:
 if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
 }
 int ok = ackIndex + 1;
 int failed = consumeRequest.getMsgs().size() - ok;
 break;
case RECONSUME_LATER:
 break;
default:
 break;
 }

 switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
 }
 break;
case CLUSTERING:
 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
 msgBackFailed.add(msg);
}
 }

 if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
 }
 break;
default:
 break;
 }

 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}
Copy after login

As can be seen from the above code, if the logic of processing messages is serial, for example, the code at the beginning of the article uses a for loop to process messages, then if the processing of a certain message fails, exit the loop directly and give The ackIndex variable of ConsumeConcurrentlyContext is assigned the position of the failed message in the message list, so that the messages following this failed message will no longer be processed and will be sent to the Broker to wait for re-pulling. The code is as follows:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 for (int i = 0; i < msgs.size(); i++) {
try{
 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
 context.setAckIndex(i);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
 });
 consumer.start();
}
Copy after login

Successfully consumed messages are removed from msgTreeMap in ProcessQueue, and the smallest offset (firstKey) in msgTreeMap is returned for update. Note: The cluster mode offset is stored on the Broker side. To update the offset, a message needs to be sent to the Broker. However, the broadcast mode offset is stored on the Consumer side and only the local offset needs to be updated.

If the logic of processing messages is parallel, it is meaningless to assign a value to ackIndex after the message processing fails, because multiple messages may fail, and assigning a value to the ackIndex variable is not accurate. The best way is to assign a value of 0 to ackIndex and consume the entire batch of messages again, which may cause other problems.

2.2 Sequential messages

For sequential messages, after taking the message from msgTreeMap, it must first be placed on consumingMsgOrderlyTreeMap. When updating the offset, the largest message offset is taken from consumingMsgOrderlyTreeMap ( lastKey).

3 Summary

Going back to the initial question, if a batch of messages is consumed in order, it is impossible for the 100th message to be consumed successfully, but the 50th message to fail, because When the 50th message fails, the loop should be exited and consumption should not continue.

If it is concurrent consumption, if this situation occurs, it is recommended that the entire batch of messages be consumed again, that is, assign a value of 0 to ackIndex. In this way, issues such as underworld must be considered.

The above is the detailed content of Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset?. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: How To Unlock Everything In MyRise
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How does SpringBoot integrate RocketMQ? How does SpringBoot integrate RocketMQ? May 14, 2023 am 10:19 AM

1. SpringBoot integrates RocketMQ. Integrating RocketMQ in SpringBoot only requires four simple steps: 1. Introduce the relevant dependency org.apache.rocketmqrocketmq-spring-boot-starter2. Add the relevant configuration of RocketMQ rocketmq:consumer:group:springboot_consumer_group# pull once The maximum value of the message. Note that it is the maximum value of the message pulled, not the maximum value consumed. pull-batch-size:10name-server:10.5.103.6:9876pr

Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset? Alibaba's second page: RocketMQ consumer pulls a batch of messages, but some of them fail to consume. How to update the offset? Apr 12, 2023 pm 11:28 PM

Hello everyone, I am Brother Jun. Recently, a reader was asked a question during an interview. If a consumer pulls a batch of messages, such as 100 messages, and the 100th message is successfully consumed, but the 50th message fails, how will the offset be updated? Regarding this issue, let’s talk today about how to save the offset if a batch of messages fails to be consumed. 1 Pulling messages 1.1 Encapsulating pull requests Taking RocketMQ push mode as an example, the RocketMQ consumer startup code is as follows: public static void main(String[] args) throws InterruptedException, MQClie

How to solve the pitfalls encountered when integrating RocketMQ with SpringBoot How to solve the pitfalls encountered when integrating RocketMQ with SpringBoot May 19, 2023 am 11:25 AM

Application scenarios When implementing RocketMQ consumption, the @RocketMQMessageListener annotation is generally used to define Group, Topic, and selectorExpression (data filtering and selection rules). In order to support dynamic filtering of data, expressions are generally used, and then dynamic switching is performed through apollo or cloudconfig. . Introduce dependency org.apache.rocketmqrocketmq-spring-boot-starter2.0.4 consumer code @RocketMQMessageListener(consumerGroup=&qu

Common environment deployment—Docker installation RocketMQ tutorial! Common environment deployment—Docker installation RocketMQ tutorial! Mar 07, 2024 am 09:30 AM

The process of installing RocketMQ in Docker is as follows: Create a Docker network: Execute the following command in the terminal to create a Docker network for communication between containers: dockernetworkcreaterocketmq-network Download the RocketMQ image: Execute the following command in the terminal to download RocketMQ Docker image: dockerpullrocketmqinc/rocketmq Start the NameServer container: Execute the following command in the terminal to start the NameServer container: dockerrun -d --namermqnamesrv --

How to implement queue producer and consumer patterns in PHP and MySQL How to implement queue producer and consumer patterns in PHP and MySQL Oct 15, 2023 pm 02:33 PM

Implementation Methods of Queue Producer and Consumer Patterns in PHP and MySQL With the rapid development of Internet business, the need to handle a large number of tasks in the system has become more and more urgent. Queues are a common solution to handle tasks efficiently. The implementation of the queue's producer-consumer pattern (Producer-ConsumerPattern) in PHP and MySQL is a common solution. This article will introduce the specific implementation method and provide code examples. producer-consumer pattern

Producer-consumer problem and its implementation in C++ Producer-consumer problem and its implementation in C++ Sep 17, 2023 pm 11:09 PM

A prevalent synchronization challenge in concurrent computing is known as the producer-consumer problem. Given that multiple threads or processes are designed to coordinate their operations when accessing a shared source; this problem requires complex communication tasks as well as balanced execution. Today's discussion will help to understand the concepts behind this difficulty, while recognizing its importance in contemporary computer science frameworks - particularly in C++ implementation practice. Understanding the Definition and Purpose of the Producer-Consumer Problem Solutions to the challenges posed by the producer-consumer problem come from clearly demarcating responsibilities between those responsible for producing and using information. When producers generate new records themselves, consumers ensure they are used correctly by synchronizing their operations. One must be careful to avoid problems such as race conditions or deadlocks, e.g.

Analysis of Java development RocketMQ producer high availability example Analysis of Java development RocketMQ producer high availability example Apr 23, 2023 pm 11:28 PM

1 message publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID=8445773977080406428L;//Topic name privateStringtopic;//Message extension information, Tag, keys, delay level all exist here privateMappproperties;//Message body, byte array privatebyte[]body;//Set the key of the message , publicvoidsetKeys(Stringkeys){}//Set

What is the role of java Consumer interface What is the role of java Consumer interface Apr 26, 2023 am 11:34 AM

1. Explain that consumer represents consumption. The interface accepts the general parameter t, calls accept, and performs a series of operations on the parameters, but there is no return value. 2. Instance For Consumer, we need to provide input parameters for consumption. classPerson{StringfirstName;StringlastName;Person(){}Person(StringfirstName,StringlastName){this.firstName=firstName;this.lastName=lastName;}}What can Java be used for? Java is mainly used in: 1.web

See all articles