Cet article présente principalement la méthode d'utilisation de Beanstalkd pour le traitement des tâches asynchrones en Python. Maintenant, je le partage avec vous et le donne comme référence. Jetons un coup d'œil ensemble
Utilisez Beanstalkd comme service de file d'attente de messages, puis combinez-le avec la syntaxe du décorateur de Python pour implémenter un outil de traitement de tâches asynchrone simple.
Effet final
Définir la tâche :
from xxxxx.job_queue import JobQueue queue = JobQueue() @queue.task('task_tube_one') def task_one(arg1, arg2, arg3): # do task
Soumettre la tâche :
task_one.put(arg1="a", arg2="b", arg3="c")
Ensuite, le fil de travail en arrière-plan peut effectuer ces tâches.
Processus de mise en œuvre
1. Comprendre le serveur Beanstalk
Beanstalk est une file d'attente de travail simple et rapide https://github.com. /kr/beanstalkd
Beanstalk est un service de file d'attente de messages implémenté en langage C. Il fournit une interface commune et a été initialement conçu pour réduire la latence des pages dans les applications Web à grande échelle en exécutant des tâches chronophages de manière asynchrone. Il existe différentes implémentations de Beanstalkd Client pour différentes langues. Il existe des beanstalkc et ainsi de suite en Python. J'utilise beanstalkc comme outil pour communiquer avec le serveur beanstalkd.
2. Principe de mise en œuvre de l'exécution asynchrone des tâches
beanstalkd ne peut effectuer que la planification de tâches de chaîne. Pour que le programme prenne en charge la soumission de fonctions et de paramètres, la fonction est ensuite exécutée par le travailleur et les paramètres sont transportés. Une couche intermédiaire est nécessaire pour enregistrer les fonctions avec les paramètres transmis.
L'implémentation comprend principalement 3 parties :
Abonné : responsable de l'enregistrement de la fonction sur un tube dans beanstalk. L'implémentation est très simple, enregistrant la relation correspondante entre le nom de la fonction et la fonction elle-même. . (Cela signifie que le même nom de fonction ne peut pas exister dans le même groupe (tube)). Les données sont stockées dans des variables de classe.
class Subscriber(object): FUN_MAP = defaultdict(dict) def __init__(self, func, tube): logger.info('register func:{} to tube:{}.'.format(func.__name__, tube)) Subscriber.FUN_MAP[tube][func.__name__] = func
JobQueue : convertit facilement une fonction ordinaire en un décorateur avec la capacité Putter
class JobQueue(object): @classmethod def task(cls, tube): def wrapper(func): Subscriber(func, tube) return Putter(func, tube) return wrapper
Putter : combinez le nom de la fonction, les paramètres de la fonction et le regroupement spécifié dans un objet, puis sérialisez json dans une chaîne, et enfin poussez-le vers la file d'attente beanstalkd via beanstalkc.
class Putter(object): def __init__(self, func, tube): self.func = func self.tube = tube # 直接调用返回 def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) # 推给离线队列 def put(self, **kwargs): args = { 'func_name': self.func.__name__, 'tube': self.tube, 'kwargs': kwargs } logger.info('put job:{} to queue'.format(args)) beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) try: beanstalk.use(self.tube) job_id = beanstalk.put(json.dumps(args)) return job_id finally: beanstalk.close()
Worker : prenez la chaîne de la file d'attente beanstalkd, puis désérialisez-la en un objet via json.loads pour obtenir le nom de la fonction et paramètres et tube. Enfin, le code de fonction correspondant au nom de la fonction est obtenu de l'abonné, puis les paramètres sont transmis pour exécuter la fonction.
class Worker(object): worker_id = 0 def __init__(self, tubes): self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) self.tubes = tubes self.reserve_timeout = 20 self.timeout_limit = 1000 self.kick_period = 600 self.signal_shutdown = False self.release_delay = 0 self.age = 0 self.signal_shutdown = False signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown()) Worker.worker_id += 1 import_module_by_str('pear.web.controllers.controller_crawler') def subscribe(self): if isinstance(self.tubes, list): for tube in self.tubes: if tube not in Subscriber.FUN_MAP.keys(): logger.error('tube:{} not register!'.format(tube)) continue self.beanstalk.watch(tube) else: if self.tubes not in Subscriber.FUN_MAP.keys(): logger.error('tube:{} not register!'.format(self.tubes)) return self.beanstalk.watch(self.tubes) def run(self): self.subscribe() while True: if self.signal_shutdown: break if self.signal_shutdown: logger.info("graceful shutdown") break job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout if not job: continue try: self.on_job(job) self.delete_job(job) except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) except Exception as e: logger.error(e) kicks = job.stats()['kicks'] if kicks < 3: self.bury_job(job) else: message = json.loads(job.body) logger.error("Kicks reach max. Delete the job", extra={'body': message}) self.delete_job(job) @classmethod def on_job(cls, job): start = time.time() msg = json.loads(job.body) logger.info(msg) tube = msg.get('tube') func_name = msg.get('func_name') try: func = Subscriber.FUN_MAP[tube][func_name] kwargs = msg.get('kwargs') func(**kwargs) logger.info(u'{}-{}'.format(func, kwargs)) except Exception as e: logger.error(e.message, exc_info=True) cost = time.time() - start logger.info('{} cost {}s'.format(func_name, cost)) @classmethod def delete_job(cls, job): try: job.delete() except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) @classmethod def bury_job(cls, job): try: job.bury() except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) def graceful_shutdown(self): self.signal_shutdown = True
Lors de l'écriture du code ci-dessus, j'ai trouvé un problème :
Enregistrer la fonction via l'abonné La relation correspondante entre le nom et la fonction elle-même est dans un interpréteur Python, c'est-à-dire qu'elle s'exécute dans un processus et que le Worker s'exécute de manière asynchrone dans un autre processus. Comment le Worker peut-il également obtenir le même abonné que Putter ? Enfin, j'ai découvert que ce problème pouvait être résolu grâce au mécanisme de décoration de Python.
Cette phrase a résolu le problème de l'abonné
import_module_by_str('pear.web.controllers.controller_crawler')
# import_module_by_str 的实现 def import_module_by_str(module_name): if isinstance(module_name, unicode): module_name = str(module_name) __import__(module_name)
Lorsque import_module_by_str est exécuté, __import__ est appelé pour charger dynamiquement les classes et les fonctions. Après avoir chargé le module contenant la fonction utilisant JobQueue en mémoire. Lors de l'exécution de Woker, l'interpréteur Python exécutera d'abord le code du décorateur @-decorated et chargera la relation correspondante dans Subscriber en mémoire.
Pour une utilisation réelle, veuillez consulter https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py
Recommandations associées :
Explication détaillée de l'instance de classe de file d'attente de messages php-beanstalkd
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!