竞争对手
futureproof的Python项目详细描述
防弹-防弹并发。未来
^{
FutureProof是一个很薄的包装,它解决了其中的一些问题并添加了一些可用性特性。
特点:
- 监视:默认情况下会记录已完成任务的摘要。
- fail fast:错误导致主线程引发异常并在默认情况下停止。
- 错误策略:用户可以决定是否引发、记录或完全忽略任务上的错误。
- backpressure control:当执行者完成任务时,大量的任务集合会被惰性地消耗掉,极大地减少了内存消耗,提高了这些情况下的响应能力。
当前状态:alpha
API可能会发生更改,任何更改都将记录在更改日志中。
FutureProof设计用于包装ThreadPoolExecutor,但是0.2+版本包含有限的支持processPoolExecutor,但仅适用于Python3.7+。
使用PraceSoPoExcRePotoor一个“OsCurry:句柄已关闭”时,从解释器退出处理程序中打印出错误,这似乎是由工人处理过早关闭造成的。这不会影响任务的执行。
期货有问题吗?什么问题?
让我们看看threadpoolexecutor的规范示例:
importconcurrent.futuresimporturllib.requestURLS=['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain-that-definitely-does-not-exist.com/']# Retrieve a single page and report the URL and contentsdefload_url(url,timeout):withurllib.request.urlopen(url,timeout=timeout)asconn:returnconn.read()# We can use a with statement to ensure threads are cleaned up promptlywithconcurrent.futures.ThreadPoolExecutor(max_workers=5)asexecutor:# Start the load operations and mark each future with its URLfuture_to_url={executor.submit(load_url,url,60):urlforurlinURLS}forfutureinconcurrent.futures.as_completed(future_to_url):url=future_to_url[future]try:data=future.result()exceptExceptionasexc:print('%r generated an exception: %s'%(url,exc))else:print('%r page is %d bytes'%(url,len(data)))
只是重申一下,这是惊人的,事实上,多线程的入口障碍如此之小,这确实证明了brian quinlan和核心python开发人员所做的伟大工作。
但是,我发现有两个问题:
- 样板。我们需要输入一个上下文管理器,调用
submit
手动跟踪未来及其参数,调用as_completed
它实际上返回一个迭代器,调用result
未来记住处理异常。 - 令人惊讶。为什么我们需要得到结果才能提高?如果我们不指望它会上涨呢?我们可能想尽快知道。
如果运行此代码,将得到以下输出(在编写本文时):
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.foxnews.com/' page is 248838 bytes
'http://www.bbc.co.uk/' page is 338658 bytes
'http://www.cnn.com/' page is 991167 bytes
'http://europe.wsj.com/' page is 970346 bytes
这是完美的。未来如何比较?
executor=futureproof.FutureProofExecutor(max_workers=5)withfutureproof.TaskManager(executor)astm:forurlinURLS:tm.submit(load_url,url,60)fortaskintm.as_completed():print("%r page is %d bytes"%(task.args[0],len(task.result)))
看起来很相似,有一个执行器和一个task manager。submit
和as_completed
是上面的方法,没有try..except
。如果我们运行它,就会得到:
'http://www.foxnews.com/' page is 248838 bytes
Traceback (most recent call last):
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1317, in do_open
encode_chunked=req.has_header('Transfer-encoding'))
... omitted traceback output ...
socket.gaierror: [Errno 8] nodename nor servname provided, or not known
请注意,futureproof
引发了立即出现的异常,并且一切都停止了,正如您在普通的非线程python中所期望的那样,这并不奇怪。
如果我们更喜欢futureproof
,可以选择使用错误策略记录甚至忽略异常。假设我们要记录异常:
logging.basicConfig(level=logging.INFO,format="[%(asctime)s%(thread)s] %(message)s",datefmt="%H:%M:%S",)executor=futureproof.FutureProofExecutor(max_workers=5)withfutureproof.TaskManager(executor,error_policy="log")astm:forurlinURLS:tm.submit(load_url,url,60)fortaskintm.as_completed():ifnotisinstance(task.result,Exception):print("%r page is %d bytes"%(task.args[0],len(task.result)))
注意,我们添加了一个复选框,以便在没有异常的情况下只打印结果,这将输出:
'http://www.foxnews.com/' page is 251088 bytes
[12:09:15 4350641600] Task Task(fn=<function load_url at 0x1029ef1e0>, args=('http://some-made-up-domain-that-definitely-does-not-exist.com/', 60), kwargs={}, result=URLError(gaierror(8, 'nodename nor servname provided, or not known')),
complete=True) raised an exception
Traceback (most recent call last):
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1317, in do_open
encode_chunked=req.has_header('Transfer-encoding'))
... omitted long traceback ...
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1319, in do_open
raise URLError(err)
urllib.error.URLError: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.bbc.co.uk/' page is 339087 bytes
'http://www.cnn.com/' page is 991167 bytes
[12:09:16 123145404444672] 5 task completed in the last 1.18 second(s)
'http://europe.wsj.com/' page is 970880 bytes
注意,我们只需要配置日志记录并传递适当的错误策略,其他一切都由我们来处理。您还可以选择完全忽略异常并自己管理它们访问result
,这是使用concurrent.futures
时的工作流。
as_completed
?
如果你考虑一下,为什么我们需要as_completed
?
答案是监控和错误处理。
如果我们有大量的url,您不想等到所有的url返回显示输出,这可能需要很长时间。但它确实增加了代码的复杂性。如果您不使用as_completed
,那么这个示例是什么样子的?
withconcurrent.futures.ThreadPoolExecutor(max_workers=5)asexecutor:future_to_url={executor.submit(load_url,url,60):urlforurlinURLS}forfuture,urlinfuture_to_url.items():try:data=future.result()exceptExceptionasexc:print("%r generated an exception: %s"%(url,exc))else:print("%r page is %d bytes"%(url,len(data)))
不过,这可以说是更具可读性,它有一个微妙的区别:在所有的未来都完成之前,没有输出。如果你想象任务需要更长的时间,你就会怀疑事情是否真的在起作用。
让我们比较一下futureproof
版本:
executor=futureproof.FutureProofExecutor(max_workers=5)withfutureproof.TaskManager(executor,error_policy="ignore")astm:forurlinURLS:tm.submit(load_url,url,60)fortaskintm.completed_tasks:ifisinstance(task.result,Exception):print("%r generated an exception: %s"%(task.args[0],task.result))else:print("%r page is %d bytes"%(task.args[0],len(task.result)))
[12:40:28 123145393414144] Starting executor monitor
[12:40:29 123145393414144] 5 task completed in the last 1.01 second(s)
[12:40:29 123145393414144] Shutting down monitor...
'http://www.foxnews.com/' page is 252016 bytes
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.cnn.com/' page is 992648 bytes
'http://www.bbc.co.uk/' page is 338987 bytes
'http://europe.wsj.com/' page is 969285 bytes
futureproof
默认记录任务的监视信息,因此你总是知道事情是否正常。请注意任务管理器如何公开completed_tasks
以允许轻松访问结果,而不必手动跟踪未来。最后,如前所述,您还可以完全控制异常处理,因此也不需要为此添加代码。
这些都是相当小的问题,我们可以使用concurrent.futures
手动解决,但是当开始处理更多的任务时,会出现其他问题,请查看examples directory以了解futureproof
和concurrent.futures
在其他更严重的场景中的实际比较。
备选方案
我决不是第一个解决这些问题的人。这里有一些类似的,更稳定的,功能齐全的,尽管有限制许可的替代方案:
- Pebble,lgpl 3.0
- more-executors,平均绩点3.0
futureproof
获得mit许可。