MapReduce 是 Google 的 Jeff Dean 等人在 2004 年提出的大数据处理框架。它将数据拆分为多块,在多个机器上并行处理。原本需要高性能计算资源的数据处理任务,通过 MapReduce 可以放到多个廉价的机器上并行处理。
在 MIT 6.824 分布式系统课程中,第一个实验便是用 Go 实现 MapReduce。
MapReduce 原理
- MapReduce 程序分成一个
master和多个worker,master负责分配和调度任务,而worker负责执行被分配的Map或Reduce任务 - MapReduce 的输入为
M个文件或数据块,数据之间没有依赖性;输出为R个 Reduce 结果文件 - 在
Map任务阶段,每一个worker被分配一个文件,对文件内容执行用户自定义的Map函数,输出若干(key, value)对,并保存至中间文件 - 当
Map任务全部执行完成后,分配R个Reduce任务,每个worker对应一个 Reduce 任务,收集该Reduce任务对应的所有中间文件,读取(key, value)对,将它们排序后执行Reduce函数,最后将结果输出
下图描绘了 MapReduce 的过程:

以计算单词个数为例,它的 Map 和 Reduce 函数可以这么表示:
| |
更多的细节在论文 MapReduce: Simplified Data Processing on Large Cluster 中,对 MapReduce 有着很详细的解释。
MapReduce 实现
下面将基于 MIT 6.824 的实验部分,讲述一下我是如何实现基本的 MapReduce 的。现在网上大多数的实验版本还停留在几年前,但 2020 Spring 的实验代码结构有了很大的变化,相对更加自由。
首先直接克隆 MIT 6.824 的项目:
| |
项目概览:所有的文件都在 src 文件夹下面。按照实验页面描述的可以执行已经给好的 sequential 的 MapReduce。示例是执行 wc.go 的统计单词数的任务。
| |
这个 Lab 是实现一个分布式的 MapReduce,由 master 和 worker 组成,它们将通过 RPC 进行通讯。
在这个 Lab 中,我们只需要修改 src/mr 目录下的 master.go、rpc.go 和 worker.go。
需求分析
- 只有一个
master实例,来分配调度 MapReduce 的任务 worker实例需要不断向master请求任务,在完成任务之后向master报告任务完成master先分配Map任务,在所有Map任务结束后,再分配reduce任务master在完成所有任务之后退出,此时如果还有worker向master请求任务,则会因为 RPC 请求失败而直接退出- 如果
worker意外退出,或者长时间未响应,则master需要重新分配该任务
实现
这个 Lab 的切入口是 worker.go,我们在 Worker 函数中,需要不断请求任务,master 根据当前状态分配任务。因此 worker 并不需要管理状态,只需要根据 RPC 返回的结果执行相应的任务。
对于 master,我们需要所有输入文件名、Reduce 任务数量 R。因为 Map 任务的数量等于输入文件的数量 M,所以可以维护一个长为 M 的数列来管理所有 Map 任务的状态。同理也可以用长为 R 的数列管理 Reduce 任务。因而主要的 master 数据结构可以如下构造:
| |
对于每个任务,自定义了一个 MapReduceTask 的结构体封装对应的数据。根据上面的需求分析,每个任务需要有:
- 类型
Type:Map|Reduce|Wait - 任务的状态
Status:Unassigned|Assigned|Finished - 该任务在原来任务列表里的
Index(方便定位和更新对应的任务) - 开始时间
Timestamp,便于判断任务超时 Map任务文件MapFile,以及 Reduce 任务中间文件列表ReduceFiles
| |
RPC 消息的定义非常简单,worker 向 master 请求时有两种情况:
- 请求新的任务
RequestTask,不需要额外数据 - 任务完成
FinishTask,需要将完成的任务Task传给master以便更新任务列表
而 master 回复时则只需要指定 Task 以及告诉总的 Reduce 任务数量。
| |
因此 master 的逻辑便是:
Master结构初始化时,得到所有待处理文件files以及 Reduce 结果数量nReduce。将待处理文件顺序加入到
mapTasks中,并初始化每个任务状态为Unassigned。同样的,将nReduce个Task加入reduceTasks中定义 RPC 处理函数
WorkerCallHandler(args *MapReduceArgs, reply *MapReduceReply) error,worker的 RPC 请求将由这个函数处理判断请求类型
RequestTask/FinishTask如果是请求任务,先通过
mapFinished判断是否 Map 阶段已完成,若未完成,则在mapTasks中选择一个Unassigned或者超时的任务;对于 Reduce 阶段,也是一样的。如果所有 Map 任务都已经被分配,并且没有完全结束,则返回一个Wait任务让worker等待如果是完成任务,则首先判断
worker返回的任务Timestamp是否一致(因为会有任务超时被重新分配,所以时间戳会不一致)。接着将对应的任务标记为Finished,并判断是否所有的任务都已经Finished。
需要注意的:
- 因为可能有多个
worker同时请求,所以需要用Mutex,确保同时只有一个线程在修改master变量 - 因为要求能够处理
worker不响应或异常退出的情况,所以worker需要先将中间文件写入临时文件,将临时文件名返回master,若成功返回了便在master将其重命名,确保只有成功完成的任务才有合法的文件名。如 Map 的中间结果为mr-<maptask_id>-<reducetask_id>,而 Reduce 的结果为mr-out-<reducetask_id>
对于 worker,只需要根据分配的任务执行相应的函数。具体的 Map 和 Reduce 任务可以参考 mrsequential.go:
| |
在实现好之后通过 src/main/test-mr.sh 执行所有的测试:
| |
参考: