ホームページ > データベース > Redis > Redis で遅延キューを実装する方法

Redis で遅延キューを実装する方法

WBOY
リリース: 2023-05-26 16:44:23
転載
2400 人が閲覧しました

    Redis は遅延キューを実装します

    Redis 遅延キュー

    Redis は順序付きセット (ZSet) を介して遅延メッセージ キューを実装します。はい、ZSet には遅延実行時間を保存するために使用できるスコア プロパティ。

    ただし、タスクをチェックするために無限ループが必要となり、システム リソースが消費されます

    class RedisDelayQueue(object):
        """Simple Queue with Redis Backend
        dq = RedisDelayQueue('delay:commtrans')
        dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})
        print(dq.get())
        """
        def __init__(self, name, namespace='queue'):
            """The default connection parameters are: host='localhost', port=6379, db=0"""
            self.__db = get_redis_engine(database_name='spdb')
            self.key = '%s:%s' % (namespace, name)
        def qsize(self):
            """Return the approximate size of the queue."""
            return self.__db.zcard(self.key)
        def empty(self):
            """Return True if the queue is empty, False otherwise."""
            return self.qsize() == 0
        def rem(self, value):
            return self.__db.zrem(self.key, value)
        def get(self):
            # 获取任务,以0和当前时间为区间,返回一条在当前区间的记录
            items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
            if items:
                item = items[0]
                if self.rem(item):  # 解决并发问题  如能删就让谁取走
                    return json.loads(item)
            return None
        def put(self, interval, item):
            """:param interval 延时秒数"""
            # 以时间作为score,对任务队列按时间戳从小到大排序
            """Put item into the queue."""
            d = json.dumps(item)
            return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
    ログイン後にコピー

    遅延キューを実装するための Redis の最適化計画

    遅延キューのアプリケーション

    最近の開発部門の新しいプロジェクトでは、重要な機能の 1 つはインテリジェント プッシュです。これは、次のビジネス シナリオなど、ユーザーの行動に基づいて特定の時点で対応するリマインダー メッセージをユーザーにプッシュすることです。

    • ## ユーザーがリチャージ項目をクリックして 30 分以内にリチャージしないと、未完了のリチャージのリマインダーがユーザーにプッシュされます。

    • ユーザーが最後に読書を行ってから 2 時間後に、続きを読むリマインダーをユーザーにプッシュします。

    • ユーザーがアプリを新規登録または終了してから N 分後に、適切な推奨メッセージをユーザーにプッシュします。

    上記のシナリオに共通する特徴は、イベントがトリガーされた後、特定のイベントを実行する前に一定期間遅延することです。イベントがトリガーした場合、上記のロジックは、指定された時点 (イベント トリガー時点の遅延時間長) で特定のタスクを実行することと同等であることがわかります。

    このような要件を達成するには、通常、遅延キューが使用されます。作成される遅延メッセージには、タスクの遅延時間やタスクの実行時点などの情報が含まれる必要があります。タスクが時間条件を満たし、実行する必要がある場合、メッセージは消費になります。つまり、キュー内のメッセージが消費される時点を指定できます。

    遅延キューの実装

    スタンドアロン環境では、

    JDK には、DelayQueue# などの遅延キュー機能を実装できる多くのコンポーネントがすでに付属しています。 # #、TimerScheduledExecutorService およびその他のコンポーネントを使用すると、遅延タスクを簡単に作成できます。ただし、上記のコンポーネントを使用するには、通常、タスクをメモリに保存する必要があります。サービスの再起動時にタスクが失われます。さらに、タスクのサイズはメモリによって制限されるため、長期的なメモリ使用量が発生し、柔軟性に欠けます。通常は、単一プロセスのクライアント プログラムまたはタスクの量が多くないプロジェクトに適しています。要件。 分散環境では、

    JDK

    独自のコンポーネントを使用するだけでは遅延キューを確実かつ効率的に実装できないため、通常はサードパーティのミドルウェアまたはフレームワークを導入する必要があります。 たとえば、一般的な古典的なタスク スケジューリング フレームワーク

    Quartz

    またはこのフレームワーク xxl-job に基づく他のフレームワーク。これらのフレームワークの主な機能は、スケジュールされたタスクを実装することです。または定期的なタスク RedisRabbitMQ が広く使用されていなかったときのタスク。たとえば、タイムアウトや未払いの注文キャンセルなどの一般的な機能はスケジュールされたタスクによって実装され、定期的なポーリングは注文が処理されたかどうかを判断し、実行をトリガーする時点に達します。 ただし、スケジュールされたタスクには一定の周期性が必要であるため、定期スキャンの間隔を制御するのは困難です。間隔が短すぎると、無意味なスキャンが大量に発生し、システムの負荷が増大します。長すぎると、大きい場合、1 回のスキャンで処理されるスタック レコードの数が多すぎる可能性があります。

    さらに、

    RabbitMQ

    TTL## を介してメッセージを実装するなど、MQ を使用して遅延キューを作成することも一般的な方法です。 # およびデッドレターキュー 配信された MQ メッセージが簡単に削除または変更できないこと、つまりタスクのキャンセルやタスク実行時点の変更が実現できないことを考慮した配信の遅延。メッセージは簡単に重複排除できないため、プロジェクトでの遅延キューの実装に MQ を使用することは選択しませんでした。 Redis

    zset のデータ構造は遅延キューの効果も実現でき、より柔軟であり、MQ が実現できるいくつかのことを実現できます。機能を実行できないため、プロジェクトは最終的に Redis を使用して遅延キューを実装し、それを最適化してカプセル化します。 実装原則は、zset

    score 属性を使用することです。redis は、要素を zset に作成します。 score に従ってコレクションを小さいものから大きいものに並べ替え、次のコマンドに示すように、zadd コマンドを使用して要素を zset に追加します。 value 値は遅延です。タスク メッセージの場合、メッセージ形式は業務に応じて定義できます。score 値は、タスクの実行時点 (13 桁のミリ秒のタイムスタンプなど) です。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">zadd delayqueue 1614608094000 taskinfo</pre><div class="contentsignin">ログイン後にコピー</div></div>タスクが追加された後、タスクを取得するロジックは、score 値が

    zset

    からの現在のタイムスタンプより小さい要素をフィルターするだけで済みます。結果は現在の時刻ノードです。実行する必要があるタスクは、次のコマンドに示すように、zrangebyscore コマンドを通じて取得されます。timestamp は現在のタイムスタンプ、limit を使用すると、一度に取得されるレコード数が大きくなりすぎないように、毎回のレコード数のプル数を制限できます。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">zrangebyscore delayqueue 0 timestamp limit 0 1000</pre><div class="contentsignin">ログイン後にコピー</div></div><p>在实际实现过程中,从<code>zset中获取到当前需要执行的任务后,需要先确保将任务对应的元素从zset中删除,删除成功后才允许执行任务逻辑,这样是为了在分布式环境下,当存在多个线程获取到同一任务后,利用redis删除操作的原子性,确保只有一个线程能够删除成功并执行任务,防止重复执行。

    实际任务的执行通常会再将其发送至MQ异步处理,将“获取任务”与“执行任务”两者分离解耦,更加灵活,“获取任务”只负责拿到当前时间需要执行的任务,并不真正运行任务业务逻辑,因此只需相对少量的执行线程即可,而实际的任务执行逻辑则由MQ消费者承担,方便调控负载能力。

    整体过程如下图所示。

    Redis で遅延キューを実装する方法

    采用zset做延时队列的另一个好处是可以实现任务的取消和任务执行时间点的更改,只需要将任务信息从zset中删除,便可取消任务,同时由于zset拥有集合去重的特性,只需再次写入同一个任务信息,但是value值设置为不同的执行时间点,便可更改任务执行时间,实现单个任务执行时间的动态调整。

    了解实现原理后,再进行具体编程实现。创建延时任务较为简便,准备好任务消息和执行时间点,写入zset即可。获取延时任务最简单的方案是通过定时任务,周期性地执行上述逻辑,如下代码所示。

    @XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue")
    public void scanBusiness1() {
    	// 某业务逻辑的zset延迟队列对应的key
    	String zsetKey = "delayqueue:business1";
    	while (true) {
    		// 筛选score值小于当前时间戳的元素,一次最多拉取1000条
    		Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000);
    		if (CollectionUtils.isEmpty(tasks)) {
    			// 当前时间下已没有需要执行的任务,结束本次扫描
    			return;
    		}
    		for (String task : tasks) {
    			// 先删除,再执行,确保多线程环境下执行的唯一性
    			Boolean delete = stringRedisTemplate.delete(task);
    			if (delete) {
    				// 删除成功后,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦
    				rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task);
    			}
    		}
    	}
    }
    ログイン後にコピー

    上述方案使用xxl-job做分布式定时任务,间隔5秒执行一次,代码借助spring提供的api来完成redisMQ的操作。

    由于是分布式定时任务,每次执行只有一个线程在获取任务,机器利用率低,当数据规模较大时,单靠一个线程无法满足吞吐量要求,因此这种方案只适用于小规模数据量级别。

    此处间隔时间也可适当调整,例如缩短为1秒,调整所需考虑原则在上文已提到:间隔太短会造成很多无意义的扫描,且增大系统压力,太长又会造成执行时间误差太大。

    为了提升整体吞吐量,考虑不使用分布式定时任务,对集群内每台机器(或实例)均设置独立的定时任务,同时采用多个zset队列,以数字后缀区分。

    假设有Mzset队列,创建延时消息时选取消息的某个ID字段,计算hash值再对M取余,根据余数决定发送到对应数字后缀的zset队列中(分散消息,此处ID字段选取需要考虑做到均匀分布,不要造成数据倾斜)。

    队列数量M的选取需要考虑机器数量N,理想情况下有多少台机器就定义多少个队列,保持MN基本相等即可。

    因为队列太少,会造成机器对队列的竞争访问处理,队列太多又会导致任务得不到及时的处理。

    为了灵活应对集群机器数量的变化,建议将队列数量配置为动态可调的,例如采用分布式配置中心的方案。

    每台机器在触发定时任务时,需要通过适当的负载均衡来决定从哪个队列拉取消息,负载均衡的好坏也会影响整个集群的效率,如果负载分布不均可能会导致多台机器竞争处理同一队列,降低效率。

    一个简单实用的做法是利用redis的自增操作再对队列数量取余即可,只要保持队列数量和机器数量基本相等,这种做法在很大程度上就可以保证不会有多台机器竞争同一队列。

    至于每台机器从对应zset中的任务获取逻辑,仍然和前面代码一致。以上方式简化实现代码如下所示。

    @Scheduled(cron = "0/5 * * * * ?")
    public void scanBusiness1() {
    	// 队列数量M,考虑动态配置,保持和机器数量基本一致
    	int M = 10;
    	// redis自增key,用于负载均衡
    	String incrKey = "incrkey:delayqueue:business1";
    	// 每台机器执行时,从不同的zset中拉取消息,尽量确保不同机器访问不同zset
    	String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M);
    	while (true) {
    		// 此处逻辑和前面代码一致,省略。。。
    	}
    }
    ログイン後にコピー

    上述方案和第一种方案的主要的不同点在于zsetKey的获取上,这里是根据负载均衡算法算出来的,确保每台机器访问不同zset并拉取消息,同时定时任务采用spring提供的进程内注解@Scheduled,集群内每台机器都会间隔5秒执行,因此相比之前的方案,能够较为明显地提升整个集群的吞吐量。

    但是这种方案的步骤相对更为复杂,需要动态配置队列数量,同时在创建延时任务时需要选择合适的消息ID字段来决定发送的目标zset队列,此处还要考虑均匀分布,整体实现要考虑的因素较多。

    上面一种方案已经能够较好地满足整体吞吐量要求,但其缺点是步骤相对复杂,因此项目中没有采用这种方案,而是采用下面一种也能满足吞吐量要求,步骤相对简单,又方便通用化的方案。

    该方案不使用定时任务,而是单独启动后台线程,在线程中执行永久循环,每次循环逻辑为:从目标zset中获取score值小于当前时间戳的元素集合中的score最小的那个元素,相当于获取当前时间点需要执行且执行时间点最早的那个任务,如果获取不到,表示当前时间点下暂无需要执行的任务,则线程休眠100ms(可视情况调整),否则,对获取到的元素进行处理,在分布式多线程环境下,仍然需要先删除成功才能进行处理。

    此外,考虑到每个线程获取元素后都需要再次访问redis尝试删除操作,为了避免多线程争抢浪费资源,降低效率,这里采用lua脚本将获取和删除操作原子化。lua脚本逻辑代码如下所示。

    local zsetKey = &#39;delayqueue&#39;
    local timestamp = 1614608094000
    local items = redis.call(&#39;zrangebyscore&#39;,zsetKey,0,timestamp,&#39;limit&#39;,0,1)
    if #items == 0 then
        return &#39;&#39;
    else
        redis.call(&#39;zremrangebyrank&#39;,zsetKey,0,0)
        return items[1]
    end
    ログイン後にコピー

    其中timestamp为当前时间戳,通过在zrangebyscore命令中指定limit为1来获取score最小的元素,若获取不到,即结果集长度为0,则返回空字符串,否则,通过zremrangebyrank命令删除头部元素,即score最小的元素,也就是之前获取到的那个元素,由于redis内部保证lua脚本的原子性,上述获取并删除的操作能够运行无误。具体JAVA实现中还对其进行了多线程操作的封装和通用化的抽象,使不同业务都能够使用该组件实现延时队列。具体实现代码如下所示。

    /**
     * 基于ZSET实现消息延迟处理,score存储执行时间点,到达时间点即会向指定队列发送该消息;
     * 定义一个继承本类的bean即可;
     */
    public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean {
    	private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT;
    	static {
    		// 获取并删除的lua脚本,使用spring提供的api
    		String sb = "local items = redis.call(&#39;zrangebyscore&#39;,KEYS[1],0,ARGV[1],&#39;limit&#39;,0,1)\n" +
    				"if #items == 0 then\n" +
    				"\treturn &#39;&#39;\n" +
    				"else\n" +
    				"\tredis.call(&#39;zremrangebyrank&#39;,KEYS[1],0,0)\n" +
    				"\treturn items[1]\n" +
    				"end";
    		// 自有工具类,只要能创建出spring包下的 RedisScript 的实现类对象均可
    		TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class);
    	}
    	private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(),
    			0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix()));
    	private volatile boolean quit = false;
    	@Autowired
    	private StringRedisTemplate stringRedisTemplate;
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	@PostConstruct
    	public void startScan() {
    		// bean构建完成后,启动若干执行线程
    		int threadNum = getThreadNum();
    		for (int i = 0; i < threadNum; i++) {
    			EXECUTOR.execute(this);
    		}
    	}
    	@Override
    	public void run() {
    		while (!quit) {
    			try {
    				// 循环,采用lua获取当前需要执行的任务并将其从redis中删除
    				String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT,
    						Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis()));
    				if (StringUtils.isNotBlank(msg)) {
    					// 消息不为空,表示获取任务成功,将其再发送到指定MQ异步处理,将“获取任务”与“执行任务”分离解耦
    					rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg);
    				} else {
    					// 获取不到任务,表示当前时间点下暂无需要执行的任务,则线程休眠1S(可视情况调整)
    					SleepUtils.sleepSeconds(1);
    				}
    			} catch (Exception e) {
    				Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e);
    			}
    		}
    	}
    	@Override
    	public void destroy() throws Exception {
    		quit = true;
    	}
    	public void setQuit(boolean quit) {
    		this.quit = quit;
    	}
    	/**
    	 * 获取消息的工作线程数量
    	 */
    	protected abstract int getThreadNum();
    	/**
    	 * 线程名称前缀,方便问题定位
    	 */
    	protected abstract String getThreadNamePrefix();
    	/**
    	 * 存放延迟消息的ZSET队列名
    	 */
    	protected abstract String getDelayedMsgSourceKey();
    	/**
    	 * 消息到达执行时间点时将其通过指定 exchange 发送到实时消费队列中
    	 */
    	protected abstract String getSendExchange();
    	/**
    	 * 消息到达执行时间点时将其通过指定 routingKey 发送到实时消费队列中
    	 */
    	protected abstract String getSendRoutingKey();
    }
    ログイン後にコピー

    在具体业务应用中,只需定义一个继承上述类的bean即可,需要实现的方法主要是提供一些配置,比如该业务对应的zset延时队列名称,同时工作拉取消息的线程数量,由于采用rabbitMq,因此这里需要提供exchangeroutingKey

    实际使用中只需向该zset队列中添加消息,并将score设为该任务需要执行的时间点(此处为13位毫秒时间戳),则到该时间点后,上述组件便会将该消息从zset中取出并删除,再将其通过指定的路由发送到实时MQ消费队列中,由消费者负责执行任务业务逻辑。目前该组件在项目中正常平稳运行。

    注意:

    本文结合项目中的实际需求介绍了延时队列的应用场景,分析了延时队列的多种实现,重点讲述了利用redis实现延时队列的原理,对其实现方案进行比较与优化,并将最终方案实际运用于项目需求中。

    以上がRedis で遅延キューを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    関連ラベル:
    ソース:yisu.com
    このウェブサイトの声明
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
    人気のチュートリアル
    詳細>
    最新のダウンロード
    詳細>
    ウェブエフェクト
    公式サイト
    サイト素材
    フロントエンドテンプレート