首頁 > Java > java教程 > 主體

Java訊息佇列任務的平滑關閉

高洛峰
發布: 2016-11-17 12:46:53
原創
1306 人瀏覽過

1.問題背景

對於訊息佇列的監聽,我們一般使用Java寫一個獨立的程序,在Linux伺服器上運行。程式啟動後,透過訊息佇列客戶端接收訊息,放入一個執行緒池進行非同步處理,並發的快速處理。

那麼問題來了,當我們修改程式後,需要重新啟動任務的時候,如何保證訊息的不遺失呢?

正常來說,訂閱者程式關閉後,訊息會在發送者隊列中堆積,等待訂閱者下次訂閱消費,所以未接收的訊息是不會遺失的。唯一可能遺失的訊息,就是在關閉的一瞬間,已經從佇列中取出但還沒有處理完畢的訊息。

因此我們需要一套平滑關閉的機制,確保在重啟的時候,訊息可以正常處理完成。

2.問題分析

平滑關閉的思路如下:

在關閉程式時,首先關閉訊息訂閱,這個時候訊息都在發送者佇列中

關閉本地訊息處理執行緒池(等待本地執行緒池中的訊息處理完畢)

程式退出

關閉訊息訂閱:一般訊息佇列的客戶端都提供關閉連線的方法,具體可以自行查看api

關閉執行緒池:Java的ThreadPoolExecutor執行緒池提供shutdown()和shutdownNow( )兩個方法,差異是前者會等待線程池中的消息都處理完畢,後者直接停止線程的執行並返回list集合。因為我們需要使用shutdown()方法進行關閉,並通過isTerminated(),方法判斷線程池是否已經關閉.

那麼問題又來了,我們如何通知到程序,需要執行關閉操作呢?

在Linux中,我們可以用kill -9 pid關閉進程,除了-9之外,我們可以透過 kill -l查看kill 指令的其它信號量,例如使用12) SIGUSR2 信號量

我們可以在Java程式啟動時,註冊對應的信號量,對信號量進行監聽,在收到對應的kill操作時,執行相關的業務操作。

偽代碼如下

 //注册linux kill信号量  kill -12Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {    @Override
    public void handle(Signal signal) {        //关闭订阅者
        //关闭线程池
        //退出
    }
});
登入後複製

下面透過一個demo模擬相關邏輯操作

先模擬一個生產者,每秒生產5個訊息

然後模擬一個訂閱者,收到訊息後交給線程池進行處理,線程池固定4個線程,每個訊息處理時間1秒,這樣線程池每秒會積壓1個訊息。

package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */public class MsgClient {    //模拟消息队列订阅者 同时4个线程处理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);    //模拟消息队列生产者
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();    //用于判断是否关闭订阅
    private static volatile boolean isClose = false;    public static void main(String[] args) throws InterruptedException {
        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);
    }    //模拟消息队列生产者
    private static void producer(final BlockingQueue  queue){        //每200毫秒向队列中放入一个消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }    //模拟消息队列消费者 生产者每秒生产5个   消费者4个线程消费1个1秒  每秒积压1个
    private static void consumer(final BlockingQueue queue) throws InterruptedException {        while (!isClose){
            getPoolBacklogSize();            //从队列中拿到消息
            final String msg = (String)queue.take();            //放入线程池处理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {                    public void run() {                        try {                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }    //查看线程池堆积消息个数
    private static long getPoolBacklogSize(){        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));        return backlog;
    }    static {
        String osName = System.getProperty("os.name").toLowerCase();        if(osName != null && osName.indexOf("window") == -1) {            //注册linux kill信号量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {                @Override
                public void handle(Signal signal) {
                    System.out.println("收到kill消息,执行关闭操作");                    //关闭订阅消费
                    isClose = true;                    //关闭线程池,等待线程池积压消息处理
                    THREAD_POOL.shutdown();                    //判断线程池是否关闭
                    while (!THREAD_POOL.isTerminated()) {                        try {                            //每200毫秒 判断线程池积压数量
                            getPoolBacklogSize();
                            TimeUnit.MILLISECONDS.sleep(200L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("订阅者关闭,线程池处理完毕");
                    System.exit(0);
                }
            });
        }
    }
}
登入後複製

當我們在服務上運行時,透過控制台可以看到相關的輸出訊息,demo中輸出了線程池的積壓訊息個數

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
登入後複製

Java訊息佇列任務的平滑關閉

另打開一個終端,透過ps命令查看進程號,或透過nohup啟動Java程序拿到流程id

ps -fe|grep MsgClient
登入後複製

Java訊息佇列任務的平滑關閉

當我們執行kill -12 pid的時候可以看到關閉業務邏輯

Java訊息佇列任務的平滑關閉

3.問題總結

3.問題總結

在部門的實際業務中,在部門的實際業務中,訊息佇列的訊息量還挺大的,某些業務高峰時每秒有幾百的訊息量,因此對訊息的處理要保證速度,避免訊息積壓,也可以透過負載解決單一訂閱節點的壓力。 🎜🎜在某些業務場景中,對訊息的完整性要求不那麼高,那麼就不用考慮重啟時的一點損耗。反之,就需要好好思考設計了。 🎜🎜🎜🎜
相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!