运行异步任务的简单包

async-task-processor的Python项目详细描述


用于在可配置的工作线程之间分配任务。

功能

  • 将任务简单定义为正常函数。
  • simpleprocessor用于简单的任务。
  • 周期处理器用于周期性任务。
  • tarantoolprocessor用于监听tarantool队列并在数据到达时触发任务。
  • 能够在出错时重试(最大重试次数和重试倒计时选项)。
  • 能够将任务作为自选项绑定到工作函数。
  • 能够实现自己的任务处理器。
  • 能够使用处理器制作控制api(可以管理您的工人)

待办事项

  • []测试
  • []控制台实用程序
  • []斯芬克斯文件

安装

通常使用pip:

pip install async-task-processor

用法示例

周期性任务处理器示例:

importtimefromasync_task_processorimportATPfromasync_task_processor.processorsimportPeriodicProcessorfromexamplesimportlogger# first test functiondeftest_func_one(sleep_time,word):"""

    :type sleep_time: int
    :type word: str
    :return:
    """logger.info('start working')time.sleep(sleep_time)logger.info('Job is done. Word is: %s'%word)# second test functiondeftest_func_second(sleep_time,word):"""

    :type sleep_time: int
    :type word: str
    :return:
    """logger.info('start working')time.sleep(sleep_time)logger.info('Job is done. Word is: %s'%word)# third function with exceptiondeftest_func_bad(self,sleep_time,word):"""

    :type self: async_task_processor.Task
    :type sleep_time: int
    :type word: str
    :return:
    """logger.info('start working')try:a=1/0exceptZeroDivisionError:# optionally you can overload max_retries and retry_countdown hereself.retry()time.sleep(sleep_time)logger.info('Job is done. Word is: %s'%word)atp=ATP(asyncio_debug=True)task_processor=PeriodicProcessor(atp=atp)# Add function to task processortask_processor.add_task(test_func_one,args=[5,'first hello world'],max_workers=5,timeout=1,max_retries=5,retry_countdown=1)# Add one more function to task processortask_processor.add_task(test_func_second,args=[3,'second hello world'],max_workers=5,timeout=1,max_retries=5,retry_countdown=1)# Add one more bad function with exception. This function will raise exception and will retry it,# then when retries exceeded, workers of this func will stop one by one with exception MaxRetriesExceeded# bind option make Task as self argumenttask_processor.add_task(test_func_bad,args=[3,'second hello world'],bind=True,max_workers=2,timeout=1,max_retries=3,retry_countdown=3)# Start async-task-processoratp.start()

tarantool任务处理器示例:

importasyncioimporttimeimportasynctntimportasynctnt_queuefromasync_task_processorimportATPfromasync_task_processor.processorsimportTarantoolProcessorfromexamplesimportloggerTARANTOOL_QUEUE='test_queue'TARANTOOL_HOST='localhost'TARANTOOL_PORT=3301TARANTOOL_USER=NoneTARANTOOL_PASS=Nonedefput_messages_to_tarantool(messages_count=1,tube_name='test_queue',host='localhost',port=3301,user=None,password=None):"""Put some test messages to tarantool queue

    :param messages_count: messages number to put in queue
    :param tube_name: tarantool queue name
    :type tube_name: str
    :param host: tarantool host
    :param port: tarantool port
    :param user: tarantool user
    :param password: tarantool password
    :return:
    """asyncdefput_jobs():conn=asynctnt.Connection(host=host,port=port,username=user,password=password)awaitconn.connect()queue=asynctnt_queue.Queue(conn)tube=queue.tube(tube_name)[awaittube.put(dict(num=i,first_name='Jon',last_name='Smith'))foriinrange(messages_count)]awaitconn.disconnect()loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.ensure_future(put_jobs()))loop.close()# Let's put 100 messages to tarantoolput_messages_to_tarantool(messages_count=100,tube_name=TARANTOOL_QUEUE,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS)# Test functiondeftest_func(self,sleep_time,word):"""

    :type self: async_task_processor.TarantoolTask
    :type sleep_time: int
    :type word: str
    :return:
    """logger.info('start working')time.sleep(sleep_time)logger.info('Job is done. Word is %s. Data is %s. '%(word,self.data))atp=ATP(asyncio_debug=True)task_processor=TarantoolProcessor(atp=atp,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS,connection_max_retries=3,connection_retry_countdown=3)# Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers# will be started.task_processor.add_task(foo=test_func,queue=TARANTOOL_QUEUE,args=[1,'hello world'],bind=True,max_workers=20,max_retries=5,retry_countdown=1)# Start async-task-processoratp.start()

tarantool任务处理器示例,能够通过tarantool缩放工作线程:

importasyncioimportimportlibimportsocketimportsysimporttimeimportasynctntimportasynctnt_queueimporttarantoolfromasync_task_processorimportATPfromasync_task_processor.processorsimportTarantoolProcessorfromexamplesimportloggerTARANTOOL_QUEUE='test_queue'TARANTOOL_HOST='localhost'TARANTOOL_PORT=3301TARANTOOL_USER=NoneTARANTOOL_PASS=Nonedefput_messages_to_tarantool(messages_count=1,tube_name='test_queue',host='localhost',port=3301,user=None,password=None):"""Put some test messages to tarantool queue

    :param messages_count: messages number to put in queue
    :param tube_name: tarantool queue name
    :type tube_name: str
    :param host: tarantool host
    :param port: tarantool port
    :param user: tarantool user
    :param password: tarantool password
    :return:
    """asyncdefput_jobs():conn=asynctnt.Connection(host=host,port=port,username=user,password=password)awaitconn.connect()tube=asynctnt_queue.Queue(conn).tube(tube_name)[awaittube.put(dict(num=i,first_name='Jon',last_name='Smith'))foriinrange(messages_count)]awaitconn.disconnect()loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.ensure_future(put_jobs()))loop.close()# Let's put 100 messages to tarantoolput_messages_to_tarantool(messages_count=100,tube_name=TARANTOOL_QUEUE,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS)# Create tube in queue for manage workersdefcreate_tube(tube_name):try:t=tarantool.connect(host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS)t.call("queue.create_tube",(tube_name,'fifo',{'if_not_exists':True}))excepttarantool.error.DatabaseErrorase:ife.args[0]==32:passelse:raise# Test functiondeftest_func(self,sleep_time,word):"""

    :type self: async_task_processor.TarantoolTask
    :type sleep_time: int
    :type word: str
    :return:
    """logger.info('Start working')time.sleep(sleep_time)logger.info('Job is done. Word is %s. Data is %s. '%(word,self.data))# Function for import functionsdeffunc_import(foo_path):path_list=foo_path.split('.')func_name=path_list.pop()m=importlib.import_module('.'.join(path_list))ifpath_listelsesys.modules[__name__]func=getattr(m,func_name)returnfunc# Function for manage workersdefadd_task(self,tp):"""

    :type self: async_task_processor.primitives.TarantoolTask
    :type tp: TarantoolProcessor
    :return:
    """ifself.data['command']=='stop':tp.stop(name=self.data['foo'],workers_count=self.data['max_workers'],leave_last=False)self.app.logger.info("%d workers was deleted from task %s"%(self.data['max_workers'],self.data['foo']))elifself.data['command']=='start':tp.add_task(foo=func_import(self.data['foo']),queue=TARANTOOL_QUEUE,args=[1,'message from new worker'],bind=True,max_workers=self.data['max_workers'],name=self.data['foo'])self.app.logger.info("Added %d workers for task %s"%(self.data['max_workers'],self.data['foo']))elifself.data['command']=='info':[logger.info(task.as_json())fortaskinself.app.tasks]else:self.app.logger.info("Unknown command %s"%self.data['command'])# get host ipip=[lforlin([ipforipinsocket.gethostbyname_ex(socket.gethostname())[2]ifnotip.startswith("127.")][:1],[[(s.connect(('8.8.8.8',53)),s.getsockname()[0],s.close())forsin[socket.socket(socket.AF_INET,socket.SOCK_DGRAM)]][0][1]])ifl][0][0].replace('.','_')# manage tube namecontrol_tube_name='control_queue_%s'%iplogger.info("control tube is %s"%control_tube_name)# create tube for manage workerscreate_tube(control_tube_name)atp=ATP(asyncio_debug=True,logger=logger)task_processor=TarantoolProcessor(atp=atp,host=TARANTOOL_HOST,port=TARANTOOL_PORT,user=TARANTOOL_USER,password=TARANTOOL_PASS,connection_max_retries=3,connection_retry_countdown=3)# Add function to task processor. Tarantool data from queue will be in `self` argument in function. 20 parallel workers# will be started.task_processor.add_task(foo=test_func,queue=TARANTOOL_QUEUE,args=[1,'hello world'],bind=True,max_workers=20,max_retries=5,retry_countdown=1)# Add task for listen manage tube commands. In this case if you start your app on different hosts,# you would control all host, because ip in control queue and different queues will be created for each host.# You can try to manage workers from tarantool console. Example command:# queue.tube.control_queue_<your ip>:put({ foo='test_func', command = 'start', max_workers = 2})task_processor.add_task(foo=add_task,queue=control_tube_name,args=[task_processor],bind=True)# Start async-task-processoratp.start()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java GridLayout超出了它的大小   java为什么SSHJ的最大写入大小是32KB?   部署后,java无法查看网站。war文件到tomcat   java如何使用gradle将javafx应用程序部署为可执行jar或exe?   java比较HashMap中的键和值   使用注入java和spring boot的RestTemplate类进行单元测试   java如何在eclipse中找到maven项目的原型?   java继承和实用程序方法及变量   java在Joptionpane中显示多行?   当location==null时,java会更改按钮的可见性   java为什么Google Drive getExportLinks返回空值?   java在使用自定义消息完成for循环后引发异常   curl如何使用docker运行undertow java应用程序