写入HDF5 fi时获得独占锁

2024-04-19 10:35:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我有下面的代码,它试图将pandas数据帧附加到HDF5存储中。我尝试获取独占文件锁,以便多个进程/线程/作业不会同时写入HDF5文件:

 #--------------------------------------------- MODULE IMPORT ----------------------------------------------------------#
    import os
    import time
    from pandas import HDFStore

 #--------------------------------------------- DEVELOPMENT CODE -------------------------------------------------------#
    class SafeHDF5Store(HDFStore):
        """Implement safe HDFStore by obtaining file lock. Multiple writes will queue if lock is not obtained."""

        def __init__(self, *args, **kwargs):
            """Initialize and obtain file lock."""

            interval   = kwargs.pop('probe_interval', 1)
            self._lock = "%s.lock" % args[0]
            while True:
                try:
                    self._flock = os.open(self._lock, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
                    break
                except IOError:
                    time.sleep(interval)

            HDFStore.__init__(self, *args, **kwargs)

        def __exit__(self, *args, **kwargs):
            """Exit and remove file lock."""

            HDFStore.__exit__(self, *args, **kwargs)
            os.close(self._flock)
            os.remove(self._lock)

    def write_hdf(f, key, df, complib):
        """append pandas dataframe to hdf5.

        Args:
        f       -- File path
        key     -- Store key
        df      -- Pandas dataframe
        complib -- Compress lib 

        NOTE: We use maximum compression w/ zlib.
        """

        with SafeHDF5Store(f, complevel=9, complib=complib) as store:
            df.to_hdf(store, key, format='table', append=True)

接下来,我在一个计算场启动100个工作。他们做一些计算来生成一个数据帧。然后他们尝试将数据帧附加到这个表中。但是,当我试图获取不知道如何解决的锁时,出现了以下错误。有什么建议吗?在

^{pr2}$

当另一个进程尝试创建锁文件时,它似乎已经存在(而持有锁的进程尚未完成删除该文件)。我确实有一个操作系统删除()调用exit()函数以删除文件。在


Tags: 文件数据keyimportselflockpandas进程
1条回答
网友
1楼 · 发布于 2024-04-19 10:35:36

在第29行的__init___()中,只有当存在IOError时,才等待获得锁:

while True:
    try:
        self._flock = os.open(self._lock, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
        break
    except IOError:
        time.sleep(interval)

但是,如果文件已经存在,则会得到一个OSError。 因此,除了这两种类型的错误,继续尝试直到成功:

^{pr2}$

相关问题 更多 >