Table des matières
1 Message
1.1 topic
1.2 Corps
Étiquette 1.3
1.4 key
Niveau de délai 1,5
2 Producteur Haute Disponibilité
2.1 Le client assure la haute disponibilité du producteur
2.1.1 Mécanisme de nouvelle tentative
2.1.2 客户端容错
commutateur de tolérance de délai
Maison Java javaDidacticiel Analyse du développement Java Exemple de haute disponibilité du producteur RocketMQ

Analyse du développement Java Exemple de haute disponibilité du producteur RocketMQ

Apr 23, 2023 pm 11:28 PM
java rocketmq

    1 Message

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        //主题名字
        private String topic;
        //消息扩展信息,Tag,keys,延迟级别都存在这里
        private Map<String, String> properties;
        //消息体,字节数组
        private byte[] body;
        //设置消息的key,
        public void setKeys(String keys) {}
        //设置topic
        public void setTopic(String topic) {}
        //延迟级别
        public int setDelayTimeLevel(int level) {}
        //消息过滤的标记
        public void setTags(String tags) {}
        //扩展信息存放在此
        public void putUserProperty(final String name, final String value) {}
    }
    Copier après la connexion

    Le message, ce sont les enfants. Ces enfants ont leurs propres caractéristiques mais ont aussi des points communs. Deux enfants envoyés par le même parent peuvent aller au même endroit, ou bien ils peuvent aller dans des endroits différents.

    1.1 topic

    Tout d'abord, chaque message enfant a un attribut topic, dont nous avons parlé plus haut, c'est une salle d'attente. Une fois que les enfants sont entrés, ils ont marché jusqu'à la zone désignée de leur salle d'attente désignée (ne prennent-ils pas habituellement le train et le train à grande vitesse sur le quai désigné ?), se sont assis dans la file d'attente des messagessièges, et attendit leur voyage.

    Le courtier a un ou plusieurs sujets, et les messages seront stockés dans la file d'attente des messages du sujet, en attente d'être consommés.

    1.2 Corps

    Child News a également un attribut Corps, qui est sa capacité Il peut dessiner, il peut chanter et il peut faire ce qu'il veut, qui est enregistré dans cet attribut Corps. Lorsque vous sortez, l'endroit où la valeur se reflète est également l'attribut Corps.

    Body est le corps du message et les consommateurs effectueront les opérations correspondantes en fonction du corps du message.

    Étiquette 1.3

    Nous avons mentionné cette étiquette dans la section précédente, c'est une marque Certains enfants portent des planches à dessin et des appareils photo, et certains bateaux de croisière trouveront spécialement ces enfants et les emmèneront pour accomplir leurs tâches.

    Vous pouvez définir des attributs de balise pour les messages et les consommateurs peuvent choisir des messages contenant des attributs de balise spécifiques à consommer.

    1.4 key

    key est le nom du message de chaque enfant. Si vous voulez retrouver un enfant, appelez-le simplement par son nom.

    Définissez la clé pour le message envoyé et vous pourrez rechercher des messages basés sur cette clé à l'avenir. Par exemple, si le message est anormal ou si le message est perdu, il sera très pratique de le rechercher.

    Niveau de délai 1,5

    Bien sûr, certains enfants ne sont pas pressés de partir. Ils y ont réfléchi avant de venir. Il leur faudra 30 minutes pour prendre un repas, ils attendront donc 30 minutes avant d'être récupérés.

    Définissez le niveau de délai pour spécifier la durée pendant laquelle le message peut être consommé.

    2 Producteur Haute Disponibilité

    Chaque parent qui envoie ses enfants veut envoyer ses enfants dans la salle d'attente, et ils ne veulent pas que leurs enfants soient perdus En ce moment, la salle d'attente a besoin de mécanismes de garantie.

    2.1 Le client assure la haute disponibilité du producteur

    2.1.1 Mécanisme de nouvelle tentative

    C'est-à-dire qu'après que le parent a envoyé l'enfant, après être entré dans la salle d'attente, l'enfant n'a pas réussi à s'asseoir dans la file d'attente des messages , et cela fonctionnait à ce moment-là. Le personnel fera un nouvel essai pour voir s'il y a des places disponibles. Le nombre de tentatives par défaut est de 2 fois, ce qui signifie que l'enfant qui reçoit le message a un total de 3 occasions de trouver une place.

    Regardez le code source, j'ai spécialement ajouté des commentaires, pour que vous puissiez le comprendre à peu près.

    //这里取到了重试的次数
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
        String lastBrokerName = null == mq ? null : mq.getBrokerName();
        //获取消息队列
        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
        if (mqSelected != null) {
            mq = mqSelected;
            brokersSent[times] = mq.getBrokerName();
            try {
                beginTimestampPrev = System.currentTimeMillis();
                if (times > 0) {
                    //Reset topic with namespace during resend.
                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                }
                long costTime = beginTimestampPrev - beginTimestampFirst;
                if (timeout < costTime) {
                    callTimeout = true;
                    break;
                }
                //发送消息
                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                ...
            } catch (RemotingException e) {
                ...
                continue;
            } catch (MQClientException e) {
                ...
                continue;
            } catch (MQBrokerException e) {
                ...
                continue;
            } catch (InterruptedException e) {
                //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
                throw e;
            }
        } else {
            break;
        }
    }
    Copier après la connexion

    Le code de nouvelle tentative est comme ci-dessus. Dans cette méthode sendDefaultImpl, il essaiera d'envoyer le message trois fois. S'il échoue, l'erreur correspondante sera générée. sendDefaultImpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。

    2.1.2 客户端容错

    若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤比较容易进入的来进入。当然那些已经关闭的停电的没有服务能力的,我们是不会进的。

    MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.

    选择Broker就是在选择message queue

    2.1.2 Tolérance aux pannes du client

    S'il y a plusieurs salles d'attente des courtiers, le personnel de service s'arrangera pour envoyer un message à l'enfant afin qu'il en choisisse une relativement moins fréquentée et plus facile d'accès. Bien entendu, nous n'entrerons pas dans ceux qui ont été fermés

    ,

    hors tension, n'ont aucune capacité de service.

    Le client MQ conservera les informations sur le délai d'envoi d'un courtier et, sur la base de ces informations, il sélectionnera un courtier avec un délai relativement faible pour envoyer le message. Éliminera activement les courtiers qui sont en panne, indisponibles ou qui ont un niveau de retard d'envoi élevé.

    Sélectionner Courtier revient à sélectionner file d'attente des messages. Le code correspondant est le suivant :

    . Ici, nous allons d'abord déterminer si le

    commutateur de tolérance de délai

    est activé. Ce commutateur

    est désactivé par défaut S'il est activé, le courtier avec une latence inférieure sera prioritaire.

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //判断发送延迟容错开关是否开启
        if (this.sendLatencyFaultEnable) {
            try {
                //选择一个延迟上可以接受,并且和上次发送相同的Broker
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //若是Broker的延迟时间可以接受,则返回这个Broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            //若是前边都没选中一个Broker,就随机选一个Broker
            return tpInfo.selectOneMessageQueue();
        }
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    Copier après la connexion
    Mais lorsque le interrupteur de tolérance de retard est dans l'état off

    , le code exécuté est le suivant : 🎜🎜Afin de répartir uniformément la pression sur le courtier, un autre courtier🎜 sera sélectionné 🎜. 🎜
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //若是没有上次的Brokername做参考,就随机选一个
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果有,那么就选一个其他的Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //这里判断遇上一个使用的Broker不是同一个
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //若是上边的都没选中,那么就随机选一个
            return selectOneMessageQueue();
        }
    }
    Copier après la connexion
    🎜2.2 Le côté courtier assure une haute disponibilité aux producteurs 🎜🎜Salle d'attente des courtiers Afin de 🎜recevoir avec précision🎜les messages des enfants, il y aura au moins deux salles, une 🎜salle principale🎜et une 🎜salle adjointe🎜 De manière générale, les enfants. Tout le monde 🎜entre dans le hall principal🎜, puis effectue une seule opération pour informer la machine (technique du clone fantôme), puis laisse le clone entrer dans le hall adjoint de cette façon, lorsque le hall principal est en panne de courant et cesse de fonctionner, le clone dans la salle des adjoints n'a qu'à le faire. Une fois la tâche terminée, tout ira bien. D'une manière générale, c'est le message du hall principal que l'enfant va prendre le bateau pour accomplir la tâche. 🎜

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

    Outils d'IA chauds

    Undresser.AI Undress

    Undresser.AI Undress

    Application basée sur l'IA pour créer des photos de nu réalistes

    AI Clothes Remover

    AI Clothes Remover

    Outil d'IA en ligne pour supprimer les vêtements des photos.

    Undress AI Tool

    Undress AI Tool

    Images de déshabillage gratuites

    Clothoff.io

    Clothoff.io

    Dissolvant de vêtements AI

    Video Face Swap

    Video Face Swap

    Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

    Outils chauds

    Bloc-notes++7.3.1

    Bloc-notes++7.3.1

    Éditeur de code facile à utiliser et gratuit

    SublimeText3 version chinoise

    SublimeText3 version chinoise

    Version chinoise, très simple à utiliser

    Envoyer Studio 13.0.1

    Envoyer Studio 13.0.1

    Puissant environnement de développement intégré PHP

    Dreamweaver CS6

    Dreamweaver CS6

    Outils de développement Web visuel

    SublimeText3 version Mac

    SublimeText3 version Mac

    Logiciel d'édition de code au niveau de Dieu (SublimeText3)

    Numéro de Smith en Java Numéro de Smith en Java Aug 30, 2024 pm 04:28 PM

    Guide du nombre de Smith en Java. Nous discutons ici de la définition, comment vérifier le numéro Smith en Java ? exemple avec implémentation de code.

    Questions d'entretien chez Java Spring Questions d'entretien chez Java Spring Aug 30, 2024 pm 04:29 PM

    Dans cet article, nous avons conservé les questions d'entretien Java Spring les plus posées avec leurs réponses détaillées. Pour que vous puissiez réussir l'interview.

    Break or Return of Java 8 Stream Forach? Break or Return of Java 8 Stream Forach? Feb 07, 2025 pm 12:09 PM

    Java 8 présente l'API Stream, fournissant un moyen puissant et expressif de traiter les collections de données. Cependant, une question courante lors de l'utilisation du flux est: comment se casser ou revenir d'une opération FOREAK? Les boucles traditionnelles permettent une interruption ou un retour précoce, mais la méthode Foreach de Stream ne prend pas directement en charge cette méthode. Cet article expliquera les raisons et explorera des méthodes alternatives pour la mise en œuvre de terminaison prématurée dans les systèmes de traitement de flux. Lire plus approfondie: Améliorations de l'API Java Stream Comprendre le flux Forach La méthode foreach est une opération terminale qui effectue une opération sur chaque élément du flux. Son intention de conception est

    Horodatage à ce jour en Java Horodatage à ce jour en Java Aug 30, 2024 pm 04:28 PM

    Guide de TimeStamp to Date en Java. Ici, nous discutons également de l'introduction et de la façon de convertir l'horodatage en date en Java avec des exemples.

    Programme Java pour trouver le volume de la capsule Programme Java pour trouver le volume de la capsule Feb 07, 2025 am 11:37 AM

    Les capsules sont des figures géométriques tridimensionnelles, composées d'un cylindre et d'un hémisphère aux deux extrémités. Le volume de la capsule peut être calculé en ajoutant le volume du cylindre et le volume de l'hémisphère aux deux extrémités. Ce tutoriel discutera de la façon de calculer le volume d'une capsule donnée en Java en utilisant différentes méthodes. Formule de volume de capsule La formule du volume de la capsule est la suivante: Volume de capsule = volume cylindrique volume de deux hémisphères volume dans, R: Le rayon de l'hémisphère. H: La hauteur du cylindre (à l'exclusion de l'hémisphère). Exemple 1 entrer Rayon = 5 unités Hauteur = 10 unités Sortir Volume = 1570,8 unités cubes expliquer Calculer le volume à l'aide de la formule: Volume = π × r2 × h (4

    PHP vs Python: comprendre les différences PHP vs Python: comprendre les différences Apr 11, 2025 am 12:15 AM

    PHP et Python ont chacun leurs propres avantages, et le choix doit être basé sur les exigences du projet. 1.Php convient au développement Web, avec une syntaxe simple et une efficacité d'exécution élevée. 2. Python convient à la science des données et à l'apprentissage automatique, avec une syntaxe concise et des bibliothèques riches.

    PHP: un langage clé pour le développement Web PHP: un langage clé pour le développement Web Apr 13, 2025 am 12:08 AM

    PHP est un langage de script largement utilisé du côté du serveur, particulièrement adapté au développement Web. 1.Php peut intégrer HTML, traiter les demandes et réponses HTTP et prend en charge une variété de bases de données. 2.PHP est utilisé pour générer du contenu Web dynamique, des données de formulaire de traitement, des bases de données d'accès, etc., avec un support communautaire solide et des ressources open source. 3. PHP est une langue interprétée, et le processus d'exécution comprend l'analyse lexicale, l'analyse grammaticale, la compilation et l'exécution. 4.PHP peut être combiné avec MySQL pour les applications avancées telles que les systèmes d'enregistrement des utilisateurs. 5. Lors du débogage de PHP, vous pouvez utiliser des fonctions telles que error_reportting () et var_dump (). 6. Optimiser le code PHP pour utiliser les mécanismes de mise en cache, optimiser les requêtes de base de données et utiliser des fonctions intégrées. 7

    Créer l'avenir : programmation Java pour les débutants absolus Créer l'avenir : programmation Java pour les débutants absolus Oct 13, 2024 pm 01:32 PM

    Java est un langage de programmation populaire qui peut être appris aussi bien par les développeurs débutants que par les développeurs expérimentés. Ce didacticiel commence par les concepts de base et progresse vers des sujets avancés. Après avoir installé le kit de développement Java, vous pouvez vous entraîner à la programmation en créant un simple programme « Hello, World ! ». Une fois que vous avez compris le code, utilisez l'invite de commande pour compiler et exécuter le programme, et « Hello, World ! » s'affichera sur la console. L'apprentissage de Java commence votre parcours de programmation et, à mesure que votre maîtrise s'approfondit, vous pouvez créer des applications plus complexes.

    See all articles