Heim Backend-Entwicklung Python-Tutorial So verwenden Sie Beanstalkd in Python für die asynchrone Aufgabenverarbeitung

So verwenden Sie Beanstalkd in Python für die asynchrone Aufgabenverarbeitung

Apr 24, 2018 pm 01:35 PM
python 任务

In diesem Artikel wird hauptsächlich die Methode zur Verwendung von Beanstalkd für die asynchrone Aufgabenverarbeitung in Python vorgestellt. Jetzt teile ich ihn mit Ihnen und gebe ihn als Referenz. Schauen wir uns das gemeinsam an

Verwenden Sie Beanstalkd als Nachrichtenwarteschlangendienst und kombinieren Sie es dann mit der Dekorator-Syntax von Python, um ein einfaches asynchrones Aufgabenverarbeitungstool zu implementieren.

Endeffekt

Aufgabe definieren:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task
Nach dem Login kopieren

Aufgabe senden:

task_one.put(arg1="a", arg2="b", arg3="c")
Nach dem Login kopieren

Dann kann der Hintergrundarbeitsthread diese Aufgaben ausführen.

Implementierungsprozess

Beanstalk Server verstehen

Beanstalk ist eine einfache, schnelle Arbeitswarteschlange /kr/beanstalkd

Beanstalk ist ein in der Sprache C implementierter Nachrichtenwarteschlangendienst. Es bietet eine gemeinsame Schnittstelle und wurde ursprünglich entwickelt, um die Seitenlatenz in großen Webanwendungen durch die asynchrone Ausführung zeitaufwändiger Aufgaben zu reduzieren. Es gibt verschiedene Beanstalkd-Client-Implementierungen für verschiedene Sprachen. Es gibt Beanstalkc usw. in Python. Ich verwende Beanstalkc als Tool zur Kommunikation mit dem Beanstalkd-Server.

2. Implementierungsprinzip der asynchronen Aufgabenausführung

beanstalkd kann nur eine String-Aufgabenplanung durchführen. Damit das Programm das Senden von Funktionen und Parametern unterstützt, wird die Funktion dann vom Worker ausgeführt und die Parameter werden übertragen. Um Funktionen mit übergebenen Parametern zu registrieren, ist eine Zwischenschicht erforderlich.

Die Implementierung besteht hauptsächlich aus 3 Teilen:

Abonnent: Verantwortlich für die Registrierung der Funktion in einer Tube in Beanstalk. Die Implementierung ist sehr einfach und registriert die Entsprechung zwischen dem Funktionsnamen und der Funktion selbst. (Das bedeutet, dass derselbe Funktionsname nicht in derselben Gruppe (Röhre) existieren kann). Daten werden in Klassenvariablen gespeichert.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
Nach dem Login kopieren

JobQueue: Praktisch, um eine gewöhnliche Funktion in einen Decorator mit Putter-Fähigkeit umzuwandeln

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
Nach dem Login kopieren

Putter: Kombinieren Sie den Funktionsnamen, die Funktionsparameter und die angegebene Gruppierung zu einem Objekt, serialisieren Sie dann JSON in einen String und schieben Sie ihn schließlich über Beanstalkc in die Beanstalkd-Warteschlange.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()
Nach dem Login kopieren

Worker: Nehmen Sie die Zeichenfolge aus der Beanstalkd-Warteschlange und deserialisieren Sie sie dann über json.loads in ein Objekt, um den Funktionsnamen zu erhalten und Parameter und Rohr. Schließlich wird der dem Funktionsnamen entsprechende Funktionscode vom Abonnenten abgerufen und anschließend werden die Parameter zum Ausführen der Funktion übergeben.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True
Nach dem Login kopieren

Beim Schreiben des obigen Codes habe ich ein Problem festgestellt:

Registrieren Sie die Funktion über Subscriber Die entsprechende Beziehung zwischen dem Namen und der Funktion selbst besteht in einem Python-Interpreter, das heißt, sie wird in einem Prozess ausgeführt, und der Worker wird asynchron in einem anderen Prozess ausgeführt. Wie kann der Worker auch denselben Abonnenten wie Putter erhalten? Schließlich habe ich festgestellt, dass dieses Problem durch den Dekoratormechanismus von Python gelöst werden kann.

Dieser Satz löste das Abonnentenproblem

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)
Nach dem Login kopieren

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
Nach dem Login kopieren

Wenn import_module_by_str ausgeführt wird, wird __import__ aufgerufen, um Klassen und Funktionen dynamisch zu laden. Nach dem Laden des Moduls mit der Funktion mithilfe von JobQueue in den Speicher. Beim Ausführen von Woker führt der Python-Interpreter zunächst den mit @ dekorierten Dekoratorcode aus und lädt die entsprechende Beziehung im Abonnenten in den Speicher.

Informationen zur tatsächlichen Verwendung finden Sie unter https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

Verwandte Empfehlungen:

Detaillierte Erläuterung der Instanz der PHP-Beanstalkd-Nachrichtenwarteschlangenklasse

Das obige ist der detaillierte Inhalt vonSo verwenden Sie Beanstalkd in Python für die asynchrone Aufgabenverarbeitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

AI Hentai Generator

AI Hentai Generator

Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

R.E.P.O. Energiekristalle erklärten und was sie tun (gelber Kristall)
3 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Beste grafische Einstellungen
3 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. So reparieren Sie Audio, wenn Sie niemanden hören können
3 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Wie man alles in Myrise freischaltet
4 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌

Heiße Werkzeuge

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Muss MySQL bezahlen? Muss MySQL bezahlen? Apr 08, 2025 pm 05:36 PM

MySQL hat eine kostenlose Community -Version und eine kostenpflichtige Enterprise -Version. Die Community -Version kann kostenlos verwendet und geändert werden, die Unterstützung ist jedoch begrenzt und für Anwendungen mit geringen Stabilitätsanforderungen und starken technischen Funktionen geeignet. Die Enterprise Edition bietet umfassende kommerzielle Unterstützung für Anwendungen, die eine stabile, zuverlässige Hochleistungsdatenbank erfordern und bereit sind, Unterstützung zu bezahlen. Zu den Faktoren, die bei der Auswahl einer Version berücksichtigt werden, gehören Kritikalität, Budgetierung und technische Fähigkeiten von Anwendungen. Es gibt keine perfekte Option, nur die am besten geeignete Option, und Sie müssen die spezifische Situation sorgfältig auswählen.

So verwenden Sie MySQL nach der Installation So verwenden Sie MySQL nach der Installation Apr 08, 2025 am 11:48 AM

Der Artikel führt den Betrieb der MySQL -Datenbank vor. Zunächst müssen Sie einen MySQL -Client wie MySQLworkBench oder Befehlszeilen -Client installieren. 1. Verwenden Sie den Befehl mySQL-uroot-P, um eine Verbindung zum Server herzustellen und sich mit dem Stammkonto-Passwort anzumelden. 2. Verwenden Sie die Erstellung von Createdatabase, um eine Datenbank zu erstellen, und verwenden Sie eine Datenbank aus. 3.. Verwenden Sie CreateTable, um eine Tabelle zu erstellen, Felder und Datentypen zu definieren. 4. Verwenden Sie InsertInto, um Daten einzulegen, Daten abzufragen, Daten nach Aktualisierung zu aktualisieren und Daten nach Löschen zu löschen. Nur indem Sie diese Schritte beherrschen, lernen, mit gemeinsamen Problemen umzugehen und die Datenbankleistung zu optimieren, können Sie MySQL effizient verwenden.

Navicat -Methode zum Anzeigen von MongoDB -Datenbankkennwort Navicat -Methode zum Anzeigen von MongoDB -Datenbankkennwort Apr 08, 2025 pm 09:39 PM

Es ist unmöglich, das MongoDB -Passwort direkt über Navicat anzuzeigen, da es als Hash -Werte gespeichert ist. So rufen Sie verlorene Passwörter ab: 1. Passwörter zurücksetzen; 2. Überprüfen Sie die Konfigurationsdateien (können Hash -Werte enthalten). 3. Überprüfen Sie Codes (May Hardcode -Passwörter).

Braucht MySQL das Internet? Braucht MySQL das Internet? Apr 08, 2025 pm 02:18 PM

MySQL kann ohne Netzwerkverbindungen für die grundlegende Datenspeicherung und -verwaltung ausgeführt werden. Für die Interaktion mit anderen Systemen, Remotezugriff oder Verwendung erweiterte Funktionen wie Replikation und Clustering ist jedoch eine Netzwerkverbindung erforderlich. Darüber hinaus sind Sicherheitsmaßnahmen (wie Firewalls), Leistungsoptimierung (Wählen Sie die richtige Netzwerkverbindung) und die Datensicherung für die Verbindung zum Internet von entscheidender Bedeutung.

Wie optimieren Sie die MySQL-Leistung für Hochlastanwendungen? Wie optimieren Sie die MySQL-Leistung für Hochlastanwendungen? Apr 08, 2025 pm 06:03 PM

Die MySQL-Datenbankleistung Optimierungshandbuch In ressourcenintensiven Anwendungen spielt die MySQL-Datenbank eine entscheidende Rolle und ist für die Verwaltung massiver Transaktionen verantwortlich. Mit der Erweiterung der Anwendung werden jedoch die Datenbankleistung Engpässe häufig zu einer Einschränkung. In diesem Artikel werden eine Reihe effektiver Strategien zur Leistungsoptimierung von MySQL -Leistung untersucht, um sicherzustellen, dass Ihre Anwendung unter hohen Lasten effizient und reaktionsschnell bleibt. Wir werden tatsächliche Fälle kombinieren, um eingehende Schlüsseltechnologien wie Indexierung, Abfrageoptimierung, Datenbankdesign und Caching zu erklären. 1. Das Design der Datenbankarchitektur und die optimierte Datenbankarchitektur sind der Eckpfeiler der MySQL -Leistungsoptimierung. Hier sind einige Kernprinzipien: Die Auswahl des richtigen Datentyps und die Auswahl des kleinsten Datentyps, der den Anforderungen entspricht, kann nicht nur Speicherplatz speichern, sondern auch die Datenverarbeitungsgeschwindigkeit verbessern.

Hadidb: Eine leichte, horizontal skalierbare Datenbank in Python Hadidb: Eine leichte, horizontal skalierbare Datenbank in Python Apr 08, 2025 pm 06:12 PM

Hadidb: Eine leichte, hochrangige skalierbare Python-Datenbank Hadidb (HadIDB) ist eine leichte Datenbank in Python mit einem hohen Maß an Skalierbarkeit. Installieren Sie HadIDB mithilfe der PIP -Installation: PipinstallHadIDB -Benutzerverwaltung erstellen Benutzer: createUser (), um einen neuen Benutzer zu erstellen. Die Authentication () -Methode authentifiziert die Identität des Benutzers. fromHadidb.operationImportUseruser_obj = user ("admin", "admin") user_obj.

Kann sich MySQL Workbench mit Mariadb verbinden? Kann sich MySQL Workbench mit Mariadb verbinden? Apr 08, 2025 pm 02:33 PM

MySQL Workbench kann eine Verbindung zu MariADB herstellen, vorausgesetzt, die Konfiguration ist korrekt. Wählen Sie zuerst "Mariadb" als Anschlusstyp. Stellen Sie in der Verbindungskonfiguration Host, Port, Benutzer, Kennwort und Datenbank korrekt ein. Überprüfen Sie beim Testen der Verbindung, ob der Mariadb -Dienst gestartet wird, ob der Benutzername und das Passwort korrekt sind, ob die Portnummer korrekt ist, ob die Firewall Verbindungen zulässt und ob die Datenbank vorhanden ist. Verwenden Sie in fortschrittlicher Verwendung die Verbindungspooling -Technologie, um die Leistung zu optimieren. Zu den häufigen Fehlern gehören unzureichende Berechtigungen, Probleme mit Netzwerkverbindung usw. Bei Debugging -Fehlern, sorgfältige Analyse von Fehlerinformationen und verwenden Sie Debugging -Tools. Optimierung der Netzwerkkonfiguration kann die Leistung verbessern

Benötigt MySQL einen Server? Benötigt MySQL einen Server? Apr 08, 2025 pm 02:12 PM

Für Produktionsumgebungen ist in der Regel ein Server erforderlich, um MySQL auszuführen, aus Gründen, einschließlich Leistung, Zuverlässigkeit, Sicherheit und Skalierbarkeit. Server haben normalerweise leistungsstärkere Hardware, redundante Konfigurationen und strengere Sicherheitsmaßnahmen. Bei kleinen Anwendungen mit niedriger Last kann MySQL auf lokalen Maschinen ausgeführt werden, aber Ressourcenverbrauch, Sicherheitsrisiken und Wartungskosten müssen sorgfältig berücksichtigt werden. Für eine größere Zuverlässigkeit und Sicherheit sollte MySQL auf Cloud oder anderen Servern bereitgestellt werden. Die Auswahl der entsprechenden Serverkonfiguration erfordert eine Bewertung basierend auf Anwendungslast und Datenvolumen.

See all articles