MapReduce论⽂中⽂翻译
译者: alex
摘要
MapReduce是⼀个编程模型,也是⼀个处理和⽣成超⼤数据集的算法模型的相关实现。⽤户⾸先创建⼀个Map函数处理⼀个基于
key/value pair的数据集合,输出中间的基于key/value pair的数据集合;然后再创建⼀个Reduce函数⽤来合并所有的具有相同中间key值的中间value值。现实世界中有很多满⾜上述处理模型的例⼦,本论⽂将详细描述这个模型。
MapReduce架构的程序能够在⼤量的普通配置的计算机上实现并⾏化处理。这个系统在运⾏时只关⼼:如何分割输⼊数据,在⼤量计算机组成的集上的调度,集中计算机的错误处理,管理集中计算机之间必要的通信。采⽤MapReduce架构可以使那些没有并⾏计算和分布式处理系统开发经验的程序员有效利⽤分布式系统的丰富资源。
我们的MapReduce实现运⾏在规模可以灵活调整的由普通机器组成的集上:⼀个典型的MapReduce计算往往由⼏千台机器组成、处理以TB计算的数据。程序员发现这个系统⾮常好⽤:已经实现了数以百
计的MapReduce程序,在Google的集上,每天都有1000多个MapReduce程序在执⾏。
介绍
在过去的5年⾥,包括本⽂作者在内的Google的很多程序员,为了处理海量的原始数据,已经实现了数以百计的、专⽤的计算⽅法。这些计算⽅法⽤来处理⼤量的原始数据,⽐如,⽂档抓取(类似⽹络爬⾍的程序)、Web请求⽇志等等;也为了计算处理各种类型的衍⽣数据,⽐如倒排索引、Web⽂档的图结构的各种表⽰形势、每台主机上⽹络爬⾍抓取的页⾯数量的汇总、每天被请求的最多的查询的集合等等。⼤多数这样的数据处理运算在概念上很容易理解。然⽽由于输⼊的数据量巨⼤,因此要想在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。如何处理并⾏计算、如何分发数据、如何处理错误?所有这些问题综合在⼀起,需要⼤量的代码处理,因此也使得原本简单的运算变得难以处理。
为了解决上述复杂的问题,我们设计⼀个新的抽象模型,使⽤这个抽象模型,我们只要表述我们想要执⾏的简单运算即可,⽽不必关⼼并⾏计算、容错、数据分布、负载均衡等复杂的细节,这些问题都被封装在了⼀个库⾥⾯。设计这个抽象模型的灵感来⾃Lisp和许多其他函数式语⾔的Map和Reduce的原语。我们意识到我们⼤多数的运算都包含这样的操作:在输⼊数据的“逻辑”记录上应⽤Map操作得出⼀个中间key/value pair集合,然后在所有具有相同key值的value值上应⽤Reduce操作,从⽽达到合并
中间的数据,得到⼀个想要的结果的⽬的。使⽤MapReduce模型,再结合⽤户实现的Map和Reduce函数,我们就可以⾮常容易的实现⼤规模并⾏化计算;通过MapReduce模型⾃带的“再次执⾏”(re-execution)功能,也提供了初级的容灾实现⽅案。
这个⼯作(实现⼀个MapReduce框架模型)的主要贡献是通过简单的接⼝来实现⾃动的并⾏化和⼤规模的分布式计算,通过使⽤MapReduce模型接⼝实现在⼤量普通的PC机上⾼性能计算。
第⼆部分描述基本的编程模型和⼀些使⽤案例。第三部分描述了⼀个经过裁剪的、适合我们的基于集的计算环境的MapReduce实现。第四部分描述我们认为在MapReduce编程模型中⼀些实⽤的技巧。第五部分对于各种不同的任务,测量我们MapReduce实现的性能。第六部分揭⽰了在Google内部如何使⽤MapReduce作为基础重写我们的索引系统产品,包括其它⼀些使⽤MapReduce的经验。第七部分讨论相关的和未来的⼯作。
编程模型
MapReduce编程模型的原理是:利⽤⼀个输⼊key/value pair集合来产⽣⼀个输出的key/value pair集合。MapReduce库的⽤户⽤两个函数表达这个计算:Map和Reduce。
⽤户⾃定义的Map函数接受⼀个输⼊的key/value pair值,然后产⽣⼀个中间key/value pair值的集合。MapReduce库把所有具有相同中间key值I的中间value值集合在⼀起后传递给reduce函数。
⽤户⾃定义的Reduce函数接受⼀个中间key的值I和相关的⼀个value值的集合。Reduce函数合并这些value值,形成⼀个较⼩的value值的集合。⼀般的,每次Reduce函数调⽤只产⽣0或1个输出value值。通常我们通过⼀个迭代器把中间value值提供给Reduce函数,这样我们就可以处理⽆法全部放⼊内存中的⼤量的value值的集合。
例⼦
例如,计算⼀个⼤的⽂档集合中每个单词出现的次数,下⾯是伪代码段:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1″);
reduce(String key, Iterator values):
/
/ key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函数输出⽂档中的每个词、以及这个词的出现次数(在这个简单的例⼦⾥就是1)。Reduce函数把Map函数产⽣的每⼀个特定的词的计数累加起来。
另外,⽤户编写代码,使⽤输⼊和输出⽂件的名字、可选的调节参数来完成⼀个符合MapReduce模型规范的对象,然后调⽤MapReduce 函数,并把这个规范对象传递给它。⽤户的代码和MapReduce库链接在⼀起(⽤C++实现)。附录A包含了这个实例的全部程序代码。
类型
尽管在前⾯例⼦的伪代码中使⽤了以字符串表⽰的输⼊输出值,但是在概念上,⽤户定义的Map和Reduce函数都有相关联的类型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
⽐如,输⼊的key和value值与输出的key和value值在类型上推导的域不同。此外,中间key和value值与输出key和value值在类型上推导的域相同。
(alex注:原⽂中这个domain的含义不是很清楚,我参考Hadoop、KFS等实现,map和reduce都使⽤了泛型,因此,我把domain翻译成类型推导的域)。
我们的C++中使⽤字符串类型作为⽤户⾃定义函数的输⼊输出,⽤户在⾃⼰的代码中对字符串进⾏适当的类型转换。
更多的例⼦
这⾥还有⼀些有趣的简单例⼦,可以很容易的使⽤MapReduce模型来表⽰:
分布式的Grep:Map函数输出匹配某个模式的⼀⾏,Reduce函数是⼀个恒等函数,即把中间数据复制到输出。
计算URL访问频率:Map函数处理⽇志中web页⾯请求的记录,然后输出(URL,1)。Reduce函数把相同URL的value值都累加起来,产⽣(URL,记录总数)结果。
倒转⽹络链接图:Map函数在源页⾯(source)中搜索所有的链接⽬标(target)并输出为(target,source)。Reduce函数把给定链接⽬标(target)的链接组合成⼀个列表,输出(target,list(source))。
每个主机的检索词向量:检索词向量⽤⼀个(词,频率)列表来概述出现在⽂档或⽂档集中的最重要的⼀些词。Map函数为每⼀个输⼊⽂档输出(主机名,检索词向量),其中主机名来⾃⽂档的URL。Reduce函数接收给定主机的所有⽂档的检索词向量,并把这些检索词向量加在⼀起,丢弃掉低频的检索词,输出⼀个最终的(主机名,检索词向量)。
倒排索引:Map函数分析每个⽂档输出⼀个(词,⽂档号)的列表,Reduce函数的输⼊是⼀个给定词的所有(词,⽂档号),排序所有的⽂档号,输出(词,list(⽂档号))。所有的输出集合形成⼀个简单的倒排索引,它以⼀种简单的算法跟踪词在⽂档中的位置。
分布式排序:Map函数从每个记录提取key,输出(key,record)。Reduce函数不改变任何的值。这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)。
实现
MapReduce模型可以有多种不同的实现⽅式。如何正确选择取决于具体的环境。例如,⼀种实现⽅式适⽤于⼩型的共享内存⽅式的机器,另外⼀种实现⽅式则适⽤于⼤型NUMA架构的多处理器的主机,⽽有的实现⽅式更适合⼤型的⽹络连接集。
本章节描述⼀个适⽤于Google内部⼴泛使⽤的运算环境的实现:⽤以太⽹交换机连接、由普通PC机组成的⼤型集。在我们的环境⾥包括:
x86架构、运⾏Linux操作系统、双处理器、2-4GB内存的机器。
普通的⽹络硬件设备,每个机器的带宽为百兆或者千兆,但是远⼩于⽹络的平均带宽的⼀半。 (alex注:这⾥需要⽹络专家解释⼀下了)
集中包含成百上千的机器,因此,机器故障是常态。
存储为廉价的内置IDE硬盘。⼀个内部分布式⽂件系统⽤来管理存储在这些磁盘上的数据。⽂件系统通过数据复制来在不可靠的硬件上保证数据的可靠性和有效性。
⽤户提交⼯作(job)给调度系统。每个⼯作(job)都包含⼀系列的任务(task),调度系统将这些任务调度到集中多台可⽤的机器上。
执⾏概括
通过将Map调⽤的输⼊数据⾃动分割为M个数据⽚段的集合,Map调⽤被分布到多台机器上执⾏。输⼊的数据⽚段能够在不同的机器上并⾏处理。使⽤分区函数将Map调⽤产⽣的中间key值分成R个不同分区(例如,hash(key) mod R),Reduce调⽤也被分布到多台机器上执⾏。分区数量(R)和分区函数由⽤户来指定。
图1展⽰了我们的MapReduce实现中操作的全部流程。当⽤户调⽤MapReduce函数时,将发⽣下⾯的⼀系列动作(下⾯的序号和图1中的序号⼀⼀对应):
⽤户程序⾸先调⽤的MapReduce库将输⼊⽂件分成M个数据⽚度,每个数据⽚段的⼤⼩⼀般从 16MB到64MB(可以通过可选的参数来控制每个数据⽚段的⼤⼩)。然后⽤户程序在机中创建⼤量的程序副本。 (alex:copies of the program还真难翻译)
这些程序副本中的有⼀个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将⼀个Map任务或Reduce任务分配给⼀个空闲的worker。
被分配了map任务的worker程序读取相关的输⼊数据⽚段,从输⼊的数据⽚段中解析出key/value pair,然后把key/value pair传递给⽤户⾃定义的Map函数,由Map函数⽣成并输出的中间key/value pair,并缓存在内存中。
缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写⼊到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
当Reduce worker程序接收到master程序发来的数据存储位置信息后,使⽤RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后,通过对key进⾏排序后使得具有相同key值的数据聚合在⼀起。由于许多不同的key 值会映射到相同的Reduce任务上,因此必须进⾏排序。如果中间数据太⼤⽆法在内存中完成排序,那么就要在外部进⾏排序。
Reduce worker程序遍历排序后的中间数据,对于每⼀个唯⼀的中间key值,Reduce worker程序将这个key值和它相关的中间value 值的集合传递给⽤户⾃定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出⽂件。
当所有的Map和Reduce任务都完成之后,master唤醒⽤户程序。在这个时候,在⽤户程序⾥的对MapReduce调⽤才返回。
在成功完成任务之后,MapReduce的输出存放在R个输出⽂件中(对应每个Reduce任务产⽣⼀个输出⽂件,⽂件名由⽤户指定)。⼀般情况下,⽤户不需要将这R个输出⽂件合并成⼀个⽂件–他们经常把这些⽂件作为另外⼀个MapReduce的输⼊,或者在另外⼀个可以处理多个分割⽂件的分布式应⽤中使⽤。
Master数据结构
Master持有⼀些数据结构,它存储每⼀个Map和Reduce任务的状态(空闲、⼯作中或完成),以及Worker机器(⾮空闲任务的机器)的标识。
Master就像⼀个数据管道,中间⽂件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任
务,master存储了Map任务产⽣的R个中间⽂件存储区域的⼤⼩和位置。当Map任务完成时,Master接收到位置和⼤⼩的更新信息,这些信息被逐步递增的推送给那些正在⼯作的Reduce任务。
容错
因为MapReduce库的设计初衷是使⽤由成百上千的机器组成的集来处理超⼤规模的数据,所以,这个库必须要能很好的处理机器故障。
worker故障
master周期性的ping每个worker。如果在⼀个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker。同样的,worker失效时正在运⾏的Map或Reduce任务也将被重新置为空闲状态,等待重新调度。
当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执⾏。⽽已经完成的Reduce任务的输出存储在全局⽂件系统上,因此不需要再次执⾏。
网络连接被重设当⼀个Map任务⾸先被worker A执⾏,之后由于worker A失效了⼜被调度到worker B执⾏,这个“重新执⾏”的动作会被通知给所有执⾏Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。
MapReduce可以处理⼤规模worker失效的情况。⽐如,在⼀个MapReduce操作执⾏期间,在正在运⾏的集上进⾏⽹络维护引起80台机器在⼏分钟内不可访问了,MapReduce master只需要简单的再次执⾏那些不可访问的worker完成的⼯作,之后继续执⾏未完成的任务,直到最终完成这个MapReduce操作。
master失败
⼀个简单的解决办法是让master周期性的将上⾯描述的数据结构(alex注:指3.2节)的写⼊磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后⼀个检查点(checkpoint)开始启动另⼀个master进程。然⽽,由于只有⼀个master进程,master失效后再恢复是⽐较⿇烦的,因此我们现在的实现是如果master失效,就中⽌MapReduce运算。客户可以检查到这个状态,并且可以根据需
要重新执⾏MapReduce操作。
在失效⽅⾯的处理机制
(alex注:原⽂为”semantics in the presence of failures”)
当⽤户提供的Map和Reduce操作是输⼊确定性函数(即相同的输⼊产⽣相同的输出)时,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执⾏产⽣的输出是⼀样的。
我们依赖对Map和Reduce任务的输出是原⼦提交的来完成这个特性。每个⼯作中的任务把它的输出写到私有的临时⽂件中。每个Reduce 任务⽣成⼀个这样的⽂件,⽽每个Map任务则⽣成R个这样的⽂件(⼀个Reduce任务对应⼀个⽂件)。当⼀个Map任务完成的时,worker 发送⼀个包含R个临时⽂件名的完成消息给master。如果master从⼀个已经完成的Map任务再次接收到到⼀个完成消息,master将忽略这个消息;否则,master将这R个⽂件的名字记录在数据结构⾥。
当Reduce任务完成时,Reduce worker进程以原⼦的⽅式把临时⽂件重命名为最终的输出⽂件。如果同⼀个Reduce任务在多台机器上执⾏,针对同⼀个最终的输出⽂件将有多个重命名操作执⾏。我们依赖底层⽂件系统提供的重命名操作的原⼦性来保证最终的⽂件系统状态仅仅包含⼀个Reduce任务产⽣的数据。
使⽤MapReduce模型的程序员可以很容易的理解他们程序的⾏为,因为我们绝⼤多数的Map和Reduce操作是确定性的,⽽且存在这样的⼀个事实:我们的失效处理机制等价于⼀个顺序的执⾏的操作。当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。当使⽤⾮确定操作的时候,⼀个Reduce任务R1的输出等价于⼀个⾮确定性程序顺序执⾏产⽣时的输出。但是,另⼀个Reduce任务R2的输出也许符合⼀个不同的⾮确定顺序程序执⾏产⽣的R2的输出。
考虑Map任务M和Reduce任务R1、R2的情况。我们设定e(Ri)是Ri已经提交的执⾏过程(有且仅有⼀个这样的执⾏过程)。当e(R1)读取了由M⼀次执⾏产⽣的输出,⽽e(R2)读取了由M的另⼀次执⾏产⽣的输出,导致了较弱的失效处理。
存储位置
在我们的计算运⾏环境中,⽹络带宽是⼀个相当匮乏的资源。我们通过尽量把输⼊数据(由GFS管理)存储在集中机器的本地磁盘上来节省⽹络带宽。GFS把每个⽂件按64MB⼀个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(⼀般是3个拷贝)。MapReduce的master在调度Map任务时会考虑输⼊⽂件的位置信息,尽量将⼀个Map任务调度在包含相关输⼊数据拷贝的机器上执⾏;如果上述努⼒失败了,master将尝试在保存有输⼊数据拷贝的机器附近的机器上执⾏Map任务(例如,分配到⼀个和包含输⼊数据的机器在⼀个switch⾥的worker机器上执⾏)。当在⼀个⾜够⼤的cluster集
上运⾏⼤型MapReduce操作的时候,⼤部分的输⼊数据都能从本地机器读取,因此消耗⾮常少的⽹络带宽。
任务粒度
如前所述,我们把Map拆分成了M个⽚段、把Reduce拆分成R个⽚段执⾏。理想情况下,M和R应当⽐集中worker的机器数量要多得多。在每台worker机器都执⾏⼤量的不同任务能够提⾼集的动态的负载均衡能⼒,并且能够加快故障恢复的速度:失效机器上执⾏的⼤量Map任务都可以分布到所有其他的worker机器上去执⾏。
但是实际上,在我们的具体实现中对M和R的取值都有⼀定的客观限制,因为master必须执⾏O(M+R)次调度,并且在内存中保存O(M*R)个状态(对影响内存使⽤的因素还是⽐较⼩的:O(M*R)块状态,⼤概每对Map任务/Reduce任务1个字节就可以了)。
更进⼀步,R值通常是由⽤户指定的,因为每个Reduce任务最终都会⽣成⼀个独⽴的输出⽂件。实际使⽤时我们也倾向于选择合适的M 值,以使得每⼀个独⽴任务都是处理⼤约16M到64M的输⼊数据(这样,上⾯描写的输⼊数据本地存储优化策略才最有效),另外,我们把R值设置为我们想使⽤的worker机器数量的⼩的倍数。我们通常会⽤这样的⽐例来执⾏MapReduce:M=200000,R=5000,使⽤2000台worker机器。
备⽤任务
影响⼀个MapReduce的总执⾏时间最通常的因素是“落伍者”:在运算过程中,如果有⼀台机器花了很长的时间才完成最后⼏个Map或Reduce任务,导致MapReduce操作总的执⾏时间超过预期。出现“落伍者”的原因⾮常多。⽐如:如果⼀个机器的硬盘出了问题,在读取的时候要经常的进⾏读取纠错操作,导致读取数据的速度从30M/s降低到1M/s。如果cluster的调度系统在这台机器上⼜调度了其他的任务,由于CPU、内存、本地硬盘和⽹络带宽等竞争因素的存在,导致执⾏MapReduce代码的执⾏效率更加缓慢。我们最近遇到的⼀个问题是由于机器的初始化代码有bug,导致关闭了的处理器的缓存:在这些机器上执⾏任务的性能和正常情况相差上百倍。
我们有⼀个通⽤的机制来减少“落伍者”出现的情况。当⼀个MapReduce操作接近完成的时候,master调度备⽤(backup)任务进程来执⾏剩下的、处于处理中状态(in-progress)的任务。⽆论是最初的执⾏进程、还是备⽤(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。我们调优了这个机制,通常只会占⽤⽐正常操作多⼏个百分点的计算资源。我们发现采⽤这样的机制对于减少超⼤MapReduce操作的总处理时间效果显著。例如,在5.3节描述的排序任务,在关闭掉备⽤任务的情况下要多花44%的时间完成排序任务。
技巧
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论