public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
The message is the children. These children have their own characteristics but also have commonalities. Two children sent by the same parent can go to the same place, or they can go to different places.
First of all, each child message has an attribute topic, which we mentioned above, is a waiting hall. After the children come in, they walk to the designated area of their designated waiting hall (don’t they also take the high-speed train at the designated platform when they go out), and sit in the message queue seats Wait, wait for the trip.
Broker has one or more topics, and messages will be stored in the message queue in the topic, waiting to be consumed.
Child news, there is also a Body attribute, this is his ability, he can draw, he can sing, he Whatever you do is recorded in the Body attribute. When you go out, the place where value is reflected is also the Body attribute.
Body is the message body, and consumers will perform corresponding operations based on the message body.
We mentioned this tag in the previous section. It is a mark. Some children carry drawing boards and cameras on their backs, and some cruise ships will specially find it. These kids pull away and complete their mission.
You can set tag attributes for messages, and consumers can choose messages containing specific tag attributes for consumption.
The key is the name of each child’s message. If you want to find a child, just call him by name.
Set the Key for the sent message, and you can search for messages based on this Key later. For example, if the message is abnormal or the message is lost, it will be very convenient to search.
Of course, some children don’t rush to leave when they come. They have thought about it before coming. It will take 30 minutes to have a meal, so they will wait when they come. Picked up after 30 minutes.
Setting the delay level can specify how long the message can be consumed.
Every parent who sends their children hopes to send them to the waiting hall, and they don’t want their children to be lost, this At this time, the waiting hall needs some guarantee mechanisms.
That is to say, after the parents send the child to the waiting hall, If you fail to sit in the message queue seat, the staff will arrange to try again to see if there are seats available. The default number of retries is 2 times, that is to say, the message child has a total of 3 opportunities to find a seat.
Looking at the source code, I specially added comments so that it can be roughly understood.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
The retry code is as above. In this sendDefaultImpl
method, it will try to send the message three times. If it fails, the corresponding error will be thrown.
If there are multiple Broker waiting halls, the service staff will arrange to message the child to choose one relatively less crowded, comparison Easy to enter to enter. Of course, we will not enter those that have been closed, have had power outages, and have no service capabilities. MQ Client will maintain a Broker's sending delay information, and based on this information will select a Broker with a relatively low delay to send the message. Will actively eliminate those Brokers that are down, unavailable or have a higher sending delay level.
Selecting
Broker is selecting message queue
. The corresponding code is as follows: Here we will first determine whether the
is turned on. This switch is turned off by default. If it is turned on, lower delay## will be selected first. #Broker.
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
off state, the code executed is as follows: In order to evenly distribute the pressure on the Broker, it will choose A different Broker
from before.public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
Main Hall There is a Deputy Hall. Generally speaking, children will enter the main hall, and then after a while, the card should be busy believing the machine (the art of shadow clone) , and then let the clone enter the deputy hall, so that when the main hall has a power outage and stops working, the clone in the deputy hall will be fine as long as it completes the task. Generally speaking, it is the message from the main hall that the child goes to take the boat to complete the task.
The above is the detailed content of Analysis of Java development RocketMQ producer high availability example. For more information, please follow other related articles on the PHP Chinese website!