使用python多处理队列写入搁置fi

2024-04-25 10:03:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我遇到了python多处理的问题。在

我有一个.csv文件(~10gb)。我想从中读取数据,转换数据,然后将其保存到python shelve文件中。数据转换很慢,所以我考虑多处理。在

我创建了两个多进程队列,第一个用于保存从csv读取的数据,第二个用于保存转换后的数据。这两个队列工作正常;但是,问题是从第二个队列向搁置写入数据的过程在几个小时后停止。在

有人遇到同样的问题吗?谢谢你的建议!在

我的代码在这里:

def read_from_csv(csv_path, csv_queue):
    count = 0
    fh =  open(csv_path, 'r')
    for line in fh:
        try:
            text, ID = line.split('|')
            text = text.split()
            ID = ID.strip()
            data = (ID, text)
            csv_queue.put(data, True)
            count += 1

            n = csv_queue.qsize()
            if n > 0 and  n % 10000 == 0:
                logger.info('Queue size is {}'.format(n))

            # Let the function sleep for a while to prevent the queue becoming too long
            if n >= 100000:
                logger.info('CSV queue sleeps for 100 sec')
                time.sleep(100)
            if count % 10000 == 0:
                logger.info('Read {} lines'.format(count))
        except:
            pass
    logger.info('Done feeding rows into the Queue')



def entities_extraction(csv_queue, shelve_queue):
    count = 0
    while not csv_queue.empty():
        try:
            item = csv_queue.get(True, 300)
        except:
            logger.info('Not able to fetch data from queue. Quitting')
            break

        # some function here
        # result = ......

        shelve_queue.put(result, True)
        count += 1

        n = shelve_queue.qsize()
        if count % 10000 == 0:
            logger.info('Process {} lines'.format(count))

        # Let the function sleep for a while to prevent the queue becoming too long
        if n > 100000:
            logger.info('Shelve queue sleeps for 100 sec')
            time.sleep(100)


def write_to_shelve(shelve_file, csv_queue, shelve_queue):


    sh = shelve.open(shelve_file, writeback=True)
    count = 0

    while True:
        if count % 10000 == 0:
            logger.info("Written {} records".format(count))

        n = shelve_queue.qsize()
        if n > 0 and n % 10000 == 0:
            logger.info('Shelve queue size is {}'.format(n))
        #if count > 50000:
        #    print('stop')
        #    break
        try:
            item = shelve_queue.get(True, 600)
            id, token = item
            sh[id] = token
            count += 1

        except:
            logger.info("Not able to fetch data from queue. Quitting")
            #time.sleep(5)
            break
    sh.close()



if __name__ == "__main__":

    t0 = time.time()
    csv_path =  
    shelve_path =  

    csv_queue = mp.Manager().Queue()
    shelve_queue = mp.Manager().Queue()


    process_to_launch = 7
    pool = mp.Pool(process_to_launch)

    pool.apply(read_from_csv, (csv_path, csv_queue,))

    jobs = [pool.apply(entities_extraction, (csv_queue, shelve_queue,)) for i in range(5)]

    result = pool.apply(write_to_shelve, (shelve_path, csv_queue, shelve_queue,))

    #sh.close()
    print(time.time() - t0)


pool.close()
pool.join()

部分日志信息是:

^{pr2}$

Tags: csvto数据pathinfotrueformatfor