So verwenden Sie Beanstalkd in Python für die asynchrone Aufgabenverarbeitung

不言
Freigeben: 2018-04-24 13:35:09
Original
2952 Leute haben es durchsucht

In diesem Artikel wird hauptsächlich die Methode zur Verwendung von Beanstalkd für die asynchrone Aufgabenverarbeitung in Python vorgestellt. Jetzt teile ich ihn mit Ihnen und gebe ihn als Referenz. Schauen wir uns das gemeinsam an

Verwenden Sie Beanstalkd als Nachrichtenwarteschlangendienst und kombinieren Sie es dann mit der Dekorator-Syntax von Python, um ein einfaches asynchrones Aufgabenverarbeitungstool zu implementieren.

Endeffekt

Aufgabe definieren:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task
Nach dem Login kopieren

Aufgabe senden:

task_one.put(arg1="a", arg2="b", arg3="c")
Nach dem Login kopieren

Dann kann der Hintergrundarbeitsthread diese Aufgaben ausführen.

Implementierungsprozess

Beanstalk Server verstehen

Beanstalk ist eine einfache, schnelle Arbeitswarteschlange /kr/beanstalkd

Beanstalk ist ein in der Sprache C implementierter Nachrichtenwarteschlangendienst. Es bietet eine gemeinsame Schnittstelle und wurde ursprünglich entwickelt, um die Seitenlatenz in großen Webanwendungen durch die asynchrone Ausführung zeitaufwändiger Aufgaben zu reduzieren. Es gibt verschiedene Beanstalkd-Client-Implementierungen für verschiedene Sprachen. Es gibt Beanstalkc usw. in Python. Ich verwende Beanstalkc als Tool zur Kommunikation mit dem Beanstalkd-Server.

2. Implementierungsprinzip der asynchronen Aufgabenausführung

beanstalkd kann nur eine String-Aufgabenplanung durchführen. Damit das Programm das Senden von Funktionen und Parametern unterstützt, wird die Funktion dann vom Worker ausgeführt und die Parameter werden übertragen. Um Funktionen mit übergebenen Parametern zu registrieren, ist eine Zwischenschicht erforderlich.

Die Implementierung besteht hauptsächlich aus 3 Teilen:

Abonnent: Verantwortlich für die Registrierung der Funktion in einer Tube in Beanstalk. Die Implementierung ist sehr einfach und registriert die Entsprechung zwischen dem Funktionsnamen und der Funktion selbst. (Das bedeutet, dass derselbe Funktionsname nicht in derselben Gruppe (Röhre) existieren kann). Daten werden in Klassenvariablen gespeichert.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
Nach dem Login kopieren

JobQueue: Praktisch, um eine gewöhnliche Funktion in einen Decorator mit Putter-Fähigkeit umzuwandeln

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
Nach dem Login kopieren

Putter: Kombinieren Sie den Funktionsnamen, die Funktionsparameter und die angegebene Gruppierung zu einem Objekt, serialisieren Sie dann JSON in einen String und schieben Sie ihn schließlich über Beanstalkc in die Beanstalkd-Warteschlange.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()
Nach dem Login kopieren

Worker: Nehmen Sie die Zeichenfolge aus der Beanstalkd-Warteschlange und deserialisieren Sie sie dann über json.loads in ein Objekt, um den Funktionsnamen zu erhalten und Parameter und Rohr. Schließlich wird der dem Funktionsnamen entsprechende Funktionscode vom Abonnenten abgerufen und anschließend werden die Parameter zum Ausführen der Funktion übergeben.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True
Nach dem Login kopieren

Beim Schreiben des obigen Codes habe ich ein Problem festgestellt:

Registrieren Sie die Funktion über Subscriber Die entsprechende Beziehung zwischen dem Namen und der Funktion selbst besteht in einem Python-Interpreter, das heißt, sie wird in einem Prozess ausgeführt, und der Worker wird asynchron in einem anderen Prozess ausgeführt. Wie kann der Worker auch denselben Abonnenten wie Putter erhalten? Schließlich habe ich festgestellt, dass dieses Problem durch den Dekoratormechanismus von Python gelöst werden kann.

Dieser Satz löste das Abonnentenproblem

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)
Nach dem Login kopieren

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
Nach dem Login kopieren

Wenn import_module_by_str ausgeführt wird, wird __import__ aufgerufen, um Klassen und Funktionen dynamisch zu laden. Nach dem Laden des Moduls mit der Funktion mithilfe von JobQueue in den Speicher. Beim Ausführen von Woker führt der Python-Interpreter zunächst den mit @ dekorierten Dekoratorcode aus und lädt die entsprechende Beziehung im Abonnenten in den Speicher.

Informationen zur tatsächlichen Verwendung finden Sie unter https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

Verwandte Empfehlungen:

Detaillierte Erläuterung der Instanz der PHP-Beanstalkd-Nachrichtenwarteschlangenklasse

Das obige ist der detaillierte Inhalt vonSo verwenden Sie Beanstalkd in Python für die asynchrone Aufgabenverarbeitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage