我正在使用开放语义搜索(OSS),我想使用Flower tool监视它的进程。芹菜需要的工人应该在its website上作为OSS状态给出
The workers will do tasks like analysis and indexing of the queued files. The workers are implemented by etl/tasks.py and will be started automatically on boot by the service opensemanticsearch.
此tasks.py文件如下所示:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Queue tasks for batch processing and parallel processing
#
# Queue handler
from celery import Celery
# ETL connectors
from etl import ETL
from etl_delete import Delete
from etl_file import Connector_File
from etl_web import Connector_Web
from etl_rss import Connector_RSS
verbose = True
quiet = False
app = Celery('etl.tasks')
app.conf.CELERYD_MAX_TASKS_PER_CHILD = 1
etl_delete = Delete()
etl_web = Connector_Web()
etl_rss = Connector_RSS()
#
# Delete document with URI from index
#
@app.task(name='etl.delete')
def delete(uri):
etl_delete.delete(uri=uri)
#
# Index a file
#
@app.task(name='etl.index_file')
def index_file(filename, wait=0, config=None):
if wait:
time.sleep(wait)
etl_file = Connector_File()
if config:
etl_file.config = config
etl_file.index(filename=filename)
#
# Index file directory
#
@app.task(name='etl.index_filedirectory')
def index_filedirectory(filename):
from etl_filedirectory import Connector_Filedirectory
connector_filedirectory = Connector_Filedirectory()
result = connector_filedirectory.index(filename)
return result
#
# Index a webpage
#
@app.task(name='etl.index_web')
def index_web(uri, wait=0, downloaded_file=False, downloaded_headers=[]):
if wait:
time.sleep(wait)
result = etl_web.index(uri, downloaded_file=downloaded_file, downloaded_headers=downloaded_headers)
return result
#
# Index full website
#
@app.task(name='etl.index_web_crawl')
def index_web_crawl(uri, crawler_type="PATH"):
import etl_web_crawl
result = etl_web_crawl.index(uri, crawler_type)
return result
#
# Index webpages from sitemap
#
@app.task(name='etl.index_sitemap')
def index_sitemap(uri):
from etl_sitemap import Connector_Sitemap
connector_sitemap = Connector_Sitemap()
result = connector_sitemap.index(uri)
return result
#
# Index RSS Feed
#
@app.task(name='etl.index_rss')
def index_rss(uri):
result = etl_rss.index(uri)
return result
#
# Enrich with / run plugins
#
@app.task(name='etl.enrich')
def enrich(plugins, uri, wait=0):
if wait:
time.sleep(wait)
etl = ETL()
etl.read_configfile('/etc/opensemanticsearch/etl')
etl.read_configfile('/etc/opensemanticsearch/enhancer-rdf')
etl.config['plugins'] = plugins.split(',')
filename = uri
# if exist delete protocoll prefix file://
if filename.startswith("file://"):
filename = filename.replace("file://", '', 1)
parameters = etl.config.copy()
parameters['id'] = uri
parameters['filename'] = filename
parameters, data = etl.process (parameters=parameters, data={})
return data
#
# Read command line arguments and start
#
#if running (not imported to use its functions), run main function
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser("etl-tasks [options]")
parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False, help="Don\'t print status (filenames) while indexing")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print debug messages")
(options, args) = parser.parse_args()
if options.verbose == False or options.verbose==True:
verbose = options.verbose
etl_delete.verbose = options.verbose
etl_web.verbose = options.verbose
etl_rss.verbose = options.verbose
if options.quiet == False or options.quiet==True:
quiet = options.quiet
app.worker_main()
我读了很多关于芹菜的教程,根据我的理解,这行应该可以完成这项工作
celery -A etl.tasks flower
但事实并非如此。结果就是
Error: Unable to load celery application. The module etl was not found.
同样适用于
celery -A etl.tasks worker --loglevel=debug
所以芹菜本身似乎引起了麻烦,而不是花。我还尝试了celery-A etl.index_filedirectory worker-loglevel=debug,但结果相同。
我错过了什么?我必须告诉芹菜哪里可以找到etl.tasks吗?在线调查并没有真正显示出类似的情况,大多数“模块未找到”的错误似乎是在导入数据时发生的。所以这可能是个愚蠢的问题,但我在任何地方都找不到解决办法。我希望你们能帮我。不幸的是,我要到星期一才能回复,对不起。
我遇到了同样的问题,我安装并配置了队列,如下所示,它工作正常。
安装RabbitMQ
MacOS
在
bash_profile
中添加以下行:PATH=$PATH:/usr/local/sbin
然后更新
bash_profile
:sudo source ~/.bash_profile
Linux
sudo apt-get install rabbitmq-server
配置RabbitMQ
启动队列:
sudo rabbitmq-server
在另一个终端中,配置队列:
推出芹菜
我建议进入包含
task.py
的文件夹并使用以下命令:celery -A task worker -l info -Q celery --concurrency 5
注意这个错误意味着两件事:
要检查是否不是后者,请运行:
关于这个问题:
如果它崩溃了,首先修复它(与芹菜不同,您将得到一个详细的错误消息)。
尝试
export PYTHONPATH=<parent directory>
,其中父目录是etl
所在的文件夹。去找芹菜工人,看看能不能解决你的问题。这可能是最常见的芹菜“问题”(不是真正的芹菜,而是一般的Python)。或者,从那个文件夹运行芹菜工人。相关问题 更多 >
编程相关推荐