Python , 子线程中函数超时,且不使用信号和thread.join
我想给一个在子线程中调用的函数加个超时限制。因为我不能使用信号,因为信号必须在主线程中使用。还有,我也不能用 thread.join(time_out)
,因为这个函数有时候几秒钟就能执行完,而在这种情况下,线程总是会等到 time_out
。
有没有其他的方法呢?
来源:
thread.join
: 在Python中使用线程的超时函数不起作用signal
: 如果执行时间太长的超时函数
1 个回答
0
你可以使用一个不太为人所知的 ctyps
小技巧,来针对特定的线程引发一个 TimeoutError
错误。我用这种方法做了一个 non_blocking
超时脚本,并刚刚在 GitHub 上发布了:https://github.com/levimluke/PyTimeoutAfter
这个实现起来非常简单,但从技术上来说有点复杂。
def raise_caller(self):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._caller_thread._ident), ctypes.py_object(self._exception))
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(self._caller_thread._ident, NULL)
raise SystemError("PyThreadState_SetAsyncExc failed")
https://github.com/levimluke/PyTimeoutAfter/blob/main/timeout_after.py#L47
我使用了一个类对象,并保存了调用线程,这样我就可以在定时的子线程中从父类中引发一个异常。