我应该学习/使用MapReduce还是其他并行化方法来完成这个任务?

11 投票
3 回答
1148 浏览
提问于 2025-04-16 07:24

我和我在谷歌的朋友聊过后,想要实现一种工作/任务模型来更新我的数据集。

这个数据集是一个第三方服务的数据的镜像,所以为了更新数据,我需要多次远程调用他们的API。我觉得大部分时间都会花在等待这个第三方服务的响应上。我想通过并行处理这些请求,加快速度,同时更好地利用我的计算时间,这样可以同时保持多个请求处于打开状态,等待各自的响应。

在我解释我的具体数据集和问题之前,我想先明确我想要的答案:

  1. 这个流程适合用MapReduce来并行处理吗?
  2. 如果适合,在亚马逊的MapReduce模块上运行是否划算?因为它按小时收费,完成工作后会向上取整小时数。我不太确定什么算作一个“工作”,所以不太清楚我会怎么收费。
  3. 如果不适合,有没有其他系统或模式我应该使用?并且有没有什么库可以帮助我在Python中实现这个(在AWS上,使用EC2 + EBS)?
  4. 你觉得我设计的这个工作流程有什么问题吗?

好了,现在进入细节:

这个数据集包含了用户的收藏项目和关注的其他用户。目标是能够更新每个用户的队列——也就是用户在加载页面时会看到的项目列表,这个列表是基于她关注的用户的收藏项目。但在我处理数据并更新用户的队列之前,我需要确保我有最新的数据,这就是API调用的作用。

我可以进行两个调用:

  • 获取关注的用户——返回被请求用户关注的所有用户,
  • 获取收藏项目——返回被请求用户的所有收藏项目。

在我为正在更新的用户调用获取关注的用户后,我需要更新每个被关注用户的收藏项目。只有当所有被关注用户的收藏项目都返回后,我才能开始处理那个原始用户的队列。这个流程看起来是这样的:

更新用户X的队列

这个流程中的工作包括:

  • 开始更新用户的队列——启动这个过程,获取被更新用户关注的用户,存储这些用户,然后为每个用户创建获取收藏的任务。
  • 获取用户的收藏——请求并存储指定用户的收藏列表,从第三方服务获取。
  • 计算用户的新队列——在所有数据都获取后,处理新的队列,然后将结果存储在应用层使用的缓存中。

所以,我的问题再次是:

  1. 这个流程适合用MapReduce来并行处理吗?我不确定它是否允许我为用户X启动这个过程,获取所有相关数据,然后在所有数据都处理完后再处理用户X的队列。
  2. 如果适合,在亚马逊的MapReduce模块上运行是否划算?如果我使用他们的模块,是否有关于我可以等待的开放API请求的“线程”数量的限制?
  3. 如果不适合,有没有其他系统或模式我应该使用?并且有没有什么库可以帮助我在Python中实现这个(在AWS上,使用EC2 + EBS)?
  4. 你觉得我设计的这个工作流程有什么问题吗?

谢谢你们的阅读,我期待和大家讨论。

编辑,回应JimR:

感谢你的详细回复。在我写下原始问题后,我开始倾向于不使用MapReduce。我还没有完全决定如何构建这个,但我开始觉得MapReduce更适合分布式/并行计算负载,而我只是想并行化HTTP请求。

我原本的“归约”任务,即处理所有获取的数据并将其转化为结果,并不是那么计算密集。我很确定这最终会变成每个用户执行一两秒的大SQL查询。

所以,我现在倾向于:

  • 一个非MapReduce的工作/任务模型,使用Python编写。我的一个谷歌朋友推荐我学习Python,因为它开销小且扩展性好。
  • 使用亚马逊EC2作为计算层。我想这意味着我还需要一个EBS切片来存储我的数据库。
  • 可能使用亚马逊的简单消息队列。听起来这个亚马逊的工具是为了跟踪任务队列,将结果从一个任务转移到另一个任务的输入,并优雅地处理失败的任务。它非常便宜。可能值得实现,而不是自定义一个任务队列系统。

3 个回答

0

我正在处理一个类似的问题,需要找到解决办法。我也在研究MapReduce,并使用亚马逊的弹性MapReduce服务。

我很相信MapReduce能解决这个问题。现在让我卡住的地方是实现部分,因为我不确定我的归约器(reducer)是否真的需要做什么。

我会根据我对你(和我自己)问题的理解来回答你的问题,希望能对你有所帮助。

  1. 是的,我认为这很合适。你可以考虑利用弹性MapReduce服务的多步骤选项。你可以用一个步骤来获取用户关注的人,然后用另一个步骤来为这些关注者编制一个曲目列表,第二个步骤的归约器可能就是用来建立缓存的。

  2. 这取决于你的数据集有多大,以及你会多频繁地运行它。如果不知道数据集有多大(或者将来会变得多大),很难说这样做是否划算。最开始,这可能会非常划算,因为你不需要管理自己的Hadoop集群,也不需要支付EC2实例的费用(假设你使用的是这些),让它们一直在线。一旦你开始长时间处理这些数据,使用亚马逊的MapReduce服务可能就不太划算了,因为你会一直有节点在线。

一个作业基本上就是你的MapReduce任务。它可以由多个步骤组成(每个MapReduce任务就是一个步骤)。一旦你的数据处理完成,所有步骤都结束了,你的作业就完成了。所以你实际上是在为Hadoop集群中每个节点的CPU时间付费。也就是说,费用是T*n,其中T是处理数据所需的时间(以小时计),n是你告诉亚马逊启动的节点数量。

希望这能帮到你,祝你好运。我很想知道你最终是如何实现你的映射器(Mappers)和归约器(Reducers)的,因为我正在解决一个非常相似的问题,我也不确定我的方法是否真的是最好的。

5

你描述的工作可能适合用队列,或者队列和工作服务器的组合。其实,它也可以作为一系列的MapReduce步骤来实现。

如果你想用工作服务器,我推荐看看Gearman。虽然它的文档不是特别好,但演示资料做得很不错,而且Python模块也比较容易理解。

简单来说,你在工作服务器上创建一些函数,然后客户端通过API来调用这些函数。函数可以同步调用,也可以异步调用。在你的例子中,你可能想要异步添加“开始更新”的任务。这个任务会处理一些准备工作,然后异步调用“获取关注用户”的任务。这个任务会获取用户信息,然后再调用“更新关注用户”的任务。这个任务会一次性提交所有“获取用户A的收藏”和朋友的任务,并同步等待所有任务的结果。当所有任务都完成后,它会调用“计算新队列”的任务。

这种仅使用工作服务器的方法一开始可能不太稳定,因为确保你能正确处理错误、服务器宕机和数据持久化会比较麻烦。

对于队列,SQS是个明显的选择。它非常稳定,从EC2访问速度也很快,而且价格便宜。对于刚入门的人来说,设置和维护比其他队列简单多了。

基本上,你会把消息放到队列里,就像上面提交任务给工作服务器一样,不过你可能不会做任何同步操作。你会异步调用“获取用户A的收藏”等等,然后有一条消息来检查这些任务是否都完成。你需要某种持久化存储(比如你熟悉的SQL数据库,或者如果你想完全使用AWS,可以用亚马逊的SimpleDB)来跟踪工作是否完成——在SQS中你无法检查任务的进度(虽然在其他队列中可以)。那条检查是否完成的消息会进行检查——如果任务没有全部完成,就不做任何事情,然后这条消息会在几分钟后重试(根据visibility_timeout)。否则,你可以把下一条消息放到队列里。

这种仅使用队列的方法应该是稳定的,只要你不小心消费了队列消息而没有完成工作。用SQS犯这样的错误是比较难的——你真的得努力去做。不要使用自动消费的队列或协议——如果出错了,你可能无法确保把替代消息放回队列。

在这种情况下,队列和工作服务器的组合可能会很有用。你可以不需要持久化存储来检查任务进度——工作服务器会让你跟踪任务的进展。你的“获取用户收藏”的消息可以把所有“获取用户A/B/C的收藏”任务放到工作服务器里。然后,把一条“检查所有收藏获取是否完成”的消息放到队列里,附上需要完成的任务列表(以及足够的信息来重启任何神秘消失的任务)。

额外加分:

把这个做成MapReduce应该相对简单。

你第一个任务的输入是所有用户的列表。映射步骤会处理每个用户,获取他们关注的用户,并输出每个用户及其关注用户的行:

"UserX" "UserA"
"UserX" "UserB"
"UserX" "UserC"

一个身份归约步骤不会改变这个结果。这将形成第二个任务的输入。第二个任务的映射步骤会获取每行的收藏(你可能想用memcached来防止通过API获取用户X/用户A组合和用户Y/用户A的收藏),并为每个收藏输出一行:

"UserX" "UserA" "Favourite1"
"UserX" "UserA" "Favourite2"
"UserX" "UserA" "Favourite3"
"UserX" "UserB" "Favourite4"

这个任务的归约步骤会将其转换为:

 "UserX" [("UserA", "Favourite1"), ("UserA", "Favourite2"), ("UserA", "Favourite3"), ("UserB", "Favourite4")]

此时,你可能会有另一个MapReduce任务来更新数据库中每个用户的这些值,或者你可以使用一些与Hadoop相关的工具,比如Pig、Hive和HBase来管理你的数据库。

我建议使用Cloudera的Hadoop发行版的ec2管理命令来创建和拆除你的Hadoop集群(他们的AMI上已经设置好了Python),并使用像Dumbo(在PyPI上)这样的工具来创建你的MapReduce任务,因为它允许你在本地/开发机器上测试你的MapReduce任务,而不需要访问Hadoop。

祝你好运!

1

看起来我们决定使用 Node.jsSeq 这个流程控制库。把我的流程图转成代码的框架非常简单,现在只需要把代码补充完整,连接到正确的API上就行了。

谢谢大家的回答,真的帮我找到了我想要的解决方案。

撰写回答