芹菜:找不到模块

2024-04-26 09:22:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用开放语义搜索(OSS),我想使用Flower tool监视它的进程。芹菜需要的工人应该在its website上作为OSS状态给出

The workers will do tasks like analysis and indexing of the queued files. The workers are implemented by etl/tasks.py and will be started automatically on boot by the service opensemanticsearch.

此tasks.py文件如下所示:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

#
# Queue tasks for batch processing and parallel processing
#

# Queue handler
from celery import Celery

# ETL connectors
from etl import ETL
from etl_delete import Delete
from etl_file import Connector_File
from etl_web import Connector_Web
from etl_rss import Connector_RSS


verbose = True
quiet = False

app = Celery('etl.tasks')
app.conf.CELERYD_MAX_TASKS_PER_CHILD = 1

etl_delete = Delete()
etl_web = Connector_Web()
etl_rss = Connector_RSS()


#
# Delete document with URI from index
#

@app.task(name='etl.delete')
def delete(uri):
    etl_delete.delete(uri=uri)


#
# Index a file
#

@app.task(name='etl.index_file')
def index_file(filename, wait=0, config=None):

    if wait:
        time.sleep(wait)

    etl_file = Connector_File()

    if config:
        etl_file.config = config

    etl_file.index(filename=filename)

#
# Index file directory
#

@app.task(name='etl.index_filedirectory')
def index_filedirectory(filename):

    from etl_filedirectory import Connector_Filedirectory

    connector_filedirectory = Connector_Filedirectory()

    result = connector_filedirectory.index(filename)

    return result


#
# Index a webpage
#
@app.task(name='etl.index_web')
def index_web(uri, wait=0, downloaded_file=False, downloaded_headers=[]):

    if wait:
        time.sleep(wait)

    result = etl_web.index(uri, downloaded_file=downloaded_file, downloaded_headers=downloaded_headers)

    return result


#
# Index full website
#

@app.task(name='etl.index_web_crawl')
def index_web_crawl(uri, crawler_type="PATH"):

    import etl_web_crawl

    result = etl_web_crawl.index(uri, crawler_type)

    return result


#
# Index webpages from sitemap
#

@app.task(name='etl.index_sitemap')
def index_sitemap(uri):

    from etl_sitemap import Connector_Sitemap

    connector_sitemap = Connector_Sitemap()

    result = connector_sitemap.index(uri)

    return result


#
# Index RSS Feed
#

@app.task(name='etl.index_rss')
def index_rss(uri):

    result = etl_rss.index(uri)

    return result


#
# Enrich with / run plugins
#

@app.task(name='etl.enrich')
def enrich(plugins, uri, wait=0):

    if wait:
        time.sleep(wait)

    etl = ETL()
    etl.read_configfile('/etc/opensemanticsearch/etl')
    etl.read_configfile('/etc/opensemanticsearch/enhancer-rdf')

    etl.config['plugins'] = plugins.split(',')

    filename = uri

    # if exist delete protocoll prefix file://
    if filename.startswith("file://"):
        filename = filename.replace("file://", '', 1)

    parameters = etl.config.copy()

    parameters['id'] = uri
    parameters['filename'] = filename

    parameters, data = etl.process (parameters=parameters, data={})

    return data


#
# Read command line arguments and start
#

#if running (not imported to use its functions), run main function
if __name__ == "__main__":

    from optparse import OptionParser 

    parser = OptionParser("etl-tasks [options]")
    parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False, help="Don\'t print status (filenames) while indexing")
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print debug messages")

    (options, args) = parser.parse_args()

    if options.verbose == False or options.verbose==True:
        verbose = options.verbose
        etl_delete.verbose = options.verbose
        etl_web.verbose = options.verbose
        etl_rss.verbose = options.verbose

    if options.quiet == False or options.quiet==True:
        quiet = options.quiet

    app.worker_main()

我读了很多关于芹菜的教程,根据我的理解,这行应该可以完成这项工作

celery -A etl.tasks flower

但事实并非如此。结果就是

Error: Unable to load celery application. The module etl was not found.

同样适用于

celery -A etl.tasks worker --loglevel=debug

所以芹菜本身似乎引起了麻烦,而不是花。我还尝试了celery-A etl.index_filedirectory worker-loglevel=debug,但结果相同。

我错过了什么?我必须告诉芹菜哪里可以找到etl.tasks吗?在线调查并没有真正显示出类似的情况,大多数“模块未找到”的错误似乎是在导入数据时发生的。所以这可能是个愚蠢的问题,但我在任何地方都找不到解决办法。我希望你们能帮我。不幸的是,我要到星期一才能回复,对不起。


Tags: fromimportwebappverboseconnectorindexif
3条回答

我遇到了同样的问题,我安装并配置了队列,如下所示,它工作正常。

安装RabbitMQ

MacOS

brew install rabbitmq
sudo vim ~/.bash_profile

bash_profile中添加以下行:

PATH=$PATH:/usr/local/sbin

然后更新bash_profile

sudo source ~/.bash_profile

Linux

sudo apt-get install rabbitmq-server

配置RabbitMQ

启动队列:

sudo rabbitmq-server

在另一个终端中,配置队列:

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

推出芹菜

我建议进入包含task.py的文件夹并使用以下命令:

celery -A task worker -l info -Q celery --concurrency 5

注意这个错误意味着两件事:

  1. 模块丢失
  2. 模块存在但无法加载。如果它有错误,例如SyntaxError。

要检查是否不是后者,请运行:

python -c "import <myModuleContainingTasksDotPyFile>" 

关于这个问题:

python -c "import etl" 

如果它崩溃了,首先修复它(与芹菜不同,您将得到一个详细的错误消息)。

尝试export PYTHONPATH=<parent directory>,其中父目录是etl所在的文件夹。去找芹菜工人,看看能不能解决你的问题。这可能是最常见的芹菜“问题”(不是真正的芹菜,而是一般的Python)。或者,从那个文件夹运行芹菜工人。

相关问题 更多 >