Springboot异步消息处理的方法
在工作中经常会碰到需要进行异步消息处理的业务场景,根据消息性质的不同有完全不同的处理方式。
1、消息不独立
不独立的消息通常是有顺序依赖关系,这时消息处理机制将退化为线性队列处理模式,只能由一个消费者去单线程处理消息。
2、消息完全独立
完全独立的消息,可以由多个消费者(线程)并发同时处理,可以达到最大的并发处理能力。
3、消息不完全独立
通常这种情况是,同源消息(来自同一生产者)要求有序,异源消息顺序无关。
这个场景的消息处理会相对复杂点,为了保证同源消息有序,很容易想到对同一来源的消息绑定固定的消费者线程,这样做很简单但存在很大问题。
如果生产者数量很大,绑定线程数可能不够,当然可以复用线程资源,同一线程绑定多个消息来源进行处理,这样做又会有另一个问题:消息源之间的相互影响。
考虑以下场景:
生产者P1产生大量消息进入队列后被分配给消费线程C1处理(C1可能需要处理很长时间),这时生产者P2产生了一个消息,不幸的是也被分配给了消费线程C1处理
那么生产者P2的消息处理将被P1的大量消息给阻塞住,导致了P1和P2之间的相互影响,而且也不能充分利用其它消费线程导致不均衡。
所以,我们必须考虑避免这样的问题。做到消费处理的及时性(尽快)、隔离性(避免相互干扰)、均衡性(最大化并发处理)
在实现中,会有两种模式,比较容易想到的是线程派发模型(PUSH方式),具体做法通常如下:
1. 有一个全局消息派发者,轮询队列取出消息。
2. 根据消息来源,派发给合适的消费线程处理。
派发的算法机制简单的可以类似像基于消息来源的Hash,复杂的可以根据各个消费线程的当前负载,等待队列长短、消息的复杂度进行综合分析选择派发。
简单Hash肯定会碰到上述场景描述的问题,但复杂派发计算很明显实现起来非常麻烦和复杂,效率也不一定好,在均衡性方面也很难做到十分平衡。
第二种模式采用PULL方式,线程按需拉取,具体做法如下:
1. 消息源直接将产生的消息放入对应该源的临时队列中(如下所示每个session代表一个不同的消息来源),再将session置入一个阻塞队列通知线程处理
2. 多个消费线程同时轮询队列,争抢消息(保证只有一个线程取到
3. 检查队列指示器是否正被其他线程处理(实现时需要在线程级别基于同源消息的检测同步)
4. 若未被其他线程处理,则在同步区置处理中指示状态,退出同步区后对临时队列中的消息进行处理
5. 处理完成后,最后再次进入同步区置处理指示状态为空闲
下面用一段代码来描述下消费线程处理流程:
public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
以上是Springboot异步消息处理的方法的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

Jasypt介绍Jasypt是一个java库,它允许开发员以最少的努力为他/她的项目添加基本的加密功能,并且不需要对加密工作原理有深入的了解用于单向和双向加密的高安全性、基于标准的加密技术。加密密码,文本,数字,二进制文件...适合集成到基于Spring的应用程序中,开放API,用于任何JCE提供程序...添加如下依赖:com.github.ulisesbocchiojasypt-spring-boot-starter2.1.1Jasypt好处保护我们的系统安全,即使代码泄露,也可以保证数据源的

使用场景1、下单成功,30分钟未支付。支付超时,自动取消订单2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评3、下单成功,商家5分钟未接单,订单取消4、配送超时,推送短信提醒……对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度的方式定时轮询处理。如:xxl-job今天我们采

一、Redis实现分布式锁原理为什么需要分布式锁在聊分布式锁之前,有必要先解释一下,为什么需要分布式锁。与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量的正确性,其使用范围是在同一个进程中。如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?现在的业务应用通常是微服务架构,这也意味着一个应用会部署多个进程,多个进程如果需要修改MySQL中的同一行记录,为了避免操作乱序导致脏数据,此时就需要引入分布式锁了。想要实现分

springboot读取文件,打成jar包后访问不到最新开发出现一种情况,springboot打成jar包后读取不到文件,原因是打包之后,文件的虚拟路径是无效的,只能通过流去读取。文件在resources下publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

在Springboot+Mybatis-plus不使用SQL语句进行多表添加操作我所遇到的问题准备工作在测试环境下模拟思维分解一下:创建出一个带有参数的BrandDTO对象模拟对后台传递参数我所遇到的问题我们都知道,在我们使用Mybatis-plus中进行多表操作是极其困难的,如果你不使用Mybatis-plus-join这一类的工具,你只能去配置对应的Mapper.xml文件,配置又臭又长的ResultMap,然后再去写对应的sql语句,这种方法虽然看上去很麻烦,但具有很高的灵活性,可以让我们

SpringBoot和SpringMVC都是Java开发中常用的框架,但它们之间有一些明显的差异。本文将探究这两个框架的特点和用途,并对它们的差异进行比较。首先,我们来了解一下SpringBoot。SpringBoot是由Pivotal团队开发的,它旨在简化基于Spring框架的应用程序的创建和部署。它提供了一种快速、轻量级的方式来构建独立的、可执行

1、自定义RedisTemplate1.1、RedisAPI默认序列化机制基于API的Redis缓存实现是使用RedisTemplate模板进行数据缓存操作的,这里打开RedisTemplate类,查看该类的源码信息publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations,BeanClassLoaderAware{//声明了key、value的各种序列化方式,初始值为空@NullableprivateRedisSe

在项目中,很多时候需要用到一些配置信息,这些信息在测试环境和生产环境下可能会有不同的配置,后面根据实际业务情况有可能还需要再做修改。我们不能将这些配置在代码中写死,最好是写到配置文件中,比如可以把这些信息写到application.yml文件中。那么,怎么在代码里获取或者使用这个地址呢?有2个方法。方法一:我们可以通过@Value注解的${key}即可获取配置文件(application.yml)中和key对应的value值,这个方法适用于微服务比较少的情形方法二:在实际项目中,遇到业务繁琐,逻
