Home > Java > javaTutorial > body text

Smooth shutdown of Java message queue tasks

高洛峰
Release: 2016-11-17 12:46:53
Original
1333 people have browsed it

1. Problem background

For message queue monitoring, we generally use Java to write a separate program and run it on the Linux server. After the program is started, messages are received through the message queue client and put into a thread pool for asynchronous processing, allowing for fast concurrent processing.

Then the question is, when we modify the program and need to restart the task, how to ensure that the message is not lost?

Normally, after the subscriber program is closed, messages will accumulate in the sender queue, waiting for the subscriber to subscribe and consume next time, so unreceived messages will not be lost. The only messages that may be lost are messages that have been taken out of the queue but not yet processed at the moment of shutdown.

So we need a smooth shutdown mechanism to ensure that messages can be processed normally when restarting.

2. Problem analysis

The idea of ​​smooth shutdown is as follows:

When closing the program, first close the message subscription. At this time, the messages are all in the sender queue

Close the local message processing thread pool (waiting for the local thread pool) The message is processed)

The program exits

Close message subscription: Generally, message queue clients provide methods to close connections. For details, you can view the API by yourself

Close the thread pool: Java's ThreadPoolExecutor thread pool provides shutdown() and shutdownNow( ) two methods, the difference is that the former will wait for the messages in the thread pool to be processed, while the latter directly stops the execution of the thread and returns the list collection. Because we need to use the shutdown() method to shut down, and use the isTerminated() method to determine whether the thread pool has been closed.

Then the question comes again, how do we notify the program that a shutdown operation needs to be performed?

In Linux , we can use kill -9 pid to shut down the process. In addition to -9, we can use kill -l to view other semaphores of the kill command, such as using 12) SIGUSR2 semaphore

We can register the corresponding signal when the Java program starts semaphore, monitor the semaphore, and perform relevant business operations when receiving the corresponding kill operation.

The pseudo code is as follows

 //注册linux kill信号量  kill -12Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {    @Override
    public void handle(Signal signal) {        //关闭订阅者
        //关闭线程池
        //退出
    }
});
Copy after login

The following simulates the relevant logical operations through a demo

First simulate a producer, producing 5 messages per second

Then simulate a subscriber, and after receiving the message, hand it over to the thread pool for processing, the thread The pool has a fixed number of 4 threads, and each message processing time is 1 second, so that the thread pool will backlog 1 message per second.

package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */public class MsgClient {    //模拟消息队列订阅者 同时4个线程处理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);    //模拟消息队列生产者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();    //用于判断是否关闭订阅
    private static volatile boolean isClose = false;    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }    //模拟消息队列生产者
    private static void producer(final BlockingQueue  queue){        //每200毫秒向队列中放入一个消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }    //模拟消息队列消费者 生产者每秒生产5个   消费者4个线程消费1个1秒  每秒积压1个
    private static void consumer(final BlockingQueue queue) throws InterruptedException {        while (!isClose){
            getPoolBacklogSize();            //从队列中拿到消息
            final String msg = (String)queue.take();            //放入线程池处理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {                    public void run() {                        try {                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }    //查看线程池堆积消息个数
    private static long getPoolBacklogSize(){        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));        return backlog;
    }    static {
        String osName = System.getProperty("os.name").toLowerCase();        if(osName != null && osName.indexOf("window") == -1) {            //注册linux kill信号量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,执行关闭操作");                    //关闭订阅消费
                    isClose = true;                    //关闭线程池,等待线程池积压消息处理
                    THREAD_POOL.shutdown();                    //判断线程池是否关闭
                    while (!THREAD_POOL.isTerminated()) {                        try {                            //每200毫秒 判断线程池积压数量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("订阅者关闭,线程池处理完毕");
                    System.exit(0);
                }
            });
        }
    }
}
Copy after login

When we run on the service, we can see the relevant output information through the console. The demo outputs the number of backlog messages in the thread pool

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
Copy after login

Smooth shutdown of Java message queue tasks

Open another terminal and check the process number through the ps command , or start the Java process through nohup to get the process id

ps -fe|grep MsgClient
Copy after login

Smooth shutdown of Java message queue tasks

When we execute kill -12 pid, we can see the shutdown business logic

Smooth shutdown of Java message queue tasks

3. Problem summary

In the actual business of the department, The message volume of the message queue is still quite large. There are hundreds of messages per second during some business peaks. Therefore, the processing speed of messages must be ensured to avoid message backlog. The pressure on a single subscription node can also be solved through load.

In some business scenarios, the requirements for message integrity are not so high, so there is no need to consider the loss during restart. On the contrary, it requires careful thinking and design.


Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template