Cet article vous présentera l'utilisation avancée de Redis - file d'attente de messages et présentera la file d'attente différée dans Redis. J'espère qu'il vous sera utile !
En parlant de middleware de file d'attente de messages, nous pensons tous à RabbitMQ, RocketMQ et Kafka pour implémenter des fonctions de messagerie asynchrone pour les applications. Il s’agit de middlewares de file d’attente de messages spécialisés avec plus de fonctionnalités que nous ne pouvons en comprendre.
Ces middlewares de messagerie sont compliqués à utiliser, comme RabbitMQ. Avant d'envoyer un message, vous devez créer un échange et une file d'attente, puis lier l'échange et la file d'attente via certaines règles. Lors de l'envoi d'un message, vous devez formuler un routage. . -key, contrôle également le message d’en-tête. Ceci s'adresse uniquement aux producteurs. Les consommateurs doivent également suivre à nouveau la série d'étapes fastidieuses ci-dessus avant de consommer les messages.
Donc, pour ceux qui n'exigent pas une fiabilité à 100 % et souhaitent implémenter des exigences simples en matière de file d'attente de messages, nous pouvons utiliser Redis pour nous libérer des étapes fastidieuses du middleware de file d'attente de messages.
La file d'attente de messages de Redis n'est pas une file d'attente de messages professionnelle. Elle ne dispose pas de nombreuses fonctionnalités avancées dans la file d'attente de messages et n'a pas non plus de garantie d'accusé de réception. Si vous recherchez la fiabilité des messages, veuillez vous tourner vers le middleware MQ professionnel. [Recommandations associées : Tutoriel vidéo Redis]
À partir de la file d'attente de messages asynchrone la plus simple, la structure de données de liste de Redis est couramment utilisée comme file d'attente de messages asynchrone et est mise en file d'attente via lrpush/lpush rpop/. lpop pour sortir de la file d'attente.
Pour l'opération pop, lorsque la file d'attente des messages est vide, le client tombera dans une boucle infinie de pop, provoquant de nombreuses interrogations vides qui lui feront perdre la vie, obligeant le client à Le CPU est augmenté, et le QPS de Redis est également augmenté.
La solution au problème ci-dessus consiste à utiliser blpop/brpop de la structure de liste pour retirer la file d'attente, où le préfixe b représente le blocage, le blocage de la lecture. Pour bloquer les lectures, il entrera en état de veille lorsqu'il n'y a aucune donnée dans la file d'attente et se réveillera dès que les données arriveront. Résout parfaitement le problème ci-dessus.
La solution bloquant la lecture semble parfaite, mais elle entraîne immédiatement un autre problème : la connexion inactive. Si le thread continue de se bloquer quelque part, la connexion client Redis devient une connexion inactive. Si le temps d'inactivité est trop long, le serveur Redis se déconnectera activement pour réduire l'occupation des ressources inutilisées. À ce moment, blpop/brpop lèvera une exception.
Nous devons donc être prudents lors de l'écriture des consommateurs de clients (applications), faire attention à la détection des exceptions et réessayer.
Dans les verrous distribués Redis, il existe généralement trois stratégies pour gérer les échecs de verrouillage :
Lancez une exception directement et le frontal rappelle à l'utilisateur s'il doit continuer l'opération ;
// 生产\ public void delay(T msg) {\ TaskItem task = new TaskItem();\ task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\ task.msg = msg;\ String s = JSON.toJSONString(task); // fastjson 序列化\ jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\ }\ // 消费\ public void loop() {\ while (!Thread.interrupted()) {\ // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\ Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\ if (values.isEmpty()) {\ try {\ Thread.sleep(500); // 歇会继续\ }\ catch (InterruptedException e) {\ break;\ }\ continue;\ }\ String s = values.iterator().next(); //消费队列\ if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\ TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\ this.handleMsg(task.msg);\ }\ }\ }
Introduction à la programmation ! !
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!