MapReduce: Simplified Data Processing on Large Clusters

摘要

MapReduce是用于大规模数据的新的编程模型,map对KV数据进行处理,生成中间KV结果,reduce对中间结果按照KEY进行分组操作。

简介

随着互联网的发展,数据量出现了迸发式的增长,如何对如此大规模的数据进行分布式存储?如何基于这些数据进行并行计算?如何处理计算过程中的故障问题?

为了解决基于大数据量、普通PC机器的大规模计算问题,可以将所有的计算单元抽象为map和reduce连个原子操作的集合。通过用户自定义的map和reduce的函数模型,我们可以实现执行单元的并发执行,以及良好的容错机制。

编程模型

计算的操作对象都是KV模型,用户只需要编写map和reduce函数即可实现编程。

示例

  • map函数
1
2
3
4
5
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
  • reduce函数
1
2
3
4
5
6
7
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

实现

执行流程

执行流程

  1. 讲输入进行分片,分片的基本单位是64MB
  2. 程序fork出一个master和几个worker,master分配任务给worker,master挑选空闲的worker,分配map或者reduce任务进行执行
  3. 执行map的worker读取分片的输入,应用用户的map程序,生成的中间KV结果集缓存在内存中。
  4. 内存中的中间结果定期的按照HASH KEY的分区写到本地磁盘。
  5. master唤醒reduce的worker,通过RPC读取map相应位置的中间结果;reduce会首先对中间结果进行排序操作,如果内存不足,会导致外排。
  6. reduce worker遍历中间结果,应用用户的reduce函数。
  7. 所有的map和reduce tasks执行结束,master将控制权交给用户程序。

Master数据结构

master管理map和reduce task的状态,以及worker machine的状态。

master将map产生的中间结果的位置,传递给reduce worker。

容错

worker failure

master定期ping各个worker机器,如果在规定时间内不返回的话,认为此机器宕机了。

  • 任何执行完成的map task会重置为idle状态,被重新执行。
  • 任何在执行中的map和reduce task会被重置为idle状态。
  • 已经执行完成的reduce task不会被重新执行。

master failure

可以通过周期性的做检查点,来恢复master故障,但是考虑到master只有一个,很少出现故障,因此如果master宕机,这里选择放弃当前任务的策略。

出错的处理

如果用户的map和reduce函数是确定性函数,则其结果集也是确定的。

不会存在map的中间结果被重复执行的情况。master会忽略相同的map输出结果。

也不会存在多个相同的reduce操作同时进行,因为reduce的结果会先生成临时文件,然后执行rename操作,rename操作是原子的,如果多个相同的reduce执行rename操作,只有一个会成功。依赖的GFS底层的原子rename操作。

本地计算

带宽是mapreduce集群非常重要的一个因素,尽量讲map下放到和其分片数据在一台机器上的worker。尽量减少数据在网络的传输。GFS中的数据存在3个副本,master会尽可能利用副本所在的机器分配worker。

任务粒度

已经将map分为M片,reduce分为R片。应该让M和R远远大于worker机器,这样每个worker可以分配更多的任务,任务粒度更低,任务调度更加准确,更有利于集群机器的负载均衡。

备份任务

很多作业的最终任务集中在几个大的reduce上,而这几个大的reduce可能因为机器原因往往导致执行特别慢。

我们提供备份任务的机制,当MR任务进入最后的快要结束状态,master备份各个task的状态,并利用其它worker去执行未完成的task,这样,不管是主的任务还是备份任务,只要有一个完成,整个任务就算结束了。

经过我们的实际统计的数据,可以知道,关闭backup策略后,执行时间普遍增加44%.

优化

分区函数

默认的分区函数是hash(key)的方式,一般来说,数据会比较均匀,当然也存在特殊情况。MR库提供特殊的分区函数,如hash(Hostname(urlkey))

中间结果排序

保证每个partiton的key按照升序排列,有利于后续需要排序的操作。

Combiner函数

允许每个partition进行提前合并,在map之后,在reduce之前,比如单词统计中,(“the”, 1)肯定有很多,可以先统计成(“the”, N), 再传递给reduce

副作用

尽量减少手动生成临时文件,因为无法保证多个文件的原子性提交。

跳过错误记录

由于程序或者数据问题,引起部分task异常的时候,可以选择性的忽略对这些数据的操作。

本地执行

由于分布式系统的不确定性,提供了本地执行的方式,所有的任务在一台机器上顺序执行,并提供了类似gdb的工具进行调试。

个人观点

mapreduce开启了一种新的对于打数据量进行分析的编程方法,但是存在几点不足。

  1. 对同一份数据的不同分析,需要写不同的map和reduce任务
  2. 场景基本是离线分析,不适用于用户交互场景

所以目前出现了各种SQL On Hadoop的技术,SOH技术可以解决第一个问题, 将SQL转化为对应的MR job。
但是对于第二个问题,需要彻底颠覆目前的计算框架以及底层存储结构GFS。

这也是目前Google F1出现的原因吧, HATP可能才是正确的方向。

本文标题:MapReduce: Simplified Data Processing on Large Clusters

文章作者:Louis

发布时间:2017年09月18日 - 15:09

最后更新:2017年09月25日 - 14:09

原始链接:/2017/09/18/mapreduce/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。