python:以灵活的方式处理深度嵌套数据的有效技术是什么?

2024-04-30 03:21:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我的问题不是关于一个特定的代码片段,而是一个更一般的问题,因此请您耐心等待:

我应该如何组织我正在分析的数据,我应该使用哪些工具来管理它?在

我使用python和numpy来分析数据。因为python文档表明字典在python中是非常优化的,而且由于数据本身是非常结构化的,所以我将其存储在一个深度嵌套的字典中。在

以下是字典的框架:层次结构中的位置定义了元素的性质,每一行新行定义了先例级别中键的内容:

[AS091209M02] [AS091209M01] [AS090901M06] ... 
[100113] [100211] [100128] [100121] 
[R16] [R17] [R03] [R15] [R05] [R04] [R07] ... 
[1263399103] ... 
[ImageSize] [FilePath] [Trials] [Depth] [Frames] [Responses] ... 
[N01] [N04] ... 
[Sequential] [Randomized] 
[Ch1] [Ch2]

编辑:为了更好地解释我的数据集:

^{pr2}$

例如,我执行的操作类型是计算数组的属性(列在Ch1、Ch2下),拾取数组以生成新的集合,例如分析给定个体在不同时间点从区域16(R16)得到的N01的响应,等等

这个结构对我来说很好,而且非常快,正如承诺的那样。我可以很快地分析完整的数据集(而且字典太小了,无法填满我的计算机内存:半千兆字节)。在

我的问题来自于我需要编写字典操作程序的繁琐方式。我经常有这样的代码:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

这是丑陋的、笨重的、不可重用的和脆弱的(需要为字典的任何变体重新编码)。在

我尝试过使用递归函数,但是除了最简单的应用程序之外,我还遇到了一些非常讨厌的错误和奇怪的行为,这导致了大量的时间浪费(当我处理深度嵌套的递归函数时,如果我不在ipython中使用pdb进行调试,那是没有帮助的)。最后,我经常使用的唯一递归函数是:

def dicExplorer(dic, depth = -1, stp = 0):
    '''prints the hierarchy of a dictionary.
    if depth not specified, will explore all the dictionary
    '''
    if depth - stp == 0: return
    try : list_keys = dic.keys()
    except AttributeError: return
    stp += 1
    for key in list_keys:
        else: print '+%s> [\'%s\']' %(stp * '---', key)
        dicExplorer(dic[key], depth, stp)

我知道我做错了,因为我的代码很长,很无聊,而且不可重用。我需要使用更好的技术来灵活地操作字典,或者将数据放入某种数据库格式(sqlite?)。我的问题是,由于我(糟糕地)自学编程,我缺乏实践经验和背景知识来欣赏可用的选项。我已经准备好学习新的工具(SQL、面向对象编程),不管怎样都可以完成任务,但是我不愿意把我的时间和精力投入到那些将成为我需求的死胡同上。在

那么你有什么建议来解决这个问题,并且能够以更简洁、灵活和可重用的方式编写我的工具?在

附录:除了使用数据字典的特定子字典执行某些操作外,以下是我为数据集dic或其子字典实现的一些操作示例:

实际上,我有一些递归函数运行良好:

def normalizeSeqDic(dic, norm_dic = {}, legend = ()):
    '''returns a normalized dictionary from a seq_amp_dic. Normalization is performed using the first time point as reference
    '''
    try : 
        list_keys = dic.keys()
        for key in list_keys:
            next_legend = legend + (key,) 
            normalizeSeqDic(dic[key], norm_dic, next_legend)
    except AttributeError:
        # normalization
        # unpack list
        mk, ek, nk, tpk = legend
        #assign values to amplitude dict
        if mk not in norm_dic: norm_dic[mk] = {}
        if ek not in norm_dic[mk]: norm_dic[mk][ek] = {}
        if nk not in norm_dic[mk][ek]: norm_dic[mk][ek][nk] = {}
        if tpk not in norm_dic[mk][ek][nk]: norm_dic[mk][ek][nk][tpk] = {}
        new_array = []
        for x in range(dic.shape[0]):
            new_array.append(dic[x][1:]/dic[x][0])
        new_array = asarray(new_array)
        norm_dic[mk][ek][nk][tpk] = new_array
    return norm_dic

def poolDic(dic):
    '''returns a dic in which all the values are pooled, and root (mk) keys are fused
    these pooled dics can later be combined into another dic
    '''
    pooled_dic = {}
    for mk in dic.keys():
        for ek in dic[mk].keys():
            for nk in dic[mk][ek].keys():
                for tpk in dic[mk][ek][nk].keys():
                    #assign values to amplitude dict
                    if ek not in pooled_dic: pooled_dic[ek] = {}
                    if nk not in pooled_dic[ek]: pooled_dic[ek][nk] = {}
                    if tpk not in pooled_dic[ek][nk]:
                        pooled_dic[ek][nk][tpk] = dic[mk][ek][nk][tpk]
                    else: pooled_dic[ek][nk][tpk]= vstack((pooled_dic[ek][nk][tpk], dic[mk][ek][nk][tpk]))
    return pooled_dic

def timePointsDic(dic):
    '''Determines the timepoints for each individual key at root
    '''
    tp_dic = {}
    for mk in dic.keys():
        tp_list = []
        for rgk in dic[mk].keys():
            tp_list.extend(dic[mk][rgk]['Neuropil'].keys())
        tp_dic[mk]=tuple(sorted(list(set(tp_list))))
    return tp_dic

对于某些操作,我没有找到其他方法,只能将字典展平:

def flattenDic(dic, label):
    '''flattens a dic to produce a list of of tuples containing keys and 'label' values
    '''
    flat_list = []
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        flat_list.append((mk, rgk, nk, ik, ek, dic[mk][rgk][nk][ik][ek][label])
    return flat_list

def extractDataSequencePoints(flat_list, mk, nk, tp_list):
        '''produces a list containing arrays of time point values
        time_points is a list of the time points wished (can have 2 or 3 elements)
        '''
        nb_tp = len(tp_list)
        # build tp_seq list
        tp_seq = []
        tp1, tp2, tp3 = [], [], []
        if nk == 'Neuropil':
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil' and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and  x[3] == tp_list[1])
        else:
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[1])
        if nb_tp == 3:
            if nk == 'Neuropil':
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and x[3] == tp_list[2])
            else:
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[2])
        for x in tp1:
            for y in tp2:
                if x[0:3] == y[0:3] :
                    if nb_tp == 3:
                        for z in tp3:
                            if x[0:3] == z[0:3] :
                                tp_seq.append(asarray([x[4],y[4],z[4]]))
                    else:
                        tp_seq.append(asarray([x[4],y[4]]))
        return tp_seq

Tags: andinnormforif字典keyslist
3条回答

通过替换以下内容,可以使循环看起来更好:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

^{pr2}$

因此,您可以使用相对简洁的代码访问所有值。如果您还需要一些密钥,可以执行以下操作:

for (mk, mv) in dic.items():
    # etc.

根据您的需要,您还可以考虑创建并使用带有元组键的单个词典:

dic[(mk, rgk, nv, ik, ek)]

我将分享一些关于这个的想法。代替此功能:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

你可以简单地写为:

^{pr2}$

有两种方法。一种是功能性的,第二种是发电机式的。第二个是:

def deep_loop(dic):
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        yield ek

这使您能够捕捉浏览字典的逻辑。很容易修改此函数以支持通过结构的不同方式。它取决于你的结构变化的方式,如果只是一个循环的深度或其他不同的东西。你能不能贴一些更高级的例子来说明你对浏览这棵树有什么要求?比如过滤、搜索等。?深度如下所示(未测试)-它将生成一对(键的元组),(value):

def deep_loop(dic, depth):
    if depth == 0:
        yield (), dic
    for subkey, subval in dic.items():
        for ktuple, value in deep_loop(subval, depth-1):
            yield (subkey,)+ktuple, value

现在它变得更简单了:

for (k1,k2,k3,k4), value in deep_loop(dic, 4):
    # do something

还有其他方法可以自定义此项,您可以添加命名元组类型作为deep_loop的参数。Deep_loop可以自动检测命名元组的深度并返回命名元组。在

"I stored it in a deeply nested dictionary"

而且,正如你所看到的,结果并不好。在

还有什么选择?在

  1. 复合键和浅字典。您有一个8部分的密钥: (单个、成像会话、成像区域、文件时间戳、文件属性、图像感兴趣区域、数据格式、采集通道)映射 值数组。在

    { ('AS091209M02', '100113', 'R16', '1263399103', 'Responses', 'N01', 'Sequential', 'Ch1' ): array, 
    ...
    

    问题在于搜索。

  2. 适当的阶级结构。实际上,一个完整的类定义可能有点过头了。

"The type of operations I perform is for instance to compute properties of the arrays (listed under Ch1, Ch2), pick up arrays to make a new collection, for instance analyze responses of N01 from region 16 (R16) of a given individual at different time points, etc."

推荐

首先,使用namedtuple作为最终对象。在

^{pr2}$

或者类似的事情。构建这些命名元组对象的简单列表。然后您可以简单地迭代它们。在

其次,在这个数组对象的主列表上使用许多简单的map reduce操作。在

过滤:

for a in theMasterArrrayList:
    if a.region = 'R16' and interest = 'N01':
        # do something on these items only.

按公用密钥还原:

individual_dict = defaultdict(list)
for a in theMasterArrayList:
    individual_dict[ a.individual ].append( a )

这将在映射中创建一个子集,该子集正好包含所需的项。在

然后,你可以单独做一个句子['AS091209M02',并获得他们所有的数据。您可以对任何(或所有)可用密钥执行此操作。在

region_dict = defaultdict(list)
for a in theMasterArrayList:
    region_dict[ a.region ].append( a )

这不会复制任何数据。它速度快,内存相对紧凑。在

映射(或变换)阵列:

for a in theMasterArrayList:
    someTransformationFunction( a.data )

如果你可以更新一个完整的数组而不破坏列表本身。如果您需要从现有数组创建一个新数组,那么您正在创建一个新的元组。这没什么错,但它是一个新的元组。你最终会得到这样的程序。在

def region_filter( array_list, region_set ):
    for a in array_list:
        if a.region in region_set:
            yield a

def array_map( array_list, someConstant ):
    for a in array_list:
        yield Array( *(a[:8] + (someTranformation( a.data, someConstant ),) )

def some_result( array_list, region, someConstant ):
    for a in array_map( region_filter( array_list, region ), someConstant ):
        yield a

你可以建立转换,减少,映射到更复杂的东西。在

最重要的是从主列表中只创建您需要的字典,这样您就不会做任何超出最低限度的过滤。在

顺便说一句,这可以简单地映射到关系数据库。它会比较慢,但是您可以有多个并发的更新操作。除了多个并发更新外,关系数据库不提供任何高于此的特性。在

相关问题 更多 >