public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
Le message, ce sont les enfants. Ces enfants ont leurs propres caractéristiques mais ont aussi des points communs. Deux enfants envoyés par le même parent peuvent aller au même endroit, ou bien ils peuvent aller dans des endroits différents.
Tout d'abord, chaque message enfant a un attribut topic, dont nous avons parlé plus haut, c'est une salle d'attente. Une fois que les enfants sont entrés, ils ont marché jusqu'à la zone désignée de leur salle d'attente désignée (ne prennent-ils pas habituellement le train et le train à grande vitesse sur le quai désigné ?), se sont assis dans la file d'attente des messagessièges, et attendit leur voyage.
Le courtier a un ou plusieurs sujets, et les messages seront stockés dans la file d'attente des messages du sujet, en attente d'être consommés.
Child News a également un attribut Corps, qui est sa capacité Il peut dessiner, il peut chanter et il peut faire ce qu'il veut, qui est enregistré dans cet attribut Corps. Lorsque vous sortez, l'endroit où la valeur se reflète est également l'attribut Corps.
Body est le corps du message et les consommateurs effectueront les opérations correspondantes en fonction du corps du message.
Nous avons mentionné cette étiquette dans la section précédente, c'est une marque Certains enfants portent des planches à dessin et des appareils photo, et certains bateaux de croisière trouveront spécialement ces enfants et les emmèneront pour accomplir leurs tâches.
Vous pouvez définir des attributs de balise pour les messages et les consommateurs peuvent choisir des messages contenant des attributs de balise spécifiques à consommer.
key est le nom du message de chaque enfant. Si vous voulez retrouver un enfant, appelez-le simplement par son nom.
Définissez la clé pour le message envoyé et vous pourrez rechercher des messages basés sur cette clé à l'avenir. Par exemple, si le message est anormal ou si le message est perdu, il sera très pratique de le rechercher.
Bien sûr, certains enfants ne sont pas pressés de partir. Ils y ont réfléchi avant de venir. Il leur faudra 30 minutes pour prendre un repas, ils attendront donc 30 minutes avant d'être récupérés.
Définissez le niveau de délai pour spécifier la durée pendant laquelle le message peut être consommé.
Chaque parent qui envoie ses enfants veut envoyer ses enfants dans la salle d'attente, et ils ne veulent pas que leurs enfants soient perdus En ce moment, la salle d'attente a besoin de mécanismes de garantie.
C'est-à-dire qu'après que le parent a envoyé l'enfant, après être entré dans la salle d'attente, l'enfant n'a pas réussi à s'asseoir dans la file d'attente des messages , et cela fonctionnait à ce moment-là. Le personnel fera un nouvel essai pour voir s'il y a des places disponibles. Le nombre de tentatives par défaut est de 2 fois, ce qui signifie que l'enfant qui reçoit le message a un total de 3 occasions de trouver une place.
Regardez le code source, j'ai spécialement ajouté des commentaires, pour que vous puissiez le comprendre à peu près.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
Le code de nouvelle tentative est comme ci-dessus. Dans cette méthode sendDefaultImpl
, il essaiera d'envoyer le message trois fois. S'il échoue, l'erreur correspondante sera générée. sendDefaultImpl
方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。
若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤,比较容易进入的来进入。当然那些已经关闭的,停电的,没有服务能力的,我们是不会进的。
MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.
选择Broker
就是在选择message queue
S'il y a plusieurs salles d'attente des courtiers, le personnel de service s'arrangera pour envoyer un message à l'enfant afin qu'il en choisisse une relativement moins fréquentée et plus facile d'accès. Bien entendu, nous n'entrerons pas dans ceux qui ont été fermés
,hors tension, n'ont aucune capacité de service.
Le client MQ conservera les informations sur le délai d'envoi d'un courtier et, sur la base de ces informations, il sélectionnera un courtier avec un délai relativement faible pour envoyer le message. Éliminera activement les courtiers qui sont en panne, indisponibles ou qui ont un niveau de retard d'envoi élevé.Sélectionner Courtier
revient à sélectionner file d'attente des messages
. Le code correspondant est le suivant :
est désactivé par défaut S'il est activé, le courtier avec une latence inférieure sera prioritaire.
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!