脚本挂起在方法“join”上,尽管队列仍在运行

2024-04-25 23:13:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图理解Python上的异步开发。所以我编写了一些使用Asyncio和Aiohttp的web解析器。所以我遇到了一些问题,尽管异步队列已满,但我的脚本仍挂在.join()方法上。实际上我认为aiohttp.ClientSession客户端会话因为当我不在该方法中使用get request时,我没有任何问题。那我做错什么了?你知道吗

import asyncio
import aiohttp
import random
from bs4 import BeautifulSoup


class OlxParser:

    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self._queue = asyncio.Queue()
        self._run_loop = True
        self._sess = None
        self._url = 'https://www.olx.kz/elektronika/telefony-i-aksesuary/mobilnye-telefony-smartfony/alma-ata/?search%5Bprivate_business%5D=private'
        self._headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'
        }
        self._urls = []
        self._sleep_range = (1, 3)
        self.data = []

    async def _sleep(self):
        sleep_time = random.randint(*self._sleep_range)
        await asyncio.sleep(sleep_time)

    async def _consume(self):
        while True:
            url = await self._queue.get()
            html = None
            print('Pass')
            if url not in self._urls:
                self._urls.append(url)
                print('Fetching:', url)
                async with self._sess.get(url, allow_redirects=False, headers=self._headers) as res:
                    if res.status == 200:
                        html = await res.text()
                    else:
                        print('Status:', res.status)
                        print('Error url:', url)
            if html is not None:
                soup = BeautifulSoup(html, 'html.parser')
                price_label = soup.find('div', {'class': 'price-label'})
                price = price_label.find('strong')
                self.data.append(price.get_text())
                # await self._sleep()
            self._queue.task_done()

    async def _produce(self, page_num):
        url = self._url
        if page_num > 1:
            url += '&page={}'.format(page_num)
        print('Fetching:', url)
        html = None
        async with self._sess.get(url, allow_redirects=False, headers=self._headers) as res:
            if res.status == 200:
                html = await res.text()
            else:
                print('Fetching has stopped at page number:', str(page_num))
                self._run_loop = False
        if html is not None:
            soup = BeautifulSoup(html, 'html.parser')
            table = soup.find('table', {'id': 'offers_table'})
            links = table.find_all('a', {'class': 'detailsLink'})
            for link in links:
                await self._queue.put(link['href'])
        # await self._sleep()

    async def run(self):
        consumer = asyncio.ensure_future(self._consume())
        page_num = 1
        async with aiohttp.ClientSession(loop=self.loop) as sess:
            self._sess = sess
            while self._run_loop:
                await self._produce(page_num)
                page_num += 1
        await self._queue.join()
        consumer.cancel()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.loop.close()
        return exc_type is None


if __name__ == '__main__':
    with OlxParser() as obj:
        obj.loop.run_until_complete(obj.run())

Tags: runselfloopurlgetasyncifdef

热门问题