目录
python的分布式任务huey如何实现异步化任务讲解
首页 php教程 php手册 python的分布式任务huey如何实现异步化任务讲解

python的分布式任务huey如何实现异步化任务讲解

Jun 13, 2016 am 09:12 AM
python 任务 分布式 如何 实现 异步 讲解

python的分布式任务huey如何实现异步化任务讲解

 本文我们来分享一个python的轻型的任务队列程序,他可以让python的分布式任务huey实现异步化任务,感兴趣的朋友可以看看。

 

 

一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单。 


关于huey的介绍:  (比celery轻型,比mrq、rq要好用 !)

a lightweight alternative.

    written in python

    no deps outside stdlib, except redis (or roll your own backend)

    support for django

supports:

    multi-threaded task execution

    scheduled execution at a given time

    periodic execution, like a crontab

    retrying tasks that fail

    task result storage


安装:

 代码如下  
Installing
 代码如下  
Installing
huey can be installed very easily using pip.
 
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
 
pip install redis
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.
 
git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:
 
python setup.py test
huey can be installed very easily using pip.

 
pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.
 

pip install redis
 代码如下  
from huey import RedisHuey, crontab
 
huey = RedisHuey('my-app', host='redis.myapp.com')
 
@huey.task()
def add_numbers(a, b):
    return a b
 
@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
    sync_all_data()
Using git If you want to run the very latest, feel free to pull down the repo from github and install by hand.   git clone https://github.com/coleifer/huey.git cd huey python setup.py install You can run the tests using the test-runner:   python setup.py test
关于huey的api,下面有详细的介绍及参数介绍的。
 代码如下  
from huey import RedisHuey, crontab   huey = RedisHuey('my-app', host='redis.myapp.com')   @huey.task() def add_numbers(a, b):     return a b   @huey.periodic_task(crontab(minute='0', hour='3')) def nightly_backup():     sync_all_data()




juey作为woker的时候,一些cli参数。 


常用的是:  

-l                  关于日志文件的执行 。

-w                 workers的数目,-w的数值大了,肯定是增加任务的处理能力

-p --periodic     启动huey worker的时候,他会从tasks.py里面找到 需要crontab的任务,会派出几个线程专门处理这些事情。 

-n                  不启动关于crontab里面的预周期执行,只有你触发的时候,才会执行周期星期的任务。 

--threads   意思你懂的。
1

 代码如下  
# 原文:     
 代码如下  
# 原文:     
The following table lists the options available for the consumer as well as their default values.
 
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
-p, --periodic
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples
 
Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:
 
huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0
The following table lists the options available for the consumer as well as their default values.

 
-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers

Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
 代码如下  
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
 
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue)
-p, --periodic Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice. -n, --no-periodic Indicate that this consumer process should not enqueue periodic tasks. -d, --delay When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds. -m, --max-delay The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds. -b, --backoff The amount to back-off when polling for results. Must be greater than one. Default is 1.15. -u, --utc Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option. --localtime Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False. Examples   Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:   huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0
任务队列huey 是靠着redis来实现queue的任务存储,所以需要咱们提前先把redis-server和redis-py都装好。 安装的方法就不说了,自己搜搜吧。  我们首先创建下huey的链接实例 :
 代码如下  
# config.py from huey import Huey from huey.backends.redis_backend import RedisBlockingQueue   queue = RedisBlockingQueue('test-queue', host='localhost', port=6379) huey = Huey(queue)


然后就是关于任务的,也就是你想让谁到任务队列这个圈子里面,和celey、rq,mrq一样,都是用tasks.py表示的。

 代码如下  
from config import huey # import the huey we instantiated in config.py
 代码如下  
from config import huey # import the huey we instantiated in config.py
 
 
@huey.task()
def count_beans(num):
    print '-- counted %s beans --' % num
 

 
@huey.task()
def count_beans(num):
    print '-- counted %s beans --' % num

 代码如下  
main.py
from config import huey  # import our "huey" object
from tasks import count_beans  # import our task
 
 
if __name__ == '__main__':
    beans = raw_input('How many beans? ')
    count_beans(int(beans))
    print 'Enqueued job to count %s beans' % beans


Ensure you have Redis running locally

Ensure you have installed huey

Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).

Run the main program: python main.py


再来一个真正去执行的 。  main.py 相当于生产者,tasks.py相当于消费者的关系。  main.py负责喂数据。

 代码如下  
main.py
from config import huey  # import our "huey" object
from tasks import count_beans  # import our task
 
  if __name__ == '__main__':
 代码如下  
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore  # ADD THIS LINE
 
 
queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379)  # ADDED
 
huey = Huey(queue, result_store=result_store) # ADDED result store
    beans = raw_input('How many beans? ')

    count_beans(int(beans))
    print 'Enqueued job to count %s beans' % beans

Ensure you have Redis running locally
 代码如下  
>>> from main import count_beans
>>> res = count_beans(100)
>>> res  # what is "res" ?

>>> res.get()  # get the result of this task
'Counted 100 beans'
Ensure you have installed huey Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”). Run the main program: python main.py
和celery、rq一样,他的结果获取是需要在你的config.py或者主代码里面指明他的存储的方式,现在huey还仅仅是支持redis,但相对他的特点和体积,这已经很足够了 ! 只是那几句话而已,导入RedisDataStore库,申明下存储的地址。
 代码如下  
from huey import Huey from huey.backends.redis_backend import RedisBlockingQueue from huey.backends.redis_backend import RedisDataStore  # ADD THIS LINE     queue = RedisBlockingQueue('test-queue', host='localhost', port=6379) result_store = RedisDataStore('results', host='localhost', port=6379)  # ADDED   huey = Huey(queue, result_store=result_store) # ADDED result store
这个时候,我们在ipython再次去尝试的时候,会发现可以获取到tasks.py里面的return值了 其实你在main.py里面获取的时候,他还是通过uuid从redis里面取出来的。
 代码如下  
>>> from main import count_beans >>> res = count_beans(100) >>> res  # what is "res" ? >>> res.get()  # get the result of this task 'Counted 100 beans'




huey也是支持celey的延迟执行和crontab的功能 。  这些功能很是重要,可以自定义的优先级或者不用再借助linux本身的crontab。


用法很简单,多加一个delay的时间就行了,看了下huey的源码,他默认是立马执行的。当然还是要看你的线程是否都是待执行的状态了。

 代码如下  
>>> import datetime
 代码如下  
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res

>>> res.get()  # this returns None, no data is ready
>>> res.get()  # still no data...
>>> res.get(blocking=True)  # ok, let's just block until its ready
'Counted 100 beans'
>>> res = count_beans.schedule(args=(100,), delay=60)

>>> res

>>> res.get()  # this returns None, no data is ready
>>> res.get()  # still no data...
>>> res.get(blocking=True)  # ok, let's just block until its ready

'Counted 100 beans'
 代码如下  
# tasks.py
from datetime import datetime
 
from config import huey
 
@huey.task(retries=3, retry_delay=10)
def try_thrice():
    print 'trying....%s' % datetime.now()
    raise Exception('nope')




再来一个重试retry的介绍,huey也是有retry,这个很是实用的东西。 如果大家有看到我的上面文章关于celery重试机制的介绍,应该也能明白huey是个怎么个回事了。  是的,他其实也是在tasks里具体函数的前面做了装饰器,装饰器里面有个func try 异常重试的逻辑 。 大家懂的。

 代码如下  
# tasks.py from datetime import datetime
 代码如下  
# count some beans
res = count_beans(10000000)
 
res.revoke()
The same applies to tasks that are scheduled in the future:
 
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
 
@huey.task(crontab(minute='*'))
def print_time():
    print datetime.now()
 

from config import huey
 
@huey.task(retries=3, retry_delay=10)
def try_thrice():
    print 'trying....%s' % datetime.now()
    raise Exception('nope')





huey是给你反悔的机会饿 ~  也就是说,你做了deley的计划任务后,如果你又想取消,那好看,直接revoke就可以了。

 代码如下  
# count some beans res = count_beans(10000000)

 
res.revoke()
The same applies to tasks that are scheduled in the future:
 
res = count_beans.schedule(args=(100000,), eta=in_the_future)

res.revoke()
 代码如下  
from config import huey
from tasks import count_beans
 
 
if __name__ == '__main__':
    beans = raw_input('How many beans? ')
    count_beans(int(beans))
    print('Enqueued job to count %s beans' % beans)
  @huey.task(crontab(minute='*')) def print_time():     print datetime.now()
task() - 透明的装饰器,让你的函数变得优美点。  periodic_task() - 这个是周期性的任务 crontab() - 启动worker的时候,附带的crontab的周期任务。  BaseQueue - 任务队列 BaseDataStore - 任务执行后,可以把 结果塞入进去。  BAseDataStore可以自己重写。   官方的huey的git库里面是提供了相关的测试代码的:  main.py
 代码如下  
from config import huey from tasks import count_beans     if __name__ == '__main__':     beans = raw_input('How many beans? ')     count_beans(int(beans))     print('Enqueued job to count %s beans' % beans)




任务.py

表> 运行.sh
代码如下  
随机导入
 代码如下  
import random
import time
from huey import crontab
 
from config import huey
 
 
@huey.task()
def count_beans(num):
    print "start..."
    print('-- counted %s beans --' % num)
    time.sleep(3)
    print "end..."
    return 'Counted %s beans' % num
 
@huey.periodic_task(crontab(minute='*/5'))
def every_five_mins():
    print('Consumer prints this every 5 mins')
 
@huey.task(retries=3, retry_delay=10)
def try_thrice():
    if random.randint(1, 3) == 1:
        print('OK')
    else:
        print('About to fail, will retry in 10 seconds')
        raise Exception('Crap something went wrong')
 
@huey.task()
def slow(n):
    time.sleep(n)
    print('slept %s' % n)
导入时间

从 Huey 导入 crontab
 
从配置导入huey
 

 
 代码如下  
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2

=>
@huey.task()

def count_beans(num):
    打印“开始...”
    print('-- 计算了 %s 颗豆子 --' % num)

    时间.睡眠(3)
 代码如下  
[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.git
Cloning into 'huey'...
remote: Counting objects: 1423, done.
remote: Compressing objects: 100% (9/9), done.
Receiving objects:  34% (497/1423), 388.00 KiB | 29.00 KiB/s   KiB/s
 
Receiving objects:  34% (498/1423), 628.00 KiB | 22.00 KiB/s
 
 
remote: Total 1423 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.
Resolving deltas: 100% (729/729), done.
Checking connectivity... done.
[xiaorui@devops /tmp ]$cd huey/examples/simple
[xiaorui@devops simple (master)]$ ll
total 40
-rw-r--r--  1 xiaorui  wheel    79B  9  8 08:49 README
-rw-r--r--  1 xiaorui  wheel     0B  9  8 08:49 __init__.py
-rw-r--r--  1 xiaorui  wheel    56B  9  8 08:49 config.py
-rwxr-xr-x  1 xiaorui  wheel   227B  9  8 08:49 cons.sh
-rw-r--r--  1 xiaorui  wheel   205B  9  8 08:49 main.py
-rw-r--r--  1 xiaorui  wheel   607B  9  8 08:49 tasks.py
[xiaorui@devops simple (master)]$
    打印“结束...”

    return '计数 %s beans' % num
 
@huey.periodic_task(crontab(分钟='*/5'))

def every_ Five_mins():     print('消费者每 5 分钟打印一次')   @huey.task(重试=3, retry_delay=10) def try_thrice():     if random.randint(1, 3) == 1:         打印('确定')     其他:         print('即将失败,10秒后重试')         引发异常('糟糕,出了问题')   @huey.task() def 慢(n):     time.sleep(n)     print('睡了 %s' % n)
表> 我们可以先clone下huey的代码库。里面有一个examples例子目录,可以看到他是支持django的,但这不是重点!
代码如下  
#!/bin/bash 回声“休伊消费者” 回声“-------------” echo "在另一个终端中,运行 'python main.py'" echo "使用 Ctrl C 停止消费者" PYTHONPATH=.:$PYTHONPATH python ../../huey/bin/huey_consumer.py main.huey --threads=2 =>
表>
代码如下  
[xiaorui@devops /tmp ]$ git 克隆 https://github.com/coleifer/huey.git 克隆成“huey”... 远程:计数对象:1423,完成。 远程:压缩对象:100% (9/9),完成。 接收对象:34% (497/1423), 388.00 KiB | 29.00 KiB/秒   KiB/秒   接收对象:34% (498/1423), 628.00 KiB | 22.00 KiB/秒     远程:总计 1423(增量 0),重用 0(增量 0) 接收对象:100% (1423/1423),2.24 MiB | 29.00 KiB/s,完成。 解决增量:100% (729/729),完成。 检查连接...完成。 [xiaorui@devops /tmp ]$cdhuey/examples/simple [xiaorui@devops 简单(大师)]$ ll 共 40 个 -rw-r--r--  1 小锐轮    79B  9  8 08:49 自述文件 -rw-r--r--  1 小睿轮     0B  9  8 08:49 __init__.py -rw-r--r--  1 小睿轮    56B  9  8 08:49 config.py -rwxr-xr-x 1 小睿轮 227B 9 8 08:49 cons.sh -rw-r--r--  1 小睿轮 205B 9 8 08:49 main.py -rw-r--r--  1 小睿轮 607B 9 8 08:49tasks.py [xiaorui@devops简单(主)]$
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

mysql 是否要付费 mysql 是否要付费 Apr 08, 2025 pm 05:36 PM

MySQL 有免费的社区版和收费的企业版。社区版可免费使用和修改,但支持有限,适合稳定性要求不高、技术能力强的应用。企业版提供全面商业支持,适合需要稳定可靠、高性能数据库且愿意为支持买单的应用。选择版本时考虑的因素包括应用关键性、预算和技术技能。没有完美的选项,只有最合适的方案,需根据具体情况谨慎选择。

HadiDB:Python 中的轻量级、可水平扩展的数据库 HadiDB:Python 中的轻量级、可水平扩展的数据库 Apr 08, 2025 pm 06:12 PM

HadiDB:轻量级、高水平可扩展的Python数据库HadiDB(hadidb)是一个用Python编写的轻量级数据库,具备高度水平的可扩展性。安装HadiDB使用pip安装:pipinstallhadidb用户管理创建用户:createuser()方法创建一个新用户。authentication()方法验证用户身份。fromhadidb.operationimportuseruser_obj=user("admin","admin")user_obj.

Navicat查看MongoDB数据库密码的方法 Navicat查看MongoDB数据库密码的方法 Apr 08, 2025 pm 09:39 PM

直接通过 Navicat 查看 MongoDB 密码是不可能的,因为它以哈希值形式存储。取回丢失密码的方法:1. 重置密码;2. 检查配置文件(可能包含哈希值);3. 检查代码(可能硬编码密码)。

mysql 需要互联网吗 mysql 需要互联网吗 Apr 08, 2025 pm 02:18 PM

MySQL 可在无需网络连接的情况下运行,进行基本的数据存储和管理。但是,对于与其他系统交互、远程访问或使用高级功能(如复制和集群)的情况,则需要网络连接。此外,安全措施(如防火墙)、性能优化(选择合适的网络连接)和数据备份对于连接到互联网的 MySQL 数据库至关重要。

mysql workbench 可以连接到 mariadb 吗 mysql workbench 可以连接到 mariadb 吗 Apr 08, 2025 pm 02:33 PM

MySQL Workbench 可以连接 MariaDB,前提是配置正确。首先选择 "MariaDB" 作为连接器类型。在连接配置中,正确设置 HOST、PORT、USER、PASSWORD 和 DATABASE。测试连接时,检查 MariaDB 服务是否启动,用户名和密码是否正确,端口号是否正确,防火墙是否允许连接,以及数据库是否存在。高级用法中,使用连接池技术优化性能。常见错误包括权限不足、网络连接问题等,调试错误时仔细分析错误信息和使用调试工具。优化网络配置可以提升性能

如何针对高负载应用程序优化 MySQL 性能? 如何针对高负载应用程序优化 MySQL 性能? Apr 08, 2025 pm 06:03 PM

MySQL数据库性能优化指南在资源密集型应用中,MySQL数据库扮演着至关重要的角色,负责管理海量事务。然而,随着应用规模的扩大,数据库性能瓶颈往往成为制约因素。本文将探讨一系列行之有效的MySQL性能优化策略,确保您的应用在高负载下依然保持高效响应。我们将结合实际案例,深入讲解索引、查询优化、数据库设计以及缓存等关键技术。1.数据库架构设计优化合理的数据库架构是MySQL性能优化的基石。以下是一些核心原则:选择合适的数据类型选择最小的、符合需求的数据类型,既能节省存储空间,又能提升数据处理速度

mysql 无法连接到本地主机怎么解决 mysql 无法连接到本地主机怎么解决 Apr 08, 2025 pm 02:24 PM

无法连接 MySQL 可能是由于以下原因:MySQL 服务未启动、防火墙拦截连接、端口号错误、用户名或密码错误、my.cnf 中的监听地址配置不当等。排查步骤包括:1. 检查 MySQL 服务是否正在运行;2. 调整防火墙设置以允许 MySQL 监听 3306 端口;3. 确认端口号与实际端口号一致;4. 检查用户名和密码是否正确;5. 确保 my.cnf 中的 bind-address 设置正确。

如何将 AWS Glue 爬网程序与 Amazon Athena 结合使用 如何将 AWS Glue 爬网程序与 Amazon Athena 结合使用 Apr 09, 2025 pm 03:09 PM

作为数据专业人员,您需要处理来自各种来源的大量数据。这可能会给数据管理和分析带来挑战。幸运的是,两项 AWS 服务可以提供帮助:AWS Glue 和 Amazon Athena。

See all articles