我正在寻找一个强大和快速的方式来处理处理谷歌应用引擎中的大文件。在
其工作原理如下(最后简化了工作流程):
Uploads
中会添加一个条目,其中包含CSV名称、文件路径(到Google存储)和一些基本信息。然后,创建一个任务,称为“预处理”。在UploadEntries
模型中,包括CSV id、行、要提取/处理的数据以及该行是否已开始处理和结束处理的一些指标(布尔值)(“正在处理”、“是否完成”)Uploads.next()
。next
方法将:
is_treating
和{Process-healthcheck
中创建一个新条目(此任务在5分钟后运行,并检查7)是否已正确执行。如果没有,则认为Redis/Outside服务器出现故障,与7)相同,没有结果(“error”)。在UploadEntries.is_treating
更新为True。在UploadEntries
条目(包括“is_treating
”和“is_done
”),并调用Uploads.next()
开始下一行。在post-process
,该任务将用处理后的数据重建CSV,并将其返回给客户。在以下几点要记住:
Uploads.next()
方法包含一个limit
参数,允许我并行搜索n
进程。50可以是1,50。在pre-processing
任务中的所有行直接添加到Redis,因为在这种情况下,下一个客户将不得不等待第一个文件完成处理,这将花费太长时间但是这个系统有各种各样的问题,这就是为什么我要求助于你:
Uploads.next()
时,返回的条目已经被处理了(只是entry.is_treating = True
还没有被推送到数据库)is_done = True
。这就是为什么我必须实现一个Healcheck系统,以确保不管发生什么情况,都能正确地处理这条线。这有双重好处:任务的名称包含csv ID和行。使每个文件唯一。如果数据存储不是最新的并且同一个任务运行了两次,那么healthcheck的创建将失败,因为相同的名称已经存在,让我知道存在并发问题,所以我忽略该任务,因为这意味着数据存储还不是最新的。在我最初考虑过在一个独立的进程中逐行运行该文件,但这有一个很大的缺点,即不能并行运行多行。此外,Google将一个任务的运行时间限制为24小时目标(不是默认值),当文件非常大时,它可以运行超过24小时
对于信息,如果有用的话,我使用的是Python
为了简化工作流程,以下是我试图以最佳方式实现的目标:
如果有人能有更好的方法来做这件事,我会非常感激的。我真的相信我不是第一个做这种工作的人,我很肯定我做得不对。在
(我认为Stackoverflow是堆栈交换中发布此类问题的最佳部分,因为这是一个算法问题,但也有可能我没有看到一个更好的网络。如果是这样的话,我很抱歉)。在
您考虑过使用Google Cloud Dataflow来处理大文件吗? 它是一个托管服务,将为您处理文件拆分和处理。在
根据最初的想法,这里有一个概要过程:
BlockingDataflowPipelineRunner
)以启动数据流任务。(由于沙盒和阻塞I/O问题,恐怕它需要一个计算实例)。在相关问题 更多 >
编程相关推荐