MapReduce 是 Google 的 Jeff Dean 等人在 2004 年提出的大数据处理框架。它将数据拆分为多块,在多个机器上并行处理。原本需要高性能计算资源的数据处理任务,通过 MapReduce 可以放到多个廉价的机器上并行处理。
在 MIT 6.824 分布式系统课程中,第一个实验便是用 Go 实现 MapReduce。
MapReduce 原理
- MapReduce 程序分成一个
master
和多个worker
,master
负责分配和调度任务,而worker
负责执行被分配的Map
或Reduce
任务 - MapReduce 的输入为
M
个文件或数据块,数据之间没有依赖性;输出为R
个 Reduce 结果文件 - 在
Map
任务阶段,每一个worker
被分配一个文件,对文件内容执行用户自定义的Map
函数,输出若干(key, value)
对,并保存至中间文件 - 当
Map
任务全部执行完成后,分配R
个Reduce
任务,每个worker
对应一个 Reduce 任务,收集该Reduce
任务对应的所有中间文件,读取(key, value)
对,将它们排序后执行Reduce
函数,最后将结果输出
下图描绘了 MapReduce 的过程:
以计算单词个数为例,它的 Map 和 Reduce 函数可以这么表示:
|
|
更多的细节在论文 MapReduce: Simplified Data Processing on Large Cluster 中,对 MapReduce 有着很详细的解释。
MapReduce 实现
下面将基于 MIT 6.824 的实验部分,讲述一下我是如何实现基本的 MapReduce 的。现在网上大多数的实验版本还停留在几年前,但 2020 Spring 的实验代码结构有了很大的变化,相对更加自由。
首先直接克隆 MIT 6.824 的项目:
|
|
项目概览:所有的文件都在 src
文件夹下面。按照实验页面描述的可以执行已经给好的 sequential 的 MapReduce。示例是执行 wc.go
的统计单词数的任务。
|
|
这个 Lab 是实现一个分布式的 MapReduce,由 master
和 worker
组成,它们将通过 RPC 进行通讯。
在这个 Lab 中,我们只需要修改 src/mr
目录下的 master.go
、rpc.go
和 worker.go
。
需求分析
- 只有一个
master
实例,来分配调度 MapReduce 的任务 worker
实例需要不断向master
请求任务,在完成任务之后向master
报告任务完成master
先分配Map
任务,在所有Map
任务结束后,再分配reduce
任务master
在完成所有任务之后退出,此时如果还有worker
向master
请求任务,则会因为 RPC 请求失败而直接退出- 如果
worker
意外退出,或者长时间未响应,则master
需要重新分配该任务
实现
这个 Lab 的切入口是 worker.go
,我们在 Worker
函数中,需要不断请求任务,master
根据当前状态分配任务。因此 worker
并不需要管理状态,只需要根据 RPC 返回的结果执行相应的任务。
对于 master
,我们需要所有输入文件名、Reduce 任务数量 R
。因为 Map
任务的数量等于输入文件的数量 M
,所以可以维护一个长为 M
的数列来管理所有 Map
任务的状态。同理也可以用长为 R
的数列管理 Reduce
任务。因而主要的 master
数据结构可以如下构造:
|
|
对于每个任务,自定义了一个 MapReduceTask
的结构体封装对应的数据。根据上面的需求分析,每个任务需要有:
- 类型
Type
:Map|Reduce|Wait - 任务的状态
Status
:Unassigned|Assigned|Finished - 该任务在原来任务列表里的
Index
(方便定位和更新对应的任务) - 开始时间
Timestamp
,便于判断任务超时 Map
任务文件MapFile
,以及 Reduce 任务中间文件列表ReduceFiles
|
|
RPC 消息的定义非常简单,worker
向 master
请求时有两种情况:
- 请求新的任务
RequestTask
,不需要额外数据 - 任务完成
FinishTask
,需要将完成的任务Task
传给master
以便更新任务列表
而 master
回复时则只需要指定 Task
以及告诉总的 Reduce 任务数量。
|
|
因此 master
的逻辑便是:
Master
结构初始化时,得到所有待处理文件files
以及 Reduce 结果数量nReduce
。将待处理文件顺序加入到
mapTasks
中,并初始化每个任务状态为Unassigned
。同样的,将nReduce
个Task
加入reduceTasks
中定义 RPC 处理函数
WorkerCallHandler(args *MapReduceArgs, reply *MapReduceReply) error
,worker
的 RPC 请求将由这个函数处理判断请求类型
RequestTask/FinishTask
如果是请求任务,先通过
mapFinished
判断是否 Map 阶段已完成,若未完成,则在mapTasks
中选择一个Unassigned
或者超时的任务;对于 Reduce 阶段,也是一样的。如果所有 Map 任务都已经被分配,并且没有完全结束,则返回一个Wait
任务让worker
等待如果是完成任务,则首先判断
worker
返回的任务Timestamp
是否一致(因为会有任务超时被重新分配,所以时间戳会不一致)。接着将对应的任务标记为Finished
,并判断是否所有的任务都已经Finished
。
需要注意的:
- 因为可能有多个
worker
同时请求,所以需要用Mutex
,确保同时只有一个线程在修改master
变量 - 因为要求能够处理
worker
不响应或异常退出的情况,所以worker
需要先将中间文件写入临时文件,将临时文件名返回master
,若成功返回了便在master
将其重命名,确保只有成功完成的任务才有合法的文件名。如 Map 的中间结果为mr-<maptask_id>-<reducetask_id>
,而 Reduce 的结果为mr-out-<reducetask_id>
对于 worker
,只需要根据分配的任务执行相应的函数。具体的 Map
和 Reduce
任务可以参考 mrsequential.go
:
|
|
在实现好之后通过 src/main/test-mr.sh
执行所有的测试:
|
|
参考: