阅读本文前请先仔细阅读Lab 1实验要求并熟悉基础代码,了解Go中的RPC通信。
Lab环境的配置过程中可能会出现若干问题,参照此处解决。
本文只提供相关实现思路,希望可以读者由此获得灵感。
实验说明
- 完成分布式MapReduce系统设计,实现词频统计
- Worker分别调用Map和Reduce Function来读取和处理文件
- Master用来调度和处理失败的Worker
- 主要修改src/mr文件夹中的master.go rpc.go worker.go
- Lab中是所有Map任务完成后才进行Reduce任务
- Worker和Master之间利用RPC进行通信
设计概述
大部分思路来自于https://github.com/yzongyue/6.824-golabs-2020,在此基础上进行修改,十分感谢!
结构说明
- 调用mrmaster.go启动Master,监听是否有人向自身发送RPC请求。
- 调用mrworker.go启动Worker,首先询问Master是否已经分配了全部任务,如果分配完成了就不新建Worker,直接退出,如果未完成,则新建Worker分担任务。
- 此处询问是否分配完成设计是由测试脚本引发的设计,crash.go中在master分配全部任务之后会继续新建Worker向master请求,如果不进行相关设计会warning,但是不影响正确结果
- Worker首先通过RPC通信向Master注册自身,再通过RPC通信向Master请求任务,执行任务,最后通过RPC向Master报告任务完成,此处共需要设计三种RPC请求回答格式。
架构图
master
![master master](/2023/07/10/MIT6-824-2020-Lab1-MapReduce/master.png)
worker
![worker worker](/2023/07/10/MIT6-824-2020-Lab1-MapReduce/worker.png)
数据流图
以nMap = n,nReduce=3为例,其中中间文件的数量为nMap*nReduce=3n个。
![dataflow dataflow](/2023/07/10/MIT6-824-2020-Lab1-MapReduce/dataflow.png)
具体实现
Master
Master的作用就是统筹Worker,分配任务,因此需要为任务构建一个结构体,使用RPC在Master和Worker之间传输,定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type TaskPhase int
const ( MapPhase TaskPhase = 0 ReducePhase TaskPhase = 1 WaitPhase TaskPhase = 2 ExitPhase TaskPhase = 3 )
type Task struct { FileName string NMap int NReduce int ID int Phase TaskPhase }
|
NMap变量由文件数决定,由于在Map阶段需要生成中间文件mr-MapID-BucketID,此处的NMap用作后面Reduce阶段遍历相关中间文件。
Phase变量有4个取值,分别代表当前系统是在哪个任务阶段(可能TaskPhase会造成误解),其中Wait阶段是Worker向Master发出请求,但是Master现在并没有任务发送给他,Worker需要等待,Exit阶段表示当前需要退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| const ( TaskReady TaskStatus = 0 TaskQueue TaskStatus = 1 TaskRunning TaskStatus = 2 TaskDone TaskStatus = 3 TaskError TaskStatus = 4 )
type TaskStat struct { WorkerID int Status TaskStatus StartTime time.Time }
type Master struct { files []string NReduce int
mu sync.Mutex NowWorkers int TotalWorkers int ExitWorkers int
Tstate []TaskStat Phase TaskPhase
TaskCh chan Task }
|
初始化过程中可以确定好files和NReduce,files是所需要处理的文件集合,NReduce是生成最后文件的数量,可以理解为桶的数量。
NowWorker为当前分配WorkerID的指针,每次有一个Worker发来一个注册RPC,为其分配一个WorkerID,然后NowWorker自增,TotalWorker自增。
TotalWorker为当前Master中存活的Worker,因为后续test脚本中有crash测试,因此当一个worker失联超过设置的时间后判定为crash,TotalWorker自减。
ExitWorker为当前Master中正常退出的Worker,当Worker发送退出RPC时(为了简化设计,将退出RPC和通知Master任务完成的RPC进行设计合并),ExitWorker自增。
如何判断当前任务全部完成,Master可以退出了( Done )?
- 判断TotalWorkers和ExitWorkers是否相等即可。
TState是Master中较为关键的成员,通过Init函数进行设置。
1 2 3 4 5 6 7 8 9
| func (m *Master) InitMap() { m.Phase = MapPhase m.Tstate = make([]TaskStat, len(m.files)) }
func (m *Master) InitReduce() { m.Phase = ReducePhase m.Tstate = make([]TaskStat, m.NReduce) }
|
TState的长度等于当前需要处理的Task的数量,TState[i]的含义就是第i个task当前的状态,包括分配给的WorkerID,状态以及开始执行的时间。TaskCh是一段有buffer的channel,用作Goroutines之间的数据传输,后续解释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func (m *Master) InitMap() {} func (m *Master) InitReduce() {}
func (m *Master) RegisterWorker(args *RegisterArgs, reply *RegisterReply) error {} func (m *Master) ExitWorker(args *ReportArgs, reply *ReportReply) error {}
func (m *Master) RegisterTask(args *RequestArgs, task *Task) {}
func (m *Master) GetTask(NowTaskID int) Task {}
func (m *Master) Schedule() {} func (m *Master) TickSchedule() {}
func (m *Master) GetOneTask(args *RequestArgs, reply *RequestReply) error {}
func (m *Master) ReportTask_m(args *ReportArgs, reply *ReportReply) error {}
func (m *Master) IsAllDone(args *ReportArgs, reply *ReportReply) error {}
func MakeMaster(files []string, nReduce int) *Master { m := Master{}
m.mu = sync.Mutex{} m.NReduce = nReduce m.files = files
if nReduce > len(files) { m.TaskCh = make(chan Task, nReduce) } else { m.TaskCh = make(chan Task, len(files)) }
m.InitMap() go m.TickSchedule()
m.server() return &m }
|
- 首先调用MakeMaster新建一个Master,并构建一个有buffer的TaskChannel,用作存放需要发送给Worker的Task。
- 接着初始化Map,设置一下相关的参数
- TickSchedule开始周期性调度全部任务,该函数主要实现是利用for循环,在不是ExitPhase的情况下周期性调用Schedule
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (m *Master) Schedule() { m.mu.Lock() defer m.mu.Unlock()
if m.Phase == ExitPhase { return }
if m.Phase == WaitPhase { if m.ExitWorkers == m.TotalWorkers { m.Phase = ExitPhase } return }
AllDone := true for idx, task := range m.Tstate { switch task.Status { case TaskReady: AllDone = false m.TaskCh <- m.GetTask(idx) m.Tstate[idx].Status = TaskQueue case TaskQueue: case TaskRunning: case TaskDone: case TaskError: default: } }
if AllDone { if m.Phase == MapPhase { m.InitReduce() } else { m.Phase = WaitPhase } } }
|
Schedule的大体框架如上,首先Master在调度之前判断一下自身的状态,wait状态是Master已经调度完全部任务了,向所有还存活的Worker发送一个伪任务,告诉他们可以退出了,其余还存活的Worker一直在不停的接受任务,这时收到Master发送过来的fake task后,向Master发送已退出的消息然后退出,Master接收到全部退出消息后,将Phase改为ExitPhase然后退出。
中间的for循环遍历TaskState,将准备好的任务发送到TaskChannel,来让Worker接收。
Worker
首先定义worker结构体
1 2 3 4 5
| type worker struct { workerID int mapf func(string, string) []KeyValue reducef func(string, []string) string }
|
worker的构建主要是通过Worker function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
w := worker{ mapf: mapf, reducef: reducef, }
args := ReportArgs{} reply := ReportReply{}
call("Master.IsAllDone", &args, &reply) if reply.AllFinished { fmt.Printf("all done, no more client\n") return }
w.Register() w.run() fmt.Printf("worker %d exit normally\n", w.workerID) }
|
首先判断一下Master是否分配完了全部的工作,否则就进行worker的注册和执行,Register主要通过RPC通信来和Master通信获取可用的workerID,run用来执行构建好的worker。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (w *worker) run() { for { task := w.RequestTask() fmt.Printf("worker %d get task %d, TaskPhase==%d\n", w.workerID, task.ID, task.Phase) if task.Phase == ExitPhase { w.Exit() break } else if task.Phase == WaitPhase { time.Sleep(1 * time.Second) continue } w.doTask(task)
} }
|
run函数是worker中需要重点关注的函数,内部使用一个for循环来不断请求Master获得空闲的Task,此处的RequestTask也是通过RPC通信进行数据的获取。
- 如果当前task处于ExitPhase,则调用Exit向Master进行RPC通信,告诉Master此Worker已退出。
- 如果当前task处于WaitPhase,说明当前Master无任务分配,让Worker睡眠一段时间后重新请求。
如果当前任务可执行,则调用doTask进行任务的执行,doTask流程如下。
1 2 3 4 5 6 7 8 9 10
| func (w *worker) doTask(task Task) { switch task.Phase { case MapPhase: w.doMap(task) case ReducePhase: w.doReduce(task) default: panic("wrong phase") } }
|
RPC设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| type RegisterArgs struct { } type RegisterReply struct { WorkerID int }
type RequestArgs struct { WorkerID int } type RequestReply struct { Task_ *Task }
type ReportArgs struct { Done bool TaskID int Phase TaskPhase WorkerID int } type ReportReply struct { AllFinished bool }
|
对于Register RPC,不需要worker传递参数给master,只需要master把当前可用的workerID 回复给worker即可。
对于Request RPC,worker需要传递给master自己的workerID,让master可以更新自己对应的TState中的任务,明确该分配给了了哪个worker,并更新状态,master回复给worker对应的task即可。
对于Report RPC,合并了三种操作,一种是刚调用Worker时询问Master是否分配完了全部的任务,一种是完成一个Task后向Master汇报该任务已完成,最后一种是向Master汇报该worker已退出。
doMap
Master把收到的全部文件设置为一个个的Task,状态保存在TState中,每次每个worker调用doMap就从TaskCh中取出一个Task任务执行mapf(大概就是对一个具体的file做Map操作,返回的键值对仿照mrsequential.go暂时保存到kva中),遍历kva,利用ihash函数将kv.Key对应的Value哈希到对应的桶中(可以使用intermediate[]表示桶的集合,大小为nReduce)
最后遍历桶,将各个桶中的数据写入中间文件,中间文件的命名可以使用mr-mapID-bucketID,写入出错或者写入成功都需要向Master call RPC进行通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (w *worker) doMap(task Task) { kva := w.mapf(filename, string(content))
intermediate := make([][]KeyValue, task.NReduce) for _, kv := range kva { bucketID := ihash(kv.Key) % task.NReduce intermediate[bucketID] = append(intermediate[bucketID], kv) }
for bucketID, kva := range intermediate { oname := Map2ReduceFileName(task.ID, bucketID) ofile, err := os.Create(oname) if err != nil { w.ReportTask_w(task, false, err) return } enc := json.NewEncoder(ofile) for _, kv := range kva { err := enc.Encode(&kv) if err != nil { w.ReportTask_w(task, false, err) } }
if err := ofile.Close(); err != nil { w.ReportTask_w(task, false, err) } } w.ReportTask_w(task, true, nil) }
|
doReduce
一个具体的Reduce Task是将上面Map生成的中间全部文件mr-i-taskID( 0 ≤ i < NMap ),集合并做reducef操作,保存到最终文件mr-out-taskID中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (w *worker) doReduce(task Task) { maps := make(map[string][]string) for idx := 0; idx < task.NMap; idx++ { filename := Map2ReduceFileName(idx, task.ID) file, err := os.Open(filename) if err != nil { w.ReportTask_w(task, false, err) return }
dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } if _, ok := maps[kv.Key]; !ok { maps[kv.Key] = make([]string, 0, 100) } maps[kv.Key] = append(maps[kv.Key], kv.Value) } }
res := make([]string, 0, 100) for key, values := range maps { res = append(res, fmt.Sprintf("%v %v\n", key, w.reducef(key, values))) }
if err := ioutil.WriteFile(MergeName(task.ID), []byte(strings.Join(res, "")), 0600); err != nil { w.ReportTask_w(task, false, err) return }
w.ReportTask_w(task, true, nil) }
|
测试
![test test](/2023/07/10/MIT6-824-2020-Lab1-MapReduce/test.GIF)