In diesem Artikel wird hauptsächlich die Methode zur Verwendung von Beanstalkd für die asynchrone Aufgabenverarbeitung in Python vorgestellt. Jetzt teile ich ihn mit Ihnen und gebe ihn als Referenz. Schauen wir uns das gemeinsam an
Verwenden Sie Beanstalkd als Nachrichtenwarteschlangendienst und kombinieren Sie es dann mit der Dekorator-Syntax von Python, um ein einfaches asynchrones Aufgabenverarbeitungstool zu implementieren.
Endeffekt
Aufgabe definieren:
from xxxxx.job_queue import JobQueue queue = JobQueue() @queue.task('task_tube_one') def task_one(arg1, arg2, arg3): # do task
Aufgabe senden:
task_one.put(arg1="a", arg2="b", arg3="c")
Dann kann der Hintergrundarbeitsthread diese Aufgaben ausführen.
Implementierungsprozess
Beanstalk Server verstehen
Beanstalk ist eine einfache, schnelle Arbeitswarteschlange /kr/beanstalkd
Beanstalk ist ein in der Sprache C implementierter Nachrichtenwarteschlangendienst. Es bietet eine gemeinsame Schnittstelle und wurde ursprünglich entwickelt, um die Seitenlatenz in großen Webanwendungen durch die asynchrone Ausführung zeitaufwändiger Aufgaben zu reduzieren. Es gibt verschiedene Beanstalkd-Client-Implementierungen für verschiedene Sprachen. Es gibt Beanstalkc usw. in Python. Ich verwende Beanstalkc als Tool zur Kommunikation mit dem Beanstalkd-Server.
2. Implementierungsprinzip der asynchronen Aufgabenausführung
beanstalkd kann nur eine String-Aufgabenplanung durchführen. Damit das Programm das Senden von Funktionen und Parametern unterstützt, wird die Funktion dann vom Worker ausgeführt und die Parameter werden übertragen. Um Funktionen mit übergebenen Parametern zu registrieren, ist eine Zwischenschicht erforderlich.
Die Implementierung besteht hauptsächlich aus 3 Teilen:
Abonnent: Verantwortlich für die Registrierung der Funktion in einer Tube in Beanstalk. Die Implementierung ist sehr einfach und registriert die Entsprechung zwischen dem Funktionsnamen und der Funktion selbst. (Das bedeutet, dass derselbe Funktionsname nicht in derselben Gruppe (Röhre) existieren kann). Daten werden in Klassenvariablen gespeichert.
class Subscriber(object): FUN_MAP = defaultdict(dict) def __init__(self, func, tube): logger.info('register func:{} to tube:{}.'.format(func.__name__, tube)) Subscriber.FUN_MAP[tube][func.__name__] = func
JobQueue: Praktisch, um eine gewöhnliche Funktion in einen Decorator mit Putter-Fähigkeit umzuwandeln
class JobQueue(object): @classmethod def task(cls, tube): def wrapper(func): Subscriber(func, tube) return Putter(func, tube) return wrapper
Putter: Kombinieren Sie den Funktionsnamen, die Funktionsparameter und die angegebene Gruppierung zu einem Objekt, serialisieren Sie dann JSON in einen String und schieben Sie ihn schließlich über Beanstalkc in die Beanstalkd-Warteschlange.
class Putter(object): def __init__(self, func, tube): self.func = func self.tube = tube # 直接调用返回 def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) # 推给离线队列 def put(self, **kwargs): args = { 'func_name': self.func.__name__, 'tube': self.tube, 'kwargs': kwargs } logger.info('put job:{} to queue'.format(args)) beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) try: beanstalk.use(self.tube) job_id = beanstalk.put(json.dumps(args)) return job_id finally: beanstalk.close()
Worker: Nehmen Sie die Zeichenfolge aus der Beanstalkd-Warteschlange und deserialisieren Sie sie dann über json.loads in ein Objekt, um den Funktionsnamen zu erhalten und Parameter und Rohr. Schließlich wird der dem Funktionsnamen entsprechende Funktionscode vom Abonnenten abgerufen und anschließend werden die Parameter zum Ausführen der Funktion übergeben.
class Worker(object): worker_id = 0 def __init__(self, tubes): self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port']) self.tubes = tubes self.reserve_timeout = 20 self.timeout_limit = 1000 self.kick_period = 600 self.signal_shutdown = False self.release_delay = 0 self.age = 0 self.signal_shutdown = False signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown()) Worker.worker_id += 1 import_module_by_str('pear.web.controllers.controller_crawler') def subscribe(self): if isinstance(self.tubes, list): for tube in self.tubes: if tube not in Subscriber.FUN_MAP.keys(): logger.error('tube:{} not register!'.format(tube)) continue self.beanstalk.watch(tube) else: if self.tubes not in Subscriber.FUN_MAP.keys(): logger.error('tube:{} not register!'.format(self.tubes)) return self.beanstalk.watch(self.tubes) def run(self): self.subscribe() while True: if self.signal_shutdown: break if self.signal_shutdown: logger.info("graceful shutdown") break job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout if not job: continue try: self.on_job(job) self.delete_job(job) except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) except Exception as e: logger.error(e) kicks = job.stats()['kicks'] if kicks < 3: self.bury_job(job) else: message = json.loads(job.body) logger.error("Kicks reach max. Delete the job", extra={'body': message}) self.delete_job(job) @classmethod def on_job(cls, job): start = time.time() msg = json.loads(job.body) logger.info(msg) tube = msg.get('tube') func_name = msg.get('func_name') try: func = Subscriber.FUN_MAP[tube][func_name] kwargs = msg.get('kwargs') func(**kwargs) logger.info(u'{}-{}'.format(func, kwargs)) except Exception as e: logger.error(e.message, exc_info=True) cost = time.time() - start logger.info('{} cost {}s'.format(func_name, cost)) @classmethod def delete_job(cls, job): try: job.delete() except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) @classmethod def bury_job(cls, job): try: job.bury() except beanstalkc.CommandFailed as e: logger.warning(e, exc_info=1) def graceful_shutdown(self): self.signal_shutdown = True
Beim Schreiben des obigen Codes habe ich ein Problem festgestellt:
Registrieren Sie die Funktion über Subscriber Die entsprechende Beziehung zwischen dem Namen und der Funktion selbst besteht in einem Python-Interpreter, das heißt, sie wird in einem Prozess ausgeführt, und der Worker wird asynchron in einem anderen Prozess ausgeführt. Wie kann der Worker auch denselben Abonnenten wie Putter erhalten? Schließlich habe ich festgestellt, dass dieses Problem durch den Dekoratormechanismus von Python gelöst werden kann.
Dieser Satz löste das Abonnentenproblem
import_module_by_str('pear.web.controllers.controller_crawler')
# import_module_by_str 的实现 def import_module_by_str(module_name): if isinstance(module_name, unicode): module_name = str(module_name) __import__(module_name)
Wenn import_module_by_str ausgeführt wird, wird __import__ aufgerufen, um Klassen und Funktionen dynamisch zu laden. Nach dem Laden des Moduls mit der Funktion mithilfe von JobQueue in den Speicher. Beim Ausführen von Woker führt der Python-Interpreter zunächst den mit @ dekorierten Dekoratorcode aus und lädt die entsprechende Beziehung im Abonnenten in den Speicher.
Informationen zur tatsächlichen Verwendung finden Sie unter https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py
Verwandte Empfehlungen:
Detaillierte Erläuterung der Instanz der PHP-Beanstalkd-Nachrichtenwarteschlangenklasse
Das obige ist der detaillierte Inhalt vonSo verwenden Sie Beanstalkd in Python für die asynchrone Aufgabenverarbeitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!