在Python中创建可中断的进程

3 投票
4 回答
1497 浏览
提问于 2025-04-16 09:25

我正在写一个Python脚本,用来处理一个很大的(但结构简单的)CSV文件。

这个处理过程可能需要一些时间。我希望能在解析CSV的时候中断一下,这样我可以在之后的某个时间继续。

目前我有这样的代码,它是一个更大类的一部分:(还没完成)

编辑:

我修改了一些代码。不过系统会处理超过300万行数据。

def parseData(self)
    reader = csv.reader(open(self.file))
    for id, title, disc in reader:
        print "%-5s %-50s %s" % (id, title, disc)
        l = LegacyData()
        l.old_id = int(id)
        l.name = title
        l.disc_number = disc
        l.parsed = False
        l.save()

这是旧的代码。

def parseData(self):
        #first line start
        fields = self.data.next()
        for row in self.data:
            items = zip(fields, row)
            item = {}
            for (name, value) in items:
                item[name] = value.strip()
            self.save(item)

谢谢大家。

4 个回答

1

我会这样做:

把实际处理的代码放在一个类里面,然后在这个类上实现 Pickle 协议(可以参考这个链接:http://docs.python.org/library/pickle.html)。简单来说,就是写好 __getstate____setstate__ 这两个函数。

这个类会接收文件名,保持打开的文件和 CSV 读取器实例作为类的成员。__getstate__ 方法会保存当前的文件位置,而 __setstate__ 方法会重新打开文件,把指针移动到正确的位置,并创建一个新的读取器。

我会在 __iter__ 方法中执行实际的工作,这个方法会在处理完每一行后把结果交给外部函数。

这个外部函数会运行一个“主循环”,监控输入是否有中断(比如网络连接、键盘输入、文件系统中特定文件的状态等等)。如果一切正常,它就会调用处理器的下一个迭代。如果发生中断,它会把处理器的状态保存到磁盘上的一个特定文件里。

当程序启动时,只需要检查是否有保存的执行状态,如果有,就用 Pickle 恢复执行对象,并继续主循环。

下面是一些(未经测试的)代码,思路其实很简单:

from cPickle import load, dump
import csv
import os, sys

SAVEFILE = "running.pkl"
STOPNOWFILE = "stop.now"

class Processor(object):
    def __init__(self, filename):
        self.file = open(filename, "rt")
        self.reader = csv.reader(self.file)
    def __iter__(self):
        for line in self.reader():
            # do stuff
            yield None
    def __getstate__(self):
        return (self.file.name, self.file.tell())
    def __setstate__(self, state):
        self.file = open(state[0],"rt")
        self.file.seek(state[1])
        self.reader = csv.reader(self.File)

def check_for_interrupts():
    # Use your imagination here!  
    # One simple thing would e to check for the existence of an specific file
    # on disk.
    # But you go all the way up to instantiate a tcp server and listen to 
    # interruptions on the network
    if os.path.exists(STOPNOWFILE): 
        return True
    return False

def main():
    if os.path.exists(SAVEFILE):
        with open(SAVEFILE) as savefile:
            processor = load(savefile)
        os.unlink(savefile)
    else:
        #Assumes the name of the .csv file to be passed on the command line
        processor = Processor(sys.argv[1])
    for line in processor:
        if check_for_interrupts():
            with open(SAVEFILE, "wb") as savefile:
                dump(processor)
            break

if __name__ == "__main__":
    main()
1

你可以用 signal 来捕捉事件。这是一个示例,展示了一个解析器如何在 Windows 系统上捕捉到 CTRL-C 的操作并停止解析:

import signal, tme, sys

def onInterupt(signum, frame):
    raise Interupted()

try:
    #windows
    signal.signal(signal.CTRL_C_EVENT, onInterupt)
except:
    pass

class Interupted(Exception): pass
class InteruptableParser(object):

    def __init__(self, previous_parsed_lines=0):
        self.parsed_lines = previous_parsed_lines

    def _parse(self, line):
        # do stuff
        time.sleep(1) #mock up
        self.parsed_lines += 1
        print 'parsed %d' % self.parsed_lines

   def parse(self, filelike):
        for line in filelike:
            try:
                self._parse(line)
            except Interupted:
                print 'caught interupt'
                self.save()
                print 'exiting ...'
                sys.exit(0)

    def save(self):
        # do what you need to save state
        # like write the parse_lines to a file maybe
        pass

parser = InteruptableParser()
parser.parse([1,2,3])

不过我现在在用 Linux,没法测试这个。

2

如果你在Linux系统下,按下Ctrl-Z可以暂停正在运行的程序。然后输入“fg”就可以把它重新调出来,并从你暂停的地方继续运行。

撰写回答