在Windows上使用多线程时出现“无法序列化<type '_csv.reader'>”错误

7 投票
3 回答
6031 浏览
提问于 2025-04-17 08:24

我正在写一个多进程程序,用来并行处理一个很大的 .CSV 文件,操作系统是 Windows。

我找到了一些很不错的例子,解决类似的问题,具体可以参考这个链接

但是在 Windows 上运行的时候,我遇到了一个错误,提示 csv.reader 不能被序列化。

我想我可以在子进程中打开 CSV 文件,然后从父进程把文件名传给它。

不过,我希望能传递一个已经打开的 CSV 文件(就像代码本来应该做的那样),也就是说,真正使用一个共享的对象。

有没有什么办法可以在 Windows 上做到这一点,或者说缺少了什么呢?

这是代码(我重新发一遍方便阅读):

"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input thread sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across threads so open/close
        # and use it all in the same thread or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

在 Windows 上运行时,我收到的错误是:

Traceback (most recent call last):
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 130, in <module>
    main(sys.argv[1:])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 127, in main
    c = CSVWorker(opts.numprocs, args[0], args[1])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 44, in __init__
    self.pin.start()
  File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Python27\lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Python27\lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Python27\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 401, in save_reduce
    save(args)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 548, in save_tuple
    save(element)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 396, in save_reduce
    save(cls)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 753, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

3 个回答

0

你想要加快一个主要依赖CPU的任务,但在Python中这很难,因为有个叫GIL的东西在限制。我的建议是,你可以一次读取10000行数据,使用一个线程来处理这些数据,然后再用多个线程把处理后的结果放到第二个列表中,结果也是10000条。最后,把这些结果写入输出文件。对于CPU密集型的操作,你可以考虑以下几种线程选项:

  • 使用multiprocessing库中的进程。
  • 安装pyspark库和JDK。把行列表转换成RDD,然后用RDD的map()方法来获取输出行的列表。
  • 把你的程序转到C++、C#或者Java中。
2

因为多进程在不同进程之间传递参数时需要对对象进行序列化和反序列化,而你的代码依赖于在进程之间传递一个CSVWorker的实例(这个实例用'self'表示),所以你遇到了这个错误——因为csv读取器和打开的文件是可以被序列化的。

你提到你的CSV文件很大,我觉得把所有数据都读到一个列表里对你来说并不是个好办法——所以你需要想办法一次只把输入CSV中的一行传给每个工作进程,并从每个工作进程中获取处理后的那一行,同时在主进程中完成所有的输入输出操作。

看起来使用multiprocessing.Pool会是一个更好的方法来编写你的应用——你可以查看multiprocessing的文档,地址是 http://docs.python.org/library/multiprocessing.html,尝试使用进程池和pool.map来处理你的CSV文件。这种方法还可以保持处理的顺序——这样可以减少你代码中的复杂逻辑。

5

你遇到的问题是因为你在使用CSVWorker类的方法作为进程目标,而这个类里面有一些东西是不能被“序列化”的;那些打开的文件根本无法正常工作。

你需要做的是把这个类拆分成两个类;一个类负责协调所有的工作子进程,另一个类则真正进行计算工作。工作进程可以把文件名作为参数传入,并在需要的时候打开各自的文件,或者至少等到它们的工作方法被调用时再打开文件。它们也可以接受multiprocessing.Queue作为参数或者类的成员,这样传递是安全的。

在某种程度上,你已经有点这样做了;你的write_output_csv方法是在子进程中打开文件,但你的parse_input_csv方法却期望在self的属性中找到一个已经打开并准备好的文件。要始终保持这种方式一致,你就会顺利很多。

撰写回答