运行异步任务的简单包
async-task-processor的Python项目详细描述
用于在可配置的工作线程之间分配任务。
功能
- 将任务简单定义为正常函数。
- simpleprocessor用于简单的任务。
- 周期处理器用于周期性任务。
- tarantoolprocessor用于监听tarantool队列并在数据到达时触发任务。
- 能够在出错时重试(最大重试次数和重试倒计时选项)。
- 能够将任务作为自选项绑定到工作函数。
- 能够实现自己的任务处理器。
- 能够使用处理器制作控制api(可以管理您的工人)
待办事项
- []测试
- []控制台实用程序
- []斯芬克斯文件
安装
通常使用pip:
pip install async-task-processor
用法示例
周期性任务处理器示例:
importtimefromasync_task_processorimportATPfromasync_task_processor.processorsimportPeriodicProcessorfromexamplesimportlogger# first test functiondeftest_func_one(sleep_time,word):""" :type sleep_time: int :type word: str :return: """logger.info('start working')time.sleep(sleep_time)logger.info('Job is done. Word is: %s'%word)# second test functiondeftest_func_second(sleep_time,word):""" :type sleep_time: int :type word: str :return: """logger.info('start working')time.sleep(sleep_time)logger.info('Job is done. Word is: %s'%word)# third function with exceptiondeftest_func_bad(self,sleep_time,word):""" :type self: async_task_processor.Task :type sleep_time: int :type word: str :return: """logger.info('start working')try:a=1/0exceptZeroDivisionError:# optionally you can overload max_retries and retry_countdown hereself.retry()time.sleep(sleep_time)logger.info('Job is done. Word is: %s'%word)atp=ATP(asyncio_debug=True)task_processor=PeriodicProcessor(atp=atp)# Add function to task processortask_processor.add_task(test_func_one,args=[5,'first hello world'],max_workers=5,timeout=1,max_retries=5,retry_countdown=1)# Add one more function to task processortask_processor.add_task(test_func_second,args=[3,'second hello world'],max_workers=5,timeout=1,max_retries=5,retry_countdown=1)# Add one more bad function with exception. This function will raise exception and will retry it,# then when retries exceeded, workers of this func will stop one by one with exception MaxRetriesExceeded# bind option make Task as self argumenttask_processor.add_task(test_func_bad,args=[3,'second hello world'],bind=True,max_workers=2,timeout=1,max_retries=3,retry_countdown=3)# Start async-task-processoratp.start()
tarantool任务处理器示例:
importasyncioimporttimeimportasynctntimportasynctnt_queuefromasync_task_processorimportATPfromasync_task_processor.processorsimportTarantoolProcessorfromexamplesimportloggerTARANTOOL_QUEUE='test_queue'TARANTOOL_HOST='localhost'TARANTOOL_PORT=3301TARANTOOL_USER=NoneTARANTOOL_PASS=Nonedefput_messages_to_tarantool(messages_count=1,tube_name='test_queue',host='localhost',port=3301,user=None,password=None):"""Put some test messages to tarantool queue :param messages_count: messages number to put in queue :param tube_name: tarantool queue name :type tube_name: str :param host: tarantool host :param port: tarantool port :param user: tarantool user :param password: tarantool password :return: """asyncdefput_jobs():conn=asynctnt.Connection(host=host,port=port,username=user,password=password)awaitconn.connect()queue=asynctnt_queue.Queue(conn)tube=queue.tube(tube_name)[awaittube.put(dict(num=i,first_name='Jon',last_name='Smith'))foriinrange(messages_count)]awaitconn.disconnect()loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.ensure_future(put_jobs()))loop.close()# Let's put 100 messages to tarantoolput_messages_to_tarantool(messages_count=100,tube_name=TARANTOOL_QUEUE,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS)# Test functiondeftest_func(self,sleep_time,word):""" :type self: async_task_processor.TarantoolTask :type sleep_time: int :type word: str :return: """logger.info('start working')time.sleep(sleep_time)logger.info('Job is done. Word is %s. Data is %s. '%(word,self.data))atp=ATP(asyncio_debug=True)task_processor=TarantoolProcessor(atp=atp,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS,connection_max_retries=3,connection_retry_countdown=3)# Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers# will be started.task_processor.add_task(foo=test_func,queue=TARANTOOL_QUEUE,args=[1,'hello world'],bind=True,max_workers=20,max_retries=5,retry_countdown=1)# Start async-task-processoratp.start()
tarantool任务处理器示例,能够通过tarantool缩放工作线程:
importasyncioimportimportlibimportsocketimportsysimporttimeimportasynctntimportasynctnt_queueimporttarantoolfromasync_task_processorimportATPfromasync_task_processor.processorsimportTarantoolProcessorfromexamplesimportloggerTARANTOOL_QUEUE='test_queue'TARANTOOL_HOST='localhost'TARANTOOL_PORT=3301TARANTOOL_USER=NoneTARANTOOL_PASS=Nonedefput_messages_to_tarantool(messages_count=1,tube_name='test_queue',host='localhost',port=3301,user=None,password=None):"""Put some test messages to tarantool queue :param messages_count: messages number to put in queue :param tube_name: tarantool queue name :type tube_name: str :param host: tarantool host :param port: tarantool port :param user: tarantool user :param password: tarantool password :return: """asyncdefput_jobs():conn=asynctnt.Connection(host=host,port=port,username=user,password=password)awaitconn.connect()tube=asynctnt_queue.Queue(conn).tube(tube_name)[awaittube.put(dict(num=i,first_name='Jon',last_name='Smith'))foriinrange(messages_count)]awaitconn.disconnect()loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.ensure_future(put_jobs()))loop.close()# Let's put 100 messages to tarantoolput_messages_to_tarantool(messages_count=100,tube_name=TARANTOOL_QUEUE,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS)# Create tube in queue for manage workersdefcreate_tube(tube_name):try:t=tarantool.connect(host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS)t.call("queue.create_tube",(tube_name,'fifo',{'if_not_exists':True}))excepttarantool.error.DatabaseErrorase:ife.args[0]==32:passelse:raise# Test functiondeftest_func(self,sleep_time,word):""" :type self: async_task_processor.TarantoolTask :type sleep_time: int :type word: str :return: """logger.info('Start working')time.sleep(sleep_time)logger.info('Job is done. Word is %s. Data is %s. '%(word,self.data))# Function for import functionsdeffunc_import(foo_path):path_list=foo_path.split('.')func_name=path_list.pop()m=importlib.import_module('.'.join(path_list))ifpath_listelsesys.modules[__name__]func=getattr(m,func_name)returnfunc# Function for manage workersdefadd_task(self,tp):""" :type self: async_task_processor.primitives.TarantoolTask :type tp: TarantoolProcessor :return: """ifself.data['command']=='stop':tp.stop(name=self.data['foo'],workers_count=self.data['max_workers'],leave_last=False)self.app.logger.info("%d workers was deleted from task %s"%(self.data['max_workers'],self.data['foo']))elifself.data['command']=='start':tp.add_task(foo=func_import(self.data['foo']),queue=TARANTOOL_QUEUE,args=[1,'message from new worker'],bind=True,max_workers=self.data['max_workers'],name=self.data['foo'])self.app.logger.info("Added %d workers for task %s"%(self.data['max_workers'],self.data['foo']))elifself.data['command']=='info':[logger.info(task.as_json())fortaskinself.app.tasks]else:self.app.logger.info("Unknown command %s"%self.data['command'])# get host ipip=[lforlin([ipforipinsocket.gethostbyname_ex(socket.gethostname())[2]ifnotip.startswith("127.")][:1],[[(s.connect(('8.8.8.8',53)),s.getsockname()[0],s.close())forsin[socket.socket(socket.AF_INET,socket.SOCK_DGRAM)]][0][1]])ifl][0][0].replace('.','_')# manage tube namecontrol_tube_name='control_queue_%s'%iplogger.info("control tube is %s"%control_tube_name)# create tube for manage workerscreate_tube(control_tube_name)atp=ATP(asyncio_debug=True,logger=logger)task_processor=TarantoolProcessor(atp=atp,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS,connection_max_retries=3,connection_retry_countdown=3)# Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers# will be started.task_processor.add_task(foo=test_func,queue=TARANTOOL_QUEUE,args=[1,'hello world'],bind=True,max_workers=20,max_retries=5,retry_countdown=1)# Add task for listen manage tube commands. In this case if you start your app on different hosts,# you would control all host, because ip in control queue and different queues will be created for each host.# You can try to manage workers from tarantool console. Example command:# queue.tube.control_queue_<your ip>:put({ foo='test_func', command = 'start', max_workers = 2})task_processor.add_task(foo=add_task,queue=control_tube_name,args=[task_processor],bind=True)# Start async-task-processoratp.start()