public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; //主题名字 private String topic; //消息扩展信息,Tag,keys,延迟级别都存在这里 private Map<String, String> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setKeys(String keys) {} //设置topic public void setTopic(String topic) {} //延迟级别 public int setDelayTimeLevel(int level) {} //消息过滤的标记 public void setTags(String tags) {} //扩展信息存放在此 public void putUserProperty(final String name, final String value) {} }
Die Botschaft ist, dass diese Kinder ihre eigenen Eigenschaften, aber auch Gemeinsamkeiten haben. Zwei Kinder, die von demselben Elternteil geschickt werden, können an denselben Ort oder an verschiedene Orte gehen.
Zunächst hat jede untergeordnete Nachricht ein Attribut Thema, das wir oben erwähnt haben, ist eine Wartehalle. Nachdem die Kinder hereingekommen waren, gingen sie zu dem gekennzeichneten Bereich ihrer vorgesehenen Wartehalle (nehmen sie normalerweise nicht den Zug und die Hochgeschwindigkeitsbahn am dafür vorgesehenen Bahnsteig?), setzten sich in die NachrichtenwarteschlangePlätze, und warteten auf ihre Reise.
Broker hat ein oder mehrere Themen und Nachrichten werden in der Nachrichtenwarteschlange im Thema gespeichert und warten darauf, verbraucht zu werden.
Kindernachrichten haben auch ein Körperattribut, nämlich seine Fähigkeiten. Er kann zeichnen, er kann singen und er kann tun, was er will, was in diesem Körperattribut aufgezeichnet ist. Wenn Sie ausgehen, ist der Ort, an dem sich der Wert widerspiegelt, auch das Körperattribut.
Body ist der Nachrichtentext, und Verbraucher führen entsprechende Vorgänge basierend auf dem Nachrichtentext aus.
Wir haben diesen Tag im vorherigen Abschnitt erwähnt, es ist ein Zeichen Manche Kinder tragen Zeichenbretter und Kameras und einige Kreuzfahrtschiffe finden diese Kinder speziell und nehmen sie mit, um ihre Aufgaben zu erledigen.
Sie können Tag-Attribute für Nachrichten festlegen und Verbraucher können Nachrichten mit bestimmten Tag-Attributen zum Konsum auswählen.
Schlüssel ist der Name der Nachricht jedes Kindes. Wenn Sie ein Kind finden möchten, nennen Sie es einfach beim Namen.
Legen Sie den Schlüssel für die gesendete Nachricht fest, und Sie können in Zukunft anhand dieses Schlüssels nach Nachrichten suchen. Wenn die Nachricht beispielsweise abnormal ist oder verloren geht, ist die Suche sehr praktisch.
Natürlich haben einige Kinder es nicht eilig zu gehen. Sie haben darüber nachgedacht, bevor sie kommen. Es wird 30 Minuten dauern, bis sie etwas essen können, also warten sie 30 Minuten, bevor sie abgeholt werden.
Stellen Sie die Verzögerungsstufe ein, um festzulegen, wie lange die Nachricht konsumiert werden kann.
Alle Eltern, die ihre Kinder schicken, möchten ihre Kinder in die Wartehalle schicken, und sie möchten nicht, dass ihre Kinder „verloren“ gehen. Zu diesem Zeitpunkt benötigt die Wartehalle einige Garantiemechanismen. 2.1 Der Client gewährleistet eine hohe Verfügbarkeit des Produzenten
2 Mal, was bedeutet, dass das Nachrichtenkind insgesamt 3 Möglichkeiten hat, einen Sitzplatz zu finden. Sehen Sie sich den Quellcode an. Ich habe speziell Kommentare hinzugefügt, damit Sie ihn grob verstehen können.
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
sendDefaultImpl
-Methode wird dreimal versucht, die Nachricht zu senden. 2.1.2 KundenfehlertoleranzsendDefaultImpl
方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。
若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤,比较容易进入的来进入。当然那些已经关闭的,停电的,没有服务能力的,我们是不会进的。
MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.
选择Broker
就是在选择message queue
Wenn es mehrere Wartehallen für Makler gibt, wird das Servicepersonal dafür sorgen, dass das Kind eine Nachricht erhält, damit es eine auswählen kann, die
und einfacher zu betreten ist. Natürlich werden wir diejenigen nicht erfassen, die geschlossen sind, Stromausfälle haben ,
keine Servicemöglichkeiten haben. MQ-Client verwaltet die Sendeverzögerungsinformationen eines Brokers und wählt basierend auf diesen Informationen einen Broker mit einer relativ geringen Verzögerung zum Senden der Nachricht aus. Eliminiert aktiv diejenigen Broker, die ausgefallen sind, nicht verfügbar sind oder eine hohe Sendeverzögerung haben. Die Auswahl von Broker
bedeutet die Auswahl von Nachrichtenwarteschlange
. Der entsprechende Code lautet wie folgt:
Hier ermitteln wir zunächst, ob der Verzögerungstoleranzschalter aktiviert ist. Wenn er aktiviert ist, erhält der Broker mit der geringeren
Latenzpublic MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
Aber wenn sich der Verzögerungsfehlertoleranzschalter im Aus-Zustand befindet, lautet der ausgeführte Code wie folgt: Um den Druck auf den Broker gleichmäßig zu verteilen, wird ein anderer Broker ausgewählt.
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
Das obige ist der detaillierte Inhalt vonAnalyse des Hochverfügbarkeitsbeispiels eines RocketMQ-Produzenten für die Java-Entwicklung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!