백엔드 개발 파이썬 튜토리얼 비동기 작업 처리를 위해 Python에서 Beanstalkd를 사용하는 방법

비동기 작업 처리를 위해 Python에서 Beanstalkd를 사용하는 방법

Apr 24, 2018 pm 01:35 PM
python

이 글에서는 Python에서 비동기 작업 처리를 위해 Beanstalkd를 사용하는 방법을 주로 소개하고 참고용으로 올려드립니다. 함께 살펴보겠습니다

Beanstalkd를 메시지 큐 서비스로 사용한 다음 이를 Python의 데코레이터 구문과 결합하여 간단한 비동기 작업 처리 도구를 구현합니다.

최종 효과

작업 정의:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task
로그인 후 복사

작업 제출:

task_one.put(arg1="a", arg2="b", arg3="c")
로그인 후 복사

그러면 백그라운드 작업 스레드가 이러한 작업을 수행할 수 있습니다.

구현 과정

1. Beanstalk 서버 이해

Beanstalk는 간단하고 빠른 작업 대기열입니다. https://github.com/kr/beanstalkd

Beanstalk는 C 언어로 구현된 메시지 대기열 서비스입니다. 이는 공통 인터페이스를 제공하며 원래 시간이 많이 걸리는 작업을 비동기식으로 실행하여 대규모 웹 애플리케이션에서 페이지 대기 시간을 줄이도록 설계되었습니다. 언어마다 다른 Beanstalkd 클라이언트 구현이 있습니다. Python에는 beanstalkc 등이 있습니다. 나는 beanstalkd 서버와 통신하기 위한 도구로 beanstalkc를 사용합니다.

2. 비동기 작업 실행 구현 원칙

beanstalkd는 문자열 작업 스케줄링만 수행할 수 있습니다. 프로그램이 함수와 매개변수 제출을 지원하기 위해 워커에 의해 함수가 실행되고 매개변수가 전달됩니다. 전달된 매개변수로 함수를 등록하려면 중간 계층이 필요합니다.

구현은 주로 세 부분으로 구성됩니다.

구독자: 콩나무 튜브에 함수를 등록하는 역할을 담당합니다. 구현은 매우 간단하며 함수 이름과 함수 자체 사이의 해당 관계를 등록합니다. (같은 그룹(튜브) 내에서는 같은 기능명이 존재할 수 없다는 뜻입니다.) 데이터는 클래스 변수에 저장됩니다.

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
  logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
  Subscriber.FUN_MAP[tube][func.__name__] = func
로그인 후 복사

JobQueue: Putter 기능을 사용하여 일반 함수를 데코레이터로 편리하게 변환합니다.

class JobQueue(object):
 @classmethod
 def task(cls, tube):
  def wrapper(func):
   Subscriber(func, tube)
   return Putter(func, tube)

  return wrapper
로그인 후 복사

Putter: 함수 이름, 함수 매개변수 및 지정된 그룹을 객체로 결합한 다음 json 직렬화 문자열로 변환하고 마지막으로 beanstalkc를 통해 beanstalkd 대기열로 푸시됩니다.

class Putter(object):
 def __init__(self, func, tube):
  self.func = func
  self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
  return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
  args = {
   'func_name': self.func.__name__,
   'tube': self.tube,
   'kwargs': kwargs
  }
  logger.info('put job:{} to queue'.format(args))
  beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  try:
   beanstalk.use(self.tube)
   job_id = beanstalk.put(json.dumps(args))
   return job_id
  finally:
   beanstalk.close()
로그인 후 복사

Worker: beanstalkd 대기열에서 문자열을 가져온 다음 json.loads를 통해 객체로 역직렬화하여 함수 이름, 매개변수 및 튜브를 가져옵니다. 마지막으로 구독자로부터 함수 이름에 해당하는 함수 코드를 얻은 후 매개 변수를 전달하여 함수를 실행합니다.

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
  self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
  self.tubes = tubes
  self.reserve_timeout = 20
  self.timeout_limit = 1000
  self.kick_period = 600
  self.signal_shutdown = False
  self.release_delay = 0
  self.age = 0
  self.signal_shutdown = False
  signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
  Worker.worker_id += 1
  import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
  if isinstance(self.tubes, list):
   for tube in self.tubes:
    if tube not in Subscriber.FUN_MAP.keys():
     logger.error('tube:{} not register!'.format(tube))
     continue
    self.beanstalk.watch(tube)
  else:
   if self.tubes not in Subscriber.FUN_MAP.keys():
    logger.error('tube:{} not register!'.format(self.tubes))
    return
   self.beanstalk.watch(self.tubes)

 def run(self):
  self.subscribe()
  while True:
   if self.signal_shutdown:
    break
   if self.signal_shutdown:
    logger.info("graceful shutdown")
    break
   job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
   if not job:
    continue
   try:
    self.on_job(job)
    self.delete_job(job)
   except beanstalkc.CommandFailed as e:
    logger.warning(e, exc_info=1)
   except Exception as e:
    logger.error(e)
    kicks = job.stats()['kicks']
    if kicks < 3:
     self.bury_job(job)
    else:
     message = json.loads(job.body)
     logger.error("Kicks reach max. Delete the job", extra={&#39;body&#39;: message})
     self.delete_job(job)

 @classmethod
 def on_job(cls, job):
  start = time.time()
  msg = json.loads(job.body)
  logger.info(msg)
  tube = msg.get(&#39;tube&#39;)
  func_name = msg.get(&#39;func_name&#39;)
  try:
   func = Subscriber.FUN_MAP[tube][func_name]
   kwargs = msg.get(&#39;kwargs&#39;)
   func(**kwargs)
   logger.info(u&#39;{}-{}&#39;.format(func, kwargs))
  except Exception as e:
   logger.error(e.message, exc_info=True)
  cost = time.time() - start
  logger.info(&#39;{} cost {}s&#39;.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
  try:
   job.delete()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
  try:
   job.bury()
  except beanstalkc.CommandFailed as e:
   logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
  self.signal_shutdown = True
로그인 후 복사

위 코드를 작성하다가 문제를 발견했습니다.

Subscriber를 통해 등록한 함수 이름과 함수 자체의 해당 관계는 Python 인터프리터, 즉 작업자는 다른 프로세스에서 비동기적으로 실행되며 어떻게 작업자가 Putter와 동일한 구독자를 얻을 수 있습니까? 마지막으로 저는 이 문제가 Python의 데코레이터 메커니즘을 통해 해결될 수 있음을 발견했습니다.

구독자 문제를 해결한 문장입니다

import_module_by_str(&#39;pear.web.controllers.controller_crawler&#39;)
로그인 후 복사

# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
  module_name = str(module_name)
 __import__(module_name)
로그인 후 복사

import_module_by_str이 실행되면 __import__를 호출하여 클래스와 함수를 동적으로 로드합니다. JobQueue를 사용하여 함수가 포함된 모듈을 메모리에 로드한 후. Woker를 실행할 때 Python 인터프리터는 먼저 @ 장식 데코레이터 코드를 실행하고 Subscriber의 해당 관계를 메모리에 로드합니다.

실제 사용법은 https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

관련 권장 사항:

php-beanstalkd 메시지 큐 클래스 인스턴스 자세한 설명을 참조하세요.

위 내용은 비동기 작업 처리를 위해 Python에서 Beanstalkd를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

MySQL은 지불해야합니다 MySQL은 지불해야합니다 Apr 08, 2025 pm 05:36 PM

MySQL에는 무료 커뮤니티 버전과 유료 엔터프라이즈 버전이 있습니다. 커뮤니티 버전은 무료로 사용 및 수정할 수 있지만 지원은 제한되어 있으며 안정성이 낮은 응용 프로그램에 적합하며 기술 기능이 강합니다. Enterprise Edition은 안정적이고 신뢰할 수있는 고성능 데이터베이스가 필요하고 지원 비용을 기꺼이 지불하는 응용 프로그램에 대한 포괄적 인 상업적 지원을 제공합니다. 버전을 선택할 때 고려 된 요소에는 응용 프로그램 중요도, 예산 책정 및 기술 기술이 포함됩니다. 완벽한 옵션은없고 가장 적합한 옵션 만 있으므로 특정 상황에 따라 신중하게 선택해야합니다.

설치 후 MySQL을 사용하는 방법 설치 후 MySQL을 사용하는 방법 Apr 08, 2025 am 11:48 AM

이 기사는 MySQL 데이터베이스의 작동을 소개합니다. 먼저 MySQLworkBench 또는 명령 줄 클라이언트와 같은 MySQL 클라이언트를 설치해야합니다. 1. MySQL-Uroot-P 명령을 사용하여 서버에 연결하고 루트 계정 암호로 로그인하십시오. 2. CreateABase를 사용하여 데이터베이스를 작성하고 데이터베이스를 선택하십시오. 3. CreateTable을 사용하여 테이블을 만들고 필드 및 데이터 유형을 정의하십시오. 4. InsertInto를 사용하여 데이터를 삽입하고 데이터를 쿼리하고 업데이트를 통해 데이터를 업데이트하고 DELETE를 통해 데이터를 삭제하십시오. 이러한 단계를 마스터하고 일반적인 문제를 처리하는 법을 배우고 데이터베이스 성능을 최적화하면 MySQL을 효율적으로 사용할 수 있습니다.

고로드 애플리케이션의 MySQL 성능을 최적화하는 방법은 무엇입니까? 고로드 애플리케이션의 MySQL 성능을 최적화하는 방법은 무엇입니까? Apr 08, 2025 pm 06:03 PM

MySQL 데이터베이스 성능 최적화 안내서 리소스 집약적 응용 프로그램에서 MySQL 데이터베이스는 중요한 역할을 수행하며 대규모 트랜잭션 관리를 담당합니다. 그러나 응용 프로그램 규모가 확장됨에 따라 데이터베이스 성능 병목 현상은 종종 제약이됩니다. 이 기사는 일련의 효과적인 MySQL 성능 최적화 전략을 탐색하여 응용 프로그램이 고 부하에서 효율적이고 반응이 유지되도록합니다. 실제 사례를 결합하여 인덱싱, 쿼리 최적화, 데이터베이스 설계 및 캐싱과 같은 심층적 인 주요 기술을 설명합니다. 1. 데이터베이스 아키텍처 설계 및 최적화 된 데이터베이스 아키텍처는 MySQL 성능 최적화의 초석입니다. 몇 가지 핵심 원칙은 다음과 같습니다. 올바른 데이터 유형을 선택하고 요구 사항을 충족하는 가장 작은 데이터 유형을 선택하면 저장 공간을 절약 할 수있을뿐만 아니라 데이터 처리 속도를 향상시킬 수 있습니다.

MySQL은 인터넷이 필요합니까? MySQL은 인터넷이 필요합니까? Apr 08, 2025 pm 02:18 PM

MySQL은 기본 데이터 저장 및 관리를위한 네트워크 연결없이 실행할 수 있습니다. 그러나 다른 시스템과의 상호 작용, 원격 액세스 또는 복제 및 클러스터링과 같은 고급 기능을 사용하려면 네트워크 연결이 필요합니다. 또한 보안 측정 (예 : 방화벽), 성능 최적화 (올바른 네트워크 연결 선택) 및 데이터 백업은 인터넷에 연결하는 데 중요합니다.

hadidb : 파이썬의 가볍고 수평 확장 가능한 데이터베이스 hadidb : 파이썬의 가볍고 수평 확장 가능한 데이터베이스 Apr 08, 2025 pm 06:12 PM

HADIDB : 가볍고 높은 수준의 확장 가능한 Python 데이터베이스 HadIDB (HADIDB)는 파이썬으로 작성된 경량 데이터베이스이며 확장 수준이 높습니다. PIP 설치를 사용하여 HADIDB 설치 : PIPINSTALLHADIDB 사용자 관리 사용자 만들기 사용자 : createUser () 메소드를 작성하여 새 사용자를 만듭니다. Authentication () 메소드는 사용자의 신원을 인증합니다. Fromhadidb.operationimportuseruser_obj = user ( "admin", "admin") user_obj.

MongoDB 데이터베이스 비밀번호를 보는 Navicat의 방법 MongoDB 데이터베이스 비밀번호를 보는 Navicat의 방법 Apr 08, 2025 pm 09:39 PM

해시 값으로 저장되기 때문에 MongoDB 비밀번호를 Navicat을 통해 직접 보는 것은 불가능합니다. 분실 된 비밀번호 검색 방법 : 1. 비밀번호 재설정; 2. 구성 파일 확인 (해시 값이 포함될 수 있음); 3. 코드를 점검하십시오 (암호 하드 코드 메일).

MySQL Workbench가 Mariadb에 연결할 수 있습니다 MySQL Workbench가 Mariadb에 연결할 수 있습니다 Apr 08, 2025 pm 02:33 PM

MySQL Workbench는 구성이 올바른 경우 MariadB에 연결할 수 있습니다. 먼저 커넥터 유형으로 "mariadb"를 선택하십시오. 연결 구성에서 호스트, 포트, 사용자, 비밀번호 및 데이터베이스를 올바르게 설정하십시오. 연결을 테스트 할 때는 마리아드 브 서비스가 시작되었는지, 사용자 이름과 비밀번호가 올바른지, 포트 번호가 올바른지, 방화벽이 연결을 허용하는지 및 데이터베이스가 존재하는지 여부를 확인하십시오. 고급 사용에서 연결 풀링 기술을 사용하여 성능을 최적화하십시오. 일반적인 오류에는 불충분 한 권한, 네트워크 연결 문제 등이 포함됩니다. 오류를 디버깅 할 때 오류 정보를 신중하게 분석하고 디버깅 도구를 사용하십시오. 네트워크 구성을 최적화하면 성능이 향상 될 수 있습니다

MySQL에는 서버가 필요합니까? MySQL에는 서버가 필요합니까? Apr 08, 2025 pm 02:12 PM

생산 환경의 경우 성능, 신뢰성, 보안 및 확장 성을 포함한 이유로 서버는 일반적으로 MySQL을 실행해야합니다. 서버에는 일반적으로보다 강력한 하드웨어, 중복 구성 및 엄격한 보안 조치가 있습니다. 소규모 저하 애플리케이션의 경우 MySQL이 로컬 컴퓨터에서 실행할 수 있지만 자원 소비, 보안 위험 및 유지 보수 비용은 신중하게 고려되어야합니다. 신뢰성과 보안을 높이려면 MySQL을 클라우드 또는 기타 서버에 배포해야합니다. 적절한 서버 구성을 선택하려면 응용 프로그램 부하 및 데이터 볼륨을 기반으로 평가가 필요합니다.

See all articles