2024-04-28 17:31:05 发布
网友
我不熟悉芹菜。我见过的所有例子都是从命令行启动芹菜工人。e、 g组:
$ celery -A proj worker -l info
我正在启动一个关于ElasticBeanstalk的项目,我想让worker成为我的web应用程序的子进程会很好。我尝试过使用多处理,它似乎有效。我想知道这是个好主意,还是可能有一些缺点。在
似乎是个不错的选择,绝对不是唯一的选择,而是一个不错的选择:)
您可能需要研究(您可能已经在做这件事),就是将自动缩放与芹菜队列的大小相关联。因此,只有在队列增长时才会扩大。在
实际上,芹菜内部也有类似的作用,所以没有太大区别。我能想到的唯一困难是处理外部资源(例如数据库连接),这可能是一个问题,但完全取决于您正在使用芹菜做什么。在
如果有人感兴趣的话,我确实通过预配置的运行python3.4的AMI服务器来研究弹性Beanstalk。我在运行Debian Jessie的基于Docker的服务器上遇到了很多问题。可能和端口重新映射有关。Docker是一个黑匣子,我发现它很难使用和调试。幸运的是,AWS的优秀员工在2015年4月8日添加了一个非docker python3.4选项。在
我做了大量的搜索来部署和工作。我看到很多问题没有答案。下面是我部署的非常简单的python3.4/flask/celery进程。在
芹菜你可以只pip安装。您需要使用config命令或container_命令从配置文件安装rabbitmq。我在上传的projectzip中使用了一个脚本,因此需要一个container_命令来使用该脚本(常规的eb config命令发生在项目安装之前)。在
[yourprovot]/.ebextensions/05\u安装_rabbitmq.config文件公司名称:
container_commands: 01RunScript: command: bash ./init_scripts/app_setup.sh
[YourApprot]/init_脚本/app_设置.sh公司名称:
我在做一个烧瓶应用程序,所以我在第一个请求之前启动了workers:
@app.before_first_request def before_first_request(): task_mgr.start_celery()
任务管理器创建celery应用程序对象(我称之为celery,因为flask应用程序对象是app)。对于一个简单的任务管理器来说,空气是非常关键的。任务预取有各种奇怪的行为。这应该是默认值吗?在
任务经理/任务_py经理公司名称:
import celery as celery_module import multiprocessing class WorkerProcess(multiprocessing.Process): def __init__(self): super().__init__(name='celery_worker_process') def run(self): argv = [ 'worker', ' loglevel=WARNING', ' hostname=local', '-Ofair', ] celery.worker_main(argv) def start_celery(): global worker_process multiprocessing.set_start_method('fork') # 'spawn' seems to work also worker_process = WorkerProcess() worker_process.start() def stop_celery(): global worker_process if worker_process: worker_process.terminate() worker_process = None worker_name = 'celery@local' worker_process = None celery = celery_module.Celery() celery.config_from_object('task_mgr.celery_config')
到目前为止,我的配置非常简单:
任务经理/芹菜_配置.py公司名称:
BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json CELERY_RESULT_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json CELERY_TASK_RESULT_EXPIRES = 18000 # Results hang around for 5 hours CELERYD_CONCURRENCY = 4
然后,您可以将任务放在任何需要的地方:
from task_mgr.task_mgr import celery import time @celery.task(bind=True) def error_task(self): self.update_state(state='RUNNING') time.sleep(10) raise KeyError('im an error') @celery.task(bind=True) def long_task(self): self.update_state(state='RUNNING') time.sleep(20) return 'long task finished' @celery.task(bind=True) def task_with_status(self, wait): self.update_state(state='RUNNING') for i in range(5): time.sleep(wait) self.update_state( state='PROGRESS', meta={ 'current': i + 1, 'total': 5, 'status': 'progress', 'host': self.request.hostname, } ) time.sleep(wait) return 'finished with wait = ' + str(wait)
我还保留一个任务队列来保存异步结果,以便监视任务:
task_queue = [] def queue_task(task, *args): async_result = task.apply_async(args) task_queue.append( { 'task_name':task.__name__, 'task_args':args, 'async_result':async_result } ) return async_result def get_tasks_info(): tasks = [] for task in task_queue: task_name = task['task_name'] task_args = task['task_args'] async_result = task['async_result'] task_id = async_result.id task_state = async_result.state task_result_info = async_result.info task_result = async_result.result tasks.append( { 'task_name': task_name, 'task_args': task_args, 'task_id': task_id, 'task_state': task_state, 'task_result.info': task_result_info, 'task_result': task_result, } ) return tasks
当然,从需要的地方开始:
from webapp.app import app from flask import url_for, render_template, redirect from webapp import tasks from task_mgr import task_mgr @app.route('/start_all_tasks') def start_all_tasks(): task_mgr.queue_task(tasks.long_task) task_mgr.queue_task(tasks.error_task) for i in range(1, 9): task_mgr.queue_task(tasks.task_with_status, i * 2) return redirect(url_for('task_status')) @app.route('/task_status') def task_status(): current_tasks = task_mgr.get_tasks_info() return render_template( 'parse/task_status.html', tasks=current_tasks )
就这样。如果你需要帮助,尽管我对芹菜的了解还很有限。在
似乎是个不错的选择,绝对不是唯一的选择,而是一个不错的选择:)
您可能需要研究(您可能已经在做这件事),就是将自动缩放与芹菜队列的大小相关联。因此,只有在队列增长时才会扩大。在
实际上,芹菜内部也有类似的作用,所以没有太大区别。我能想到的唯一困难是处理外部资源(例如数据库连接),这可能是一个问题,但完全取决于您正在使用芹菜做什么。在
如果有人感兴趣的话,我确实通过预配置的运行python3.4的AMI服务器来研究弹性Beanstalk。我在运行Debian Jessie的基于Docker的服务器上遇到了很多问题。可能和端口重新映射有关。Docker是一个黑匣子,我发现它很难使用和调试。幸运的是,AWS的优秀员工在2015年4月8日添加了一个非docker python3.4选项。在
我做了大量的搜索来部署和工作。我看到很多问题没有答案。下面是我部署的非常简单的python3.4/flask/celery进程。在
芹菜你可以只pip安装。您需要使用config命令或container_命令从配置文件安装rabbitmq。我在上传的projectzip中使用了一个脚本,因此需要一个container_命令来使用该脚本(常规的eb config命令发生在项目安装之前)。在
[yourprovot]/.ebextensions/05\u安装_rabbitmq.config文件公司名称:
[YourApprot]/init_脚本/app_设置.sh公司名称:
^{pr2}$我在做一个烧瓶应用程序,所以我在第一个请求之前启动了workers:
任务管理器创建celery应用程序对象(我称之为celery,因为flask应用程序对象是app)。对于一个简单的任务管理器来说,空气是非常关键的。任务预取有各种奇怪的行为。这应该是默认值吗?在
任务经理/任务_py经理公司名称:
到目前为止,我的配置非常简单:
任务经理/芹菜_配置.py公司名称:
然后,您可以将任务放在任何需要的地方:
我还保留一个任务队列来保存异步结果,以便监视任务:
当然,从需要的地方开始:
就这样。如果你需要帮助,尽管我对芹菜的了解还很有限。在
相关问题 更多 >
编程相关推荐