如何在GAE上处理大文件?

2024-04-26 13:07:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在寻找一个强大和快速的方式来处理处理谷歌应用引擎中的大文件。在

其工作原理如下(最后简化了工作流程):

  1. 客户发送一个CSV文件,我们的服务器将逐行处理。在
  2. 上传文件后,NDB数据存储Uploads中会添加一个条目,其中包含CSV名称、文件路径(到Google存储)和一些基本信息。然后,创建一个任务,称为“预处理”。在
  3. 预处理任务将循环CSV文件的所有行(可能是数百万行),并将为每一行添加一个NDB条目到UploadEntries模型中,包括CSV id、行、要提取/处理的数据以及该行是否已开始处理和结束处理的一些指标(布尔值)(“正在处理”、“是否完成”)
  4. 预处理任务结束后,它会将信息更新到客户端“将处理XXX行”
  5. 调用Uploads.next()next方法将:
    • is_treating和{}为false时搜索{}
    • 将在Redis数据存储中为找到的下一行添加任务。(使用Redis数据存储是因为这里的工作是在不由Google管理的服务器上进行的)
    • 还将在任务Process-healthcheck中创建一个新条目(此任务在5分钟后运行,并检查7)是否已正确执行。如果没有,则认为Redis/Outside服务器出现故障,与7)相同,没有结果(“error”)。在
    • 然后,它将该条目的UploadEntries.is_treating更新为True。在
  6. 外部服务器将处理数据,并通过向服务器上的端点发出POST请求来返回结果。在
  7. 该端点更新数据存储中的UploadEntries条目(包括“is_treating”和“is_done”),并调用Uploads.next()开始下一行。在
  8. 在上传.下一步,当搜索下一个条目时,没有返回任何结果,我认为该文件将被最终处理,并调用任务post-process,该任务将用处理后的数据重建CSV,并将其返回给客户。在

以下几点要记住:

  1. 真正起作用的服务器都在googleappengine之外,这就是为什么我必须开发Redis。在
  2. 当前的处理方式为我提供了处理并行条目的灵活性:在5)中,Uploads.next()方法包含一个limit参数,允许我并行搜索n进程。50可以是1,50。在
  3. 我不能直接将pre-processing任务中的所有行直接添加到Redis,因为在这种情况下,下一个客户将不得不等待第一个文件完成处理,这将花费太长时间

但是这个系统有各种各样的问题,这就是为什么我要求助于你:

  1. 有时候,这个系统太快了,数据存储还没有正确更新,当调用Uploads.next()时,返回的条目已经被处理了(只是entry.is_treating = True还没有被推送到数据库)
  2. Redis或者我的服务器(我不知道)有时候会丢失任务,或者在处理之后没有发出POST请求,所以任务永远不会转到is_done = True。这就是为什么我必须实现一个Healcheck系统,以确保不管发生什么情况,都能正确地处理这条线。这有双重好处:任务的名称包含csv ID和行。使每个文件唯一。如果数据存储不是最新的并且同一个任务运行了两次,那么healthcheck的创建将失败,因为相同的名称已经存在,让我知道存在并发问题,所以我忽略该任务,因为这意味着数据存储还不是最新的。在

我最初考虑过在一个独立的进程中逐行运行该文件,但这有一个很大的缺点,即不能并行运行多行。此外,Google将一个任务的运行时间限制为24小时目标(不是默认值),当文件非常大时,它可以运行超过24小时

对于信息,如果有用的话,我使用的是Python


为了简化工作流程,以下是我试图以最佳方式实现的目标:

  • 处理一个大文件,运行多个并行进程,每行一个。在
  • 使用Redis将工作发送到外部服务器。完成后,外部服务器通过POST请求将结果返回给主服务器
  • 然后,主服务器更新关于该行的信息,并转到下一行

如果有人能有更好的方法来做这件事,我会非常感激的。我真的相信我不是第一个做这种工作的人,我很肯定我做得不对。在

(我认为Stackoverflow是堆栈交换中发布此类问题的最佳部分,因为这是一个算法问题,但也有可能我没有看到一个更好的网络。如果是这样的话,我很抱歉)。在


Tags: 文件csv数据服务器名称redis信息客户
1条回答
网友
1楼 · 发布于 2024-04-26 13:07:25

The servers that does the real work are outside of Google AppEngine

您考虑过使用Google Cloud Dataflow来处理大文件吗? 它是一个托管服务,将为您处理文件拆分和处理。在

根据最初的想法,这里有一个概要过程:

  • 用户使用signed urls或blobstore API将文件直接上载到google云存储
  • 来自AppEngine的请求启动一个小型计算引擎实例,该实例启动一个阻塞请求(BlockingDataflowPipelineRunner)以启动数据流任务。(由于沙盒和阻塞I/O问题,恐怕它需要一个计算实例)。在
  • 当数据流任务完成时,计算引擎实例被解除阻塞,并在pubsub中发布一条消息。在
  • pubsub消息调用AppEngine服务上的webhook,该服务将任务状态从“进行中”更改为“完成”,以便用户可以获取结果。在

相关问题 更多 >