无法从另一个线程停止计时器(nidaqmxpython和回调的简短示例)

2024-04-25 09:52:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我在这个论坛上看到了关于这个话题的其他问题,但没有一个能帮助我理解如何处理这个问题。在我看来,它们中的大多数似乎都是相当复杂和冗长的代码。我相信我正在做一件相当简单的事。我希望有人能帮忙!下面是详细的解释,然后是我当前的代码

注意:请不要删除此问题。我对以下内容进行了很多思考,并仔细阅读了相关的文章,但没有任何效果。我还认为发布这篇文章是有意义的,因为它部分与一个更一般的问题有关:如何在后台运行回调的同时进行实时绘图(见最后的摘要),这可以概括为我的总体目标

设置和目标:国家仪器采集模块(这与litlle有关)NI cDAQ9178,通过nidaqmx-python接口,由NI维护的文件包here。其中输入了一些模拟信号,目标是以一定的采样率(约1000 Hz)连续采集(直到我决定停止采集),同时实时绘制信号。绘图几乎不需要如此频繁地刷新(10Hz的刷新率甚至可以)。我在conda虚拟环境中使用Windows10和Python3.7,编辑是在PyCharm中完成的。理想情况下,PyCharm和任何终端都应该工作

情况:nidaqmx-python提供高级函数,允许注册回调(根据自己的意愿进行定义),每次一定数量的样本(在我的例子中是100个,但这并不严格)填充PC缓冲区时调用回调。其思想是,下面定义的回调在该点读取缓冲区,并执行一些操作(在我的例子中,一些低通过滤,为了简洁起见,我将其取出,一些存储到全局变量data,可能还进行打印-请参见下文)

问题:我一直在胡闹,让实时打印数据的任何东西都包含在回调中,但matplotlib的情况非常糟糕,因为回调使用的线程不是主线程,matplotlib不喜欢从主线程之外的任何地方调用。我已经在谷歌上搜索了其他为实时绘图而优化的库(而且,我一直在想,希望是线程安全的),但这并不是那么容易:我无法让vispy工作,甚至无法让pyqtgraph安装,只是举几个例子。然后我在互联网上看到了一些帖子,上面写着人们用matplotlib管理相当不错的实时动画,尽管它是在考虑出版而不是这些应用程序的情况下开发的;所以我想让我们试一试

我的目标:由于我无法让matplotlib在回调内部完成工作,所以我做了以下工作(这是您在下面看到的代码):在回调之后,在任务以task.start()(这是特定于nidaqmx-python)启动之后,我只创建一个while循环,它绘制全局变量buffer。我认为这是一个很好的技巧:看,buffer每隔0.1秒左右通过回调更新(称之为更新)(无所谓),另一方面,while循环反复打印buffer变量,每次打印前都会擦除,有效地生成类似于实时的打印

注意:我非常清楚绘图部分并没有它所能做的那么好(我可能应该使用matplotlib的ax API和subplots,更不用说动画了),但我现在不在乎。我稍后会处理这个问题,并对其进行改进,使其更高效

我想要的:这实际上就是我想要的。。。除了,为了停止它,我在while循环中引入了try:except:语句,如下面的代码所示。当然,按CTRL+C键确实会打破循环。。。但它是n还会中断整个正在运行的脚本,并给我留下以下错误:forrtl: error (200): program aborting due to control-C event,在PyCharm中,从终端运行时的精度如下:

Image              PC                Routine            Line        Source
libifcoremd.dll    00007FFECF413B58  Unknown               Unknown  Unknown
KERNELBASE.dll     00007FFF219F60A3  Unknown               Unknown  Unknown
KERNEL32.DLL       00007FFF23847BD4  Unknown               Unknown  Unknown
ntdll.dll          00007FFF240CCED1  Unknown               Unknown  Unknown
QObject::~QObject: Timers cannot be stopped from another thread

不便之处在于,我别无选择,只能关闭python外壳(再次思考PyCharm),并且我无法访问宝贵的变量data,其中包含。。。嗯,我的数据

猜测:显然,回调不喜欢在这种情况下停止。nidaqmx_python任务应使用task.stop()停止。我尝试将task.stop()放在键盘中断except:之后,但这没有帮助,因为CTRL+C会在顶部停止脚本/而不是中断while循环。我相信需要一些更精确的方法来停止我的任务。我已经想了好几天了,但想不出一种方法来同时完成这两件事:一个我可以停止的任务,同时实时绘制。请注意,如果没有绘图,在按ENTER键时很容易停止任务:只需在末尾写入即可

input('Press ENTER to stop task')
task.stop()

但当然,仅仅做上述操作不允许我包括实时绘图部分

摘要:我无法从连续读取数据的回调调用matplotlib,因此我在一个单独的块中编写了一个while循环用于实时绘图,但是如果不得到上述错误(我认为这会导致回调从另一个线程停止),我将无法停止该while循环

我希望我是清楚的,如果不是,请问

代码:我已将其清理干净,使其尽可能接近显示问题的MWE,尽管我当然意识到你们中的大多数人没有NI daq可供使用和连接以运行此功能。无论如何这是:

import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants

sfreq = 1000
bufsize = 100

with nidaqmx.Task() as task:

    # Here we set up the task ... nevermind
    task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
    task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                    samps_per_chan=bufsize)
    # Here we define a stream to be read continuously
    stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

    data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
    buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback

    # This is my callback to read data continuously
    def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize is passed to num_samples when this is called
        global data
        global buffer

        buffer = np.zeros((1, num_samples))

        # This is the reading part
        stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
        data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

        return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

    # Here is the heavy lifting I believe: the above callback is registered
    task.register_every_n_samples_acquired_into_buffer_event(bufsize, reading_task_callback)
    task.start()  # The task is started (callback called periodically)

    print('Acquiring sensor data. Press CTRL+C to stop the run.\n')  # This should work ...

    fig = plt.figure()
    try:
        while True:
            # Poor's man plot updating
            plt.clf()
            plt.plot(buffer.T)
            plt.show()
            plt.pause(0.01)  # 100 Hz refresh rate
    except KeyboardInterrupt:  # stop loop with CTRL+C ... or so I thought :-(
        plt.close(fig)
        pass

    task.stop()  # I believe I never get to this part after pressing CTRL+C ...

    # Some prints at the end ... nevermind
    print('Total number of acquired samples: ', len(data.T),'\n')
    print('Sampling frequency: ', sfreq, 'Hz\n')
    print('Buffer size: ', bufsize, '\n')
    print('Acquisition duration: ', len(data.T)/sfreq, 's\n')

如有任何意见,将不胜感激。提前谢谢各位

编辑:在下面被接受的答案之后,我重写了上面的代码,并提出了以下内容,这些内容现在可以正常工作了(对不起,这次我还没有整理好,有些行与当前问题无关):

# Stream read from a task that is set up to read continuously
import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants

from scipy import signal

import threading

running = True

sfreq = 1000
bufsize = 100
bufsizeb = 100

global task

def askUser():  # it might be better to put this outside of task
    global running
    input("Press return to stop.")
    running = False

def main():
    global running

    global data
    global buffer
    global data_filt
    global buffer_filt

    global b
    global z

    print('Acquiring sensor data...')

    with nidaqmx.Task() as task:  # maybe we can use target as above

        thread = threading.Thread(target=askUser)
        thread.start()

        task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
        task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                        samps_per_chan=bufsize)
        # unclear samps_per_chan is needed here above or why it would be different than bufsize
        stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

        data = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
        buffer = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback
        data_filt = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
        buffer_filt = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback

        b = signal.firwin(150, 0.004)
        z = signal.lfilter_zi(b, 1)

        def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsizeb is passed to num_samples
            global data
            global buffer
            global data_filt
            global buffer_filt
            global z
            global b

            if running:
                # It may be wiser to read slightly more than num_samples here, to make sure one does not miss any sample,
                # see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
                buffer = np.zeros((1, num_samples))
                stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
                data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

                # IIR Filtering, low-pass
                buffer_filt = np.zeros((1, num_samples))
                for i, x in enumerate(np.squeeze(buffer)):  # squeeze required for x to be just a scalar (which lfilter likes)
                    buffer_filt[0,i], z = signal.lfilter(b, 1, [x], zi=z)

                data_filt = np.append(data_filt, buffer_filt, axis=1)  # appends buffered filtered data to variable data_filt

            return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

        task.register_every_n_samples_acquired_into_buffer_event(bufsizeb, reading_task_callback)  # bufsizeb instead

        task.start()
        while running:  # this is perfect: it "stops" the console just like sleep in a way that the task does not stop
            plt.clf()
            plt.plot(buffer.T)
            plt.draw()
            plt.pause(0.01)  # 100 Hz refresh rate
        # plt.close(fig)  # maybe no need to close it for now

        # task.join()  # this is for threads I guess ... (seems useless to my case?)

        # Some prints at the end ...
    print('Total number of acquired samples:', len(data.T))
    print('Sampling frequency:', sfreq, 'Hz')
    print('Buffer size:', bufsize)
    print('Acquisition duration:', len(data.T)/sfreq, 's')

if __name__ == '__main__':
    main()

请注意,我根本不需要task.stop(),因为连续采集任务处理此包的方式是读取task.start()之后的任何代码行,这不是一个sleep或类似的东西,会使任务停止(至少这是我的理解)


Tags: thetoimporttaskdataisbuffernp
1条回答
网友
1楼 · 发布于 2024-04-25 09:52:33

我做的第一件事就是去掉键盘中断循环。我用一个全局变量running替换它,另一个线程在从返回时将变量设置为False

def askUser():
  global running
  input("Press return to stop.")
  running = False

然后,在while loop之前,创建了一个将执行此函数的新线程

askUserThread = threading.Thread(target=askUser)
askUserThread.start()

对于while循环,去掉try{}语句:

while running:
  plt.clf()
  plt.plot(buffer.T)
  plt.draw()          # Note: this got changed because .show wasn't working.
  plt.pause(0.01)

这对我来说仍然不起作用,因为我必须关闭绘图窗口才能显示新的绘图窗口。所以从this answer,我把它从.show改为.draw

我的最终代码有点不同(因为我对随机数据进行了采样),但这里就是

# sampling.py
# by Preston Hager

import matplotlib.pyplot as plt
import numpy as np

import threading

sfreq = 1000
bufsize = 100

running = True

data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback

def askUser():
    global running

    input("Press return to stop.")
    running = False

def readingTask():
    global data
    global buffer

    while running:
        buffer = np.random.rand(1, bufsize)
        # This is the reading part
        data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

def main():
    global running

    print('Acquiring sensor data.')

    thread = threading.Thread(target=askUser)
    thread.start()
    task = threading.Thread(target=readingTask)
    task.start()

    fig = plt.figure()
    while running:
        # Poor's man plot updating
        plt.clf()
        plt.plot(buffer.T)
        plt.draw()
        plt.pause(0.01)  # 100 Hz refresh rate
    plt.close(fig)

    task.join()

    # Some prints at the end ... nevermind
    print('Total number of acquired samples:', len(data.T))
    print('Sampling frequency:', sfreq, 'Hz')
    print('Buffer size:', bufsize)
    print('Acquisition duration:', len(data.T)/sfreq, 's')

if __name__ == '__main__':
    main()

相关问题 更多 >