MapReduce 原理及实现 - MIT 6.824 Lab 1

MapReduce 是 Google 的 Jeff Dean 等人在 2004 年提出的大数据处理框架。它将数据拆分为多块,在多个机器上并行处理。原本需要高性能计算资源的数据处理任务,通过 MapReduce 可以放到多个廉价的机器上并行处理。

MIT 6.824 分布式系统课程中,第一个实验便是用 Go 实现 MapReduce。

MapReduce 原理

  1. MapReduce 程序分成一个 master 和多个 workermaster 负责分配和调度任务,而 worker 负责执行被分配的 MapReduce 任务
  2. MapReduce 的输入为 M 个文件或数据块,数据之间没有依赖性;输出为 R 个 Reduce 结果文件
  3. Map 任务阶段,每一个 worker 被分配一个文件,对文件内容执行用户自定义的 Map 函数,输出若干 (key, value) 对,并保存至中间文件
  4. Map 任务全部执行完成后,分配 RReduce 任务,每个 worker 对应一个 Reduce 任务,收集该 Reduce 任务对应的所有中间文件,读取 (key, value) 对,将它们排序后执行 Reduce 函数,最后将结果输出

下图描绘了 MapReduce 的过程:

以计算单词个数为例,它的 Map 和 Reduce 函数可以这么表示:

1
2
3
4
5
6
7
8
Example: word count
  input is thousands of text files
  Map(k, v)
    split v into words
    for each word w
      emit(w, "1")
  Reduce(k, v)
    emit(len(v))

更多的细节在论文 MapReduce: Simplified Data Processing on Large Cluster 中,对 MapReduce 有着很详细的解释。

MapReduce 实现

下面将基于 MIT 6.824 的实验部分,讲述一下我是如何实现基本的 MapReduce 的。现在网上大多数的实验版本还停留在几年前,但 2020 Spring 的实验代码结构有了很大的变化,相对更加自由。

首先直接克隆 MIT 6.824 的项目

1
$ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

项目概览:所有的文件都在 src 文件夹下面。按照实验页面描述的可以执行已经给好的 sequential 的 MapReduce。示例是执行 wc.go 的统计单词数的任务。

1
2
3
4
$ cd 6.824/src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt

这个 Lab 是实现一个分布式的 MapReduce,由 masterworker 组成,它们将通过 RPC 进行通讯。

在这个 Lab 中,我们只需要修改 src/mr 目录下的 master.gorpc.goworker.go

需求分析

  1. 只有一个 master 实例,来分配调度 MapReduce 的任务
  2. worker 实例需要不断向 master 请求任务,在完成任务之后向 master 报告任务完成
  3. master 先分配 Map 任务,在所有 Map 任务结束后,再分配 reduce 任务
  4. master 在完成所有任务之后退出,此时如果还有 workermaster 请求任务,则会因为 RPC 请求失败而直接退出
  5. 如果 worker 意外退出,或者长时间未响应,则 master 需要重新分配该任务

实现

这个 Lab 的切入口是 worker.go,我们在 Worker 函数中,需要不断请求任务,master 根据当前状态分配任务。因此 worker 并不需要管理状态,只需要根据 RPC 返回的结果执行相应的任务。

对于 master,我们需要所有输入文件名、Reduce 任务数量 R。因为 Map 任务的数量等于输入文件的数量 M,所以可以维护一个长为 M 的数列来管理所有 Map 任务的状态。同理也可以用长为 R 的数列管理 Reduce 任务。因而主要的 master 数据结构可以如下构造:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type Master struct {
	files   []string
	nReduce int

	mapTasks    []MapReduceTask
	reduceTasks []MapReduceTask

	mapFinished    bool
	reduceFinished bool

	mutex sync.Mutex
}

对于每个任务,自定义了一个 MapReduceTask 的结构体封装对应的数据。根据上面的需求分析,每个任务需要有:

  • 类型 Type:Map|Reduce|Wait
  • 任务的状态 Status:Unassigned|Assigned|Finished
  • 该任务在原来任务列表里的 Index(方便定位和更新对应的任务)
  • 开始时间 Timestamp,便于判断任务超时
  • Map 任务文件 MapFile,以及 Reduce 任务中间文件列表 ReduceFiles
1
2
3
4
5
6
7
8
type MapReduceTask struct {
	Type        string    // "Map", "Reduce", "Wait"
	Status      int       // "Unassigned", "Assigned", "Finished"
	Index       int       // Index of the task
	Timestamp   time.Time // Start time
	MapFile     string    // File for map task
	ReduceFiles []string  // List of files for reduce task
}

RPC 消息的定义非常简单,workermaster 请求时有两种情况:

  1. 请求新的任务 RequestTask,不需要额外数据
  2. 任务完成 FinishTask,需要将完成的任务 Task 传给 master 以便更新任务列表

master 回复时则只需要指定 Task 以及告诉总的 Reduce 任务数量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const (
	RequestTask = iota
	FinishTask
)

type MapReduceArgs struct {
	MessageType int
	Task        MapReduceTask
}

type MapReduceReply struct {
	Task    MapReduceTask
	NReduce int
}

因此 master 的逻辑便是:

  1. Master 结构初始化时,得到所有待处理文件 files 以及 Reduce 结果数量 nReduce

    将待处理文件顺序加入到 mapTasks 中,并初始化每个任务状态为 Unassigned。同样的,将 nReduceTask 加入 reduceTasks

  2. 定义 RPC 处理函数 WorkerCallHandler(args *MapReduceArgs, reply *MapReduceReply) errorworker 的 RPC 请求将由这个函数处理

  3. 判断请求类型 RequestTask/FinishTask

    如果是请求任务,先通过 mapFinished 判断是否 Map 阶段已完成,若未完成,则在 mapTasks 中选择一个 Unassigned 或者超时的任务;对于 Reduce 阶段,也是一样的。如果所有 Map 任务都已经被分配,并且没有完全结束,则返回一个 Wait 任务让 worker 等待

    如果是完成任务,则首先判断 worker 返回的任务 Timestamp 是否一致(因为会有任务超时被重新分配,所以时间戳会不一致)。接着将对应的任务标记为 Finished,并判断是否所有的任务都已经 Finished

需要注意的:

  1. 因为可能有多个 worker 同时请求,所以需要用 Mutex,确保同时只有一个线程在修改 master 变量
  2. 因为要求能够处理 worker 不响应或异常退出的情况,所以 worker 需要先将中间文件写入临时文件,将临时文件名返回 master,若成功返回了便在 master 将其重命名,确保只有成功完成的任务才有合法的文件名。如 Map 的中间结果为 mr-<maptask_id>-<reducetask_id>,而 Reduce 的结果为 mr-out-<reducetask_id>

对于 worker,只需要根据分配的任务执行相应的函数。具体的 MapReduce 任务可以参考 mrsequential.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for true {
		args := MapReduceArgs{MessageType: RequestTask}
		reply := MapReduceReply{}

		res := call("Master.WorkerCallHandler", &args, &reply)
		if !res {
			break
		}

		switch reply.Task.Type {
		case "Map":
			doMap(&reply, mapf)
		case "Reduce":
			doReduce(&reply, reducef)
		case "Wait":
			time.Sleep(1 * time.Second)
		}
	}
}

在实现好之后通过 src/main/test-mr.sh 执行所有的测试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
$ sh test-mr.sh
*** Starting wc test.
2020/09/06 19:39:30 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- wc test: PASS
*** Starting indexer test.
2020/09/06 19:39:36 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- indexer test: PASS
*** Starting map parallelism test.
2020/09/06 19:39:39 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2020/09/06 19:39:47 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- reduce parallelism test: PASS
*** Starting crash test.
2020/09/06 19:39:55 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- crash test: PASS
*** PASSED ALL TESTS

参考:

加载评论