Heim > Java > javaLernprogramm > Analyse des Hochverfügbarkeitsbeispiels eines RocketMQ-Produzenten für die Java-Entwicklung

Analyse des Hochverfügbarkeitsbeispiels eines RocketMQ-Produzenten für die Java-Entwicklung

WBOY
Freigeben: 2023-04-23 23:28:06
nach vorne
1040 Leute haben es durchsucht

    1 Botschaft

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        //主题名字
        private String topic;
        //消息扩展信息,Tag,keys,延迟级别都存在这里
        private Map<String, String> properties;
        //消息体,字节数组
        private byte[] body;
        //设置消息的key,
        public void setKeys(String keys) {}
        //设置topic
        public void setTopic(String topic) {}
        //延迟级别
        public int setDelayTimeLevel(int level) {}
        //消息过滤的标记
        public void setTags(String tags) {}
        //扩展信息存放在此
        public void putUserProperty(final String name, final String value) {}
    }
    Nach dem Login kopieren

    Die Botschaft ist, dass diese Kinder ihre eigenen Eigenschaften, aber auch Gemeinsamkeiten haben. Zwei Kinder, die von demselben Elternteil geschickt werden, können an denselben Ort oder an verschiedene Orte gehen.

    1.1 Thema

    Zunächst hat jede untergeordnete Nachricht ein Attribut Thema, das wir oben erwähnt haben, ist eine Wartehalle. Nachdem die Kinder hereingekommen waren, gingen sie zu dem gekennzeichneten Bereich ihrer vorgesehenen Wartehalle (nehmen sie normalerweise nicht den Zug und die Hochgeschwindigkeitsbahn am dafür vorgesehenen Bahnsteig?), setzten sich in die NachrichtenwarteschlangePlätze, und warteten auf ihre Reise.

    Broker hat ein oder mehrere Themen und Nachrichten werden in der Nachrichtenwarteschlange im Thema gespeichert und warten darauf, verbraucht zu werden.

    1.2 Körper

    Kindernachrichten haben auch ein Körperattribut, nämlich seine Fähigkeiten. Er kann zeichnen, er kann singen und er kann tun, was er will, was in diesem Körperattribut aufgezeichnet ist. Wenn Sie ausgehen, ist der Ort, an dem sich der Wert widerspiegelt, auch das Körperattribut.

    Body ist der Nachrichtentext, und Verbraucher führen entsprechende Vorgänge basierend auf dem Nachrichtentext aus.

    1.3 Tag

    Wir haben diesen Tag im vorherigen Abschnitt erwähnt, es ist ein Zeichen Manche Kinder tragen Zeichenbretter und Kameras und einige Kreuzfahrtschiffe finden diese Kinder speziell und nehmen sie mit, um ihre Aufgaben zu erledigen.

    Sie können Tag-Attribute für Nachrichten festlegen und Verbraucher können Nachrichten mit bestimmten Tag-Attributen zum Konsum auswählen.

    1.4 Schlüssel

    Schlüssel ist der Name der Nachricht jedes Kindes. Wenn Sie ein Kind finden möchten, nennen Sie es einfach beim Namen.

    Legen Sie den Schlüssel für die gesendete Nachricht fest, und Sie können in Zukunft anhand dieses Schlüssels nach Nachrichten suchen. Wenn die Nachricht beispielsweise abnormal ist oder verloren geht, ist die Suche sehr praktisch.

    1,5 Verspätungsstufe

    Natürlich haben einige Kinder es nicht eilig zu gehen. Sie haben darüber nachgedacht, bevor sie kommen. Es wird 30 Minuten dauern, bis sie etwas essen können, also warten sie 30 Minuten, bevor sie abgeholt werden.

    Stellen Sie die Verzögerungsstufe ein, um festzulegen, wie lange die Nachricht konsumiert werden kann.

    2 Hersteller mit hoher Verfügbarkeit

    Alle Eltern, die ihre Kinder schicken, möchten ihre Kinder in die Wartehalle schicken, und sie möchten nicht, dass ihre Kinder „verloren“ gehen. Zu diesem Zeitpunkt benötigt die Wartehalle einige Garantiemechanismen. 2.1 Der Client gewährleistet eine hohe Verfügbarkeit des Produzenten

    2.1.1 Wiederholungsmechanismus

    Das heißt, nachdem der Elternteil das Kind geschickt hatte, konnte das Kind nach dem Betreten der Wartehalle „nicht auf dem Platz in der Nachrichtenwarteschlange sitzen“
    , und es funktionierte zu diesem Zeitpunkt. Das Personal wird einen erneuten Versuch veranlassen, um zu prüfen, ob Plätze verfügbar sind. Die Standardanzahl der Wiederholungsversuche beträgt

    2 Mal, was bedeutet, dass das Nachrichtenkind insgesamt 3 Möglichkeiten hat, einen Sitzplatz zu finden. Sehen Sie sich den Quellcode an. Ich habe speziell Kommentare hinzugefügt, damit Sie ihn grob verstehen können.

    //这里取到了重试的次数
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //获取消息队列
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                //发送消息
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                ...
            } catch (RemotingException e) {
                ...
                continue;
            } catch (MQClientException e) {
                ...
                continue;
            } catch (MQBrokerException e) {
                ...
                continue;
            } catch (InterruptedException e) {
                //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
                throw e;
            }
        } else {
            break;
        }
    }
    Nach dem Login kopieren

    Der Wiederholungscode ist wie oben. Bei dieser sendDefaultImpl-Methode wird dreimal versucht, die Nachricht zu senden.

    2.1.2 Kundenfehlertoleranz

    sendDefaultImpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。

    2.1.2 客户端容错

    若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤比较容易进入的来进入。当然那些已经关闭的停电的没有服务能力的,我们是不会进的。

    MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.

    选择Broker就是在选择message queueWenn es mehrere Wartehallen für Makler gibt, wird das Servicepersonal dafür sorgen, dass das Kind eine Nachricht erhält, damit es eine auswählen kann, die

    relativ weniger überfüllt

    und einfacher zu betreten ist. Natürlich werden wir diejenigen nicht erfassen, die geschlossen sind, Stromausfälle haben ,

    keine Servicemöglichkeiten haben

    . MQ-Client verwaltet die Sendeverzögerungsinformationen eines Brokers und wählt basierend auf diesen Informationen einen Broker mit einer relativ geringen Verzögerung zum Senden der Nachricht aus. Eliminiert aktiv diejenigen Broker, die ausgefallen sind, nicht verfügbar sind oder eine hohe Sendeverzögerung haben. Die Auswahl von Broker bedeutet die Auswahl von Nachrichtenwarteschlange. Der entsprechende Code lautet wie folgt:

    Hier ermitteln wir zunächst, ob der Verzögerungstoleranzschalter aktiviert ist. Wenn er aktiviert ist, erhält der Broker mit der geringeren

    Latenz

    Priorität.

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //判断发送延迟容错开关是否开启
        if (this.sendLatencyFaultEnable) {
            try {
                //选择一个延迟上可以接受,并且和上次发送相同的Broker
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //若是Broker的延迟时间可以接受,则返回这个Broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //若是前边都没选中一个Broker,就随机选一个Broker
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    Nach dem Login kopieren

    Aber wenn sich der Verzögerungsfehlertoleranzschalter im Aus-Zustand befindet, lautet der ausgeführte Code wie folgt: Um den Druck auf den Broker gleichmäßig zu verteilen, wird ein anderer Broker ausgewählt.

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //若是没有上次的Brokername做参考,就随机选一个
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果有,那么就选一个其他的Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //这里判断遇上一个使用的Broker不是同一个
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //若是上边的都没选中,那么就随机选一个
            return selectOneMessageQueue();
        }
    }
    Nach dem Login kopieren
    2.2 Maklerseite sorgt für hohe Verfügbarkeit für Produzenten 🎜🎜Maklerwartehalle Um 🎜Nachrichtenkinder genau zu empfangen🎜, gibt es mindestens zwei Hallen, eine 🎜Haupthalle🎜und eine 🎜Stellvertreterhalle🎜 Im Allgemeinen Kinder Jeder 🎜betritt die Haupthalle🎜 und führt dann einen einzigen Vorgang aus, um die Maschine zu informieren (Schattenklontechnik) und lässt dann den Klon in die Stellvertreterhalle. Auf diese Weise wird, wenn in der Haupthalle ein Stromausfall auftritt und dieser nicht mehr funktioniert, Der Klon in der Stellvertreterhalle muss nur Sobald Sie die Aufgabe erledigt haben, ist alles in Ordnung. Im Allgemeinen ist es die Botschaft aus der Haupthalle, dass das Kind das Boot nimmt, um die Aufgabe zu erledigen. 🎜

    Das obige ist der detaillierte Inhalt vonAnalyse des Hochverfügbarkeitsbeispiels eines RocketMQ-Produzenten für die Java-Entwicklung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Verwandte Etiketten:
    Quelle:yisu.com
    Erklärung dieser Website
    Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
    Beliebte Tutorials
    Mehr>
    Neueste Downloads
    Mehr>
    Web-Effekte
    Quellcode der Website
    Website-Materialien
    Frontend-Vorlage