我是新使用的python aysnc函数。 我想启动3个工人批量下载文件在网络。以及我使用的是pythonaysnc队列函数。但当我启动程序时,只有一个工人在运行。在
import requests
import json
import common
import asyncio
import time
def http_get(index,type,tag):
page_res = requests.get('{}/{}/{}/{}'.format(common.es_url,index,type,tag), headers={
'Content-Type': 'application/json'
})
return page_res.status_code, page_res.content
def http_post(index, type, tag, o):
resp = requests.post('{}/{}/{}/{}'.format(common.es_url, index, type, tag), headers={
'Content-Type': 'application/json'
}, data=json.dumps(o))
return resp
def get_pubchem_tag():
status_code, content = http_get('configure', '_doc', 'pubchem_tag')
# print(page_res.status_code)
if status_code == 404:
bak_status_code, bak_content = http_get('configure_bak', '_doc', 'pubchem_tag')
if bak_status_code == 404:
o = {
'key': 'pubchem_page',
'record':0
}
http_post('configure', '_doc', 'pubchem_tag', o)
http_post('configure_bak', '_doc', 'pubchem_tag', o)
return o['record']
else:
bak_content_obj = json.loads(bak_content)
http_post('configure', '_doc', 'pubchem_tag', bak_content_obj['_source'])
return bak_content_obj['_source']['record']
else:
content_obj = json.loads(content)
return content_obj['_source']['record']
async def get_pubchem_worker(name, queue):
while True:
page = await queue.get()
url = 'https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{}/JSON/?response_type=display'.format(page)
html_resp = requests.get(url)
if html_resp.status_code in {200, 201}:
obj = json.loads(html_resp.content)
o = {
"key": obj['Record']['RecordNumber'],
"value": json.dumps(obj['Record'])
}
r = http_post('html_compounds', '_doc', obj['Record']['RecordNumber'], o)
if r.status_code in {200, 201}:
o = {
'record': page
}
r = http_post('configure', '_doc', 'pubchem_tag', o)
if r.status_code not in {200, 201}:
http_post('configure', '_doc', 'pubchem_tag', o)
r = http_post('configure_bak', '_doc', 'pubchem_tag', o)
if r.status_code not in {200, 201}:
http_post('configure_bak', '_doc', 'pubchem_tag', o)
print(f'{name} has get pubchem record, id is {page}')
else:
o = {
"key": page,
"exception": "Pubchem put es exception"
}
http_post('pubchem_exception', '_doc', page, o)
print(f'{name} got exception in Pubchem put es process, id is {page}, exception is {r.content}')
else:
o = {
"key": page,
"exception": "Pubchem get http exception"
}
http_post('pubchem_exception', '_doc', page, o)
print(f'{name} got exception in Pubchem get HTTP process, id is {page}, exception is {html_resp.content}')
queue.task_done()
async def multiprocess_get_pubchem():
page = get_pubchem_tag()
queue = asyncio.Queue()
for i in range(1, 10):
queue.put_nowait(page+i)
workers = []
for i in range(3):
worker = asyncio.create_task(get_pubchem_worker(f'worker-{i}', queue))
workers.append(worker)
started_at = time.monotonic()
await queue.join()
total_execute_time = time.monotonic() - started_at
print(f'Total execute time is {total_execute_time:.2f}')
if __name__ == '__main__':
asyncio.run(multiprocess_get_pubchem())
结果如下:
^{pr2}$我正在跟随网站的教程 https://docs.python.org/dev/library/asyncio.html
这个问题让我很困惑,但是当我删除get_pubchem_worker函数中的细节(虽然为True)时,其他两个工作线程将开始工作,但当执行一次时,它们都将阻塞。 有谁能帮我解决这个问题吗?非常感谢
当我添加
await asyncio.sleep(0.1)
以前
queue.task_done()
另一个工人开始工作了!
当触发await asyncio.sleep(0.1)
时,我创建的worker是否互相监视?在
目前没有回答
相关问题 更多 >
编程相关推荐