Python线程池执行器线程未完成

2024-04-20 14:07:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个使用Python 3.8.2使用concurrent.futures.ThreadPoolExecutor对页面进行爬网的脚本。从本质上讲,它会在一个页面上抓取链接,使用sqlalchemy将它们存储在sqlite中,然后再移动到下一个页面

我有一个问题,但脚本永远不会完成。我已经确保使用两个print语句完成所有进程,但是脚本只是挂起,从未完成。关于如何处理并发性和sqlite会话,我是否遗漏了什么

from sqlalchemy import create_engine, Column, String
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base


def crawl(link):
    print('Starting: {}'.format(link))
    session = Session()
    html = requests.get(url, timeout=10)
    soup = BeautifulSoup(html.text, 'lxml')

    links = [entry.get('href') for entry in soup.find_all('a',  clazz)]
    for link in links:
        data = {
            'type': self.type,
            'status': self.status,
            'url': link
        }
        if not session.query(exists().where(Table.url == link)).scalar():
            d = DataEntry(**data)
            session.add(d)
            session.commit()

    print('Finished: {}'.format(link))

def main():
    links = ['www.link1.com', 'www.link2', ....]
    with futures.ThreadPoolExecutor(max_workers=4) as executor:
        the_futures = [executor.submit(crawl_for_listings, task) for task in tasks]
        for future in the_futures:
            try:
                result = future.result()
            except Exception as e:
                print('Thread threw exception:', e)

if __name__ == "__main__":
    engine = create_engine("sqlite:///database.sql")
    Base = declarative_base()

    class Links(Base):
        __tablename__ = 'links'

        url = Column(String, primary_key=True)
        type = Column(String)
        status = Column(String)

    Base.metadata.create_all(engine)

    session_factory = sessionmaker(bind=engine)
    Session = scoped_session(session_factory)

    main()

    Session.remove()

Tags: in脚本urlforsqlitestringsqlalchemysession
1条回答
网友
1楼 · 发布于 2024-04-20 14:07:03

您对submit的调用应该是:

future = executor.submit(crawl, link)

不是:

executor.submit(crawl(link))

在第一种情况下,您将向submit传递对函数及其参数的引用。在第二种情况下,您首先调用函数,然后将该调用的返回值传递给submit,该返回值看起来是None。然后,您应该保存返回的future对象,并且可以在线程出现时测试线程的完成情况,因此:

with futures.ThreadPoolExecutor(max_workers=4) as executor: 
    the_futures = []
    for link in links:
        future = executor.submit(crawl, link)
        the_futures.append(future)
    for future in futures.as_completed(the_futures):
        #print(future.result()) # result is None in this case
        pass

或更“Pythonically”:

with futures.ThreadPoolExecutor(max_workers=4) as executor: 
    the_futures = [executor.submit(crawl, link) for link in links]
    for future in futures.as_completed(the_futures):
        pass

还请注意,我正在使用上下文管理器创建变量executor,以便在块终止时完成任何必要的清理(调用shutdown,这将等待所有未来完成,但我在退出块之前显式地等待未来完成)

如果您关心结果是否按创建顺序返回(在这种情况下,您不会这样做,因为返回的结果总是None):

with futures.ThreadPoolExecutor(max_workers=4) as executor: 
    for result in executor.map(crawl, links):
        #print(result) # None in this case
        pass

然而,当您想要获得所有结果时,上面的executor.map函数并不是那么方便,并且一个或多个线程可能会引发异常,因为您将无法从引发异常的第一个线程之外的线程检索结果(即使假设您使用try/except块以获取结果)。当您调用的函数接受的不是一个参数时,使用它也更复杂。因此,在这些情况下,最好使用期货:

with futures.ThreadPoolExecutor(max_workers=4) as executor: 
    the_futures = [executor.submit(crawl, link) for link in links]
for future in the_futures:
    try:
        result = future.result() # could throw an exception if the thread threw an exception
        print(result)
    except Exception as e:
        print('Thread threw exception:', e)

有了以上这些,我仍然不知道你的程序为什么没有终止。有一点是肯定的:您不是多线程。

相关问题 更多 >