首頁 後端開發 Python教學 Python使用 Beanstalkd 做非同步任務處理的方法

Python使用 Beanstalkd 做非同步任務處理的方法

Apr 24, 2018 pm 01:35 PM
python 任務

這篇文章主要介紹了Python使用 Beanstalkd 做非同步任務處理的方法,現在分享給大家,也給大家做個參考。一起來看看吧

使用Beanstalkd 作為訊息佇列服務,然後結合Python 的裝飾器語法實作一個簡單的非同步任務處理工具.

最終效果

#定義任務:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task
登入後複製

提交任務:

task_one.put(arg1="a", arg2="b", arg3="c")
登入後複製

然後就可以由後台的work 執行緒去執行這些任務了。

實作過程

1、了解Beanstalk Server

#Beanstalk is a simple, fast work queue. https://github.com /kr/beanstalkd

Beanstalk 是一個C 語言實作的訊息佇列服務。它提供了通用的接口,最初設計的目的是透過非同步運行耗時的任務來減少大量Web應​​用程式中的頁面延遲。針對不同的語言,有不同的 Beanstalkd Client 實作。 Python 裡就有 beanstalkc 等。我就是利用 beanstalkc 來作為與 beanstalkd server 溝通的工具。

2、任務非同步執行實作原理

#beanstalkd 只能進行字串的任務調度。為了讓程式支援提交函數和參數,然後由woker執行函數並攜帶參數。需要一個中間層來將函數與傳遞的參數註冊。

實作主要包含3個部分:

Subscriber: 負責將函數註冊到 beanstalk 的一個tube上,實作很簡單,註冊函數名稱和函數本身的對應關係。 (也就意味著同一個分組(tube)下不能有相同函數名存在)。資料儲存在類別變數裡。

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
登入後複製

JobQueue: 方便將一個普通函數轉換為具有Putter 能力的裝飾器

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
登入後複製

Putter: 將函數名稱、函數參數、指定的分組組合為一個對象,然後json 序列化為字串,最後透過beanstalkc 推送到beanstalkd 佇列。

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()
登入後複製

Worker: 從beanstalkd 佇列中取出字串,然後透過json.loads 反序列化為對象,獲得函數名稱、參數和tube 。最後從 Subscriber 得到 函數名稱對應的函數程式碼,然後傳遞參數執行函數。

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True
登入後複製

寫上面程式碼的時候,發現一個問題:

透過Subscriber 註冊函數名稱和函數本身的對應關係,是在一個Python解釋器,也就是在一個進程裡運行的,而Worker 又是異步在另外的進程運行,怎麼樣才能讓Worker 也能拿到和Putter 一樣的Subscriber。最後發現透過 Python 的裝飾器機制可以解決這個問題。

就是這句解決了Subscriber 的問題

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)
登入後複製

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
登入後複製

執行import_module_by_str 時,會呼叫__import__ 動態載入類別和函數。將使用了 JobQueue 的函數所在模組載入到記憶體之後。當 執行 Woker 時,Python 解釋器就會先執行 @修飾的裝飾器程式碼,也會把 Subscriber 中的對應關係載入到記憶體。

實際使用可以看https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

相關建議:

php-beanstalkd訊息佇列類別實例詳解

以上是Python使用 Beanstalkd 做非同步任務處理的方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
4 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

mysql 是否要付費 mysql 是否要付費 Apr 08, 2025 pm 05:36 PM

MySQL 有免費的社區版和收費的企業版。社區版可免費使用和修改,但支持有限,適合穩定性要求不高、技術能力強的應用。企業版提供全面商業支持,適合需要穩定可靠、高性能數據庫且願意為支持買單的應用。選擇版本時考慮的因素包括應用關鍵性、預算和技術技能。沒有完美的選項,只有最合適的方案,需根據具體情況謹慎選擇。

mysql安裝後怎麼使用 mysql安裝後怎麼使用 Apr 08, 2025 am 11:48 AM

文章介紹了MySQL數據庫的上手操作。首先,需安裝MySQL客戶端,如MySQLWorkbench或命令行客戶端。 1.使用mysql-uroot-p命令連接服務器,並使用root賬戶密碼登錄;2.使用CREATEDATABASE創建數據庫,USE選擇數據庫;3.使用CREATETABLE創建表,定義字段及數據類型;4.使用INSERTINTO插入數據,SELECT查詢數據,UPDATE更新數據,DELETE刪除數據。熟練掌握這些步驟,並學習處理常見問題和優化數據庫性能,才能高效使用MySQL。

如何針對高負載應用程序優化 MySQL 性能? 如何針對高負載應用程序優化 MySQL 性能? Apr 08, 2025 pm 06:03 PM

MySQL數據庫性能優化指南在資源密集型應用中,MySQL數據庫扮演著至關重要的角色,負責管理海量事務。然而,隨著應用規模的擴大,數據庫性能瓶頸往往成為製約因素。本文將探討一系列行之有效的MySQL性能優化策略,確保您的應用在高負載下依然保持高效響應。我們將結合實際案例,深入講解索引、查詢優化、數據庫設計以及緩存等關鍵技術。 1.數據庫架構設計優化合理的數據庫架構是MySQL性能優化的基石。以下是一些核心原則:選擇合適的數據類型選擇最小的、符合需求的數據類型,既能節省存儲空間,又能提升數據處理速度

HadiDB:Python 中的輕量級、可水平擴展的數據庫 HadiDB:Python 中的輕量級、可水平擴展的數據庫 Apr 08, 2025 pm 06:12 PM

HadiDB:輕量級、高水平可擴展的Python數據庫HadiDB(hadidb)是一個用Python編寫的輕量級數據庫,具備高度水平的可擴展性。安裝HadiDB使用pip安裝:pipinstallhadidb用戶管理創建用戶:createuser()方法創建一個新用戶。 authentication()方法驗證用戶身份。 fromhadidb.operationimportuseruser_obj=user("admin","admin")user_obj.

mysql 需要互聯網嗎 mysql 需要互聯網嗎 Apr 08, 2025 pm 02:18 PM

MySQL 可在無需網絡連接的情況下運行,進行基本的數據存儲和管理。但是,對於與其他系統交互、遠程訪問或使用高級功能(如復制和集群)的情況,則需要網絡連接。此外,安全措施(如防火牆)、性能優化(選擇合適的網絡連接)和數據備份對於連接到互聯網的 MySQL 數據庫至關重要。

Navicat查看MongoDB數據庫密碼的方法 Navicat查看MongoDB數據庫密碼的方法 Apr 08, 2025 pm 09:39 PM

直接通過 Navicat 查看 MongoDB 密碼是不可能的,因為它以哈希值形式存儲。取回丟失密碼的方法:1. 重置密碼;2. 檢查配置文件(可能包含哈希值);3. 檢查代碼(可能硬編碼密碼)。

mysql workbench 可以連接到 mariadb 嗎 mysql workbench 可以連接到 mariadb 嗎 Apr 08, 2025 pm 02:33 PM

MySQL Workbench 可以連接 MariaDB,前提是配置正確。首先選擇 "MariaDB" 作為連接器類型。在連接配置中,正確設置 HOST、PORT、USER、PASSWORD 和 DATABASE。測試連接時,檢查 MariaDB 服務是否啟動,用戶名和密碼是否正確,端口號是否正確,防火牆是否允許連接,以及數據庫是否存在。高級用法中,使用連接池技術優化性能。常見錯誤包括權限不足、網絡連接問題等,調試錯誤時仔細分析錯誤信息和使用調試工具。優化網絡配置可以提升性能

mysql 需要服務器嗎 mysql 需要服務器嗎 Apr 08, 2025 pm 02:12 PM

對於生產環境,通常需要一台服務器來運行 MySQL,原因包括性能、可靠性、安全性和可擴展性。服務器通常擁有更強大的硬件、冗餘配置和更嚴格的安全措施。對於小型、低負載應用,可在本地機器運行 MySQL,但需謹慎考慮資源消耗、安全風險和維護成本。如需更高的可靠性和安全性,應將 MySQL 部署到雲服務器或其他服務器上。選擇合適的服務器配置需要根據應用負載和數據量進行評估。

See all articles