Python 3.4 concurrent.futures.Executor无法控制线程暂停和恢复
我正在使用 concurrent.future.ThreadPoolExecutor 来进行多线程处理,主要是执行一些 HTTP 服务。我想要控制线程的执行,当服务器宕机时暂停执行,等服务器启动后再继续执行。
服务器宕机的触发条件是,我会检查某个特定位置是否有一个文件,如果没有这个文件,我就需要暂停执行。
所以,concurrent.futures.Executor.shutdown() 方法会告诉执行器在当前待处理的任务完成后,释放它正在使用的资源。
但是,当我使用执行器的 shutdown() 方法时,它并不会立即关闭线程,而是在完成所有执行后才会调用 shutdown()。
实际上,我调用 shutdown() 方法是因为在 concurrent.future 中找不到暂停和恢复的功能。因此,作为替代方案,我在线程完成执行后会从列表中移除 URL,这样我就可以传递剩下的列表并重新调用同样的方法。
以下是代码:
import concurrent.futures
import urllib.request
import os.path
import datetime
import sys
import pathlib
from errno import ENOENT, EACCES, EPERM
import time
import threading
listOfFilesFromDirectory = []
webroot = settings.configuration.WEBSERVER_WEBROOT
WEBSERVER_PORT = settings.configuration.WEBSERVER_PORT
shutdown = False
def class myclass:
#populating the list with the urls from a file
def triggerMethod(path):
try:
for line in open(path):
listOfFilesFromDirectory.append(line)
except IOError as err:
if err.errno == ENOENT:
#logging.critical("document.txt file is missing")
print("document.txt file is missing")
elif err.errno in (EACCES, EPERM):
#logging.critical("You are not allowed to read document.txt")
print("You are not allowed to read document.txt")
else:
raise
# calling this method to stop the threads and restart after a sleep of 100 secs, as the list will always have the urls that were not executed.
def stopExecutor(executor):
filePath = "C:\logs\serverStopLog.txt"
while not shutdown:
time.sleep(5)
if os.path.isfile(filePath):
executor.shutdown( )
time.sleep(100)
runRegressionInMultipleThreads( )
break
def load_url(url, timeout):
conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + "/" + url, timeout = timeout)
return conn.info()
def trigegerFunc( ):
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in listOfFilesFromDirectory}
t = threading.Thread(target=stopExecutor, args=(executor))
t.start()
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
listOfFilesFromDirectory.remove(url)
else:
if data:
if "200" in data:
listOfFilesFromDirectory.remove(url)
else:
listOfFilesFromDirectory.remove(url)
else:
listOfFilesFromDirectory.remove(url)
shutdown = True
t.join()
triggerMethod("C:\inetpub\wwwroot")
trigegerFunc()
1 个回答
1
在Python中,你不能取消或暂停/恢复线程。executor.shutdown()
的确是你在引用文档时所说的那样:
通知执行器释放它正在使用的任何资源,当当前待处理的任务执行完毕后。
注意加粗的部分——执行器只有在所有正在执行的任务完成后才会关闭。如果你想要更好的控制,就需要把urllib
的调用放在一个单独的进程中,像这样(这是你脚本的简化版本):
import time
import os.path
import threading
import urllib.request
import multiprocessing
import concurrent.futures
from multiprocessing import cpu_count
shutdown = False
should_cancel = False
def stopTasks():
global should_cancel
filePath = "C:\logs\serverStopLog.txt"
while not shutdown:
time.sleep(5)
if os.path.isfile(filePath):
should_cancel = True
break
def _load_url(num, timeout, q):
conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT +
"/" + url, timeout=timeout)
q.put(conn.info())
def load_url(num, timeout):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=_load_url, args=(num, timeout, q))
p.start()
while p.is_alive():
time.sleep(.5)
if should_cancel:
p.terminate() # This will actually kill the process, cancelling the operation
break # You could return something here that indicates it was cancelled, too.
else:
# We'll only enter this if we didn't `break` above.
out = q.get()
p.join()
return out
def triggerFunc():
global shutdown
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60):
url for url in listOfFilesFromDirectory}
t = threading.Thread(target=stopTasks)
t.start()
for future in concurrent.futures.as_completed(future_to_url):
info = future.result()
print("done: {}".format(info))
# other stuff you do
shutdown = True
t.join()
if __name__ == "__main__":
triggerFunc()
因为我们可以通过发送SIGTERM信号来杀死一个子进程,所以我们可以在urlopen
操作还在进行时真正取消它。