从大文件读取并绘制数据

4 投票
2 回答
3123 浏览
提问于 2025-04-16 14:37

我们有一些比较大的文件,总共大约1到1.5GB(主要是日志文件),里面的数据很容易转换成csv格式,之后这些数据会用来生成一系列的图像。

目前,我们使用bash脚本把原始数据转成csv文件,里面只包含需要绘制图表的数字,然后再把这个csv文件输入到gnuplot脚本中。不过,这个过程非常慢。我尝试通过用一个awk命令替换一些管道中的cuttr等命令来加快bash脚本的速度,虽然这样确实提高了一些速度,但整体上还是很慢。

所以,我开始觉得可能有更好的工具来处理这个过程。我现在考虑用python加numpy或者R来重写这个过程。我的一个朋友建议我使用JVM,如果我选择这样做,我会用clojure,但我不太确定JVM的表现如何。

我在处理这类问题方面经验不多,所以如果有任何建议该怎么进行就太好了。谢谢。

补充:另外,我还想把生成的中间数据(也就是csv文件)存储到磁盘上,这样如果我想要不同样子的图表,就不需要重新生成数据。

补充 2:原始数据文件每行有一条记录,字段之间用一个分隔符(|)分开。并不是所有字段都是数字。我需要在输出的csv中包含的每个字段都是通过对输入记录应用某种公式得到的,这个公式可能会用到输入数据中的多个字段。输出的csv每行会有3到4个字段,我需要绘制1-2、1-3、1-4字段的图表(可能是柱状图)。希望这样能让你更清楚。

补充 3:我对@adirau的脚本做了一些修改,似乎效果不错。我已经进展到读取数据、发送到一组处理线程(伪处理,把线程名附加到数据上),然后通过另一个收集器线程把它们聚合到一个输出文件中。

PS:我不太确定这个问题的标签是否合适,欢迎随意纠正。

2 个回答

1

我觉得用Python加上Numpy是最有效率的方法,既快又容易实现。Numpy经过高度优化,所以性能还不错,而Python则能让算法的实现变得简单。

这个组合应该很适合你的情况,只要你在加载文件到内存时进行一些优化。你可以尝试找到一个平衡点,处理的数据块不要太大,但也要足够大,以减少读取和写入的次数,因为这会拖慢程序的速度。

如果你觉得还需要更快(我真心怀疑),你可以用Cython来加速那些比较慢的部分。

4

听起来用Python是个不错的选择,因为它有一个很好的线程处理接口(不过具体实现可能有点问题),还有matplotlib和pylab这两个库。我希望你能提供更多的细节,但也许这可以作为你开始的一个好点子:matplotlib: 使用线程进行异步绘图

我建议用一个线程来处理大量的磁盘读写操作,然后用一个队列把数据同步到多个线程中进行处理(如果你的记录长度是固定的,可以通过预先计算读取的偏移量来加快速度,只把偏移量传给线程池)。在处理磁盘读写的线程中,我会使用内存映射(mmap)来读取数据源文件,先读取预定义的字节数,再多读一次,以确保能抓取到当前数据源行的最后几个字节;这个字节数应该接近你平均行长度。接下来就是通过队列把数据送入线程池进行处理和绘图;我对你具体要绘制的内容没有很清晰的了解,但希望这些建议能对你有所帮助。

补充:可以使用file.readlines([sizehint])一次性读取多行;不过可能速度不太快,因为文档说它内部使用的是readline()。

补充:这里有个简单的代码框架

import threading
from collections import deque
import sys
import mmap


class processor(Thread):
    """
        processor gets a batch of data at time from the diskio thread
    """
    def __init__(self,q):
        Thread.__init__(self,name="plotter")
        self._queue = q
    def run(self):
        #get batched data 
        while True:
            #we wait for a batch
            dataloop = self.feed(self._queue.get())
            try:
                while True:
                    self.plot(dataloop.next())
            except StopIteration:
                pass
            #sanitizer exceptions following, maybe

    def parseline(self,line):
        """ return a data struct ready for plotting """
        raise NotImplementedError

    def feed(self,databuf):
        #we yield one-at-time datastruct ready-to-go for plotting
        for line in databuf:
            yield self.parseline(line)

    def plot(self,data):
        """integrate
        https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
        maybe
        """
class sharedq(object):
    """i dont recall where i got this implementation from 
    you may write a better one"""
    def __init__(self,maxsize=8192):
        self.queue = deque()
        self.barrier = threading.RLock()
        self.read_c = threading.Condition(self.barrier)
        self.write_c = threading.Condition(self.barrier)
        self.msz = maxsize
    def put(self,item):
        self.barrier.acquire()
        while len(self.queue) >= self.msz:
            self.write_c.wait()
        self.queue.append(item)
        self.read_c.notify()
        self.barrier.release()
    def get(self):
        self.barrier.acquire()
        while not self.queue:
            self.read_c.wait()
        item = self.queue.popleft()
        self.write_c.notify()
        self.barrier.release()
        return item



q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
    p = processor(q)
    p.start()
for fn in sys.argv[1:]
    with open(fn, "r+b") as f:
        #you may want a better sizehint here
        map = mmap.mmap(f.fileno(), 0)
        #insert a loop here, i forgot
        q.put(map.readlines(numbytes))

#some cleanup code may be desirable

撰写回答