从大文件读取并绘制数据
我们有一些比较大的文件,总共大约1到1.5GB(主要是日志文件),里面的数据很容易转换成csv格式,之后这些数据会用来生成一系列的图像。
目前,我们使用bash脚本把原始数据转成csv文件,里面只包含需要绘制图表的数字,然后再把这个csv文件输入到gnuplot脚本中。不过,这个过程非常慢。我尝试通过用一个awk
命令替换一些管道中的cut
、tr
等命令来加快bash脚本的速度,虽然这样确实提高了一些速度,但整体上还是很慢。
所以,我开始觉得可能有更好的工具来处理这个过程。我现在考虑用python加numpy或者R来重写这个过程。我的一个朋友建议我使用JVM,如果我选择这样做,我会用clojure,但我不太确定JVM的表现如何。
我在处理这类问题方面经验不多,所以如果有任何建议该怎么进行就太好了。谢谢。
补充:另外,我还想把生成的中间数据(也就是csv文件)存储到磁盘上,这样如果我想要不同样子的图表,就不需要重新生成数据。
补充 2:原始数据文件每行有一条记录,字段之间用一个分隔符(|
)分开。并不是所有字段都是数字。我需要在输出的csv中包含的每个字段都是通过对输入记录应用某种公式得到的,这个公式可能会用到输入数据中的多个字段。输出的csv每行会有3到4个字段,我需要绘制1-2、1-3、1-4字段的图表(可能是柱状图)。希望这样能让你更清楚。
补充 3:我对@adirau的脚本做了一些修改,似乎效果不错。我已经进展到读取数据、发送到一组处理线程(伪处理,把线程名附加到数据上),然后通过另一个收集器线程把它们聚合到一个输出文件中。
PS:我不太确定这个问题的标签是否合适,欢迎随意纠正。
2 个回答
我觉得用Python加上Numpy是最有效率的方法,既快又容易实现。Numpy经过高度优化,所以性能还不错,而Python则能让算法的实现变得简单。
这个组合应该很适合你的情况,只要你在加载文件到内存时进行一些优化。你可以尝试找到一个平衡点,处理的数据块不要太大,但也要足够大,以减少读取和写入的次数,因为这会拖慢程序的速度。
如果你觉得还需要更快(我真心怀疑),你可以用Cython来加速那些比较慢的部分。
听起来用Python是个不错的选择,因为它有一个很好的线程处理接口(不过具体实现可能有点问题),还有matplotlib和pylab这两个库。我希望你能提供更多的细节,但也许这可以作为你开始的一个好点子:matplotlib: 使用线程进行异步绘图。
我建议用一个线程来处理大量的磁盘读写操作,然后用一个队列把数据同步到多个线程中进行处理(如果你的记录长度是固定的,可以通过预先计算读取的偏移量来加快速度,只把偏移量传给线程池)。在处理磁盘读写的线程中,我会使用内存映射(mmap)来读取数据源文件,先读取预定义的字节数,再多读一次,以确保能抓取到当前数据源行的最后几个字节;这个字节数应该接近你平均行长度。接下来就是通过队列把数据送入线程池进行处理和绘图;我对你具体要绘制的内容没有很清晰的了解,但希望这些建议能对你有所帮助。
补充:可以使用file.readlines([sizehint])一次性读取多行;不过可能速度不太快,因为文档说它内部使用的是readline()。
补充:这里有个简单的代码框架
import threading
from collections import deque
import sys
import mmap
class processor(Thread):
"""
processor gets a batch of data at time from the diskio thread
"""
def __init__(self,q):
Thread.__init__(self,name="plotter")
self._queue = q
def run(self):
#get batched data
while True:
#we wait for a batch
dataloop = self.feed(self._queue.get())
try:
while True:
self.plot(dataloop.next())
except StopIteration:
pass
#sanitizer exceptions following, maybe
def parseline(self,line):
""" return a data struct ready for plotting """
raise NotImplementedError
def feed(self,databuf):
#we yield one-at-time datastruct ready-to-go for plotting
for line in databuf:
yield self.parseline(line)
def plot(self,data):
"""integrate
https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
maybe
"""
class sharedq(object):
"""i dont recall where i got this implementation from
you may write a better one"""
def __init__(self,maxsize=8192):
self.queue = deque()
self.barrier = threading.RLock()
self.read_c = threading.Condition(self.barrier)
self.write_c = threading.Condition(self.barrier)
self.msz = maxsize
def put(self,item):
self.barrier.acquire()
while len(self.queue) >= self.msz:
self.write_c.wait()
self.queue.append(item)
self.read_c.notify()
self.barrier.release()
def get(self):
self.barrier.acquire()
while not self.queue:
self.read_c.wait()
item = self.queue.popleft()
self.write_c.notify()
self.barrier.release()
return item
q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
p = processor(q)
p.start()
for fn in sys.argv[1:]
with open(fn, "r+b") as f:
#you may want a better sizehint here
map = mmap.mmap(f.fileno(), 0)
#insert a loop here, i forgot
q.put(map.readlines(numbytes))
#some cleanup code may be desirable