我有一个使用Python 3.8.2使用concurrent.futures.ThreadPoolExecutor对页面进行爬网的脚本。从本质上讲,它会在一个页面上抓取链接,使用sqlalchemy将它们存储在sqlite中,然后再移动到下一个页面
我有一个问题,但脚本永远不会完成。我已经确保使用两个print语句完成所有进程,但是脚本只是挂起,从未完成。关于如何处理并发性和sqlite会话,我是否遗漏了什么
from sqlalchemy import create_engine, Column, String
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
def crawl(link):
print('Starting: {}'.format(link))
session = Session()
html = requests.get(url, timeout=10)
soup = BeautifulSoup(html.text, 'lxml')
links = [entry.get('href') for entry in soup.find_all('a', clazz)]
for link in links:
data = {
'type': self.type,
'status': self.status,
'url': link
}
if not session.query(exists().where(Table.url == link)).scalar():
d = DataEntry(**data)
session.add(d)
session.commit()
print('Finished: {}'.format(link))
def main():
links = ['www.link1.com', 'www.link2', ....]
with futures.ThreadPoolExecutor(max_workers=4) as executor:
the_futures = [executor.submit(crawl_for_listings, task) for task in tasks]
for future in the_futures:
try:
result = future.result()
except Exception as e:
print('Thread threw exception:', e)
if __name__ == "__main__":
engine = create_engine("sqlite:///database.sql")
Base = declarative_base()
class Links(Base):
__tablename__ = 'links'
url = Column(String, primary_key=True)
type = Column(String)
status = Column(String)
Base.metadata.create_all(engine)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
main()
Session.remove()
您对
submit
的调用应该是:不是:
在第一种情况下,您将向
submit
传递对函数及其参数的引用。在第二种情况下,您首先调用函数,然后将该调用的返回值传递给submit
,该返回值看起来是None
。然后,您应该保存返回的future
对象,并且可以在线程出现时测试线程的完成情况,因此:或更“Pythonically”:
还请注意,我正在使用上下文管理器创建变量
executor
,以便在块终止时完成任何必要的清理(调用shutdown
,这将等待所有未来完成,但我在退出块之前显式地等待未来完成)如果您关心结果是否按创建顺序返回(在这种情况下,您不会这样做,因为返回的结果总是
None
):然而,当您想要获得所有结果时,上面的
executor.map
函数并不是那么方便,并且一个或多个线程可能会引发异常,因为您将无法从引发异常的第一个线程之外的线程检索结果(即使假设您使用try/except
块以获取结果)。当您调用的函数接受的不是一个参数时,使用它也更复杂。因此,在这些情况下,最好使用期货:有了以上这些,我仍然不知道你的程序为什么没有终止。有一点是肯定的:您不是多线程。
相关问题 更多 >
编程相关推荐