从大列表中获取百分位点

2 投票
5 回答
1644 浏览
提问于 2025-04-21 07:45

我有一个很大的数据列表(超过4500万条数据),里面都是数字: [78,0,5,150,9000,5,......,25,9,78422...]

我可以很容易地找出这些数字中的最大值和最小值,计算它们的数量,以及它们的总和:

file_handle=open('huge_data_file.txt','r')
sum_values=0
min_value=None
max_value=None

for i,line in enumerate(file_handle):
    value=int(line[:-1])
    if min_value==None or value<min_value:
        min_value=value
    if max_value==None or value>max_value:
        max_value=value
    sum_values+=value
average_value=float(sum_values)/i

不过,这些都不是我真正需要的。我需要一个包含10个数字的列表,这10个数字之间的数据点数量是相等的。例如:

中位数点 [0,30,120,325,912,1570,2522,5002,7025,78422],在0和30之间,或者在30和120之间的数据点数量大约是450万。

我该怎么做呢?

=============================

编辑:

我知道我们需要对数据进行排序。问题是,我无法把所有这些数据都放在一个变量里,因为内存不够,但我需要从一个生成器(file_handle)中顺序读取这些数据。

5 个回答

1

我的代码是一个提议,旨在找到结果而不需要占用太多空间。在测试中,它在7分钟51秒内找到了一个量化值,数据集的大小为4500万。

from bisect import bisect_left

class data():
    def __init__(self, values):
        random.shuffle(values)
        self.values = values
    def __iter__(self):
        for i in self.values:
            yield i
    def __len__(self):
        return len(self.values)
    def sortedValue(self, percentile):
        val = list(self)
        val.sort()
        num = int(len(self)*percentile)
        return val[num]

def init():
    numbers = data([x for x in range(1,1000000)])
    print(seekPercentile(numbers, 0.1))
    print(numbers.sortedValue(0.1))


def seekPercentile(numbers, percentile):
    lower, upper = minmax(numbers)
    maximum = upper
    approx = _approxPercentile(numbers, lower, upper, percentile)
    return neighbor(approx, numbers, maximum)

def minmax(list):
    minimum = float("inf")
    maximum = float("-inf")

    for num in list:
        if num>maximum:
            maximum = num
        if num<minimum:
            minimum = num
    return minimum, maximum

def neighbor(approx, numbers, maximum):
    dif = maximum
    for num in numbers:
        if abs(approx-num)<dif:
            result = num
            dif = abs(approx-num)
    return result

def _approxPercentile(numbers, lower, upper, percentile):
    middles = []
    less = []
    magicNumber = 10000
    step = (upper - lower)/magicNumber
    less = []
    for i in range(1, magicNumber-1):
        middles.append(lower + i * step)
        less.append(0)

    for num in numbers:
        index = bisect_left(middles,num)
        if index<len(less):
            less[index]+= 1

    summing = 0
    for index, testVal in enumerate(middles):
        summing += less[index]
        if summing/len(numbers) < percentile:
            print(" Change lower from "+str(lower)+" to "+ str(testVal))
            lower = testVal

        if summing/len(numbers) > percentile:
            print(" Change upper from "+str(upper)+" to "+ str(testVal))
            upper = testVal
            break


    precision = 0.01
    if (lower+precision)>upper:
        return lower
    else:
        return _approxPercentile(numbers, lower, upper, percentile)

init()

我对我的代码做了一些修改,现在我觉得这种方法至少在某种程度上是有效的,即使它不是最优的。

1

这里有一个用纯Python写的磁盘分区排序的实现。虽然代码比较慢,也不太好看,但它能正常工作,希望每个步骤都能让人比较清楚(合并的那部分代码真的很难看!)。

#!/usr/bin/env python
import os


def get_next_int_from_file(f):
    l = f.readline()
    if not l:
        return None
    return int(l.strip())


MAX_SAMPLES_PER_PARTITION = 1000000
PARTITION_FILENAME = "_{}.txt"

# Partition data set
part_id = 0
eof = False

with open("data.txt", "r") as fin:
    while not eof:
        print "Creating partition {}".format(part_id)
        with open(PARTITION_FILENAME.format(part_id), "w") as fout:
            for _ in range(MAX_SAMPLES_PER_PARTITION):
                line = fin.readline()
                if not line:
                    eof = True
                    break
                fout.write(line)
        part_id += 1

num_partitions = part_id


# Sort each partition
for part_id in range(num_partitions):
    print "Reading unsorted partition {}".format(part_id)
    with open(PARTITION_FILENAME.format(part_id), "r") as fin:
        samples = [int(line.strip()) for line in fin.readlines()]
    print "Disk-Deleting unsorted {}".format(part_id)
    os.remove(PARTITION_FILENAME.format(part_id))
    print "In-memory sorting partition {}".format(part_id)
    samples.sort()
    print "Writing sorted partition {}".format(part_id)
    with open(PARTITION_FILENAME.format(part_id), "w") as fout:
        fout.writelines(["{}\n".format(sample) for sample in samples])


# Merge-sort the partitions
# NB This is a very inefficient implementation!
print "Merging sorted partitions"
part_files = []
part_next_int = []
num_lines_out = 0

# Setup data structures for the merge
for part_id in range(num_partitions):
    fin = open(PARTITION_FILENAME.format(part_id), "r")
    next_int = get_next_int_from_file(fin)
    if next_int is None:
        continue
    part_files.append(fin)
    part_next_int.append(next_int)

with open("data_sorted.txt", "w") as fout:
    while part_files:
        # Find the smallest number across all files
        min_number = None
        min_idx = None
        for idx in range(len(part_files)):
            if min_number is None or part_next_int[idx] < min_number:
                min_number = part_next_int[idx]
                min_idx = idx
        # Now add that number, and move the relevent file along
        fout.write("{}\n".format(min_number))
        num_lines_out += 1
        if num_lines_out % MAX_SAMPLES_PER_PARTITION == 0:
            print "Merged samples: {}".format(num_lines_out)

        next_int = get_next_int_from_file(part_files[min_idx])
        if next_int is None:
            # Remove this partition, it's now finished
            del part_files[min_idx:min_idx + 1]
            del part_next_int[min_idx:min_idx + 1]
        else:
            part_next_int[min_idx] = next_int

# Cleanup partition files
for part_id in range(num_partitions):
    os.remove(PARTITION_FILENAME.format(part_id))
1

这是一种比较简单的numpy方法:

import numpy as np

# example data (produced by numpy but converted to a simple list)
datalist = list(np.random.randint(0, 10000000, 45000000))

# converted back to numpy array (start here with your data)
arr = np.array(datalist)
np.percentile(arr, 10), np.percentile(arr, 20), np.percentile(arr, 30)
# ref: 
# http://docs.scipy.org/doc/numpy-dev/reference/generated/numpy.percentile.html

你也可以试着这样做:

arr.sort()

# And then select the 10%, 20% etc value, add some check for equal amount of
# numbers within a bin and then calculate the average, excercise for reader :-)

不过要注意,如果你多次调用这个函数,它的速度会变慢。所以其实,最好的办法是先把数组排序,然后自己选择需要的元素。

1

正如你在评论中提到的,你想要一个可以处理更大数据集的解决方案,而这些数据集可能无法完全放在内存里。你可以把数据放进一个SQLlite3数据库里。即使你的数据集有10GB,而你的内存只有8GB,SQLlite3数据库仍然可以对数据进行排序,并把结果按顺序返回给你。

SQLlite3数据库会给你一个可以逐步获取已排序数据的生成器。你也可以考虑使用其他数据库解决方案,超越Python的限制。

2

如果你对一个大致的结果满意,这里有一个很不错(而且相对简单易行)的算法,可以用来从流数据中计算分位数。这个算法的详细介绍可以在这篇文章中找到:“空间高效的在线分位数总结计算”,作者是Greenwald和Khanna。

撰写回答