Redis如何實現延遲隊列?方法介紹
延遲佇列,顧名思義它是一種具有延遲功能的訊息佇列。那麼,是在什麼場景下我才需要這樣的隊列呢?
1. 背景
我們先看看以下業務場景:
- 當訂單一直處於未支付狀態時,如何及時的關閉訂單
- 如何定期檢查處於退款狀態的訂單是否已經退款成功
- 在訂單長時間沒有收到下游系統的狀態通知的時候,如何實現階梯式的同步訂單狀態的策略
- 在系統通知上游系統支付成功終態時,上游系統返回通知失敗,如何進行非同步通知實行分頻率發送:15s 3m 10m 30m 30m 1h 2h 6h 15h
1.1 解決方案
#最簡單的方式,定時掃表 。例如訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作。 優點是簡單,缺點是每分鐘全域掃表,浪費資源,如果遇到表資料訂單量即將過期的訂單量很大,會造成關單延遲。
使用RabbitMq或其他MQ改造實作延遲佇列,優點是,開源,現成的穩定的實作方案,缺點是:MQ是一個訊息中介軟體,如果團隊技術堆疊本來就有MQ,那還好,如果不是,那為了延遲隊列而去部署一套MQ成本有點大
#使用Redis的zset、list的特性,我們可以利用redis來實現一個延遲佇列RedisDelayQueue
#2. 設計目標
- # 即時性:允許存在一定時間的秒級誤差
- 高可用性:支援單機、支援叢集
- 支援訊息刪除:業務會隨時刪除指定訊息
- 訊息可靠性:保證至少被消費一次
- 訊息持久化:基於Redis本身的持久化特性,如果Redis資料遺失,意味著延遲訊息的遺失,不過可以做主備和叢集保證。這個可以考慮後續最佳化將訊息持久化到MangoDB中
3.設計方案
設計主要包含以下幾點:- 將整個Redis當做訊息池,以KV形式儲存訊息
- 使用ZSET做優先權佇列,依照Score維持優先權
- 使用LIST結構,以先進先出的方式消費
- ZSET和LIST儲存訊息地址(對應訊息池的每個KEY)
- 自訂路由對象,儲存ZSET和LIST名稱,以點對點的方式將訊息從ZSET路由到正確的LIST
- 使用定時器維護路由
- 根據TTL規則實作訊息延遲
3.1 設計圖
還是基於有讚的延遲佇列設計,進行最佳化改造及程式碼實作。有讚設計3.2 資料結構
- ZING:DELAY_QUEUE:JOB_POOL
是一個Hash_Table結構,裡面儲存了所有延遲佇列的資訊。 KV結構:K=prefix projectName field = topic jobId V=CONENT;V由客戶端傳入的數據,消費的時候回傳
- ZING:DELAY_QUEUE:BUCKET
延遲隊列的有序集合ZSET,存放K=ID和需要的執行時間戳,根據時間戳排序
- ZING:DELAY_QUEUE:QUEUE
LIST結構,每個Topic一個LIST,list存放的都是目前需要被消費的JOB
圖片僅供參考,基本上可以描述整個流程的執行過程,圖片源自於文末的參考部落格中
3.3 任務的生命週期
- 新增一個JOB,會在
- ZING:DELAY_QUEUE:JOB_POOL
中插入一條數據,記錄了業務方消費方。
ZING:DELAY_QUEUE:BUCKET也會插入一筆記錄,記錄執行的時間戳
搬運執行緒會去 - ZING:DELAY_QUEUE:BUCKET
中尋找哪些執行時間戳的RunTimeMillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什麼,然後將這些任務PUSH到TOPIC對應的清單
ZING:DELAY_QUEUE:QUEUE #每個TOPIC的LIST都會有一個監聽線程去批量獲取LIST中的待消費數據,獲取到的數據全部扔給這個TOPIC的消費線程池 - 消費線程池執行會去
- ZING:DELAY_QUEUE:JOB_POOL
尋找資料結構,傳回回呼結構,執行回呼方法。
3.4 設計要點#
3.4.1 基本概念
- JOB:需要非同步處理的任務,是延遲佇列裡的基本單元
- Topic:一組相同類型Job的集合(佇列)。供消費者訂閱
3.4.2 訊息結構
每個JOB必須包含以下幾個屬性
- jobId:Job的唯一識別。用來檢索和刪除指定的Job資訊
- topic:Job類型。可以理解成具體的業務名稱
- delay:Job需要延遲的時間。單位:秒。 (服務端會將其轉換為絕對時間)
- body:Job的內容,供消費者做具體的業務處理,以json格式儲存
- retry:失敗重試次數
- url:通知URL
3.5 設計細節
3.5.1 如何快速消費ZING:DELAY_QUEUE:QUEUE
#最簡單的實作方式就是使用計時器進行秒數掃描,為了確保訊息執行的時效性,可以設定每1S請求Redis一次,判斷隊列中是否有待消費的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸好LIST中有一個BLPOP阻塞原語
,如果list中有資料就會立刻返回,如果沒有資料就會一直阻塞在那裡,直到有資料回傳,可以設定阻塞的超時時間,超時會傳回NULL;具體的實作方式及策略會在程式碼中進行具體的實作介紹
3.5.2 避免定時導致的訊息重複搬運及消費
- 使用Redis的分散式鎖定來控制訊息的搬運,從而避免訊息被重複搬運導致的問題
- 使用分散式鎖定來保證計時器的執行頻率
4.核心程式碼實作
4.1 技術說明
#技術堆疊:SpringBoot,Redisson,Redis,分散式鎖,計時器
#注意:本專案沒有實現設計方案中的多Queue消費,只開啟了一個QUEUE,這個待以後優化
4.2 核心實體
#4.2.1 Job新增物件
/** * 消息结构 * * @author 睁眼看世界 * @date 2020年1月15日 */ @Data public class Job implements Serializable { private static final long serialVersionUID = 1L; /** * Job的唯一标识。用来检索和删除指定的Job信息 */ @NotBlank private String jobId; /** * Job类型。可以理解成具体的业务名称 */ @NotBlank private String topic; /** * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间) */ private Long delay; /** * Job的内容,供消费者做具体的业务处理,以json格式存储 */ @NotBlank private String body; /** * 失败重试次数 */ private int retry = 0; /** * 通知URL */ @NotBlank private String url; }
4.2.2 Job刪除物件
#/** * 消息结构 * * @author 睁眼看世界 * @date 2020年1月15日 */ @Data public class JobDie implements Serializable { private static final long serialVersionUID = 1L; /** * Job的唯一标识。用来检索和删除指定的Job信息 */ @NotBlank private String jobId; /** * Job类型。可以理解成具体的业务名称 */ @NotBlank private String topic; }
4.3 搬運執行緒
#/** * 搬运线程 * * @author 睁眼看世界 * @date 2020年1月17日 */ @Slf4j @Component public class CarryJobScheduled { @Autowired private RedissonClient redissonClient; /** * 启动定时开启搬运JOB信息 */ @Scheduled(cron = "*/1 * * * * *") public void carryJobToQueue() { System.out.println("carryJobToQueue --->"); RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE); long now = System.currentTimeMillis(); Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true); List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList()); RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE); readyQueue.addAll(jobList); bucketSet.removeAllAsync(jobList); } catch (InterruptedException e) { log.error("carryJobToQueue error", e); } finally { if (lock != null) { lock.unlock(); } } } }</string></string></object></object>
4.4 消費執行緒
#@Slf4j @Component public class ReadyQueueContext { @Autowired private RedissonClient redissonClient; @Autowired private ConsumerService consumerService; /** * TOPIC消费线程 */ @PostConstruct public void startTopicConsumer() { TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程"); } /** * 开启TOPIC消费线程 * 将所有可能出现的异常全部catch住,确保While(true)能够不中断 */ @SuppressWarnings("InfiniteLoopStatement") private void runTopicThreads() { while (true) { RLock lock = null; try { lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK); } catch (Exception e) { log.error("runTopicThreads getLock error", e); } try { if (lock == null) { continue; } // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { continue; } // 1. 获取ReadyQueue中待消费的数据 RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE); String topicId = queue.poll(60, TimeUnit.SECONDS); if (StringUtils.isEmpty(topicId)) { continue; } // 2. 获取job元信息内容 RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY); Job job = jobPoolMap.get(topicId); // 3. 消费 FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId()); if (taskResult.get()) { // 3.1 消费成功,删除JobPool和DelayBucket的job信息 jobPoolMap.remove(topicId); } else { int retrySum = job.getRetry() + 1; // 3.2 消费失败,则根据策略重新加入Bucket // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) { jobPoolMap.remove(topicId); continue; } job.setRetry(retrySum); long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000; log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime)); RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(nextTime, topicId); // 3.3 更新元信息失败次数 jobPoolMap.put(topicId, job); } } catch (Exception e) { log.error("runTopicThreads error", e); } finally { if (lock != null) { try { lock.unlock(); } catch (Exception e) { log.error("runTopicThreads unlock error", e); } } } } } }</object></boolean></string></string>
4.5 新增及刪除JOB
/** * 提供给外部服务的操作接口 * * @author why * @date 2020年1月15日 */ @Slf4j @Service public class RedisDelayQueueServiceImpl implements RedisDelayQueueService { @Autowired private RedissonClient redissonClient; /** * 添加job元信息 * * @param job 元信息 */ @Override public void addJob(Job job) { RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId()); // 1. 将job添加到 JobPool中 RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); if (jobPool.get(topicId) != null) { throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST); } jobPool.put(topicId, job); // 2. 将job添加到 DelayBucket中 RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(job.getDelay(), topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } /** * 删除job信息 * * @param job 元信息 */ @Override public void deleteJob(JobDie jobDie) { RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId()); RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); jobPool.remove(topicId); RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.remove(topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } }</object></string></object></string>
#5. 待最佳化的內容
- 目前只有一個Queue隊列存放訊息,當需要消費的消息大量堆積後,會影響訊息通知的時效。改進的辦法是,開啟多個Queue,進行訊息路由,再開啟多個消費執行緒進行消費,提供吞吐量
- 訊息沒有持久化,存在風險,後續會將訊息持久化到MangoDB中
6. 原始碼
#更多詳細原始碼請在下面位址中取得
-
RedisDelayQueue實作
zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue) -
RedissonStarter
redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter) -
專案應用程式
zing-pay(https://gitee.com/whyCodeData/zing-pay)
#7.參考
以上是Redis如何實現延遲隊列?方法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

Redis集群模式通過分片將Redis實例部署到多個服務器,提高可擴展性和可用性。搭建步驟如下:創建奇數個Redis實例,端口不同;創建3個sentinel實例,監控Redis實例並進行故障轉移;配置sentinel配置文件,添加監控Redis實例信息和故障轉移設置;配置Redis實例配置文件,啟用集群模式並指定集群信息文件路徑;創建nodes.conf文件,包含各Redis實例的信息;啟動集群,執行create命令創建集群並指定副本數量;登錄集群執行CLUSTER INFO命令驗證集群狀態;使

如何清空 Redis 數據:使用 FLUSHALL 命令清除所有鍵值。使用 FLUSHDB 命令清除當前選定數據庫的鍵值。使用 SELECT 切換數據庫,再使用 FLUSHDB 清除多個數據庫。使用 DEL 命令刪除特定鍵。使用 redis-cli 工具清空數據。

要從 Redis 讀取隊列,需要獲取隊列名稱、使用 LPOP 命令讀取元素,並處理空隊列。具體步驟如下:獲取隊列名稱:以 "queue:" 前綴命名,如 "queue:my-queue"。使用 LPOP 命令:從隊列頭部彈出元素並返回其值,如 LPOP queue:my-queue。處理空隊列:如果隊列為空,LPOP 返回 nil,可先檢查隊列是否存在再讀取元素。

使用 Redis 指令需要以下步驟:打開 Redis 客戶端。輸入指令(動詞 鍵 值)。提供所需參數(因指令而異)。按 Enter 執行指令。 Redis 返迴響應,指示操作結果(通常為 OK 或 -ERR)。

使用Redis進行鎖操作需要通過SETNX命令獲取鎖,然後使用EXPIRE命令設置過期時間。具體步驟為:(1) 使用SETNX命令嘗試設置一個鍵值對;(2) 使用EXPIRE命令為鎖設置過期時間;(3) 當不再需要鎖時,使用DEL命令刪除該鎖。

在CentOS系統上,您可以通過修改Redis配置文件或使用Redis命令來限制Lua腳本的執行時間,從而防止惡意腳本佔用過多資源。方法一:修改Redis配置文件定位Redis配置文件:Redis配置文件通常位於/etc/redis/redis.conf。編輯配置文件:使用文本編輯器(例如vi或nano)打開配置文件:sudovi/etc/redis/redis.conf設置Lua腳本執行時間限制:在配置文件中添加或修改以下行,設置Lua腳本的最大執行時間(單位:毫秒)

使用 Redis 命令行工具 (redis-cli) 可通過以下步驟管理和操作 Redis:連接到服務器,指定地址和端口。使用命令名稱和參數向服務器發送命令。使用 HELP 命令查看特定命令的幫助信息。使用 QUIT 命令退出命令行工具。

在Debian系統中,readdir系統調用用於讀取目錄內容。如果其性能表現不佳,可嘗試以下優化策略:精簡目錄文件數量:盡可能將大型目錄拆分成多個小型目錄,降低每次readdir調用處理的項目數量。啟用目錄內容緩存:構建緩存機制,定期或在目錄內容變更時更新緩存,減少對readdir的頻繁調用。內存緩存(如Memcached或Redis)或本地緩存(如文件或數據庫)均可考慮。採用高效數據結構:如果自行實現目錄遍歷,選擇更高效的數據結構(例如哈希表而非線性搜索)存儲和訪問目錄信
