pythonsceleri在env上运行一组函数而不重叠

2024-04-25 04:07:29 发布

您现在位置:Python中文网/ 问答频道 /正文

尝试用芹菜处理以下案件:

示例:

  • 有一组不同的测试(每个测试都是一个带参数的dict)

  • 有5个不同的环境来运行这些测试。

我可以在5个EVN上同时执行一个测试(多处理)。现在正试着吃芹菜。在

问题是测试是如何执行的,它们不应该在一个env中重叠。在

因此,一个env应该在同一时间只运行一个测试,但是Celery和worker和队列可以执行它们全部it-total mashup。我尝试使用“Canvas primitives”并在循环中添加“test+evns”。在

比如:

tests_set_list =  [{test1}, {test2}, {test3}]
envs_list = [{alpha}, {bravo}, {charlie}, {delta}, {echo}]

变体1: 在这种情况下,所有的任务都被添加到队列中,最后只执行一个,没有任何顺序,因此它们重叠。在

^{pr2}$

变量2:-->;TypeError:“AsyncResult”对象不可订阅 在这种情况下,任务只添加第一次迭代,然后停止。在

""" This loop create env inst for each task. Works in group
    Works bad, did not add task in worker queue. 
"""
list_of_groups = []
for test_item in tests_set_list:
    tasks_group_by_env = []
    for env_item in synced_envs:
        r_key = 'tests_run.bunch_test_execute_env_q_task.{0}.{1}'.format(test_item['test_folder_name'],
                                                                          env_item['env_codename'])
        task = bunch_test_execute_env_q_task(env_item, [test_item], branch)
        tasks_group_by_env.append(task)
    groups = group(tasks_group_by_env)
    list_of_groups.append(groups)

for group_item in list_of_groups:
    # job = group(*group_item)
    job = group_item
    job.apply_async(queue='tests_run')

变量3:-->;类型错误:无序类型:float()>;=str() 但无论如何,任务被添加到队列中,但执行时重叠。在

""" This loop use one test item and iter envs from list of synced envs to add groups """
list_of_groups = []
for test_item in tests_set_list:
    r_key = 'tests_run.bunch_test_execute_env_q_task.{0}.{1}'
    lazy_group = group(bunch_test_execute_env_q_task.apply_async(
          args=[env_item['env_codename']],
          kwargs={'env_item': env_item,
                  'test_item': [test_item],
                  'tkn_branch': branch,
                  },
          queue='tests_run',
          routing_key=r_key.format(test_item['pattern_folder_name'],
                                   env_item['env_codename']))for env_item in envs_list)
    # list_of_tasks.append(task)
    list_of_groups.append(lazy_group)

all_groups = group(*list_of_groups)
all_groups()

总而言之,我想达到的目标是: 每个测试都有单独的测试环境。这是一项单独的工作:

job_1 = alpha(test1)
job_2 = bravo(test1)
job_3 = charlie(test1)
job_4 = delta(test1)
job_5 = echo(test1)
...
job_n+1 = alpha(test_n+1)
job_n+1 = bravo(test_n+1)
job_n+1 = charlie(test_n+1)
job_n+1 = delta(test_n+1)
job_n+1 = echo(test_n+1)

这看起来可以通过为每个测试环境添加单独的worker来实现,因此它们永远不会相互重叠,因为在不同的进程中运行worker,但这并不优雅,因为环境可以随时间变化。在


Tags: ofintestenvfortaskgroupjob
1条回答
网友
1楼 · 发布于 2024-04-25 04:07:29

简单的答案是使用队列:

在py上的样子: -这只是为了向现有的辅助进程添加队列。如何增加工人-见芹菜妖魔化博士。在

app.control.add_consumer(
    queue='alpha',
    exchange = 'tests_run',
    exchange_type = 'direct',
    routing_key   = 'alpha.*',
    destination = ['alpha@tentacle'])

然后,如果要同时执行5个不同的任务而不重叠(例如使用5个diff env),则只需添加以下任务:

queue=“理想队列”

^{pr2}$

这是什么意思? 你可以让工人代表你的每一个工作环境:

  • 阿尔法,贝塔,查理,德尔塔,回声

所以在for循环中执行一个任务,每个任务都使用您想要的env。所有任务都将在其工作线程上运行,因为工作线程附加了唯一的队列。所以你需要5个工人+5个队列。这就成功了。当然,您可以在每个任务流程中包括偶数线程,并使更多线程化。在

相关问题 更多 >