处理大型文件(9.1GB)并提高处理速度 -- Python
我有一个9GB的推文文本文件,格式如下:
T 'time and date'
U 'name of user in the form of a URL'
W Actual tweet
总共有600万用户和超过6000万条推文。我使用itertools.izip()每次读取3行,然后根据用户名把它们写入文件。但是这个过程太慢了(已经花了26个小时了)。有什么办法可以加快速度吗?
为了完整性,我贴上代码:
s='the existing folder which will have all the files'
with open('path to file') as f:
for line1,line2,line3 in itertools.izip_longest(*[f]*3):
if(line1!='\n' and line2!='\n' and line3!='\n'):
line1=line1.split('\t')
line2=line2.split('\t')
line3=line3.split('\t')
if(not(re.search(r'No Post Title',line1[1]))):
url=urlparse(line3[1].strip('\n')).path.strip('/')
if(url==''):
file=open(s+'junk','a')
file.write(line1[1])
file.close()
else:
file=open(s+url,'a')
file.write(line1[1])
file.close()
我的目标是对这些小文本进行主题建模(也就是说,对每个用户的所有推文运行lda,因此需要为每个用户准备一个单独的文件),但这个过程耗时太长了。
更新:我按照用户S.Lott的建议,使用了以下代码:
import re
from urlparse import urlparse
import os
def getUser(result):
result=result.split('\n')
u,w=result[0],result[1]
path=urlparse(u).path.strip('/')
if(path==''):
f=open('path to junk','a')
f.write('its Junk !!')
f.close()
else:
result="{0}\n{1}\n{2}\n".format(u,w,path)
writeIntoFile(result)
def writeIntoFile(result):
tweet=result.split('\n')
users = {}
directory='path to directory'
u, w, user = tweet[0],tweet[1],tweet[2]
if user not in users :
if(os.path.isfile(some_directory+user)):
if(len(users)>64):
lru,aFile,u=min(users.values())
aFile.close()
users.pop(u)
users[user]=open(some_directory+user,'a')
users[user].write(w+'\n')
#users[user].flush
elif (not(os.path.isfile(some_directory+user))):
if len(users)>64:
lru,aFile,u=min(users.values())
aFile.close()
users.pop(u)
users[user]=open(some_directory+user,'w')
users[user].write(w+'\n')
for u in users:
users[u].close()
import sys
s=open(sys.argv[1],'r')
tweet={}
for l in s:
r_type,content=l.split('\t')
if r_type in tweet:
u,w=tweet.get('U',''),tweet.get('W','')
if(not(re.search(r'No Post Title',u))):
result="{0}{1}".format(u,w)
getUser(result)
tweet={}
tweet[r_type]=content
显然,这段代码基本上是他建议的内容的镜像,并且他也很友好地分享了。最开始速度非常快,但后来变慢了。我发布了更新后的代码,希望能得到更多关于如何加快速度的建议。如果我从sys.stdin读取,就出现了一个我无法解决的导入错误。因此,为了节省时间并继续进行,我简单地使用了这个,希望它能正常工作。谢谢。
7 个回答
对于这么多信息,我建议使用数据库(比如MySQL、PostgreSQL、SQLite等)。这些数据库是专门为你正在做的事情优化过的。
所以,和其说把数据添加到一个文件里,不如直接在一个表格里加一行(可以是“垃圾”表格或者“好”表格),把网址和相关数据放进去(同一个网址可以出现多次)。这样做肯定能加快写入的速度。
现在这种做法会浪费时间,因为你的输入文件是从硬盘的一个地方读取,而你在不同的地方写数据:硬盘的读写头需要来回移动,这样很慢。而且,创建新文件也需要时间。如果你能主要从输入文件读取数据,让数据库来处理数据的缓存和写入优化,处理速度肯定会更快。
你大部分时间都在处理输入输出(I/O)。这里有一些解决办法:
- 进行更大的输入输出操作,比如说可以一次性读取512K的数据到一个缓冲区,等到这个缓冲区至少有256K的数据时再进行写入。
- 尽量避免频繁打开和关闭文件。
- 使用多个线程来读取文件,也就是把文件分成几个部分,每个线程处理自己的一部分。
这就是为什么你的操作系统有多进程管道的原因。
collapse.py sometweetfile | filter.py | user_id.py | user_split.py -d some_directory
collapse.py
import sys
with open("source","r") as theFile:
tweet = {}
for line in theFile:
rec_type, content = line.split('\t')
if rec_type in tweet:
t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
result= "{0}\t{1}\t{2}".format( t, u, w )
sys.stdout.write( result )
tweet= {}
tweet[rec_type]= content
t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
result= "{0}\t{1}\t{2}".format( t, u, w )
sys.stdout.write( result )
filter.py
import sys
for tweet in sys.stdin:
t, u, w = tweet.split('\t')
if 'No Post Title' in t:
continue
sys.stdout.write( tweet )
user_id.py
import sys
import urllib
for tweet in sys.stdin:
t, u, w = tweet.split('\t')
path=urlparse(w).path.strip('/')
result= "{0}\t{1}\t{2}\t{3}".format( t, u, w, path )
sys.stdout.write( result )
user_split.py
users = {}
for tweet in sys.stdin:
t, u, w, user = tweet.split('\t')
if user not in users:
# May run afoul of open file limits...
users[user]= open(some_directory+user,"w")
users[user].write( tweet )
users[user].flush( tweet )
for u in users:
users[u].close()
哇,你会说。这代码真多。
没错。但是,这些代码会分散到你所有的处理核心上,同时运行。还有,当你通过管道把标准输出连接到标准输入时,其实只是一个共享的缓冲区:并没有实际的输入输出发生。
用这种方式做事情快得惊人。这就是为什么*Nix* 操作系统是这样工作的。这是你想要实现真正速度的方式。
这是LRU算法,供你参考。
if user not in users:
# Only keep a limited number of files open
if len(users) > 64: # or whatever your OS limit is.
lru, aFile, u = min( users.values() )
aFile.close()
users.pop(u)
users[user]= [ tolu, open(some_directory+user,"w"), user ]
tolu += 1
users[user][1].write( tweet )
users[user][1].flush() # may not be necessary
users[user][0]= tolu