GIL用于C扩展中的IO绑定线程(HDF5)

2024-05-13 21:55:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个采样应用程序,它每秒获取250,000个样本,将它们缓冲在内存中,并最终附加到pandas提供的HDFStore。总的来说,这很好。但是,我有一个线程运行并不断清空数据采集设备(DAQ),它需要定期运行。大约一秒钟的偏差会使事情破裂。以下是观察到的时间安排的极端情况。Start表示DAQ读取开始,Finish表示完成时,IO表示HDF写入(两个DAQ和{}都发生在不同的线程中)。在

Start        : 2016-04-07 12:28:22.241303
IO (1)       : 2016-04-07 12:28:22.241303
Finish       : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms)
IO Done (1)  : 2016-04-07 12:28:46.573440 (24332.39 ms)

如您所见,执行此写入操作需要24秒(典型的写入时间约为40毫秒)。我写的硬盘没有负载,所以这个延迟不应该是由争用引起的(它在运行时大约有7%的利用率)。我已禁用对HDFStore写入的索引。我的应用程序运行许多其他线程,所有线程都打印状态字符串,因此IO任务似乎阻塞了所有其他线程。我花了相当长的时间来逐步检查代码,找出哪里的速度变慢了,而且它总是在一个C扩展提供的方法中,这就引出了我的问题。。在

  1. Python(我用的是3.5)能在C扩展中抢占执行权吗?Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock?似乎表示除非扩展特别产生,否则它不会这样做。在
  2. Pandas的hdf5c代码是否实现了I/O的任何屈服?如果是这样,这是否意味着延迟是由CPU限制的任务引起的?我已禁用索引。在
  3. 有什么建议可以让我得到一些一致的时间安排?我正在考虑将HDF5代码移到另一个进程中。不过,这只能在一定程度上有所帮助,因为无论如何,我实在无法忍受大约20秒的写入,尤其是在它们不可预测的情况下。在

下面是一个您可以运行的示例来查看问题:

^{pr2}$

您将得到类似于以下内容的输出:

IO Done      : 2016-04-08 10:51:14.100479 (3.63 ms, 470)
Dummy Thread  : 2016-04-08 10:51:14.101484 (12 ms)
IO Done      : 2016-04-08 10:51:14.104475 (3.01 ms, 471)
Dummy Thread  : 2016-04-08 10:51:14.576640 (475 ms)
IO Done      : 2016-04-08 10:51:14.576640 (472.00 ms, 472)
Dummy Thread  : 2016-04-08 10:51:14.897756 (321 ms)
IO Done      : 2016-04-08 10:51:14.898782 (320.79 ms, 473)
IO Done      : 2016-04-08 10:51:14.901772 (3.29 ms, 474)
IO Done      : 2016-04-08 10:51:14.905773 (2.84 ms, 475)
IO Done      : 2016-04-08 10:51:14.908775 (2.96 ms, 476)
Dummy Thread  : 2016-04-08 10:51:14.909777 (11 ms)

Tags: 代码io应用程序时间情况线程threadstart
2条回答

答案是不,这些作家不会发布GIL。请参阅文档here。我知道您实际上并不是在尝试使用多个线程来编写代码,但这应该会提示您。当写操作发生时,有一些强锁被持有,以防止多次写入。PyTables和{}都将此作为HDF5标准的一部分。在

你可以看一下SWMR,尽管熊猫不直接支持它。PyTables文档herehere指向解决方案。这些通常涉及到有一个单独的进程从队列中提取数据并将其写入。在

在任何情况下,这通常是一个更具伸缩性的模式。在

感谢您提供工作代码。我修改了它以获得一些洞察力,然后创建了 使用多处理的修改版本。在

修改的线程版本

所有的修改只是为了获取更多的信息,没有概念上的改变。一切合而为一 文件mthread.py,并逐部分注释。在

通常进口:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging

write_samples得到了一些日志记录:

^{pr2}$

begin_io获得最大延迟,超过该时间将导致警告日志条目:

def begin_io(maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while True:
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug("IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")

dummy_thread被修改为正确停止,并且如果花费太长时间,也会发出警告:

def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info("Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")

最后我们称之为一切。请随意修改日志级别,WARNING只显示了过多的时间, INFO和{}说明了更多。在

if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)

    pill2kill = threading.Event()
    t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
    t.start()
    try:
        begin_io(500)
    finally:
        pill2kill.set()
        t.join()

运行代码,我得到的结果如您所述:

2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133

从这些值可以清楚地看出,虽然begin_io非常繁忙且延迟(可能是在数据期间) {cd4}被延迟的时间也几乎相同。在

多处理版本-工作良好

我修改了代码以在多个进程中运行,并且从那时起,它确实没有阻止 dummy_thread。在

2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744

多处理代码如下:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging


def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")


def begin_io(pill2kill, maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while not pill2kill.wait(0):
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug( "IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")


def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info( "Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")


if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)
    pill2kill = multiprocessing.Event()
    dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
    dp.start()
    try:
        p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
        p.start()
        time.sleep(100)
    finally:
        pill2kill.set()
        dp.join()
        p.join()

结论

将数据写入HDF5文件确实会阻塞其他线程,并且需要多处理版本。在

如果您希望dummy_thread做一些实际的工作(比如收集要存储的数据),那么您希望 将数据从这里发送到HDF5序列化程序,您将不得不进行某种消息传递-使用 multiprocessing.QueuePipe或可能使用ZeroMQ(例如推拉套接字 配对)。使用ZeroMQ,您甚至可以在另一台计算机上保存数据。在

编辑/警告:如果代码有时无法保存数据,我将其用于测量性能,而不是防水。当Ctrl-C在处理过程中,有时我得到损坏的文件。我认为这个问题超出了这个问题的范围(这个问题应该通过仔细停止运行进程来解决)。在

相关问题 更多 >