python:高效处理灵活深度嵌套数据的技巧有哪些?

11 投票
6 回答
5196 浏览
提问于 2025-04-15 21:01

我的问题不是关于某段具体的代码,而是更一般性的,所以请耐心听我说:

我应该如何组织我正在分析的数据,应该使用哪些工具来管理这些数据呢?

我正在使用Python和Numpy来分析数据。因为Python的文档提到字典在Python中非常高效,而且由于我的数据本身结构很清晰,所以我把它存储在一个层层嵌套的字典里。

这里是字典的一个框架:在层级中的位置决定了元素的性质,每一行新内容代表前一层中一个键的内容:

[AS091209M02] [AS091209M01] [AS090901M06] ... 
[100113] [100211] [100128] [100121] 
[R16] [R17] [R03] [R15] [R05] [R04] [R07] ... 
[1263399103] ... 
[ImageSize] [FilePath] [Trials] [Depth] [Frames] [Responses] ... 
[N01] [N04] ... 
[Sequential] [Randomized] 
[Ch1] [Ch2]

补充说明一下我的数据集:

[individual] ex: [AS091209M02]
[imaging session (date string)] ex: [100113]
[Region imaged] ex: [R16]
[timestamp of file] ex [1263399103]  
[properties of file] ex: [Responses]
[regions of interest in image ] ex [N01]
[format of data] ex [Sequential]
[channel of acquisition: this key indexes an array of values] ex [Ch1]

我进行的操作类型,比如计算数组的属性(在Ch1、Ch2下列出),选择数组来创建一个新集合,比如分析某个个体在不同时间点来自第16区域(R16)的N01的反应等等。

这个结构对我来说效果很好,而且速度很快,正如所承诺的那样。我可以很快分析完整的数据集(而且字典的大小远远不够填满我电脑的内存:只有半个GB)。

我的问题在于,我需要编写字典操作的方式太繁琐了。我经常有一段段代码像这样:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

这段代码看起来很丑,繁琐,不可重用,而且不够稳定(每次字典有变化都需要重新编写)。

我尝试使用递归函数,但除了最简单的应用外,我遇到了一些非常棘手的错误和奇怪的行为,浪费了很多时间(在处理深层嵌套的递归函数时,我在ipython中用pdb调试也不太顺利)。最终,我唯一经常使用的递归函数是:

def dicExplorer(dic, depth = -1, stp = 0):
    '''prints the hierarchy of a dictionary.
    if depth not specified, will explore all the dictionary
    '''
    if depth - stp == 0: return
    try : list_keys = dic.keys()
    except AttributeError: return
    stp += 1
    for key in list_keys:
        else: print '+%s> [\'%s\']' %(stp * '---', key)
        dicExplorer(dic[key], depth, stp)

我知道这样做不对,因为我的代码又长又复杂,不能重复使用。我需要使用更好的技术来灵活操作字典,或者把数据放到某种数据库格式中(比如sqlite?)。我的问题是,由于我在编程方面是(不太好)自学的,所以缺乏实践经验和背景知识,无法充分理解可用的选项。我愿意学习新工具(SQL,面向对象编程),无论如何都想完成工作,但我不想把时间和精力投入到对我需求没有帮助的事情上。

那么,你有什么建议来解决这个问题,让我能够以更简洁、灵活和可重用的方式编写我的工具呢?

附加说明:除了对数据字典的某个特定子字典进行操作外,这里有一些我为数据集dic或其子字典实现的操作示例:

实际上,我有一些递归函数效果很好:

def normalizeSeqDic(dic, norm_dic = {}, legend = ()):
    '''returns a normalized dictionary from a seq_amp_dic. Normalization is performed using the first time point as reference
    '''
    try : 
        list_keys = dic.keys()
        for key in list_keys:
            next_legend = legend + (key,) 
            normalizeSeqDic(dic[key], norm_dic, next_legend)
    except AttributeError:
        # normalization
        # unpack list
        mk, ek, nk, tpk = legend
        #assign values to amplitude dict
        if mk not in norm_dic: norm_dic[mk] = {}
        if ek not in norm_dic[mk]: norm_dic[mk][ek] = {}
        if nk not in norm_dic[mk][ek]: norm_dic[mk][ek][nk] = {}
        if tpk not in norm_dic[mk][ek][nk]: norm_dic[mk][ek][nk][tpk] = {}
        new_array = []
        for x in range(dic.shape[0]):
            new_array.append(dic[x][1:]/dic[x][0])
        new_array = asarray(new_array)
        norm_dic[mk][ek][nk][tpk] = new_array
    return norm_dic

def poolDic(dic):
    '''returns a dic in which all the values are pooled, and root (mk) keys are fused
    these pooled dics can later be combined into another dic
    '''
    pooled_dic = {}
    for mk in dic.keys():
        for ek in dic[mk].keys():
            for nk in dic[mk][ek].keys():
                for tpk in dic[mk][ek][nk].keys():
                    #assign values to amplitude dict
                    if ek not in pooled_dic: pooled_dic[ek] = {}
                    if nk not in pooled_dic[ek]: pooled_dic[ek][nk] = {}
                    if tpk not in pooled_dic[ek][nk]:
                        pooled_dic[ek][nk][tpk] = dic[mk][ek][nk][tpk]
                    else: pooled_dic[ek][nk][tpk]= vstack((pooled_dic[ek][nk][tpk], dic[mk][ek][nk][tpk]))
    return pooled_dic

def timePointsDic(dic):
    '''Determines the timepoints for each individual key at root
    '''
    tp_dic = {}
    for mk in dic.keys():
        tp_list = []
        for rgk in dic[mk].keys():
            tp_list.extend(dic[mk][rgk]['Neuropil'].keys())
        tp_dic[mk]=tuple(sorted(list(set(tp_list))))
    return tp_dic

对于某些操作,我发现没有其他方法,只能将字典扁平化:

def flattenDic(dic, label):
    '''flattens a dic to produce a list of of tuples containing keys and 'label' values
    '''
    flat_list = []
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        flat_list.append((mk, rgk, nk, ik, ek, dic[mk][rgk][nk][ik][ek][label])
    return flat_list

def extractDataSequencePoints(flat_list, mk, nk, tp_list):
        '''produces a list containing arrays of time point values
        time_points is a list of the time points wished (can have 2 or 3 elements)
        '''
        nb_tp = len(tp_list)
        # build tp_seq list
        tp_seq = []
        tp1, tp2, tp3 = [], [], []
        if nk == 'Neuropil':
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil' and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and  x[3] == tp_list[1])
        else:
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[1])
        if nb_tp == 3:
            if nk == 'Neuropil':
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and x[3] == tp_list[2])
            else:
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[2])
        for x in tp1:
            for y in tp2:
                if x[0:3] == y[0:3] :
                    if nb_tp == 3:
                        for z in tp3:
                            if x[0:3] == z[0:3] :
                                tp_seq.append(asarray([x[4],y[4],z[4]]))
                    else:
                        tp_seq.append(asarray([x[4],y[4]]))
        return tp_seq

6 个回答

1

我来分享一些想法。首先,有一个函数是这样的:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

你可能想简单地写成:

for ek in deep_loop(dic):
    do_something

这里有两种方法。一种是功能性的方法,另一种像生成器的方法。第二种方法是:

def deep_loop(dic):
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        yield ek

这样可以让你在遍历字典时捕捉到逻辑。你可以很容易地修改这个函数,以支持不同的遍历方式。这取决于你的结构如何变化,可能只是循环的深度,或者其他的变化。你能不能提供一些更复杂的例子,说明你在遍历树时有什么需求?比如过滤、搜索等等?深度的实现大概是这样的(未经测试)——它会返回一对(键的元组),(值):

def deep_loop(dic, depth):
    if depth == 0:
        yield (), dic
    for subkey, subval in dic.items():
        for ktuple, value in deep_loop(subval, depth-1):
            yield (subkey,)+ktuple, value

现在变得简单多了:

for (k1,k2,k3,k4), value in deep_loop(dic, 4):
    # do something

还有其他方法可以定制这个,你可以把一个命名元组类型作为深度遍历的参数。深度遍历可以自动检测命名元组的深度,并返回这个命名元组。

2

你可以让你的循环看起来更简洁,通过把:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

换成

for mv in dic.values():
    for rgv in mv.values():
        for nv in rgv.values():
            for iv in nv.values():
                for ev in iv.values():
                    #do something

这样你就能用比较简短的代码获取所有的值。如果你还需要一些键,可以这样做:

for (mk, mv) in dic.items():
    # etc.

根据你的需求,你也可以考虑创建一个字典,并使用元组作为键:

dic[(mk, rgk, nv, ik, ek)]
12

“我把它存储在一个层次很深的字典里。”

结果你也看到了,这样做并不好。

那有什么替代方案呢?

  1. 使用复合键和一个浅层字典。你可以有一个包含8个部分的键: (个人信息、成像会话、成像区域、文件时间戳、文件属性、图像中的感兴趣区域、数据格式、采集通道),这个键可以对应一个值的数组。

    { ('AS091209M02', '100113', 'R16', '1263399103', 'Responses', 'N01', 'Sequential', 'Ch1' ): array, 
    ...
    

    这样做的问题在于搜索。

  2. 使用合适的类结构。其实,定义一个完整的类可能有点过于复杂。

“我进行的操作类型,比如计算数组的属性(在Ch1和Ch2下列出),选择数组来创建一个新集合,比如分析某个个体在不同时间点的第16个区域(R16)的N01响应等等。”

建议

首先,使用一个namedtuple来定义你的最终对象。

Array = namedtuple( 'Array', 'individual, session, region, timestamp, properties, roi, format, channel, data' )

或者类似的东西。构建一个简单的这些命名元组对象的列表。然后你可以简单地遍历它们。

其次,对这个数组对象的主列表进行许多简单的映射-归约操作。

过滤:

for a in theMasterArrrayList:
    if a.region = 'R16' and interest = 'N01':
        # do something on these items only.

按公共键进行归约:

individual_dict = defaultdict(list)
for a in theMasterArrayList:
    individual_dict[ a.individual ].append( a )

这将创建一个只包含你想要的项目的子集。

然后你可以通过individual_dict['AS091209M02']来获取他们的所有数据。你可以对任何(或所有)可用的键这样做。

region_dict = defaultdict(list)
for a in theMasterArrayList:
    region_dict[ a.region ].append( a )

这样做不会复制任何数据。速度快,占用内存相对较小。

映射(或转换)数组:

for a in theMasterArrayList:
    someTransformationFunction( a.data )

如果数组本身是一个列表,你可以更新这个列表而不破坏整个元组。如果你需要从现有数组创建一个新数组,那就是在创建一个新元组。这没有问题,但它确实是一个新元组。你最终会得到这样的程序。

def region_filter( array_list, region_set ):
    for a in array_list:
        if a.region in region_set:
            yield a

def array_map( array_list, someConstant ):
    for a in array_list:
        yield Array( *(a[:8] + (someTranformation( a.data, someConstant ),) )

def some_result( array_list, region, someConstant ):
    for a in array_map( region_filter( array_list, region ), someConstant ):
        yield a

你可以将转换、归约、映射构建得更复杂。

最重要的是,从主列表中只创建你需要的字典,这样你就不会进行多余的过滤。

顺便说一下,这可以很容易地映射到关系数据库。虽然速度会慢一些,但你可以进行多个并发更新操作。除了多个并发更新外,关系数据库并没有提供比这更多的功能。

撰写回答