Rumah pembangunan bahagian belakang Tutorial Python Python使用 Beanstalkd 做异步任务处理的方法

Python使用 Beanstalkd 做异步任务处理的方法

Apr 24, 2018 pm 01:35 PM
python Tugasan

这篇文章主要介绍了Python使用 Beanstalkd 做异步任务处理的方法,现在分享给大家,也给大家做个参考。一起过来看看吧

使用 Beanstalkd 作为消息队列服务,然后结合 Python 的装饰器语法实现一个简单的异步任务处理工具.

最终效果

定义任务:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task
Salin selepas log masuk

提交任务:

task_one.put(arg1="a", arg2="b", arg3="c")
Salin selepas log masuk

然后就可以由后台的 work 线程去执行这些任务了。

实现过程

1、了解 Beanstalk Server

Beanstalk is a simple, fast work queue. https://github.com/kr/beanstalkd

Beanstalk 是一个 C 语言实现的消息队列服务。 它提供了通用的接口,最初设计的目的是通过异步运行耗时的任务来减少大量Web应用程序中的页面延迟。针对不同的语言,有不同的 Beanstalkd Client 实现。 Python 里就有 beanstalkc 等。我就是利用 beanstalkc 来作为与 beanstalkd server 通信的工具。

2、任务异步执行实现原理

beanstalkd 只能进行字符串的任务调度。为了让程序支持提交函数和参数,然后由woker执行函数并携带参数。需要一个中间层来将函数与传递的参数注册。

实现主要包括3个部分:

Subscriber: 负责将函数注册到 beanstalk 的一个tube上,实现很简单,注册函数名和函数本身的对应关系。(也就意味着同一个分组(tube)下不能有相同函数名存在)。数据存储在类变量里。

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
Salin selepas log masuk

JobQueue: 方便将一个普通函数转换为具有 Putter 能力的装饰器

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
Salin selepas log masuk

Putter: 将函数名、函数参数、指定的分组组合为一个对象,然后 json 序列化为字符串,最后通过 beanstalkc 推送到beanstalkd 队列。

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()
Salin selepas log masuk

Worker: 从 beanstalkd 队列中取出字符串,然后通过 json.loads 反序列化为对象,获得 函数名、参数和tube。最后从 Subscriber 中获得 函数名对应的函数代码,然后传递参数执行函数。

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True
Salin selepas log masuk

写上面代码的时候,发现一个问题:

通过 Subscriber 注册函数名和函数本身的对应关系,是在一个Python解释器,也就是在一个进程里运行的,而 Worker 又是异步在另外的进程运行,怎么样才能让 Worker 也能拿到和 Putter 一样的 Subscriber。最后发现通过 Python 的装饰器机制可以解决这个问题。

就是这句解决了 Subscriber 的问题

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)
Salin selepas log masuk

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
Salin selepas log masuk

执行 import_module_by_str 时, 会调用 __import__ 动态加载类和函数。将使用了 JobQueue 的函数所在模块加载到内存之后。当 运行 Woker 时,Python 解释器就会先执行 @修饰的装饰器代码,也就会把 Subscriber 中的对应关系加载到内存。

实际使用可以看 https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

相关推荐:

php-beanstalkd消息队列类实例详解

Atas ialah kandungan terperinci Python使用 Beanstalkd 做异步任务处理的方法. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

AI Hentai Generator

AI Hentai Generator

Menjana ai hentai secara percuma.

Artikel Panas

R.E.P.O. Kristal tenaga dijelaskan dan apa yang mereka lakukan (kristal kuning)
3 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Tetapan grafik terbaik
3 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Cara Memperbaiki Audio Jika anda tidak dapat mendengar sesiapa
3 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Cara Membuka Segala -galanya Di Myrise
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌

Alat panas

Notepad++7.3.1

Notepad++7.3.1

Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina

SublimeText3 versi Cina

Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1

Hantar Studio 13.0.1

Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6

Dreamweaver CS6

Alat pembangunan web visual

SublimeText3 versi Mac

SublimeText3 versi Mac

Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Adakah Mysql perlu membayar Adakah Mysql perlu membayar Apr 08, 2025 pm 05:36 PM

MySQL mempunyai versi komuniti percuma dan versi perusahaan berbayar. Versi komuniti boleh digunakan dan diubahsuai secara percuma, tetapi sokongannya terhad dan sesuai untuk aplikasi dengan keperluan kestabilan yang rendah dan keupayaan teknikal yang kuat. Edisi Enterprise menyediakan sokongan komersil yang komprehensif untuk aplikasi yang memerlukan pangkalan data yang stabil, boleh dipercayai, berprestasi tinggi dan bersedia membayar sokongan. Faktor yang dipertimbangkan apabila memilih versi termasuk kritikal aplikasi, belanjawan, dan kemahiran teknikal. Tidak ada pilihan yang sempurna, hanya pilihan yang paling sesuai, dan anda perlu memilih dengan teliti mengikut keadaan tertentu.

Cara Menggunakan MySQL Selepas Pemasangan Cara Menggunakan MySQL Selepas Pemasangan Apr 08, 2025 am 11:48 AM

Artikel ini memperkenalkan operasi pangkalan data MySQL. Pertama, anda perlu memasang klien MySQL, seperti MySqlworkbench atau Command Line Client. 1. Gunakan perintah MySQL-Uroot-P untuk menyambung ke pelayan dan log masuk dengan kata laluan akaun root; 2. Gunakan CreateTatabase untuk membuat pangkalan data, dan gunakan Pilih pangkalan data; 3. Gunakan createtable untuk membuat jadual, menentukan medan dan jenis data; 4. Gunakan InsertInto untuk memasukkan data, data pertanyaan, kemas kini data dengan kemas kini, dan padam data dengan padam. Hanya dengan menguasai langkah -langkah ini, belajar menangani masalah biasa dan mengoptimumkan prestasi pangkalan data anda boleh menggunakan MySQL dengan cekap.

Bagaimana untuk mengoptimumkan prestasi MySQL untuk aplikasi beban tinggi? Bagaimana untuk mengoptimumkan prestasi MySQL untuk aplikasi beban tinggi? Apr 08, 2025 pm 06:03 PM

Panduan Pengoptimuman Prestasi Pangkalan Data MySQL Dalam aplikasi yang berintensifkan sumber, pangkalan data MySQL memainkan peranan penting dan bertanggungjawab untuk menguruskan urus niaga besar-besaran. Walau bagaimanapun, apabila skala aplikasi berkembang, kemunculan prestasi pangkalan data sering menjadi kekangan. Artikel ini akan meneroka satu siri strategi pengoptimuman prestasi MySQL yang berkesan untuk memastikan aplikasi anda tetap cekap dan responsif di bawah beban tinggi. Kami akan menggabungkan kes-kes sebenar untuk menerangkan teknologi utama yang mendalam seperti pengindeksan, pengoptimuman pertanyaan, reka bentuk pangkalan data dan caching. 1. Reka bentuk seni bina pangkalan data dan seni bina pangkalan data yang dioptimumkan adalah asas pengoptimuman prestasi MySQL. Berikut adalah beberapa prinsip teras: Memilih jenis data yang betul dan memilih jenis data terkecil yang memenuhi keperluan bukan sahaja dapat menjimatkan ruang penyimpanan, tetapi juga meningkatkan kelajuan pemprosesan data.

Adakah mysql memerlukan internet Adakah mysql memerlukan internet Apr 08, 2025 pm 02:18 PM

MySQL boleh berjalan tanpa sambungan rangkaian untuk penyimpanan dan pengurusan data asas. Walau bagaimanapun, sambungan rangkaian diperlukan untuk interaksi dengan sistem lain, akses jauh, atau menggunakan ciri -ciri canggih seperti replikasi dan clustering. Di samping itu, langkah -langkah keselamatan (seperti firewall), pengoptimuman prestasi (pilih sambungan rangkaian yang betul), dan sandaran data adalah penting untuk menyambung ke Internet.

Hadidb: Pangkalan data yang ringan dan berskala mendatar di Python Hadidb: Pangkalan data yang ringan dan berskala mendatar di Python Apr 08, 2025 pm 06:12 PM

Hadidb: Pangkalan data Python yang ringan, tinggi, Hadidb (Hadidb) adalah pangkalan data ringan yang ditulis dalam Python, dengan tahap skalabilitas yang tinggi. Pasang HadIdb menggunakan pemasangan PIP: Pengurusan Pengguna PipInstallHadidB Buat Pengguna: CreateUser () Kaedah untuk membuat pengguna baru. Kaedah pengesahan () mengesahkan identiti pengguna. dariHadidb.OperationImportuserer_Obj = user ("admin", "admin") user_obj.

Kaedah Navicat untuk melihat kata laluan pangkalan data MongoDB Kaedah Navicat untuk melihat kata laluan pangkalan data MongoDB Apr 08, 2025 pm 09:39 PM

Tidak mustahil untuk melihat kata laluan MongoDB secara langsung melalui Navicat kerana ia disimpan sebagai nilai hash. Cara mendapatkan kata laluan yang hilang: 1. Tetapkan semula kata laluan; 2. Periksa fail konfigurasi (mungkin mengandungi nilai hash); 3. Semak Kod (boleh kata laluan Hardcode).

Bolehkah Mysql Workbench menyambung ke Mariadb Bolehkah Mysql Workbench menyambung ke Mariadb Apr 08, 2025 pm 02:33 PM

MySQL Workbench boleh menyambung ke MariaDB, dengan syarat bahawa konfigurasi adalah betul. Mula -mula pilih "MariaDB" sebagai jenis penyambung. Dalam konfigurasi sambungan, tetapkan host, port, pengguna, kata laluan, dan pangkalan data dengan betul. Apabila menguji sambungan, periksa bahawa perkhidmatan MariaDB dimulakan, sama ada nama pengguna dan kata laluan betul, sama ada nombor port betul, sama ada firewall membenarkan sambungan, dan sama ada pangkalan data itu wujud. Dalam penggunaan lanjutan, gunakan teknologi penyatuan sambungan untuk mengoptimumkan prestasi. Kesilapan biasa termasuk kebenaran yang tidak mencukupi, masalah sambungan rangkaian, dan lain -lain. Apabila kesilapan debugging, dengan teliti menganalisis maklumat ralat dan gunakan alat penyahpepijatan. Mengoptimumkan konfigurasi rangkaian dapat meningkatkan prestasi

Adakah mysql memerlukan pelayan Adakah mysql memerlukan pelayan Apr 08, 2025 pm 02:12 PM

Untuk persekitaran pengeluaran, pelayan biasanya diperlukan untuk menjalankan MySQL, atas alasan termasuk prestasi, kebolehpercayaan, keselamatan, dan skalabilitas. Pelayan biasanya mempunyai perkakasan yang lebih kuat, konfigurasi berlebihan dan langkah keselamatan yang lebih ketat. Untuk aplikasi kecil, rendah, MySQL boleh dijalankan pada mesin tempatan, tetapi penggunaan sumber, risiko keselamatan dan kos penyelenggaraan perlu dipertimbangkan dengan teliti. Untuk kebolehpercayaan dan keselamatan yang lebih besar, MySQL harus digunakan di awan atau pelayan lain. Memilih konfigurasi pelayan yang sesuai memerlukan penilaian berdasarkan beban aplikasi dan jumlah data.

See all articles