全部读了一遍,除了实践以外都做了笔记
摘要
map:用来处理键值对来生成一组中间键值对
reduce:合并同一相同中间键相关联的中间值
介绍
用途举例:倒排索引、web文档的图形化表示等。
操作简单,但是数据量大,需要多台机器并行计算。
问题:如何并行化计算、分发数据、处理故障
使用用户指定的map和reduce操作,用重新执行作为容错的机制
编程模型
map函数:由用户编写,输入一个对,然后输出一组中间键值对。MapReduce库会把所有具有相同key的中间键值对组合起来,然后发给Reduce函数处理。
reduce函数:也是由用户编写。接受中间键 I 和该键的一组值。它将这些值合并在一起以形成可能数量更小的一组值。通常reduce操作只产生0或1个值。中间值通过迭代器提供给用户的 Reduce 函数。这使我们能够处理太大而无法放入内存的值列表。
例子
统计单词词频:
map把所有的单词作为key,value分别设为1
reduce把相同key的value相加,输出结果
用户编写具体的代码实现填充到mapreduce specification对象中,然后把用户的代码和MapReduce库链接在一起实现。
类型
map
和 reduce
函数的输入和输出类型在概念上是不同的,但在 C++ 实现中,通过字符串来统一传递数据。
用户需要在代码中处理字符串和实际数据类型之间的转换。
更多例子
分布式grep(查找某个单词)
如果某一行满足提供的范式,map函数就把它发出。reduce只是把它拷贝一遍。
URL访问频次统计
map函数统计网站请求日志,并输出为<url,1>,reduce把后面的数值加起来
反向web连接图
在resource界面找到的target的url,经过map函数,输出<target,resource>对。reduce把resource整理成list,输出<target,list(resource)>
每个主机的词向量
词向量总结了一个或多个文件中最重要的单词的出现频率,是由<word,frequency>组成的列表。map从输入文件中读入hostname,输出<hostname,term vector>。reduce函数将每一个hostname的term vector合并并输出。
倒排索引
就是说找到单词的位置。map输出<word, 文件id>,reduce把文件id合成一个list
分布式排序
map函数从每一个记录中提取出key,输出<key,record>对,reduce啥也不变。排序取决于后面提到的分区工具和排序属性(稍后来填坑)
实现
MapReduce接口有许多不同的实现。正确的选择取决于环境。这节描述了一个针对Google广泛使用的计算环境的实现:用交换式以太网连接在一起的大型商用PC集群。
执行概述
通过将输入数据自动划分为一组M个部分,map函数调用分布在多台机器上。输入拆分可以由不同的机器并行处理。通过使用分区函数(例如,hash(key) mod R)将中间密钥空间划分成R个部分,来分布Reduce调用。分区数量(R)和分区函数由用户指定。
MapReduce库首先将输入文件分成M段,每段通常16MB-64MB,然后在集群的机器上开启程序的许多副本(图中fork成worker和master)。
其中有一个特殊的副本叫master,其他的都是被master分配工作的worker。有M个映射任务和R个归约任务要分配。master挑选空闲的worker来分配一个map任务或者一个reduce任务
被分配了map的worker读取相应输入分割的内容。它从输入数据中解析出键/值对,并将每一对传递给用户定义的reduce函数。Map函数产生的中间键/值对在内存中缓冲。
缓存的pair被周期性的写到本地磁盘。由分区函数(前面提到的hash取余)划分成R个区域。这些缓冲对在本地磁盘上的位置被传递回master,主设备负责将这些位置转发给reduce工作器。
当mater通知reduce worker这些位置时,它使用rpc从map workers的本地磁盘读取缓冲数据。当reduce worker读取了所有的中间数据后,它会按照中间键对数据进行排序,以便将所有出现的相同键组合在一起。需要进行排序是因为通常许多不同的键映射到同一个reduce任务,为了确保相同的键的所有值被分组在一起,这有助于 Reduce 阶段对这些键进行集中处理。如果中间数据量太大,不适合内存,就使用外部排序(?)。
如果中间数据量非常大,无法完全放入内存中进行排序,这时候就需要使用外部排序(External Sort)。外部排序 是一种用于处理超大规模数据的排序方法,因为这些数据太大而无法一次性加载到内存中。外部排序的典型做法是将数据分块处理,将每个块单独排序后写回磁盘,最后将所有已排序的块合并起来,形成一个最终有序的数据集。在这个过程中,Reduce 节点会将无法容纳在内存中的数据分批次地进行排序和合并,直到所有数据都已经排序完成。这种排序策略对于处理超大规模数据非常有效。
reduce worker在排好序的中间数据上遍历,对于遇到的每个中间键(唯一的,不会重复发!),它会把键和值都发给reduce函数。reduce函数的输出被附加到这个Reduce分区的最终输出文件中。
当所有的map任务和reduce任务完成后,主机唤醒用户程序。此时,用户程序中的MapReduce调用返回到用户代码
最后用户得到R个输出文件,但是不一定会合并,因为会作为下一个输入(可能)
Master的数据结构
master保存的数据结构:对于每个map和reduce任务,master保存其状态(空闲、进行中、完成)和非空闲状态任务的worker的身份。
master是用来讲map任务产生的中间文件传播到reduce任务的通道。所以master存着已完成的map任务的中间文件的位置和大小,如果有新的map任务完成会继续更新,并且用增量的方式推送到正在执行reduce任务的worker。
错误容忍
worker失效(failure)
master间歇性ping worker。如果一段时间worker没回就是挂了。所有分到这个worker的map任务都会重新召回标记为空闲状态,进行重新调度。reduce也是类似的。就算是已完成的map任务,也会重新执行,因为他们的map中间结果在挂了的worker上。完成的reduce任务不需要重新执行,因为他们输出存储在全局文件系统中。
A挂了之后把map分给B时,所有的reduce worker都会收到这次A原本的工作执行的通知(因为不知道这个任务分给了哪个reduce worker,让所有人都知道要换个地方找)。
mapreduce能够容忍大规模的woker挂掉。
master失效
主节点周期性的写master数据结构的checkpoints。master挂了的话就从最后一个checkpoint开启一个副本。但因为只有一个master,不太可能故障。所以如果master故障的话会终止mapreduce。如果需要,客户端可以检查并重试MapReduce操作。
属于是技术上可以做但是没必要
在故障情况下的语义(?)
系统在出现故障时,计算程序所产生的行为和结果的定义。它探讨的是,系统中某些组件(比如某些节点或任务)如果出现故障,整个系统在处理任务时依然要保证的结果的正确性和一致性。
当用户提供的 map 和 reduce 操作是输入值的确定性函数时,mapreduce会生成与不发生故障的顺序执行整个程序时相同的输出。(就是说故障情况下结果依然是一样的)。这是依赖于map和reduce任务输出的原子提交:每个进行中的任务都将其输出写入私有临时文件。reduce任务产生一个这样的文件,map任务产生R个这样的文件(每个reduce任务一个)。当map任务完成时,worker向master发送一条消息,并在消息中包含R个临时文件的名称。如果主服务器收到已完成的map任务的完成消息,它将忽略该消息(可能原来挂了的复活了)。否则,它在主数据结构中记录R文件的名称。
假设我们有两个 Reduce worker,分别是 A 和 B,同时存在一个 Reduce 任务 R。下面是详细的流程描述,包括一个 worker 挂掉的过程。
- 初始分配任务
- 主节点将 Reduce 任务 R 分配给 worker A 执行。
- worker A 开始执行这个 Reduce 任务,并逐步处理数据,将结果写入到一个临时文件 tempA 中。
- 任务执行过程中发生故障
- 假设 worker A 在执行过程中由于某种原因(例如硬件故障、网络断连等)突然挂掉,它无法完成任务。
- 主节点监控着所有任务的状态,发现 worker A 在规定的时间内没有汇报任务进度或完成情况,于是判断 worker A 失效。
- 主节点决定将 Reduce 任务 R 重新分配给另一个健康的 worker B 来执行。
- 新的任务分配给 worker B
- worker B 接到任务后,开始重新执行 Reduce 任务 R,它可能需要从中间数据中读取并计算,生成自己的临时文件 tempB。
- worker B 最终完成了任务,并生成了临时文件 tempB,准备将其重命名为最终的输出文件 final_output。
- worker A 恢复运行
- 在 worker B 正在执行任务时,之前挂掉的 worker A 突然恢复运行了。
- 恢复后的 worker A 会继续之前中断的工作,并试图完成它的 Reduce 任务。因此,worker A 最终也生成了它自己的临时文件 tempA。
- 两个 worker 竞争写入最终输出
- 现在我们有两个不同的 worker(A 和 B),它们都认为自己完成了 Reduce 任务 R,并且分别生成了两个不同的临时文件 tempA 和 tempB。
- worker A 和 worker B 都会尝试将各自的临时文件重命名为最终的输出文件 final_output。
- 原子重命名操作的应用
- 在执行重命名时,底层文件系统的原子重命名操作起到了关键作用。
- 假设 worker B 先尝试重命名它的临时文件 tempB 为 final_output,由于重命名操作是原子的,worker B 成功地将 tempB 重命名为了 final_output。
- 由于文件系统的原子性,这个重命名操作是不可分割的,意味着此时最终的输出文件 final_output 已经确定下来,包含的是 worker B 的结果。
- 当 worker A 尝试重命名它的临时文件 tempA 为 final_output 时,发现这个名称已经存在,因此它的重命名操作会失败。
- 通过这种机制,最终的文件系统中只会保留一次任务的输出,并且最终的输出文件 final_output 中包含的是某一个 Reduce worker 成功执行后的数据(在这个例子中是 worker B 的数据)。
- 因为他们访问的是同一个全局命名空间,并且能够查看和操作相同的文件。
绝大部分情况下map和reduce是确定性的,这样的话程序相当于是并行的。如果map和/或ruduce是非确定性的,提供较弱但仍然合理的语义。在存在非确定性运算符的情况下,特定 Reduce 任务 R1 的输出相当于一个非确定性程序顺序执行产生的 R1 输出。但是,另一个 Reduce 任务 R2 的输出可能对应于非确定性程序的不同顺序执行产生的 R2 输出。(就是说执行顺序也许不同)
在非确定性操作中,多个 reduce 任务可能会读取来自 map 任务的不同执行结果,因此输出的顺序和内容可能不同,但每个 reduce 任务的输出仍然等价于它在顺序执行中可能得到的结果。
本地性
因为网络资源稀缺,输入数据用Google File System(GFS)管理,存储在机器的本地磁盘。GFS把每个文件被分为64MB的块,并将每个块存储多个副本(通常是3个副本)存到不同机器。master会考虑输入文件的位置,并尝试将一个 map 任务调度到包含相应输入数据副本的机器上。如果不行,master会尝试将 map 任务调度到一个靠近该任务输入数据副本的机器上(例如,调度到与包含数据副本的机器处于同一网络交换机上的 worker 机器)。因此,大部分数据读取是在本地。
任务粒度
map和reduce分别分为M和R个任务,都远大于worker的数量。每个worker执行多个不同的任务,这些任务是相互独立的。这样可以更好的负载均衡,并且可以加速worker挂了的时候的会复苏的,因为挂掉worker的任务可以快速分配到其他woker上。
实际上R和M大小有限制,因为master必须做O(M + R)次调度决策,并且需要在内存中保持O(M ∗ R)的状态信息(虽然每一对M和R的状态信息很小,1B左右)。R经常受用户约束(大概是用户指定的意思),所以一般限制M,让每个任务处理的输入数据大约为16 MB到64 MB,R为worker数量的一个小倍数。如果是2000台worker,那么通常设置M = 200,000和R = 5,000
备份任务
mapreduce中常见的延迟原因之一是straggler(拖延者),即一个机器在完成最后几个map或reduce任务时花费异常长的时间。
可能的原因:硬件问题(例如磁盘读写性能低下)、资源竞争(例如机器上有多个任务同时运行,导致资源不足)、甚至是软件故障(例如处理器缓存被禁用,导致性能急剧下降)
解决策略:设计了一套备份任务执行机制。当MapReduce操作接近完成时,master会调度剩余的正在进行的任务的备份执行。只要主任务或备份任务中的任何一个完成,任务就会被标记为已完成。
改良
感觉就是一些可修改的参数。
分区函数(Partition)
分区函数是对mapper产生的中间文件进行划分,分到reducer上面,默认是hash(key) mod R。通常够用了,但是有的用户想自定义,因为其他函数更有意义,比如有时输出键可能是URL,而我们希望相同hostname的所有条目都位于同一个输出文件中。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数,可以确保来自同一主机的所有URL条目都被划分到同一个输出文件。
顺序保证
中间键值对会按照键的递增顺序进行处理(MapReduce系统会保证在每个Reduce任务中,这些键值对按照键的递增顺序被排序),即reducer的输入递增。对于以下情况有意义:
当输出文件格式需要支持按键进行高效的随机访问查找时。
当输出的用户觉得数据按键排序更方便时。
合并函数
背景:每个map任务产生的中间件有时有显著重复,且reduce时可交换和可结合的。比如单词计数,每个Map任务会产生成百上千条为<the, 1>
。但是对于reduce来说很浪费,因为全是一样的。因此用户可以事先指定一个combiner函数,在从mapper通过网络发送到reducer之前进行部分合并(减小网络开销)。
通常,Combiner函数和Reduce函数使用的是相同的代码,但是combiner产生的是中间文件,reducer产生的是最终文件。
输入输出类型
MapReduce库支持以多种不同格式读取输入数据:
文本(text)模式:每一行视为一个键/值对,键是文件中的偏移量,值是该行的内容。
按键的顺序存储:多个键/值对会根据键的大小顺序进行排序
比如输入:
``` apple banana apple grape banana apple
1
2
3
4
5
6
7
8
9
10
11
- mapper输出:
- ```
(apple, 1)
(apple, 1)
(apple, 1)
(banana, 1)
(banana, 1)
(grape, 1)
可以用一个reader接口来添加对新输入类型的支持(但是用户通常用预定义的那些就够了)。reader不一定从文件中读取数据,数据库、内存映射的数据结构也可以。
输出类似,同上所述。
副作用
(这里副作用感觉不一定是坏的那种意思)
用户可能希望同时能输出其他辅助文件,比如日志文件、调试信息。但是这个文件的输出(在这里被称为副作用)的原子性和幂等性应当由开发者自己保证。
原子性是指一个操作或一组操作要么完全成功,要么完全失败,强调操作的完整性
幂等性是指一个操作可以被执行多次,但无论执行多少次,最终的结果都是相同的。强调操作的重复性
通常,应用程序会将数据写入临时文件,并在文件完全生成后原子性地重命名该文件。
mapreduce不提供对单个任务生成的多个输出文件的原子两阶段提交的支持。所以如果要输出多个文件并且要保证以上两个性质,那么函数应当是确定性的。
如果某个任务生成多个输出文件(例如
output1.txt
和output2.txt
),并且这两个文件之间有某种依赖关系(如output1.txt
中的内容需要在output2.txt
中引用),那么这个任务必须是确定性的:任务在每次运行时都应生成相同的文件内容,并且文件生成的顺序也应一致。这样,即使MapReduce框架没有支持原子两阶段提交,用户仍然能确保输出的可靠性。
什么是原子两阶段提交: 是一种分布式事务协议,旨在确保在分布式系统中,多个独立的参与者(例如多个数据库、服务器或服务)能够一致地完成某个事务的提交或回滚。这个协议的核心目标是保证分布式事务的原子性,即要么所有参与者都成功提交事务,要么所有参与者都回滚事务。
- 准备阶段(Phase 1 - Prepare Phase):
- 协调者(通常是一个事务管理器)向所有参与者发送一个“准备提交”的请求,询问它们是否可以提交事务。
- 每个参与者(即执行任务的节点)检查自己能否成功提交事务。如果一切正常,参与者返回YES(准备提交);如果有任何问题,参与者返回NO(无法提交)。
- 提交阶段(Phase 2 - Commit Phase):
- 如果所有参与者都返回YES,即所有参与者都准备好了提交事务,那么协调者会发出一个提交(commit)命令,所有参与者将正式提交事务。
- 如果任何一个参与者返回NO,即某个节点无法提交事务,那么协调者会发出一个回滚(rollback)命令,要求所有参与者回滚已做的操作,确保整个事务不产生任何副作用。
跳过坏记录
代码里面的错误有可能会导致map和reduce在某些数据上崩溃,但是有时候这些代码不好改(比如调的别人的库)。有时忽略掉一些数据是可以接受的,比如数据量很大的那种大数据统计,少几个数据的影响很小。mapreduce库里提供了一种可选的执行模式,可以检测到导致确定性崩溃的记录,并跳过这些记录以确保任务能够继续进行。
具体流程:
- 崩溃检测:
- 每个工作进程(即执行Map或Reduce任务的机器)都会安装一个信号处理程序,用于捕捉崩溃信号(如段错误
segmentation fault
或总线错误bus error
)。 - 在每次执行Map或Reduce任务之前,MapReduce框架会记录当前正在处理的记录的序列号。
- 每个工作进程(即执行Map或Reduce任务的机器)都会安装一个信号处理程序,用于捕捉崩溃信号(如段错误
- 记录失败:
- 如果某个Map或Reduce任务由于某个特定的记录而崩溃,信号处理程序会捕获这个崩溃并发送一个UDP数据包给MapReduce的主节点。该数据包包含了导致崩溃的记录的序列号。
- 失败计数和跳过记录:
- MapReduce主节点收到多个崩溃报告后,识别出哪个记录导致了崩溃。如果某个记录多次引发崩溃,主节点会决定跳过该记录,并在重新调度Map或Reduce任务时不再处理该记录。
- 继续执行:
- 跳过导致崩溃的记录后,任务可以继续执行,而不会被这些问题数据阻塞。最终,MapReduce作业可以完成,虽然有些记录被忽略。
本地执行
因为完整的mapreduce过程很难调试map和reduce函数的问题,所以提出了在本地机器上顺序地执行MapReduce操作的所有任务的方式。这样,开发人员可以直接在本地环境中运行程序,并使用调试工具(如gdb)来追踪问题和调试代码。
状态信息
主节点运行一个内部HTTP服务器(所以上面那个跳过坏记录的可以发udp到master),提供实时的状态监控页面,给用户跟踪整个MapReduce作业的进度,还包含了每个任务生成的标准错误和标准输出文件的链接。
计数器
提供了一个计数器功能,允许用户在Map和Reduce函数中对不同的事件进行计数。可以帮助用户在运行MapReduce作业时进行数据验证、性能监控或行为分析。来自各个worker机器的计数器值会定期传递给主节点(通过ping响应传递)。计数器值也会显示在主节点的状态页面上,用户可以实时观察计算进度。计数器会消除相同map或reduce函数的影响。
使用方法:
- 创建计数器:用户通过调用
GetCounter("name")
来创建一个计数器对象,这个对象会以name
作为标识符。 - 增加计数器值:在Map或Reduce函数中,用户可以使用
Increment()
方法增加计数器值。例如,在Word Count程序中,用户可以创建一个计数器来统计大写字母单词的数量。
性能
以大数据中寻找符合特定范式的数据和大数据排序两个例子来测量性能。这两个程序代表了MapReduce用户编写的实际程序的一个大子集——一类程序将数据从一种表示变换到另一种表示,另一类程序从大数据集中提取少量有趣的数据。
集群配置
1800台机器,其他不想赘述(
Grep操作
扫描一个\(10^{10}\)条的100Byte的记录,从中找一个罕见的三字符的范式,出现了92337次。输入被分成大约 64MB 的块(M = 15000),整个输出放在一个文件中(R = 1)
Y轴表示输入数据的扫描速率。随着更多的机器被分配到这个MapReduce计算任务,扫描速率逐渐提高,当分配了1764个工作节点时,扫描速率达到超过30 GB/s的峰值。
随着Map任务的完成,速率开始下降,并在计算开始后的约80秒时降为零,表示所有Map任务都已经完成。整个计算过程大约持续了150秒,包括大约1分钟的启动开销。这个启动开销主要是由于程序传播到所有工作节点,以及与GFS(Google文件系统)交互的延迟——需要打开1000个输入文件并获取本地优化所需的信息。
排序操作
数据:\(10^{10}\)个 100Byte的记录,大约1TB。基于TeraSort基准测试的模型。
一个三行的Map函数从每行文本中提取出10字节的排序键,并将这个键和原始文本行作为中间的键/值对输出。使用了一个内建的Identity函数作为Reduce操作符,这个函数把中间文件原封不动的输出。最终排序后的输出写入到一组双重复制的GFS文件中(即程序的输出是2TB的数据)。
M=15000,R=4000.分区函数内置了关于键分布的知识。在一般的排序程序中,我们会增加一个预处理的MapReduce操作,用来收集一些键的样本,然后利用这些样本的分布计算最终排序阶段的分割点(可以确保每个Reduce任务接收到大致相等的负载)。
shuffle阶段是发生在map之后,是把中间的文件传给reduce的这个过程叫shuffle
备份任务的影响
仅用了前文所提到的备份任务后,时间多了44%
机器失败的影响
故意在计算的几分钟后杀死了1746个工作进程中的200个,相较于正常执行时间,仅增加了5%的时间