Le rôle de préservation de la file d'attente de messages (MQ, Message Queue) dans la transmission des données de message offre une garantie de communication des données et une commodité dans le traitement en temps réel. Nous examinerons ici l'implémentation des threads de la file d'attente de messages MQ en Python et le. file d'attente des messages Analyse des avantages
La "file d'attente des messages" est un conteneur qui enregistre les messages lors de leur transmission. Le gestionnaire de files d'attente de messages agit comme intermédiaire lors du relais des messages de sa source vers sa destination. L'objectif principal d'une file d'attente est d'assurer le routage et de garantir la livraison des messages ; si le destinataire n'est pas disponible lorsque le message est envoyé, Message Queue conserve le message jusqu'à ce qu'il puisse être livré avec succès. Je pense que la file d'attente de messages est un composant crucial pour toute architecture ou application. Voici dix raisons :
Exemple de file d'attente de messages Python :
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): """没打印一个数字等待1秒,并发打印10个数字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): whileTrue: #消费者端,从队列中获取num num = self.queue.get() print "i'm num %s"%(num) time.sleep(1) #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号 self.queue.task_done() start = time.time() def main(): #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发 for i in range(10): t = ThreadNum(queue) t.setDaemon(True) t.start() #往队列中填错数据 for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
Résultats d'exécution :
i'm num 0 i'm num 1 i'm num 2 i'm num 3 i'm num 4 i'm num 5 i'm num 6 i'm num 7 i'm num 8 i'm num 9 Elapsed Time: 1.01399993896
Interprétation :
Les étapes de travail spécifiques sont décrites comme suit :
1, créez une Queue.Queue () et remplissez avec des données.
2, transmettez l'instance de données remplie à la classe de thread, qui est créée en héritant de threading.Thread.
3. Générez un pool de threads démon.
4. Prenez un élément de la file d'attente à chaque fois et utilisez la méthode data and run dans ce fil pour effectuer le travail correspondant.
5. Après avoir terminé ce travail, utilisez la fonction queue.task_done() pour envoyer un signal à la file d'attente indiquant que la tâche est terminée.
6. Effectuer une opération de jointure sur la file d'attente signifie en fait attendre que la file d'attente soit vide avant de quitter le programme principal.
Une chose à noter lors de l'utilisation de ce mode : en définissant le thread démon sur true, le programme se fermera automatiquement après son exécution. L'avantage est que vous pouvez effectuer une opération de jointure sur la file d'attente ou attendre que la file d'attente soit vide avant de sortir.
Les soi-disant files d'attente multiples, la sortie d'une file d'attente peut être utilisée comme entrée d'une autre file d'attente
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() out_queue = Queue.Queue() class ThreadNum(threading.Thread): def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): whileTrue: #从队列中取消息 num = self.queue.get() bkeep = num #将bkeep放入队列中 self.out_queue.put(bkeep) #signals to queue job is done self.queue.task_done() class PrintLove(threading.Thread): def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): whileTrue: #从队列中获取消息并赋值给bkeep bkeep = self.out_queue.get() keke = "I love " + str(bkeep) print keke, print self.getName() time.sleep(1) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #populate queue with data for num in range(10): queue.put(num) #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadNum(queue, out_queue) t.setDaemon(True) t.start() for i in range(5): pl = PrintLove(out_queue) pl.setDaemon(True) pl.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
Résultats d'exécution :
I love 0 Thread-6 I love 1 Thread-7 I love 2 Thread-8 I love 3 Thread-9 I love 4 Thread-10 I love 5 Thread-7 I love 6 Thread-6 I love 7 Thread-9 I love 8 Thread-8 I love 9 Thread-10 Elapsed Time: 2.00300002098
Interprétation :
Workflow de la classe ThreadNum
Définir la file d'attente--->Hériter le threading---->Initialiser file d'attente-- -->Définir la fonction d'exécution--->obtenir les données dans la file d'attente---->Traiter les données---->mettre les données dans une autre file d'attente-->Envoyer un signal à la file d'attente pour indiquer à la file d'attente que le traitement est terminé
Flux de travail de la fonction principale :
---> >---> La boucle for est déterminée comme étant démarrée Nombre de threads----> Instancier la classe ThreadNum----> Démarrez le thread et configurez la garde
--- > La boucle for détermine le nombre de threads démarrés----> Instancier la classe PrintLove ---> Démarrez le fil et définissez-le comme garde
---> file d'attente à traiter, puis exécutez la jointure. Autrement dit, quittez le programme principal.
Après avoir compris la mise en œuvre générale de MQ, résumons les avantages des files d'attente de messages :
1 Découplage
2. Redondance
3. Évolutivité
4. Flexibilité et capacité de traitement maximale
5. Récupérabilité
Le mécanisme de redondance fourni par la file d'attente des messages garantit que le message peut être réellement traité, tant qu'un processus lit la file d'attente. Sur cette base, IronMQ offre une garantie « livraison unique ». Quel que soit le nombre de processus recevant des données de la file d'attente, chaque message ne peut être traité qu'une seule fois. Ceci est possible car recevoir un message « s'abonne » simplement au message, le supprimant temporairement de la file d'attente. À moins que le client n'indique explicitement qu'il a terminé le traitement du message, le message sera remis dans la file d'attente et pourra être traité à nouveau après une période de temps configurable.
Dans de nombreux cas, l'ordre dans lequel les données sont traitées est important. La file d'attente des messages est intrinsèquement triée et peut garantir que les données seront traitées dans un ordre spécifique. IronMO garantit que les messages sont traités dans l'ordre FIFO (premier entré, premier sorti), de sorte que la position des messages dans la file d'attente correspond à la position à partir de laquelle ils ont été récupérés.
Dans tout système important, il y aura des éléments qui nécessiteront des temps de traitement différents. Par exemple, charger une image prend moins de temps que appliquer un filtre. Les files d'attente de messages utilisent une couche tampon pour aider les tâches à s'exécuter le plus efficacement possible : les écritures dans la file d'attente sont traitées aussi rapidement que possible, sans être contraintes par un traitement préparatoire à la lecture dans la file d'attente. Cette mise en mémoire tampon permet de contrôler et d'optimiser la vitesse à laquelle les données circulent dans le système.
Dans un système distribué, il est très difficile d'avoir une idée globale de la durée des opérations utilisateur et de leur raison. Les séries de messages peuvent aider à identifier les processus ou les domaines sous-performants en fonction de la fréquence de traitement des messages, là où le flux de données n'est pas suffisamment optimisé.
Souvent, vous ne voulez pas ou n'avez pas besoin de traiter les messages immédiatement. Les files d'attente de messages fournissent un mécanisme de traitement asynchrone qui vous permet de placer un message dans la file d'attente mais de ne pas le traiter immédiatement. Vous pouvez mettre autant de messages que vous le souhaitez dans la file d'attente et les traiter quand vous en avez envie.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!