Heim Java javaLernprogramm Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Nov 17, 2016 pm 12:46 PM
java

1. Problemhintergrund

Für die Überwachung der Nachrichtenwarteschlange verwenden wir im Allgemeinen Java, um ein separates Programm zu schreiben und es auf dem Linux-Server auszuführen. Nachdem das Programm gestartet wurde, werden Nachrichten über den Nachrichtenwarteschlangen-Client empfangen und zur asynchronen Verarbeitung in einen Thread-Pool gestellt, was eine schnelle gleichzeitige Verarbeitung ermöglicht.

Dann stellt sich die Frage: Wenn wir das Programm ändern und die Aufgabe neu starten müssen, wie stellen wir sicher, dass die Nachricht nicht verloren geht?

Normalerweise sammeln sich Nachrichten nach dem Schließen des Abonnentenprogramms in der Absenderwarteschlange an und warten auf den nächsten Abonnementverbrauch des Abonnenten, sodass nicht empfangene Nachrichten nicht verloren gehen. Die einzigen Nachrichten, die verloren gehen können, sind Nachrichten, die aus der Warteschlange genommen, aber zum Zeitpunkt des Herunterfahrens noch nicht verarbeitet wurden.

Daher benötigen wir einen reibungslosen Shutdown-Mechanismus, um sicherzustellen, dass Nachrichten beim Neustart normal verarbeitet werden können.

2. Problemanalyse

Die Idee des reibungslosen Herunterfahrens ist wie folgt:

Schließen Sie beim Schließen des Programms zunächst das Nachrichtenabonnement befinden sich alle in der Absenderwarteschlange

Schließen Sie den lokalen Nachrichtenverarbeitungs-Thread-Pool (warten Sie, bis die Nachrichten im lokalen Thread-Pool verarbeitet werden)

Das Programm wird beendet

Nachricht schließen Abonnement: Im Allgemeinen stellen Nachrichtenwarteschlangen-Clients Methoden zum Schließen von Verbindungen bereit. Weitere Informationen finden Sie in der API

zum Schließen des Thread-Pools: Der ThreadPoolExecutor von Java bietet zwei Methoden: Shutdown() und ShutdownNow() Der Unterschied besteht darin, dass ersteres auf die Verarbeitung aller Nachrichten im Thread-Pool wartet, während letzteres direkt die Ausführung des Threads stoppt und die Listensammlung zurückgibt. Denn wir müssen die Methode „shutdown()“ zum Herunterfahren verwenden und die Methode „isTerminated()“ verwenden, um festzustellen, ob der Thread-Pool geschlossen wurde.

Dann stellt sich erneut die Frage: Wie benachrichtigen wir das Programm darüber? Muss ein Shutdown-Vorgang durchgeführt werden?

Unter Linux können wir kill -9 pid verwenden, um den Prozess herunterzufahren. Zusätzlich zu -9 können wir kill -l verwenden, um andere Semaphoren des Kill-Befehls anzuzeigen , wie zum Beispiel mit 12) SIGUSR2-Semaphor

Wir können das entsprechende Semaphor registrieren, wenn das Java-Programm startet, das Semaphor überwachen und verwandte Geschäftsvorgänge ausführen, wenn wir den entsprechenden Kill-Vorgang empfangen.

Der Pseudocode lautet wie folgt

 //注册linux kill信号量  kill -12Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {    @Override
    public void handle(Signal signal) {        //关闭订阅者
        //关闭线程池
        //退出
    }
});
Nach dem Login kopieren

Das Folgende simuliert verwandte logische Operationen durch eine Demo

Simulieren Sie zunächst einen Produzenten, der 5 Nachrichten pro Sekunde produziert

Simulieren Sie dann einen Abonnenten und übergeben Sie ihn nach Erhalt an den Thread-Pool zur Verarbeitung. Der Thread-Pool verfügt über 4 feste Threads und die Verarbeitungszeit jeder Nachricht beträgt 1 Sekunde Nachricht pro Sekunde.

package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */public class MsgClient {    //模拟消息队列订阅者 同时4个线程处理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);    //模拟消息队列生产者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();    //用于判断是否关闭订阅
    private static volatile boolean isClose = false;    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }    //模拟消息队列生产者
    private static void producer(final BlockingQueue  queue){        //每200毫秒向队列中放入一个消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }    //模拟消息队列消费者 生产者每秒生产5个   消费者4个线程消费1个1秒  每秒积压1个
    private static void consumer(final BlockingQueue queue) throws InterruptedException {        while (!isClose){
            getPoolBacklogSize();            //从队列中拿到消息
            final String msg = (String)queue.take();            //放入线程池处理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {                    public void run() {                        try {                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }    //查看线程池堆积消息个数
    private static long getPoolBacklogSize(){        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));        return backlog;
    }    static {
        String osName = System.getProperty("os.name").toLowerCase();        if(osName != null && osName.indexOf("window") == -1) {            //注册linux kill信号量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,执行关闭操作");                    //关闭订阅消费
                    isClose = true;                    //关闭线程池,等待线程池积压消息处理
                    THREAD_POOL.shutdown();                    //判断线程池是否关闭
                    while (!THREAD_POOL.isTerminated()) {                        try {                            //每200毫秒 判断线程池积压数量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("订阅者关闭,线程池处理完毕");
                    System.exit(0);
                }
            });
        }
    }
}
Nach dem Login kopieren

Wenn wir den Dienst ausführen, können wir die relevanten Ausgabeinformationen über die Konsole sehen. Die Demo gibt die Anzahl der Backlog-Nachrichten im Thread-Pool aus

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
Nach dem Login kopieren

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Öffnen Sie ein anderes Terminal und überprüfen Sie die Prozessnummer über den Befehl ps oder starten Sie den Java-Prozess über nohup, um die Prozess-ID zu erhalten

ps -fe|grep MsgClient
Nach dem Login kopieren

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

Wenn wir ausführen Beim Töten von -12 pid können Sie die Geschäftslogik zum Herunterfahren sehen

Reibungsloses Herunterfahren von Java-Nachrichtenwarteschlangenaufgaben

3 Problemzusammenfassung

Im eigentlichen Geschäft der Abteilung Das Nachrichtenvolumen der Nachrichtenwarteschlange ist bei einigen Geschäftsspitzen immer noch ziemlich groß. Daher muss die Verarbeitungsgeschwindigkeit der Nachrichten sichergestellt werden, um den Druck auf einen einzelnen Abonnementknoten zu vermeiden durch Last.

In einigen Geschäftsszenarien sind die Anforderungen an die Nachrichtenintegrität nicht so hoch, sodass der Verlust beim Neustart nicht berücksichtigt werden muss. Im Gegenteil, es erfordert sorgfältiges Denken und Design.


Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

Video Face Swap

Video Face Swap

Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!

Heiße Werkzeuge

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Perfekte Zahl in Java Perfekte Zahl in Java Aug 30, 2024 pm 04:28 PM

Leitfaden zur perfekten Zahl in Java. Hier besprechen wir die Definition, Wie prüft man die perfekte Zahl in Java?, Beispiele mit Code-Implementierung.

Weka in Java Weka in Java Aug 30, 2024 pm 04:28 PM

Leitfaden für Weka in Java. Hier besprechen wir die Einführung, die Verwendung von Weka Java, die Art der Plattform und die Vorteile anhand von Beispielen.

Smith-Nummer in Java Smith-Nummer in Java Aug 30, 2024 pm 04:28 PM

Leitfaden zur Smith-Zahl in Java. Hier besprechen wir die Definition: Wie überprüft man die Smith-Nummer in Java? Beispiel mit Code-Implementierung.

Fragen zum Java Spring-Interview Fragen zum Java Spring-Interview Aug 30, 2024 pm 04:29 PM

In diesem Artikel haben wir die am häufigsten gestellten Fragen zu Java Spring-Interviews mit ihren detaillierten Antworten zusammengestellt. Damit Sie das Interview knacken können.

Brechen oder aus Java 8 Stream foreach zurückkehren? Brechen oder aus Java 8 Stream foreach zurückkehren? Feb 07, 2025 pm 12:09 PM

Java 8 führt die Stream -API ein und bietet eine leistungsstarke und ausdrucksstarke Möglichkeit, Datensammlungen zu verarbeiten. Eine häufige Frage bei der Verwendung von Stream lautet jedoch: Wie kann man von einem Foreach -Betrieb brechen oder zurückkehren? Herkömmliche Schleifen ermöglichen eine frühzeitige Unterbrechung oder Rückkehr, aber die Stream's foreach -Methode unterstützt diese Methode nicht direkt. In diesem Artikel werden die Gründe erläutert und alternative Methoden zur Implementierung vorzeitiger Beendigung in Strahlverarbeitungssystemen erforscht. Weitere Lektüre: Java Stream API -Verbesserungen Stream foreach verstehen Die Foreach -Methode ist ein Terminalbetrieb, der einen Vorgang für jedes Element im Stream ausführt. Seine Designabsicht ist

Zeitstempel für Datum in Java Zeitstempel für Datum in Java Aug 30, 2024 pm 04:28 PM

Anleitung zum TimeStamp to Date in Java. Hier diskutieren wir auch die Einführung und wie man Zeitstempel in Java in ein Datum konvertiert, zusammen mit Beispielen.

Java -Programm, um das Kapselvolumen zu finden Java -Programm, um das Kapselvolumen zu finden Feb 07, 2025 am 11:37 AM

Kapseln sind dreidimensionale geometrische Figuren, die aus einem Zylinder und einer Hemisphäre an beiden Enden bestehen. Das Volumen der Kapsel kann berechnet werden, indem das Volumen des Zylinders und das Volumen der Hemisphäre an beiden Enden hinzugefügt werden. In diesem Tutorial wird erörtert, wie das Volumen einer bestimmten Kapsel in Java mit verschiedenen Methoden berechnet wird. Kapselvolumenformel Die Formel für das Kapselvolumen lautet wie folgt: Kapselvolumen = zylindrisches Volumenvolumen Zwei Hemisphäre Volumen In, R: Der Radius der Hemisphäre. H: Die Höhe des Zylinders (ohne die Hemisphäre). Beispiel 1 eingeben Radius = 5 Einheiten Höhe = 10 Einheiten Ausgabe Volumen = 1570,8 Kubikeinheiten erklären Berechnen Sie das Volumen mithilfe der Formel: Volumen = π × R2 × H (4

Gestalten Sie die Zukunft: Java-Programmierung für absolute Anfänger Gestalten Sie die Zukunft: Java-Programmierung für absolute Anfänger Oct 13, 2024 pm 01:32 PM

Java ist eine beliebte Programmiersprache, die sowohl von Anfängern als auch von erfahrenen Entwicklern erlernt werden kann. Dieses Tutorial beginnt mit grundlegenden Konzepten und geht dann weiter zu fortgeschrittenen Themen. Nach der Installation des Java Development Kit können Sie das Programmieren üben, indem Sie ein einfaches „Hello, World!“-Programm erstellen. Nachdem Sie den Code verstanden haben, verwenden Sie die Eingabeaufforderung, um das Programm zu kompilieren und auszuführen. Auf der Konsole wird „Hello, World!“ ausgegeben. Mit dem Erlernen von Java beginnt Ihre Programmierreise, und wenn Sie Ihre Kenntnisse vertiefen, können Sie komplexere Anwendungen erstellen.

See all articles