Maison développement back-end Tutoriel Python Comment utiliser Beanstalkd en Python pour le traitement des tâches asynchrones

Comment utiliser Beanstalkd en Python pour le traitement des tâches asynchrones

Apr 24, 2018 pm 01:35 PM
python 任务

Cet article présente principalement la méthode d'utilisation de Beanstalkd pour le traitement des tâches asynchrones en Python. Maintenant, je le partage avec vous et le donne comme référence. Jetons un coup d'œil ensemble

Utilisez Beanstalkd comme service de file d'attente de messages, puis combinez-le avec la syntaxe du décorateur de Python pour implémenter un outil de traitement de tâches asynchrone simple.

Effet final

Définir la tâche :

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task
Copier après la connexion

Soumettre la tâche :

task_one.put(arg1="a", arg2="b", arg3="c")
Copier après la connexion

Ensuite, le fil de travail en arrière-plan peut effectuer ces tâches.

Processus de mise en œuvre

1. Comprendre le serveur Beanstalk

Beanstalk est une file d'attente de travail simple et rapide https://github.com. /kr/beanstalkd

Beanstalk est un service de file d'attente de messages implémenté en langage C. Il fournit une interface commune et a été initialement conçu pour réduire la latence des pages dans les applications Web à grande échelle en exécutant des tâches chronophages de manière asynchrone. Il existe différentes implémentations de Beanstalkd Client pour différentes langues. Il existe des beanstalkc et ainsi de suite en Python. J'utilise beanstalkc comme outil pour communiquer avec le serveur beanstalkd.

2. Principe de mise en œuvre de l'exécution asynchrone des tâches

beanstalkd ne peut effectuer que la planification de tâches de chaîne. Pour que le programme prenne en charge la soumission de fonctions et de paramètres, la fonction est ensuite exécutée par le travailleur et les paramètres sont transportés. Une couche intermédiaire est nécessaire pour enregistrer les fonctions avec les paramètres transmis.

L'implémentation comprend principalement 3 parties :

Abonné : responsable de l'enregistrement de la fonction sur un tube dans beanstalk. L'implémentation est très simple, enregistrant la relation correspondante entre le nom de la fonction et la fonction elle-même. . (Cela signifie que le même nom de fonction ne peut pas exister dans le même groupe (tube)). Les données sont stockées dans des variables de classe.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
Copier après la connexion

JobQueue : convertit facilement une fonction ordinaire en un décorateur avec la capacité Putter

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
Copier après la connexion

Putter : combinez le nom de la fonction, les paramètres de la fonction et le regroupement spécifié dans un objet, puis sérialisez json dans une chaîne, et enfin poussez-le vers la file d'attente beanstalkd via beanstalkc.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()
Copier après la connexion

Worker : prenez la chaîne de la file d'attente beanstalkd, puis désérialisez-la en un objet via json.loads pour obtenir le nom de la fonction et paramètres et tube. Enfin, le code de fonction correspondant au nom de la fonction est obtenu de l'abonné, puis les paramètres sont transmis pour exécuter la fonction.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True
Copier après la connexion

Lors de l'écriture du code ci-dessus, j'ai trouvé un problème :

Enregistrer la fonction via l'abonné La relation correspondante entre le nom et la fonction elle-même est dans un interpréteur Python, c'est-à-dire qu'elle s'exécute dans un processus et que le Worker s'exécute de manière asynchrone dans un autre processus. Comment le Worker peut-il également obtenir le même abonné que Putter ? Enfin, j'ai découvert que ce problème pouvait être résolu grâce au mécanisme de décoration de Python.

Cette phrase a résolu le problème de l'abonné

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)
Copier après la connexion

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
Copier après la connexion

Lorsque import_module_by_str est exécuté, __import__ est appelé pour charger dynamiquement les classes et les fonctions. Après avoir chargé le module contenant la fonction utilisant JobQueue en mémoire. Lors de l'exécution de Woker, l'interpréteur Python exécutera d'abord le code du décorateur @-decorated et chargera la relation correspondante dans Subscriber en mémoire.

Pour une utilisation réelle, veuillez consulter https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

Recommandations associées :

Explication détaillée de l'instance de classe de file d'attente de messages php-beanstalkd

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

AI Hentai Generator

AI Hentai Generator

Générez AI Hentai gratuitement.

Article chaud

R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Meilleurs paramètres graphiques
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Comment réparer l'audio si vous n'entendez personne
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Comment déverrouiller tout dans Myrise
4 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

MySQL doit-il payer MySQL doit-il payer Apr 08, 2025 pm 05:36 PM

MySQL a une version communautaire gratuite et une version d'entreprise payante. La version communautaire peut être utilisée et modifiée gratuitement, mais le support est limité et convient aux applications avec des exigences de stabilité faibles et des capacités techniques solides. L'Enterprise Edition fournit une prise en charge commerciale complète pour les applications qui nécessitent une base de données stable, fiable et haute performance et disposées à payer pour le soutien. Les facteurs pris en compte lors du choix d'une version comprennent la criticité des applications, la budgétisation et les compétences techniques. Il n'y a pas d'option parfaite, seulement l'option la plus appropriée, et vous devez choisir soigneusement en fonction de la situation spécifique.

Comment utiliser MySQL après l'installation Comment utiliser MySQL après l'installation Apr 08, 2025 am 11:48 AM

L'article présente le fonctionnement de la base de données MySQL. Tout d'abord, vous devez installer un client MySQL, tel que MySQLWorkBench ou le client de ligne de commande. 1. Utilisez la commande MySQL-UROot-P pour vous connecter au serveur et connecter avec le mot de passe du compte racine; 2. Utilisez Createdatabase pour créer une base de données et utilisez Sélectionner une base de données; 3. Utilisez CreateTable pour créer une table, définissez des champs et des types de données; 4. Utilisez InsertInto pour insérer des données, remettre en question les données, mettre à jour les données par mise à jour et supprimer les données par Supprimer. Ce n'est qu'en maîtrisant ces étapes, en apprenant à faire face à des problèmes courants et à l'optimisation des performances de la base de données que vous pouvez utiliser efficacement MySQL.

Comment optimiser les performances MySQL pour les applications de haute charge? Comment optimiser les performances MySQL pour les applications de haute charge? Apr 08, 2025 pm 06:03 PM

Guide d'optimisation des performances de la base de données MySQL dans les applications à forte intensité de ressources, la base de données MySQL joue un rôle crucial et est responsable de la gestion des transactions massives. Cependant, à mesure que l'échelle de l'application se développe, les goulots d'étranglement des performances de la base de données deviennent souvent une contrainte. Cet article explorera une série de stratégies efficaces d'optimisation des performances MySQL pour garantir que votre application reste efficace et réactive dans des charges élevées. Nous combinerons des cas réels pour expliquer les technologies clés approfondies telles que l'indexation, l'optimisation des requêtes, la conception de la base de données et la mise en cache. 1. La conception de l'architecture de la base de données et l'architecture optimisée de la base de données sont la pierre angulaire de l'optimisation des performances MySQL. Voici quelques principes de base: sélectionner le bon type de données et sélectionner le plus petit type de données qui répond aux besoins peut non seulement économiser un espace de stockage, mais également améliorer la vitesse de traitement des données.

MySQL a-t-il besoin d'Internet MySQL a-t-il besoin d'Internet Apr 08, 2025 pm 02:18 PM

MySQL peut s'exécuter sans connexions réseau pour le stockage et la gestion des données de base. Cependant, la connexion réseau est requise pour l'interaction avec d'autres systèmes, l'accès à distance ou l'utilisation de fonctionnalités avancées telles que la réplication et le clustering. De plus, les mesures de sécurité (telles que les pare-feu), l'optimisation des performances (choisissez la bonne connexion réseau) et la sauvegarde des données sont essentielles pour se connecter à Internet.

HaDIDB: une base de données légère et évolutive horizontalement dans Python HaDIDB: une base de données légère et évolutive horizontalement dans Python Apr 08, 2025 pm 06:12 PM

HaDIDB: Une base de données Python évolutive de haut niveau légère HaDIDB (HaDIDB) est une base de données légère écrite en Python, avec un niveau élevé d'évolutivité. Installez HaDIDB à l'aide de l'installation PIP: PiPinStallHaDIDB User Management Créer un utilisateur: CreateUser () pour créer un nouvel utilisateur. La méthode Authentication () authentifie l'identité de l'utilisateur. FromHadidb.OperationMportUserUser_OBJ = User ("Admin", "Admin") User_OBJ.

Méthode de Navicat pour afficher le mot de passe de la base de données MongoDB Méthode de Navicat pour afficher le mot de passe de la base de données MongoDB Apr 08, 2025 pm 09:39 PM

Il est impossible de visualiser le mot de passe MongoDB directement via NAVICAT car il est stocké sous forme de valeurs de hachage. Comment récupérer les mots de passe perdus: 1. Réinitialiser les mots de passe; 2. Vérifiez les fichiers de configuration (peut contenir des valeurs de hachage); 3. Vérifiez les codes (May Code Hardcode).

MySQL Workbench peut-il se connecter à MariaDB MySQL Workbench peut-il se connecter à MariaDB Apr 08, 2025 pm 02:33 PM

MySQL Workbench peut se connecter à MARIADB, à condition que la configuration soit correcte. Sélectionnez d'abord "MariADB" comme type de connecteur. Dans la configuration de la connexion, définissez correctement l'hôte, le port, l'utilisateur, le mot de passe et la base de données. Lorsque vous testez la connexion, vérifiez que le service MARIADB est démarré, si le nom d'utilisateur et le mot de passe sont corrects, si le numéro de port est correct, si le pare-feu autorise les connexions et si la base de données existe. Dans une utilisation avancée, utilisez la technologie de mise en commun des connexions pour optimiser les performances. Les erreurs courantes incluent des autorisations insuffisantes, des problèmes de connexion réseau, etc. Lors des erreurs de débogage, analysez soigneusement les informations d'erreur et utilisez des outils de débogage. L'optimisation de la configuration du réseau peut améliorer les performances

MySQL a-t-il besoin d'un serveur MySQL a-t-il besoin d'un serveur Apr 08, 2025 pm 02:12 PM

Pour les environnements de production, un serveur est généralement nécessaire pour exécuter MySQL, pour des raisons, notamment les performances, la fiabilité, la sécurité et l'évolutivité. Les serveurs ont généralement un matériel plus puissant, des configurations redondantes et des mesures de sécurité plus strictes. Pour les petites applications à faible charge, MySQL peut être exécutée sur des machines locales, mais la consommation de ressources, les risques de sécurité et les coûts de maintenance doivent être soigneusement pris en considération. Pour une plus grande fiabilité et sécurité, MySQL doit être déployé sur le cloud ou d'autres serveurs. Le choix de la configuration du serveur approprié nécessite une évaluation en fonction de la charge d'application et du volume de données.

See all articles