我使用python异步队列来下载网络中的文件,但是当我启动3个worker来使用队列时,只有一个工作

2024-05-16 12:56:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我是新使用的python aysnc函数。 我想启动3个工人批量下载文件在网络。以及我使用的是pythonaysnc队列函数。但当我启动程序时,只有一个工人在运行。在

import requests
import json
import common
import asyncio
import time


def http_get(index,type,tag):
    page_res = requests.get('{}/{}/{}/{}'.format(common.es_url,index,type,tag), headers={
            'Content-Type': 'application/json'
        })
    return page_res.status_code, page_res.content


def http_post(index, type, tag, o):
    resp = requests.post('{}/{}/{}/{}'.format(common.es_url, index, type, tag), headers={
        'Content-Type': 'application/json'
    }, data=json.dumps(o))
    return resp


def get_pubchem_tag():
    status_code, content = http_get('configure', '_doc', 'pubchem_tag')
    # print(page_res.status_code)

    if status_code == 404:
        bak_status_code, bak_content = http_get('configure_bak', '_doc', 'pubchem_tag')
        if bak_status_code == 404:
            o = {
                'key': 'pubchem_page',
                'record':0
            }
            http_post('configure', '_doc', 'pubchem_tag', o)
            http_post('configure_bak', '_doc', 'pubchem_tag', o)
            return o['record']
        else:
            bak_content_obj = json.loads(bak_content)
            http_post('configure', '_doc', 'pubchem_tag', bak_content_obj['_source'])
            return bak_content_obj['_source']['record']
    else:
        content_obj = json.loads(content)
        return content_obj['_source']['record']


async def get_pubchem_worker(name, queue):
    while True:
        page = await queue.get()
        url = 'https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{}/JSON/?response_type=display'.format(page)

        html_resp = requests.get(url)
        if html_resp.status_code in {200, 201}:
            obj = json.loads(html_resp.content)
            o = {
                "key": obj['Record']['RecordNumber'],
                "value": json.dumps(obj['Record'])
            }
            r = http_post('html_compounds', '_doc', obj['Record']['RecordNumber'], o)
            if r.status_code in {200, 201}:
                o = {
                    'record': page
                }
                r = http_post('configure', '_doc', 'pubchem_tag', o)
                if r.status_code not in {200, 201}:
                    http_post('configure', '_doc', 'pubchem_tag', o)
                r = http_post('configure_bak', '_doc', 'pubchem_tag', o)
                if r.status_code not in {200, 201}:
                    http_post('configure_bak', '_doc', 'pubchem_tag', o)
                print(f'{name} has get pubchem record, id is {page}')
            else:
                o = {
                    "key": page,
                    "exception": "Pubchem put es exception"
                }
                http_post('pubchem_exception', '_doc', page, o)
                print(f'{name} got exception in Pubchem put es process, id is {page}, exception is {r.content}')

        else:
            o = {
                "key": page,
                "exception": "Pubchem get http exception"
            }
            http_post('pubchem_exception', '_doc', page, o)
            print(f'{name} got exception in Pubchem get HTTP process, id is {page}, exception is {html_resp.content}')

        queue.task_done()


async def multiprocess_get_pubchem():
    page = get_pubchem_tag()
    queue = asyncio.Queue()

    for i in range(1, 10):
        queue.put_nowait(page+i)

    workers = []
    for i in range(3):
        worker = asyncio.create_task(get_pubchem_worker(f'worker-{i}', queue))
        workers.append(worker)

    started_at = time.monotonic()
    await queue.join()
    total_execute_time = time.monotonic() - started_at
    print(f'Total execute time is {total_execute_time:.2f}')


if __name__ == '__main__':
    asyncio.run(multiprocess_get_pubchem())

结果如下:

^{pr2}$

我正在跟随网站的教程 https://docs.python.org/dev/library/asyncio.html

这个问题让我很困惑,但是当我删除get_pubchem_worker函数中的细节(虽然为True)时,其他两个工作线程将开始工作,但当执行一次时,它们都将阻塞。 有谁能帮我解决这个问题吗?非常感谢


当我添加

await asyncio.sleep(0.1)

以前

queue.task_done()

另一个工人开始工作了! 当触发await asyncio.sleep(0.1)时,我创建的worker是否互相监视?在


Tags: jsonobjhttpgetdocconfiguretagstatus