涡轮齿轮2的异步作业工人
tgext.asyncjob的Python项目详细描述
关于异步作业
AsyncJob是TurboGears2扩展,用于处理后台/同步作业。 允许在系统执行更多工作时快速向用户返回响应 背景,可用于视频转码、缩略图生成或其他 用户在得到答案之前无法预期所需时间的任务。
要在后台执行任务,只需执行:
from tgext.asyncjob import asyncjob_perform asyncjob_perform(callable, arg1, arg2, kwarg=value)
安装
tgext.asyncjob既可以从pypi安装,也可以从bitbucket安装:
easy_install tgext.asyncjob
应该只对大多数用户有效
启用异步作业
在应用程序config/app_cfg.py中,添加以下行:
import tgext.asyncjob tgext.asyncjob.plugme(base_config)
或者,如果tgext.pluggable可用,则使用可插入应用程序接口:
from tgext.pluggable import plug plug(base_config, 'tgext.asyncjob')
您可以将globals对象本身传递给plug函数:
plug(base_config, 'tgext.asyncjob', app_globals=app_globals)
它将用于存储任务队列,否则AsyncJob将自动检测 调用堆栈帧中的globals对象将对象放在 它被称为。
执行后台任务
要执行后台任务,只需使用tgext.asyncjob.asyncjob\u perform 它从任何有有效请求的上下文调用它将执行可调用的 在后台作为第一个参数传递,并提供参数:
from tgext.asyncjob import asyncjob_perform def background_task(number): print number*2 asyncjob_perform(background_task, 5)
跟踪任务进度
asyncjob跟踪任务状态并允许更新它以实现进度条或其他类型 用户报告长时间运行的操作。每次调用asyncjob\u perform时 为您刚才计划的操作返回一个唯一的id,该id可用于检索 随时显示任务状态。
您可以随时从后台任务本身内部通过调用 asyncjob\u set\u progress(value,data)参数value应为int值 而第二个可选的data参数可以是您稍后可能要返回的任何内容。
要检索进度状态,可以调用asyncjob\u get\u progress传递 要获取其状态的任务的ID。返回值是一个2项元组 第一个条目是数值,第二个条目是您传递的数据 设置asyncjob_progress。 如果任务已完成,则返回none,如果任务尚未启动,则返回(-1,none)
进度跟踪示例:
from tgext.asyncjob import asyncjob_perform, asyncjob_get_progress, asyncjob_set_progress @expose() def controller_method(self): def async_action(): for i in range(5): asyncjob_set_progress(i) time.sleep(1) taskid = asyncjob_perform(async_action) return redirect(url('/state', uid=taskid)) @expose() def state(self, uid): state = asyncjob_get_progress(uid) if not state: return 'Job Completed' elif state[0] < 0: return 'Not yet started' else: return str(state[0])
多进程进程进程跟踪
当使用多个进程为您的web应用程序提供服务时,默认的任务跟踪 系统将无法跟踪不同进程之间的进程状态。
要解决此问题,您必须依赖另一个进度跟踪程序来存储 共享存储中的状态。默认情况下,tgext.asyncjob.tracker.redisdb.RedisProgressTracker 在redis数据库中存储状态。
使用它非常简单:
from tgext.asyncjob import start_async_worker tgext.asyncjob.tracker.redisdb import RedisProgressTracker class Globals(object): def __init__(self): start_async_worker(progress_tacker=RedisProgressTracker(host='localhost'))
RedisProgressTracker类接受host、port、db和timeout参数 它设置数据库连接选项和进度跟踪键的超时(以避免 把钥匙丢在后面)。默认情况下,使用db15,超时时间为1小时。
自定义进度跟踪程序
编写自定义进度跟踪器就像为类提供以下方法一样简单:
class MyProgressTracker(object): def track(self, entryid): pass def remove(self, entryid): pass def set_progress(self, entryid, value, message): pass def get_progress(self, entryid): pass
然后可以将自定义进度跟踪程序作为参数传递给start_async_worker以存储 SQL数据库、MongoDB或任何其他系统上的状态。
参考tgext.asyncjob.tracker.redisdb.RedisProgressTracker或 tgext.asyncjob.tracker.memory.MemoryProgressTracker用于实现示例。
访问数据库
默认情况下,asyncjob自行管理sqlalchemy会话和事务。每个后台任务 被封装在在发生任何异常时恢复的事务。
asyncjob使用它自己的sqlalchemy会话,因此永远不要传递已经绑定到另一个会话的对象。 再问一遍。
开发人员可能需要记住的唯一问题是,在查找对象时 在开始后台任务之前创建的,可能在内部还不可用 数据库。为了避免这个问题,asyncjob提供了asyncjob定时查询来执行查询 查找结果,直到找到结果本身或达到超时(默认为60秒)。
这可用于取回在启动等待的后台任务之前创建的对象 它们将出现在数据库中:
from tgext.asyncjob import asyncjob_perform, asyncjob_timed_query @expose() def controller_method(self): def async_query_action(group_id): group = asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id)).first() group.display_name = 'Prova' g = Group(group_name='test_group') DBSession.add(g) DBSession.flush() asyncjob_perform(async_query_action, g.group_id) return 'OK'
要更改超时,只需将不同的retries和interval参数传递给 异步作业定时查询:
asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id), retries=10, interval=6).first()