Java消息队列任务的平滑关闭
1.问题背景
对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。
那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?
正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。
因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。
2.问题分析
平滑关闭的思路如下:
在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中
关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)
程序退出
关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api
关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭.
那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?
在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量
我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。
伪代码如下
//注册linux kill信号量 kill -12Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { //关闭订阅者 //关闭线程池 //退出 } });
下面通过一个demo模拟相关逻辑操作
首先模拟一个生产者,每秒生产5个消息
然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。
package com.lujianing.demo;import sun.misc.Signal;import sun.misc.SignalHandler;import java.util.concurrent.*;/** * @author lujianing01@58.com * @Description: * @date 2016/11/14 */public class MsgClient { //模拟消息队列订阅者 同时4个线程处理 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); //模拟消息队列生产者 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); //用于判断是否关闭订阅 private static volatile boolean isClose = false; public static void main(String[] args) throws InterruptedException { BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100); producer(queue); consumer(queue); } //模拟消息队列生产者 private static void producer(final BlockingQueue queue){ //每200毫秒向队列中放入一个消息 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() { public void run() { queue.offer(""); } }, 0L, 200L, TimeUnit.MILLISECONDS); } //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个 private static void consumer(final BlockingQueue queue) throws InterruptedException { while (!isClose){ getPoolBacklogSize(); //从队列中拿到消息 final String msg = (String)queue.take(); //放入线程池处理 if(!THREAD_POOL.isShutdown()) { THREAD_POOL.execute(new Runnable() { public void run() { try { //System.out.println(msg); TimeUnit.MILLISECONDS.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //查看线程池堆积消息个数 private static long getPoolBacklogSize(){ long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount(); System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog)); return backlog; } static { String osName = System.getProperty("os.name").toLowerCase(); if(osName != null && osName.indexOf("window") == -1) { //注册linux kill信号量 kill -12 Signal sig = new Signal("USR2"); Signal.handle(sig, new SignalHandler() { @Override public void handle(Signal signal) { System.out.println("收到kill消息,执行关闭操作"); //关闭订阅消费 isClose = true; //关闭线程池,等待线程池积压消息处理 THREAD_POOL.shutdown(); //判断线程池是否关闭 while (!THREAD_POOL.isTerminated()) { try { //每200毫秒 判断线程池积压数量 getPoolBacklogSize(); TimeUnit.MILLISECONDS.sleep(200L); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("订阅者关闭,线程池处理完毕"); System.exit(0); } }); } } }
当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id
ps -fe|grep MsgClient
当我们执行kill -12 pid的时候 可以看到关闭业务逻辑
3.问题总结
在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。
在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

Java 8引入了Stream API,提供了一种强大且表达力丰富的处理数据集合的方式。然而,使用Stream时,一个常见问题是:如何从forEach操作中中断或返回? 传统循环允许提前中断或返回,但Stream的forEach方法并不直接支持这种方式。本文将解释原因,并探讨在Stream处理系统中实现提前终止的替代方法。 延伸阅读: Java Stream API改进 理解Stream forEach forEach方法是一个终端操作,它对Stream中的每个元素执行一个操作。它的设计意图是处

胶囊是一种三维几何图形,由一个圆柱体和两端各一个半球体组成。胶囊的体积可以通过将圆柱体的体积和两端半球体的体积相加来计算。本教程将讨论如何使用不同的方法在Java中计算给定胶囊的体积。 胶囊体积公式 胶囊体积的公式如下: 胶囊体积 = 圆柱体体积 两个半球体体积 其中, r: 半球体的半径。 h: 圆柱体的高度(不包括半球体)。 例子 1 输入 半径 = 5 单位 高度 = 10 单位 输出 体积 = 1570.8 立方单位 解释 使用公式计算体积: 体积 = π × r2 × h (4

Java是热门编程语言,适合初学者和经验丰富的开发者学习。本教程从基础概念出发,逐步深入讲解高级主题。安装Java开发工具包后,可通过创建简单的“Hello,World!”程序实践编程。理解代码后,使用命令提示符编译并运行程序,控制台上将输出“Hello,World!”。学习Java开启了编程之旅,随着掌握程度加深,可创建更复杂的应用程序。
