Comment utiliser Redis pour créer une file d'attente de messages
Tout d'abord, Redis est conçu pour être utilisé pour la mise en cache, mais en raison de certaines de ses propres caractéristiques, il peut être utilisé pour les files d'attente de messages. Il dispose de plusieurs API de blocage qui peuvent être utilisées. Ce sont ces API de blocage qui lui donnent la possibilité de créer des files d'attente de messages.
Imaginez simplement que sous l'idée de"la base de données résout tous les problèmes", vos besoins peuvent être satisfaits sans utiliser de files d'attente de messages. Nous stockons toutes les tâches dans la base de données, puis les traitons via une interrogation continue. Bien que cette approche puisse accomplir votre tâche, elle est très rudimentaire. Mais si l'interface de votre base de données fournit une méthode de blocage, vous pouvez éviter les opérations d'interrogation. Votre base de données peut également être utilisée comme file d'attente de messages, mais la base de données actuelle ne dispose pas d'une telle interface.
De plus, d'autres fonctionnalités de la file d'attente de messages telles que FIFO sont également faciles à implémenter. Vous n'avez besoin que d'un objet List pour récupérer les données du début et remplir les données de la fin.
Redis peut être utilisé comme file d'attente de messages grâce à son interface d'objet liste blpop brpop et certaines interfaces de Pub/Sub (publier/s'abonner). Ce sont toutes des versions bloquantes, elles peuvent donc être utilisées comme files d’attente de messages.
Approche prioritaire de Rabbitmq
Il existe de nombreux produits de file d'attente de messages matures, tels que Rabbitmq. Il est relativement simple à utiliser et possède des fonctions relativement riches. Il est tout à fait suffisant dans les situations générales. Mais ce qui est très ennuyeux, c'est qu'il ne prend pas en charge les priorités.
Par exemple, dans la tâche d'envoi d'e-mails, certains utilisateurs privilégiés espèrent que leurs e-mails pourront être envoyés plus rapidement, ou du moins leur donner la priorité par rapport aux utilisateurs ordinaires. Par défaut, Rabbitmq ne peut pas le gérer. Les tâches lancées vers Rabbitmq sont FIFO premier entré, premier sorti. Mais nous pouvons utiliser certaines solutions de contournement pour prendre en charge ces priorités. Créez plusieurs files d'attente et définissez les règles de routage correspondantes pour les consommateurs Rabbitmq.
Par exemple, il existe une telle file d'attente par défaut. Nous utilisons une liste pour simuler [tâche1, tâche2, tâche3], et les consommateurs retirent à tour de rôle les tâches une par une selon le principe FIFO pour les traiter. . Si une tâche hautement prioritaire arrive, elle ne peut être traitée qu'en dernier [tâche1, tâche2, tâche3, higitask1]. Mais si deux files d'attente sont utilisées, une file d'attente haute priorité et une file d'attente prioritaire normale. Priorité normale [tâche1, tâche2, tâche3], priorité élevée [tâche élevée1] Ensuite, nous définissons le routage du consommateur pour lui permettre de récupérer aléatoirement les données de n'importe quelle file d'attente.
Et nous pouvons définir un consommateur qui gère spécifiquement les files d'attente à haute priorité. Il ne traitera pas les données des files d'attente à faible priorité lorsqu'il est inactif. Ceci est similaire au comptoir VIP d'une banque : les clients ordinaires font la queue pour obtenir un numéro à la banque. Lorsqu'un VIP arrive, même s'il ne retire pas de ticket à la machine à numéroter qui se trouve devant les membres ordinaires, il peut toujours accéder directement au canal VIP plus rapidement.
Si vous utilisez Rabbitmq pour prendre en charge les files d'attente de messages prioritaires, tout comme les membres VIP de la même banque mentionnés ci-dessus, passez par différents canaux. Cependant, cette méthode n'utilise qu'une priorité relative et ne peut pas obtenir un contrôle de priorité absolu. Par exemple, j'espère qu'une certaine tâche hautement prioritaire sera traitée en premier dans un sens absolu plutôt que d'autres tâches ordinaires. Dans ce cas, la solution ci-dessus ne fonctionnera pas. . Étant donné que le consommateur de Rabbitmq ne sait traiter "de manière aléatoire" les premières données d'une file d'attente de la file d'attente qui l'intéresse que lorsqu'elle est libre, il ne peut pas contrôler quelle file d'attente il prend en premier. Ou un contrôle de priorité plus fin. Ou il y a plus de 10 priorités définies dans votre système. Il est également difficile d’utiliser Rabbitmq de cette manière.
Mais si vous utilisez Redis comme file d'attente, les exigences ci-dessus peuvent être remplies.
Pourquoi la file d'attente des messages est nécessaire
L'introduction du mécanisme de file d'attente des messages dans le système est une très grande amélioration du système. Par exemple, dans un système Web, après que l'utilisateur a effectué une certaine opération, une notification par courrier électronique doit être envoyée à la boîte aux lettres de l'utilisateur. Vous pouvez utiliser la méthode synchrone pour laisser l'utilisateur attendre que l' e-mail soit envoyé avant de faire un retour à l'utilisateur, mais cela peut amener l'utilisateur à attendre longtemps en raison de l'incertitude du réseau et affecter l'expérience utilisateur. .
Dans certains scénarios, il est impossible d'attendre la fin en utilisant la méthode synchrone, et ces opérations nécessitent beaucoup de temps en arrière-plan. Par exemple, dans un exemple extrême, pour une tâche système de compilation en ligne, la compilation en arrière-plan prend 30 minutes. La conception de ce scénario rend impossible d'attendre de manière synchrone puis de donner un retour. Il doit d'abord renvoyer un retour à l'utilisateur, puis le traitement asynchrone est terminé, puis attendre que le traitement soit terminé avant de renvoyer à l'utilisateur selon le cas. situation.
De plus, la situation dans laquelle la file d'attente de messages est applicable est celle dans laquelle la capacité de traitement du système est limitée. Le mécanisme de file d'attente est d'abord utilisé pour stocker temporairement les tâches, et le système traite ensuite à tour de rôle les tâches en file d'attente. un par un. De cette manière, des tâches hautement concurrentes peuvent être traitées de manière stable même lorsque le débit du système est insuffisant.
La file d'attente des messages peut être utilisée comme mécanisme de file d'attente. Tant que le système doit utiliser le mécanisme de file d'attente, la file d'attente des messages peut être utilisée.
Mise en œuvre de la priorité de la file d'attente des messages Redis
Explication de quelques connaissances de base de Redis
redis> blpop tasklist 0 "im task 01"
Cet exemple utilise la commande blpop pour récupérer les premières données de la liste des tâches de manière bloquante, et le dernier paramètre est le délai d'attente. S'il est défini sur 0, cela signifie attendre indéfiniment. De plus, les données stockées dans redis ne peuvent être que de type string, donc lors du transfert de tâches, seule string peut être transmise. Il nous suffit simplement de sérialiser les données responsables dans une chaîne au format json, puis de les convertir côté consommateur.
Ici, notre exemple de langage utilise python, et la bibliothèque liée à redis utilise redis-py. Si vous avez des compétences en programmation, cela ne devrait poser aucun problème de le basculer vers votre langage préféré.
1. File d'attente FIFO simple
import redis, time def handle(task): print task time.sleep(4) def main(): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) r = redis.Redis(connection_pool=pool) while 1: result = r.brpop('tasklist', 0) handle(result[1]) if name == "main": main()
L'exemple ci-dessus est même le consommateur le plus simple. Nous récupérons en continu les données de la file d'attente Redis via une boucle infinie. S'il n'y a aucune donnée dans la file d'attente, elle y sera bloquée sans délai d'attente. S'il y a des données, elles seront supprimées et exécutées.
Généralement, il s'agira d'une chaîne complexe supprimée. Nous devrons peut-être la formater puis la transmettre à la fonction de traitement, mais par souci de simplicité, notre exemple est une chaîne ordinaire. De plus, la fonction de traitement de l'exemple n'effectue aucun traitement, mais la veille est utilisée pour simuler des opérations fastidieuses.
Nous ouvrons un autre client Redis pour simuler le producteur, et le client intégré suffit. Mettez plus de données dans la file d'attente de la liste des tâches.
redis> lpush tasklist 'im task 01' redis> lpush tasklist 'im task 02' redis> lpush tasklist 'im task 03' redis> lpush tasklist 'im task 04' redis> lpush tasklist 'im task 05'
Puis côté consommateur, vous verrez ces tâches simulées être consommées une à une.
2. File d'attente prioritaire simple
Supposons une exigence simple, qui nécessite uniquement que les tâches hautement prioritaires soient traitées en premier, plutôt que les tâches faiblement prioritaires. L'ordre des autres tâches n'a pas d'importance. Dans ce cas, il suffit de la placer en tête de la file d'attente lorsque nous rencontrons une tâche hautement prioritaire, au lieu de la placer en arrière-plan.
Comme notre file d'attente utilise une liste Redis, elle est facile à mettre en œuvre. Utilisez rpush en cas de priorité élevée. Utilisez lpush en cas de priorité faible. Vous verrez alors que la priorité élevée est toujours exécutée avant la priorité faible. Cependant, l’inconvénient de cette solution est que l’ordre d’exécution des tâches hautement prioritaires est le premier entré, dernier sorti.
redis> lpush tasklist 'im task 01' redis> lpush tasklist 'im task 02' redis> rpush tasklist 'im high task 01' redis> rpush tasklist 'im high task 01' redis> lpush tasklist 'im task 03' redis> rpush tasklist 'im high task 03'
Le code ci-dessus récupérera les données des deux files d'attente 'high_task_queue' et 'low_task_queue' de manière bloquante si la première n'est pas récupérée de la deuxième file d'attente.
def main(): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) r = redis.Redis(connection_pool=pool) while 1: result = r.brpop(['high_task_queue', 'low_task_queue'], 0) handle(result[1])
Grâce au test ci-dessus, nous pouvons voir que la priorité élevée sera exécutée en premier, et le principe FIFO est également garanti entre les priorités élevées.
redis> lpush low_task_queue low001 redis> lpush low_task_queue low002 redis> lpush low_task_queue low003 redis> lpush low_task_queue low004 redis> lpush high_task_queue low001 redis> lpush high_task_queue low002 redis> lpush high_task_queue low003 redis> lpush high_task_queue low004
type de données
triable tel qu'un ensemble trié, il est dommage qu'il n'ait pas de version bloquante de l'interface. Nous ne pouvons donc utiliser le type de liste que pour atteindre cet objectif via d'autres méthodes.Un moyen simple consiste à créer une seule file d'attente et à s'assurer qu'elle est triée par priorité. Utilisez ensuite la méthode recherche binaire
pour trouver l'emplacement approprié pour une tâche et insérez-le dans l'emplacement correspondant via la commande lset. Étant donné que la recherche binaire est relativement rapide et que Redis lui-même est également en mémoire, théoriquement, la vitesse peut être garantie. Mais si la quantité de données est vraiment importante, nous pouvons également l’ajuster de certaines manières.Rappelez-vous notre troisième option, combiner la troisième option réduira considérablement les frais généraux. Par exemple, pour les files d'attente avec un volume de données de 100 000, leurs priorités sont également comprises de manière aléatoire entre 0 et 100 000. Nous pouvons configurer 10 ou 100 files d'attente différentes. Les tâches prioritaires de 0 à 10 000 sont placées dans la file d'attente 1, et les tâches de 10 000 à 20 000 sont placées dans la file d'attente 2. De cette façon, une fois qu'une file d'attente est divisée en différents niveaux, les données d'une seule file d'attente seront considérablement réduites, de sorte que l'efficacité de la correspondance de recherche binaire sera plus élevée. Cependant, les ressources occupées par les données restent fondamentalement inchangées. Cent mille données devraient occuper la même quantité de mémoire. C'est juste qu'il y a plus de files d'attente dans le système.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!