1.問題背景
對於訊息佇列的監聽,我們一般使用Java寫一個獨立的程序,在Linux伺服器上運行。程式啟動後,透過訊息佇列客戶端接收訊息,放入一個執行緒池進行非同步處理,並發的快速處理。
那麼問題來了,當我們修改程式後,需要重新啟動任務的時候,如何保證訊息的不遺失呢?
正常來說,訂閱者程式關閉後,訊息會在發送者隊列中堆積,等待訂閱者下次訂閱消費,所以未接收的訊息是不會遺失的。唯一可能遺失的訊息,就是在關閉的一瞬間,已經從佇列中取出但還沒有處理完畢的訊息。
因此我們需要一套平滑關閉的機制,確保在重啟的時候,訊息可以正常處理完成。
2.問題分析
平滑關閉的思路如下:
在關閉程式時,首先關閉訊息訂閱,這個時候訊息都在發送者佇列中
關閉本地訊息處理執行緒池(等待本地執行緒池中的訊息處理完畢)
程式退出
關閉訊息訂閱:一般訊息佇列的客戶端都提供關閉連線的方法,具體可以自行查看api
關閉執行緒池:Java的ThreadPoolExecutor執行緒池提供shutdown()和shutdownNow( )兩個方法,差異是前者會等待線程池中的消息都處理完畢,後者直接停止線程的執行並返回list集合。因為我們需要使用shutdown()方法進行關閉,並通過isTerminated(),方法判斷線程池是否已經關閉.
那麼問題又來了,我們如何通知到程序,需要執行關閉操作呢?
在Linux中,我們可以用kill -9 pid關閉進程,除了-9之外,我們可以透過 kill -l查看kill 指令的其它信號量,例如使用12) SIGUSR2 信號量
我們可以在Java程式啟動時,註冊對應的信號量,對信號量進行監聽,在收到對應的kill操作時,執行相關的業務操作。
偽代碼如下
//注册linux kill信号量 kill -12Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { //关闭订阅者 //关闭线程池 //退出 } });
下面透過一個demo模擬相關邏輯操作
先模擬一個生產者,每秒生產5個訊息
然後模擬一個訂閱者,收到訊息後交給線程池進行處理,線程池固定4個線程,每個訊息處理時間1秒,這樣線程池每秒會積壓1個訊息。
package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/** * @author lujianing01@58.com * @Description: * @date 2016/11/14 */public class MsgClient { //模拟消息队列订阅者 同时4个线程处理 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); //模拟消息队列生产者 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); //用于判断是否关闭订阅 private static volatile boolean isClose = false; public static void main(String[] args) throws InterruptedException { BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100); producer(queue); consumer(queue); } //模拟消息队列生产者 private static void producer(final BlockingQueue queue){ //每200毫秒向队列中放入一个消息 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { public void run() { queue.offer(""); } }, 0L, 200L, TimeUnit.MILLISECONDS); } //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个 private static void consumer(final BlockingQueue queue) throws InterruptedException { while (!isClose){ getPoolBacklogSize(); //从队列中拿到消息 final String msg = (String)queue.take(); //放入线程池处理 if(!THREAD_POOL.isShutdown()) { THREAD_POOL.execute(new Runnable() { public void run() { try { //System.out.println(msg); TimeUnit.MILLISECONDS.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //查看线程池堆积消息个数 private static long getPoolBacklogSize(){ long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount(); System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog)); return backlog; } static { String osName = System.getProperty("os.name").toLowerCase(); if(osName != null && osName.indexOf("window") == -1) { //注册linux kill信号量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { System.out.println("收到kill消息,执行关闭操作"); //关闭订阅消费 isClose = true; //关闭线程池,等待线程池积压消息处理 THREAD_POOL.shutdown(); //判断线程池是否关闭 while (!THREAD_POOL.isTerminated()) { try { //每200毫秒 判断线程池积压数量 getPoolBacklogSize(); TimeUnit.MILLISECONDS.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("订阅者关闭,线程池处理完毕"); System.exit(0); } }); } } }
當我們在服務上運行時,透過控制台可以看到相關的輸出訊息,demo中輸出了線程池的積壓訊息個數
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
另打開一個終端,透過ps命令查看進程號,或透過nohup啟動Java程序拿到流程id
ps -fe|grep MsgClient
當我們執行kill -12 pid的時候可以看到關閉業務邏輯
3.問題總結
3.問題總結