假設我們有這麼一個業務場景,在網站下單付款以後,需要通知庫存服務進行出貨處理。
上面業務實作不難,我們只要讓庫存服務提供給相關的給口,下單付款之後只要呼叫庫存服務即可。
後面如果又有新的業務,比如說積分服務,他需要取得下單支付的結果,然後增加用戶的積分。
這個實作也不難,讓積分服務同樣提供一個接口,下單支付之後只要調用庫存服務即可。
如果只有兩個業務需要取得下單付款結果,那麼對程式進行改造也相對容易。然而,隨著業務不斷發展,越來越多的新業務需要進行下訂單並進行支付。
這時我們會發現上面這樣的系統架構存在許多問題:
第一,下單支付業務與其他業務重度耦合,每當有個新業務需要支付結果,就需要改動下單支付的業務。
第二,如果呼叫業務過多,會導致下單支付介面回應時間變長。同步造成下單支付介面回應變長的原因之一是下游介面回應變慢。
第三,如果任一下游介面失敗,可能導致資料不一致的情況。比如說下圖,先呼叫 A,成功之後再呼叫 B,最後再呼叫 C。
如果在呼叫B 介面的發生異常,此時可能就導致下單支付介面回傳失敗,但此時A 介面其實已經呼叫成功,這就代表它內部已經處理下單支付成功的結果。
這樣就會導致 A,B,C 三個下游接口,A 獲取成功獲取支付結果,但是 B,C 沒有拿到,導致三者係統數據不一致的情況。
其實我們仔細想一下,對於下單支付業務來講,它其實不需要關心下游調用結果,只要有某種機制通知能通知到他們就可以了。
講到這裡,這需要引進今天需要介紹發布訂閱機制。
Redis 提供了基於「發布/訂閱」模式的訊息機制,在這種模式下,訊息發布者與訂閱者不需要直接溝通。
如上圖所示,訊息發布者只需要想指定的頻道發布訊息,訂閱該頻道的每個用戶端都可以接受到這個訊息。
使用Redis 發布訂閱此機制,對於上面業務,下單支付業務只需要向支付結果這個頻道發送訊息,其他下游業務訂閱支付結果這個頻道,就能收相應訊息,然後做出業務處理即可。
這樣就可以解耦系統上下游之間呼叫關係。
接下來我們來看下,我們來看看如何使用 Redis 發布訂閱功能。
Redis 中提供了一組命令,可以用於發布訊息,訂閱頻道,取消訂閱以及按照模式訂閱。
首先我們來看下如何發布一則訊息,其實很簡單只要使用publish 指令:
publish channel message
上圖中,我們使用publish 指令向pay_result 這個頻道發送了一則訊息。我們可以看到 redis 向我們返回 0 ,這其實代表當前訂閱者個數,由於此時沒有訂閱,所以返回結果為 0 。
接下來我們使用subscribe 訂閱一個或多個頻道
subscribe channel [channel ...]
如上圖所示,我們訂閱pay_result 這個頻道,當有其他客戶端往這個頻道發送訊息,
#目前訂閱者就會收到訊息。
我們子在使用訂閱指令,需要主要幾點:
第一,客戶端執行訂閱指令之後,就會進入訂閱狀態,之後就只能接收subscribe、psubscribe、unsubscribe、punsubscribe 這四個指令。
第二,新订阅的客户端,是无法收到这个频道之前的消息,这是因为 Redis 并不会对发布的消息持久化的。
相比于很多专业 MQ,比如 kafka、rocketmq 来说, redis 发布订阅功能就显得有点简陋了。如果当前的使用场景可以容忍这些缺点,那么简单优秀的 redis 发布订阅功能值得选择。
除了上面的功能以外的,Redis 还支持模式匹配的订阅方式。简单来说,客户端可以订阅一个带 *
号的模式,如果某些频道的名字与这个模式匹配,那么当其他客户端发送给消息给这些频道时,订阅这个模式的客户端也将会到收到消息。
使用 Redis 订阅模式,我们需要使用一个新的指令 psubscribe。
我们执行下面这个指令:
psubscribe pay.*
那么一旦有其他客户端往 pay 开头的频道,比如 pay_result
、pay_xxx
,我们都可以收到消息。
如果需要取消订阅模式,我们需要使用相应punsubscribe
指令,比如取消上面订阅的模式:
punsubscribe pay.*
聊完 Redis 发布订阅指令,我们来看下 Java Redis 客户端如何使用发布订阅。
下面的例子主要基于 Jedis,maven 版本为:
<dependency> <groupid>redis.clients</groupid> <artifactid>jedis</artifactid> <version>3.1.0</version> </dependency>登入後複製其他 Redis 客户端大同小异。
jedis 发布代码比较简单,只需要调用 Jedis
类的 publish
方法。
// 生产环境千万不要这么使用哦,推荐使用 JedisPool 线程池的方式 Jedis jedis = new Jedis("localhost", 6379); jedis.auth("xxxxx"); jedis.publish("pay_result", "hello world");
订阅的代码就相对复杂了,我们需要继承 JedisPubSub
实现里面的相关方法,一旦有其他客户端往订阅的频道上发送消息,将会调用 JedisPubSub
相应的方法。
private static class MyListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("收到订阅频道:" + channel + " 消息:" + message); } @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("收到具体订阅频道:" + channel + "订阅模式:" + pattern + " 消息:" + message); } }
其次我们需要调用 Jedis
类的 subscribe
方法:
Jedis jedis = new Jedis("localhost", 6379); jedis.auth("xxx"); jedis.subscribe(new MyListener(), "pay_result");
当有其他客户端往 pay_result
频道发送消息时,订阅将会收到消息。
不过需要注意的是,jedis#subscribe
是一个阻塞方法,调用之后将会阻塞主线程的,所以如果需要在正式项目使用需要使用异步线程运行,这里就不演示具体的代码了。
原生 jedis 发布订阅操作,相对来说还是有点复杂。现在我们很多应用已经基于 SpringBoot 开发,使用 spring-boot-starter-data-redis
,可以简化发布订阅开发。
首先我们需要引入相应的 startter 依赖:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> <exclusions> <exclusion> <artifactid>lettuce-core</artifactid> <groupid>io.lettuce</groupid> </exclusion> </exclusions> </dependency> <dependency> <groupid>redis.clients</groupid> <artifactid>jedis</artifactid> </dependency>
这里我们使用 Jedis 当做底层连接客户端,所以需要排除 lettuce,然后引入 Jedis 依赖。
然后我们需要创建一个消息接收类,里面需要有方法消费消息:
@Slf4j public class Receiver { private AtomicInteger counter = new AtomicInteger(); public void receiveMessage(String message) { log.info("Received "); counter.incrementAndGet(); } public int getCount() { return counter.get(); } }
接着我们只需要注入 Spring- Redis 相关 Bean,比如:
StringRedisTemplate
,用来操作 Redis 命令
MessageListenerAdapter
,消息监听器,可以在这个类注入我们上面创建消息接受类 Receiver
RedisConnectionFactory
, 创建 Redis 底层连接
@Configuration public class MessageConfiguration { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 订阅指定频道使用 ChannelTopic // 订阅模式使用 PatternTopic container.addMessageListener(listenerAdapter, new ChannelTopic("pay_result")); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { // 注入 Receiver,指定类中的接受方法 return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean Receiver receiver() { return new Receiver(); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
最后我们使用 StringRedisTemplate#convertAndSend
发送消息,同时 Receiver
将会收到一条消息。
@SpringBootApplication public class MessagingRedisApplication { public static void main(String[] args) throws InterruptedException { ApplicationContext ctx = SpringApplication.run(MessagingRedisApplication.class, args); StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class); Receiver receiver = ctx.getBean(Receiver.class); while (receiver.getCount() == 0) { template.convertAndSend("pay_result", "Hello from Redis!"); Thread.sleep(500L); } System.exit(0); } }
Redis Sentinel 是 Redis 一套高可用方案,可以在主节点故障的时候,自动将从节点提升为主节点,从而转移故障。
今天这里我们不详细解释 Redis Sentinel 详细原理,主要来看下 Redis Sentinel 如何使用发布订阅机制。
Redis Sentinel 节点主要使用发布订阅机制,实现新节点的发现,以及交换主节点的之间的状态。
如下所示,每一个 Sentinel 节点将会定时向 _sentinel_:hello
频道发送消息,并且每个 Sentinel 都会订阅这个节点。
这样一旦有节点往这个频道发送消息,其他节点就可以立刻收到消息。
这样一旦有的新节点加入,它往这个频道发送消息,其他节点收到之后,判断本地列表并没有这个节点,于是就可以当做新的节点加入本地节点列表。
除此之外,每次往这个频道发送消息内容可以包含节点的状态信息,这样可以作为后面 Sentinel 领导者选举的依据。
以上都是对于 Redis 服务端来讲,对于客户端来讲,我们也可以用到发布订阅机制。
当 Redis Sentinel 进行主节点故障转移,这个过程各个阶段会通过发布订阅对外提供。
对于我们客户端来讲,比较关心切换之后的主节点,这样我们及时切换主节点的连接(旧节点此时已故障,不能再接受操作指令),
客户端可以订阅 +switch-master
频道,一旦 Redis Sentinel 结束了对主节点的故障转移就会发布主节点的的消息。
redission 开源框架提供一些便捷操作 Redis 的方法,其中比较出名的 redission 基于 Redis 的实现分布式锁。
今天我们来看下 Redis 的实现分布式锁中如何使用 Redis 发布订阅机制,提高加锁的性能。
PS:redission 分布式锁实现原理,可以参考之前写过的文章:
可重入分布式锁的实现方式
Redis 分布式锁,看似简单,其实真不简单
首先我们来看下 redission 加锁的方法:
Redisson redisson = .... RLock redissonLock = redisson.getLock("xxxx"); redissonLock.lock();
RLock
继承自 Java 标准的 Lock
接口,调用 lock
方法,如果当前锁已被其他客户端获取,那么当前加锁的线程将会被阻塞,直到其他客户端释放这把锁。
这里其实有个问题,当前阻塞的线程如何感知分布式锁已被释放呢?
这里其实有两种实现方法:
第一钟,定时查询分布时锁的状态,一旦查到锁已被释放(Redis 中不存在这个键值),那么就去加锁。
实现伪码如下:
while (true) { boolean result=lock(); if (!result) { Thread.sleep(N); } }
这种方式实现起来起来简单,不过缺点也比较多。
如果定时任务时间过短,将会导致查询次数过多,其实这些都是无效查询。
如果定时任务休眠时间过长,那又会导致加锁时间过长,导致加锁性能不好。
那么第二种实现方案,就是采用服务通知的机制,当分布式锁被释放之后,客户端可以收到锁释放的消息,然后第一时间再去加锁。
这个服务通知的机制我们可以使用 Redis 发布订阅模式。
当线程加锁失败之后,线程将会订阅 redisson_lock__channel_xxx
(xx 代表锁的名称) 频道,使用异步线程监听消息,然后利用 Java 中 Semaphore
使当前线程进入阻塞。
一旦其他客户端进行解锁,redission 就会往这个redisson_lock__channel_xxx
发送解锁消息。
等异步线程收到消息,将会调用 Semaphore
释放信号量,从而让当前被阻塞的线程唤醒去加锁。
ps:这里只是简单描述了 redission 加锁部分原理,出于篇幅,这里就不再消息解析源码。
感兴趣的小伙伴可以自己看下 redission 加锁的源码。
通过发布订阅机制,被阻塞的线程可以及时被唤醒,减少无效的空转的查询,有效的提高的加锁的效率。
ps: 这种方式,性能确实提高,但是实现起来的复杂度也很高,这部分源码有点东西,快看晕了。
以上是Redis發布訂閱怎麼實現的詳細內容。更多資訊請關注PHP中文網其他相關文章!