Pratique de développement de coroutines asynchrones : création d'un système de file d'attente de messages hautes performances
Avec le développement d'Internet, le système de file d'attente de messages est devenu un élément clé dans la construction d'un système distribué hautes performances et évolutif. Lors de la création d'un système de file d'attente de messages, l'application de coroutines asynchrones peut améliorer efficacement les performances et l'évolutivité du système. Cet article présentera le développement pratique de coroutines asynchrones, en prenant comme exemple la création d'un système de file d'attente de messages hautes performances, et fournira des exemples de code spécifiques.
1.1 Légères : les coroutines asynchrones n'ont pas besoin de créer des threads supplémentaires, et seul un petit nombre de coroutines doivent être créées pour obtenir une concurrence à grande échelle. Cela réduit considérablement la consommation des ressources système.
1.2 Efficacité : les coroutines asynchrones utilisent des E/S non bloquantes et des mécanismes pilotés par les événements pour obtenir une planification et un traitement efficaces des tâches avec une surcharge extrêmement faible et ne sont pas soumises à la surcharge du changement de contexte.
1.3 Évolutivité : les coroutines asynchrones peuvent se développer automatiquement à mesure que la charge du système augmente, sans qu'il soit nécessaire d'ajuster manuellement des paramètres tels que la taille du pool de threads.
import asyncio message_queue = [] subscriptions = {} async def publish(channel, message): message_queue.append((channel, message)) await notify_subscribers() async def notify_subscribers(): while message_queue: channel, message = message_queue.pop(0) for subscriber in subscriptions.get(channel, []): asyncio.ensure_future(subscriber(message)) async def subscribe(channel, callback): if channel not in subscriptions: subscriptions[channel] = [] subscriptions[channel].append(callback) async def consumer(message): print("Received message:", message) async def main(): await subscribe("channel1", consumer) await publish("channel1", "hello world") if __name__ == "__main__": asyncio.run(main())
Dans le code ci-dessus, nous utilisons une liste message_queue
pour stocker les messages publiés et un dictionnaire d'abonnements.
pour stocker les abonnés et les chaînes correspondantes. La fonction publish
permet de publier des messages, la fonction notify_subscribers
permet de notifier les abonnés, la fonction subscribe
permet de s'abonner à une chaîne, et consumer< La fonction /code> sert d'exemple de consommateur. <code>message_queue
列表来存储发布的消息,使用一个字典subscriptions
来存储订阅者和对应的通道。publish
函数用于发布消息,notify_subscribers
函数用于通知订阅者,subscribe
函数用于订阅某个通道,consumer
函数作为一个示例的消费者。
在main
函数中,我们首先使用subscribe
函数订阅了channel1
通道,并将consumer
函数指定为订阅者。然后我们使用publish
函数发布了一条消息到channel1
通道,notify_subscribers
会自动地将消息发送给订阅者。
下面是一个基于异步I/O和协程池的消息队列系统的优化示例代码:
import asyncio from concurrent.futures import ThreadPoolExecutor message_queue = [] subscriptions = {} executor = ThreadPoolExecutor() async def publish(channel, message): message_queue.append((channel, message)) await notify_subscribers() async def notify_subscribers(): while message_queue: channel, message = message_queue.pop(0) for subscriber in subscriptions.get(channel, []): await execute(subscriber(message)) async def execute(callback): loop = asyncio.get_running_loop() await loop.run_in_executor(executor, callback) async def subscribe(channel, callback): if channel not in subscriptions: subscriptions[channel] = [] subscriptions[channel].append(callback) async def consumer(message): print("Received message:", message) async def main(): await subscribe("channel1", consumer) await publish("channel1", "hello world") if __name__ == "__main__": asyncio.run(main())
在优化示例代码中,我们使用executor
来创建一个协程池,并通过execute
main
, on s'abonne d'abord au canal channel1
en utilisant la fonction subscribe
et on précise le consommateur
fonction pour les abonnés. Ensuite, nous utilisons la fonction publish
pour publier un message sur le canal channel1
, et notify_subscribers
enverra automatiquement le message aux abonnés.
exécuteur
pour créer un pool de coroutines, Et placez la fonction de rappel dans le pool de coroutines pour exécution via la fonction execute
. Cela peut éviter un changement de contexte excessif, exécuter des fonctions de rappel simultanément et améliorer les capacités de traitement des messages. 🎜🎜Bien sûr, dans le système de file d'attente de messages actuel, il peut être encore optimisé et étendu, par exemple en introduisant la persistance des messages, le mécanisme de confirmation des messages, l'expansion horizontale, etc. 🎜🎜🎜Résumé🎜Cet article présente le développement pratique de coroutines asynchrones, en prenant comme exemple la création d'un système de file d'attente de messages hautes performances, et fournit des exemples de code spécifiques. Les coroutines asynchrones peuvent permettre une planification et un traitement efficaces des tâches avec une surcharge extrêmement faible, et peuvent améliorer efficacement les performances et l'évolutivité du système. En combinant des technologies telles que les E/S asynchrones et les pools de coroutines, nous pouvons optimiser et étendre davantage le système de file d'attente de messages pour l'adapter aux différents scénarios et besoins d'application. 🎜🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!