mit6.824笔记

环境配置

环境:Ubuntu 20.04

配置go语言环境

1
apt install golang

在vscode里面安装go的插件

lab1

.so文件是go里面的插件。在本lab中mrsequential.go是主程序。每次运行主程序的时候选择一个插件,即具体的map-reduce过程。

go build -buildmode=plugin ../mrapps/wc.go是吧wc.go编译成插件(动态链接库)

阅读mrsequential.go

os.Args 是一个字符串切片,包含了程序运行时的所有命令行参数,包括程序名本身。程序需要至少两个参数:xxx.so 插件文件和至少一个输入文件。

如果合法,则加载插件中的map和reduce方法。

创建一个中间输出的键值对切片。

对于所有输入的文件,将其分别打开并全部读入,调用插件中的map函数,生成中间键值对,并将键值对的内容全部加入先前创建的中间输出切片。

如果要将一个切片的所有元素追加到另一个切片中,可以使用 ... 操作符。这个操作符告诉编译器将切片展开为一个个的元素。如果不使用 ...,则会尝试将整个切片作为一个元素添加到目标切片中。

与真实的MapReduce不同,这里的mrsequential.go将所有的中间数据都存储在一个切片中,而不是被切分成N*M个。

对key进行排序。

对于每一个中间输出的键值对,检查有多少个key相同,把相同的key数量统计为j,并将这j个相同key的键值对的value存入一个字符串切片,将这个相同的key和字符串切片传入自定义的reduce方法中,最后键值对结果保存到mr-out-0中。

具体的loadPlugin函数(即加载map和reduce方法的函数),用Lookup来寻找文件中的方法,返回该方法的引用。

1
mapf := xmapf.(func(string, string) []mr.KeyValue)

这行代码通过类型断言将 xmapf 变量转换成具体的函数类型 func(string, string) []mr.KeyValue。这意味着程序预期找到的 "Map" 函数应该接受两个 string 类型的参数,并返回一个 mr.KeyValue 类型的切片。

Lookup返回一个空接口类型 interface{} 。可以理解空接口类型就像java中的object,所有类都继承他,但是具体是啥需要自行解释(断言),是一种多态的体现。如果断言错误就会出现恐慌(panic)。

阅读wc.go

实现了map和reduce方法。map读入文件名和以字符串形式传入的文件内容,实际上并没有用到文件名。

在map方法中,首先创建了一个判断分隔符的函数,将其作为回调函数来分割文件内容,返回一个个索引和单词组成的切片。遍历切片,将所有的单词用“单词: 1”的键值对存储在切片中。返回切片。

reduce方法直接返回了values的长度。

任务

实现一个分布式 MapReduce,它由两个程序(coordinatorworker)组成。只有一个协调程序和一个或多个并行执行的工作程序。在真实系统中,worker会运行在多台不同的机器上,但在本lab中,你将在一台机器上运行所有workerworker将通过 RPC 与协调程序对话。每个 worker 进程将循环向coordinator请求任务,从一个或多个文件中读取任务输入,执行任务,将任务输出写入一个或多个文件,然后再次向coordinator请求新任务。coordinator应该注意到,如果某个 worker 在合理的时间内(本lab使用 10 秒)没有完成任务,就会将相同的任务交给不同的 worker

提供的代码位于main/mrcoordinator.gomain/mrworker.go,最后实现的代码放在 mr/coordinator.go, mr/worker.gomr/rpc.go

阅读

首先阅读一下源码。其中main/mrcoordinator.gomain/mrworker.go是不能修改的,mrcoordinator.go用于调用自己编写的 mr/coordinator.gomrworker.go用于开启一个worker进程,具体实现在mr/rpc.go

mrcoordinator.go

调用了mr/coordinator.go,将所有文件名称传入MakeCoordinator方法,同时传入10,代表10个reduce任务。当Done返回true的时候结束。

mrworker.go

加载插件,生成map和reduce方法,传给自己创建的worker。

coordinator.go

创建了一个Coordinator类,主要包含serverDone方法。server方法用于开启一个接受RPC请求的线程。Done方法用于判断工作是否结束。MakeCoordinator函数被mrcoordinator.go调用,返回一个Coordinator{}的实例。

rpc.go

RPC相关的一些定义,自定义时需要首字母大写。包含一个coordinatorSock()函数,用于在/var/tmp/目录下创建一个套接字文件,名为5840-mr-UserId。这个函数并不直接创建,而是返回这样一个路径下命名的字符串。

worker.go

在其中定义了键值对的结构体。

创建了一个将不同键通过对其哈希值取模分配到N个reduce任务的方法。

最主要的是main/mrworker.go会调用的Worker方法,传入map和reduce方法,主要是在这里写代码。

思路

按照原论文的习惯,我就称为worker和master了

新worker加入

对于woker来说,可以每次一上线就注册到master当中,调用master的join方法(待编写)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (w *workerTask) CallMasterStart() {
args := ArgsToMaster{}
args.Id = os.Getuid()
fmt.Printf("id %v\n", args.Id)
//args.msgType = join
reply := ArgsFromMaster{}
ok := call("Coordinator.WorkerJoin", &args, &reply)
if ok {
// 应该会返回master给返回worker的任务,待详细设计
//fmt.Printf("reply.msg %v\n", reply.Msg)
splits := strings.Split(reply.Msg, "=")
w.nReduce, _ = strconv.Atoi(splits[1])
w.filename = splits[0]
w.taskType, _ = strconv.Atoi(splits[2])
//fmt.Printf("reply.msg %v, nReduce %v\n", w.filename, w.nReduce)

} else {
fmt.Printf("call failed!\n")
}
}

join方法里面最好是能建立一个socket到worker,不知道可不可以,如果不可以的话,master怎么给woker派任务?(用reply的值返回就行,socket开销太大)所以在worker中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Coordinator) WorkerJoin(args *ArgsToMaster, reply *ArgsFromMaster) error {
// 由于是并发的,所以需要考虑线程安全
c.mu.Lock()
c.workerList = append(c.workerList, args.Id)
// 回传一个任务地址回去
for i, taskItem := range c.taskList {
//fmt.Println(taskItem)
if taskItem.taskStatus == ready {
reply.Msg = taskItem.taskAddr + "=" + strconv.Itoa(c.nReducer) + "=" + strconv.Itoa(int(taskItem.taskType))
c.taskList[i].taskStatus = assign
c.taskList[i].taskWorker = args.Id
// 如果找到了就要解锁,不然其他线程获取不到锁
c.mu.Unlock()
return nil
}
}
c.mu.Unlock()
reply.Msg = ""
return nil
}

master维护一个任务列表,每个任务包含任务类型,文件地址,任务状态,分配到的wokerid

1
2
3
4
5
6
type task struct {
taskType TaskType //任务类型
taskAddr string //文件地址
taskStatus TaskStatus //任务状态
taskWorker int //分配给的worker编号
}

worker收到reply之后,应该在本地读取任务并运行

报错:

1
gob: type mismatch: no fields matched compiling decoder for ArgsToMaster

搜了一下说客户端所传的参数类型和服务端不一致

原来是因为之前修改了结构体名称后没有重新运行master。。。。。

master执行逻辑

按理说master给worker分了任务就已经可以了,但是,master有要求如下:

如果工作者在合理的时间内(在这个实验中,使用十秒)没有完成其任务,协调者应该注意到,并将相同的任务分配给不同的工作者。

所以master应当给每个任务维护一个计时器,只要10s到了任务还没有end,那这个任务就应该把状态改为ready。任务新增计时器的字段,并且每分配一次任务就要初始化一次计时器

在master的任务列表初始化结束后,开启一个新的携程,专门用于检查是否超时。此后每次任务完成回给master的时候,master都应该检查id是不是和列表中的相同。

原本修改的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 用来检查有没有任务超时了
func (c *Coordinator) testTimeOut() {
for !c.Done() {
for i, taskItem := range c.taskList {
//fmt.Println("正在检查:", c.taskList[i])
if c.taskList[i].taskStatus == assign && time.Since(c.taskList[i].taskStart) >= 10*time.Second {
//fmt.Println("已到达")
fmt.Println("删除了:", taskItem.taskAddr, time.Since(taskItem.taskStart), "现在时间:", time.Now(), "任务开始时间", c.taskList[i].taskStart)
c.taskList[i].taskStatus = ready
c.taskList[i].taskWorker = -1
}
}
}
}

但是存在一个问题!就是在锁的机制上。简单地说,c里面的数据在多线程中是不安全的,每一个访问和修改都应该要加锁才对。

因此,在每一个if判断前后,都应该加上一个锁。但是这样是否就存在一个问题:所有的步骤都卡在这个锁上,降低了并行的效率,成为性能瓶颈。虽然在master上,这个锁并不一定会太影响,但是诸如检查任务是否超时的场景会经常请求访问。

所以我考虑不用之前定义的sync.Mutex,而是用读写锁RWMutex,这样至少可以多个同时读。最后写出来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 用来检查有没有任务超时了
func (c *Coordinator) testTimeOut() {
for !c.Done() {
for i := range c.taskList {
//fmt.Println("正在检查:", c.taskList[i])
c.mu.RLock()
if c.taskList[i].taskStatus == assign && time.Since(c.taskList[i].taskStart) >= 10*time.Second {
fmt.Println("已到达")
c.mu.RUnlock()
c.mu.Lock()
fmt.Println("删除了:", c.taskList[i].taskAddr, time.Since(c.taskList[i].taskStart), "现在时间:", time.Now(), "任务开始时间", c.taskList[i].taskStart)
c.taskList[i].taskStatus = ready
c.taskList[i].taskWorker = -1
c.mu.Unlock()
c.mu.RLock() // 为了和最后的c.mu.RUnlock()配对,不然会出错
}
c.mu.RUnlock()
}
}
}

gpt 给了一版更好的,我也觉得更好看,但是我没有用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (c *Coordinator) testTimeOut() {
for !c.Done() {
for i := range c.taskList {
c.mu.RLock()
task := c.taskList[i] // 获取任务
taskStatus := task.taskStatus
taskStart := task.taskStart
c.mu.RUnlock()

if taskStatus == assign && time.Since(taskStart) >= 10*time.Second {
// 超时处理逻辑
c.mu.Lock()
if c.taskList[i].taskStatus == assign { // 双重检查避免竞态条件
fmt.Println("超时任务:", c.taskList[i].taskAddr, time.Since(taskStart))
c.taskList[i].taskStatus = ready
c.taskList[i].taskWorker = -1
}
c.mu.Unlock()
}
}
}
}

在Done里面,我们可以简单的把任务队列过一遍,如果所有的任务都已经变成finish,那么done返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
ret := true

// Your code here.
// 遍历任务队列,如果存在为finish的,就返回false
c.mu.RLock()
for _, task := range c.taskList {
if task.taskStatus != finish {
ret = false
}
}
c.mu.RUnlock()

return ret
}

worker执行逻辑

现在worker知道了任务类型和文件地址,应该使用map或者reduce函数进行操作,并将结果存到文件中。但仍然存在一个问题:我们并不是调完一次map或者reduce任务,worker就挂了,而是持续存在。因此,需要每执行完一个任务,就去找master要任务。