python:Windows下的SIGALRM等效信号

21 投票
3 回答
39725 浏览
提问于 2025-04-17 07:54

我有一个装饰器:

def timed_out(timeout):
    def decorate(f):
        if not hasattr(signal, "SIGALRM"):
            return f

        def handler(signum, frame):
            raise TimedOutExc()

        @functools.wraps(f)
        def new_f(*args, **kwargs):
            old = signal.signal(signal.SIGALRM, handler)
            signal.alarm(timeout)
            try:
                result = f(*args, **kwargs)
            finally:
                signal.signal(signal.SIGALRM, old)
            signal.alarm(0)
            return result

        new_f.func_name = f.func_name
        return new_f

    return decorate

不过,这段代码只在Linux系统上有效,因为在Windows上没有SIGALRM这个东西。那么,有什么简单的方法可以让这段代码在Windows上也能运行呢?

3 个回答

0

关于最初的问题:python:Windows上SIGALRM的等效实现

在Windows上,有一种方法可以创建类似SIGALRM的事件。这种方法利用多媒体定时器来触发事件。这个定时器是一个通用的定时器,可以触发一个单独的事件(periodic=False),也可以触发多次事件(n_reps >= 1),或者在外部停止事件的情况下持续运行(通过调用.stop()函数)。

这个定时器可以非常准确地产生事件,这些事件的时间间隔不依赖于Python的运行环境。在这里,时间间隔是以秒为单位来指定的(intervalresolution)。

import ctypes as ct
from ctypes.wintypes import UINT, DWORD
from typing import Callable, Optional

class mmPeriodicTimer:
    """Periodic timer based upon the Windows Multimedia library
    This produces a timed callback with high precision ( ~ < 1ms difference)
    executed outside of a threaded structure (based on Windows runtime ).
    The advantage of this is that the Python GIL limitation is avoided,
    and an aligned "lag" time between all Python processes is not generated;

    The disadvantage is that since this is thread independent, the callback
    procedure must complete its tasks before the next callback is executed 
    otherwise collisions may occur

    This is based on the example:
    https://stackoverflow.com/questions/10717589/how-to-implement-high-speed-consistent-sampling
    """

    timeproc = ct.WINFUNCTYPE(None, ct.c_uint, ct.c_uint, DWORD, DWORD, DWORD)
    timeSetEvent = ct.windll.winmm.timeSetEvent
    timeKillEvent = ct.windll.winmm.timeKillEvent

    def Tick(self):
        self.i_reps += 1
        self.tickFunc()

        if not self.periodic:
            self.stop()

        if self.n_reps is not None and self.i_reps >= self.n_reps:
            self.stop()

    def CallBack(self, uID, uMsg, dwUser, dw1, dw2):
        if self.running:
            self.Tick()

    def __init__(
        self,
        interval: int,
        tickfunc: Callable,
        resolution: Optional[int] = 0,
        stopFunc: Optional[Callable] = None,
        periodic: Optional[bool] = True,
        n_reps: Optional[int] = None,
    ):
        self.interval = UINT(int(interval * 1000))
        self.resolution = UINT(int(resolution * 1000))
        self.tickFunc = tickfunc
        self.stopFunc = stopFunc
        self.periodic = periodic
        self.n_reps = n_reps
        if not self.periodic and self.n_reps is not None:
            raise ValueError("n_reps must be None if periodic is False")

        self.i_reps = 0
        self.id = None
        self.running = False
        self.calbckfn = self.timeproc(self.CallBack)

    def start(self, instant: bool = False):
        if not self.running:
            self.running = True
            if instant:
                self.Tick()

            self.id = self.timeSetEvent(
                self.interval,
                self.resolution,
                self.calbckfn,
                ct.c_ulong(0),
                ct.c_uint(self.periodic),
            )

    def stop(self):
        if self.running:
            self.timeKillEvent(self.id)
            self.running = False

            if self.stopFunc:
                self.stopFunc()

单个事件示例

import time
from datetime import datetime

def my_func():
    print("current time=", datetime.now())

t = mmPeriodicTimer(interval=1, tickfunc=my_func, periodic=False)
t.start()

输出: 当前时间= 2024-03-24 22:42:16.475416


多个事件示例(然后停止):

t = mmPeriodicTimer(interval=1, tickfunc=my_func, n_reps=5)
t.start()

输出:

current time= 2024-03-24 22:42:11.474459
current time= 2024-03-24 22:42:12.475201
current time= 2024-03-24 22:42:13.474462
current time= 2024-03-24 22:42:14.475113
current time= 2024-03-24 22:42:15.474802

在给定时间后手动终止示例:

t = mmPeriodicTimer(interval=1, tickfunc=my_func, periodic=True)
t.start()
time.sleep(5)
t.stop()

输出:

current time= 2024-03-24 22:46:32.864423
current time= 2024-03-24 22:46:33.865319
current time= 2024-03-24 22:46:34.864554
current time= 2024-03-24 22:46:35.864762
current time= 2024-03-24 22:46:36.864778

原始来源: 如何实现高速、一致的采样?

0

这个答案只适用于Cygwin..

...而且在最近的Cygwin版本中可能不再有效...

我觉得这个超时装饰器的代码非常实用。(我最初是在这个问题的回答中找到的:如何限制函数调用的执行时间?

为了在Windows上使用它,我使用的是安装在Cygwin里的Python。

我运行setup-x86_64.exe,然后从Python文件夹中选择python3这个包。(或者,如果你更喜欢Python 2,可以选择python这个包。)

为了把python3重命名为python2,我在Cygwin命令提示符下定义了一个别名

alias python=python3

因为我不太常用这个功能,所以可能不会把它放进.bashrc文件里或其他地方。

相关问题: Python信号在Cygwin上也不工作?

12

这段代码看起来不是特别好,但我需要在不同平台上做类似的事情,所以我想到了使用一个单独的线程。基于信号的系统在所有平台上都不太可靠。

可以把这个类用装饰器包裹起来,或者做成一个 with 上下文管理器。

你的情况可能会有所不同。

#!/usr/bin/env python2.7
import time, threading

class Ticker(threading.Thread):
  """A very simple thread that merely blocks for :attr:`interval` and sets a
  :class:`threading.Event` when the :attr:`interval` has elapsed. It then waits
  for the caller to unset this event before looping again.

  Example use::

    t = Ticker(1.0) # make a ticker
    t.start() # start the ticker in a new thread
    try:
      while t.evt.wait(): # hang out til the time has elapsed
        t.evt.clear() # tell the ticker to loop again
        print time.time(), "FIRING!"
    except:
      t.stop() # tell the thread to stop
      t.join() # wait til the thread actually dies

  """
  # SIGALRM based timing proved to be unreliable on various python installs,
  # so we use a simple thread that blocks on sleep and sets a threading.Event
  # when the timer expires, it does this forever.
  def __init__(self, interval):
    super(Ticker, self).__init__()
    self.interval = interval
    self.evt = threading.Event()
    self.evt.clear()
    self.should_run = threading.Event()
    self.should_run.set()

  def stop(self):
    """Stop the this thread. You probably want to call :meth:`join` immediately
    afterwards
    """
    self.should_run.clear()

  def consume(self):
    was_set = self.evt.is_set()
    if was_set:
      self.evt.clear()
    return was_set

  def run(self):
    """The internal main method of this thread. Block for :attr:`interval`
    seconds before setting :attr:`Ticker.evt`

    .. warning::
      Do not call this directly!  Instead call :meth:`start`.
    """
    while self.should_run.is_set():
      time.sleep(self.interval)
      self.evt.set()

撰写回答