处理大型文件(9.1GB)并提高处理速度 -- Python

13 投票
7 回答
8505 浏览
提问于 2025-04-16 09:49

我有一个9GB的推文文本文件,格式如下:

T      'time and date'
U      'name of user in the form of a URL'
W      Actual tweet

总共有600万用户和超过6000万条推文。我使用itertools.izip()每次读取3行,然后根据用户名把它们写入文件。但是这个过程太慢了(已经花了26个小时了)。有什么办法可以加快速度吗?

为了完整性,我贴上代码:

s='the existing folder which will have all the files'
with open('path to file') as f:
 for line1,line2,line3 in itertools.izip_longest(*[f]*3):
            if(line1!='\n' and line2!='\n' and line3!='\n'):
     line1=line1.split('\t')
     line2=line2.split('\t')
     line3=line3.split('\t')
     if(not(re.search(r'No Post Title',line1[1]))):
         url=urlparse(line3[1].strip('\n')).path.strip('/')

  if(url==''):
   file=open(s+'junk','a')
   file.write(line1[1])
   file.close()
  else:
   file=open(s+url,'a')
   file.write(line1[1])
   file.close()

我的目标是对这些小文本进行主题建模(也就是说,对每个用户的所有推文运行lda,因此需要为每个用户准备一个单独的文件),但这个过程耗时太长了。

更新:我按照用户S.Lott的建议,使用了以下代码:

import re
from urlparse import urlparse
import os 
def getUser(result):
    result=result.split('\n')
    u,w=result[0],result[1]
    path=urlparse(u).path.strip('/')
    if(path==''):
        f=open('path to junk','a')
        f.write('its Junk !!')
        f.close()
    else:
        result="{0}\n{1}\n{2}\n".format(u,w,path)
        writeIntoFile(result)
def writeIntoFile(result):
    tweet=result.split('\n')
    users = {}
    directory='path to directory'
    u, w, user = tweet[0],tweet[1],tweet[2]
    if user not in users :
        if(os.path.isfile(some_directory+user)):
            if(len(users)>64):
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)
            users[user]=open(some_directory+user,'a')
            users[user].write(w+'\n')
            #users[user].flush
        elif (not(os.path.isfile(some_directory+user))):
            if len(users)>64:
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)

            users[user]=open(some_directory+user,'w')
            users[user].write(w+'\n')
    for u in users:
        users[u].close()
import sys
s=open(sys.argv[1],'r')
tweet={}
for l in s:
    r_type,content=l.split('\t')
    if r_type in tweet:
    u,w=tweet.get('U',''),tweet.get('W','')
            if(not(re.search(r'No Post Title',u))):
                result="{0}{1}".format(u,w)
                getUser(result)
                tweet={}
        tweet[r_type]=content

显然,这段代码基本上是他建议的内容的镜像,并且他也很友好地分享了。最开始速度非常快,但后来变慢了。我发布了更新后的代码,希望能得到更多关于如何加快速度的建议。如果我从sys.stdin读取,就出现了一个我无法解决的导入错误。因此,为了节省时间并继续进行,我简单地使用了这个,希望它能正常工作。谢谢。

7 个回答

1

对于这么多信息,我建议使用数据库(比如MySQL、PostgreSQL、SQLite等)。这些数据库是专门为你正在做的事情优化过的。

所以,和其说把数据添加到一个文件里,不如直接在一个表格里加一行(可以是“垃圾”表格或者“好”表格),把网址和相关数据放进去(同一个网址可以出现多次)。这样做肯定能加快写入的速度。

现在这种做法会浪费时间,因为你的输入文件是从硬盘的一个地方读取,而你在不同的地方写数据:硬盘的读写头需要来回移动,这样很慢。而且,创建新文件也需要时间。如果你能主要从输入文件读取数据,让数据库来处理数据的缓存和写入优化,处理速度肯定会更快。

3

你大部分时间都在处理输入输出(I/O)。这里有一些解决办法:

  • 进行更大的输入输出操作,比如说可以一次性读取512K的数据到一个缓冲区,等到这个缓冲区至少有256K的数据时再进行写入。
  • 尽量避免频繁打开和关闭文件。
  • 使用多个线程来读取文件,也就是把文件分成几个部分,每个线程处理自己的一部分。
24

这就是为什么你的操作系统有多进程管道的原因。

collapse.py sometweetfile | filter.py | user_id.py | user_split.py -d some_directory

collapse.py

import sys
with open("source","r") as theFile:
    tweet = {}
    for line in theFile:
        rec_type, content = line.split('\t')
        if rec_type in tweet:
            t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
            result=  "{0}\t{1}\t{2}".format( t, u, w )
            sys.stdout.write( result )
            tweet= {}
        tweet[rec_type]= content
    t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
    result=  "{0}\t{1}\t{2}".format( t, u, w )
    sys.stdout.write( result )

filter.py

import sys
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    if 'No Post Title' in t:
        continue
    sys.stdout.write( tweet )

user_id.py

import sys
import urllib
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    path=urlparse(w).path.strip('/')
    result= "{0}\t{1}\t{2}\t{3}".format( t, u, w, path )
    sys.stdout.write( result )

user_split.py

users = {}
for tweet in sys.stdin:
    t, u, w, user = tweet.split('\t')
    if user not in users:
        # May run afoul of open file limits...
        users[user]= open(some_directory+user,"w")
    users[user].write( tweet )
    users[user].flush( tweet )
for u in users:
    users[u].close()

哇,你会说。这代码真多。

没错。但是,这些代码会分散到你所有的处理核心上,同时运行。还有,当你通过管道把标准输出连接到标准输入时,其实只是一个共享的缓冲区:并没有实际的输入输出发生。

用这种方式做事情快得惊人。这就是为什么*Nix* 操作系统是这样工作的。这是你想要实现真正速度的方式。


这是LRU算法,供你参考。

    if user not in users:
        # Only keep a limited number of files open
        if len(users) > 64: # or whatever your OS limit is.
            lru, aFile, u = min( users.values() )
            aFile.close()
            users.pop(u)
        users[user]= [ tolu, open(some_directory+user,"w"), user ]
    tolu += 1
    users[user][1].write( tweet )
    users[user][1].flush() # may not be necessary
    users[user][0]= tolu

撰写回答