环境配置
环境:Ubuntu 20.04
配置go语言环境
1 | apt install golang |
在vscode里面安装go的插件
lab1
.so
文件是go里面的插件。在本lab中mrsequential.go
是主程序。每次运行主程序的时候选择一个插件,即具体的map-reduce过程。
go build -buildmode=plugin ../mrapps/wc.go是吧wc.go编译成插件(动态链接库)
阅读mrsequential.go
os.Args
是一个字符串切片,包含了程序运行时的所有命令行参数,包括程序名本身。程序需要至少两个参数:xxx.so 插件文件和至少一个输入文件。
如果合法,则加载插件中的map和reduce方法。
创建一个中间输出的键值对切片。
对于所有输入的文件,将其分别打开并全部读入,调用插件中的map函数,生成中间键值对,并将键值对的内容全部加入先前创建的中间输出切片。
如果要将一个切片的所有元素追加到另一个切片中,可以使用 ...
操作符。这个操作符告诉编译器将切片展开为一个个的元素。如果不使用 ...
,则会尝试将整个切片作为一个元素添加到目标切片中。
与真实的MapReduce
不同,这里的mrsequential.go
将所有的中间数据都存储在一个切片中,而不是被切分成N*M个。
对key进行排序。
对于每一个中间输出的键值对,检查有多少个key相同,把相同的key数量统计为j,并将这j个相同key的键值对的value存入一个字符串切片,将这个相同的key和字符串切片传入自定义的reduce方法中,最后键值对结果保存到mr-out-0中。
具体的loadPlugin
函数(即加载map和reduce方法的函数),用Lookup来寻找文件中的方法,返回该方法的引用。
1 | mapf := xmapf.(func(string, string) []mr.KeyValue) |
这行代码通过类型断言将 xmapf
变量转换成具体的函数类型 func(string, string) []mr.KeyValue
。这意味着程序预期找到的 "Map"
函数应该接受两个 string
类型的参数,并返回一个 mr.KeyValue
类型的切片。
Lookup返回一个空接口类型
interface{}
。可以理解空接口类型就像java中的object,所有类都继承他,但是具体是啥需要自行解释(断言),是一种多态的体现。如果断言错误就会出现恐慌(panic)。
阅读wc.go
实现了map和reduce方法。map读入文件名和以字符串形式传入的文件内容,实际上并没有用到文件名。
在map方法中,首先创建了一个判断分隔符的函数,将其作为回调函数来分割文件内容,返回一个个索引和单词组成的切片。遍历切片,将所有的单词用“单词: 1”的键值对存储在切片中。返回切片。
reduce方法直接返回了values的长度。
任务
实现一个分布式 MapReduce,它由两个程序(coordinator
和worker
)组成。只有一个协调程序和一个或多个并行执行的工作程序。在真实系统中,worker
会运行在多台不同的机器上,但在本lab中,你将在一台机器上运行所有worker
。worker
将通过 RPC 与协调程序对话。每个 worker
进程将循环向coordinator
请求任务,从一个或多个文件中读取任务输入,执行任务,将任务输出写入一个或多个文件,然后再次向coordinator
请求新任务。coordinator
应该注意到,如果某个 worker
在合理的时间内(本lab使用 10 秒)没有完成任务,就会将相同的任务交给不同的 worker
。
提供的代码位于main/mrcoordinator.go
和main/mrworker.go
,最后实现的代码放在 mr/coordinator.go
, mr/worker.go
和 mr/rpc.go
。
阅读
首先阅读一下源码。其中main/mrcoordinator.go
和main/mrworker.go
是不能修改的,mrcoordinator.go
用于调用自己编写的 mr/coordinator.go
,mrworker.go
用于开启一个worker进程,具体实现在mr/rpc.go
。
mrcoordinator.go
调用了mr/coordinator.go
,将所有文件名称传入MakeCoordinator
方法,同时传入10,代表10个reduce任务。当Done
返回true的时候结束。
mrworker.go
加载插件,生成map和reduce方法,传给自己创建的worker。
coordinator.go
创建了一个Coordinator
类,主要包含server
,Done
方法。server
方法用于开启一个接受RPC请求的线程。Done
方法用于判断工作是否结束。MakeCoordinator
函数被mrcoordinator.go
调用,返回一个Coordinator{}
的实例。
rpc.go
RPC相关的一些定义,自定义时需要首字母大写。包含一个coordinatorSock()
函数,用于在/var/tmp/
目录下创建一个套接字文件,名为5840-mr-UserId
。这个函数并不直接创建,而是返回这样一个路径下命名的字符串。
worker.go
在其中定义了键值对的结构体。
创建了一个将不同键通过对其哈希值取模分配到N个reduce任务的方法。
最主要的是main/mrworker.go
会调用的Worker方法,传入map和reduce方法,主要是在这里写代码。
思路
按照原论文的习惯,我就称为worker和master了
新worker加入
对于woker来说,可以每次一上线就注册到master当中,调用master的join方法(待编写)
1 | func (w *workerTask) CallMasterStart() { |
join方法里面最好是能建立一个socket到worker,不知道可不可以,如果不可以的话,master怎么给woker派任务?(用reply的值返回就行,socket开销太大)所以在worker中: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20func (c *Coordinator) WorkerJoin(args *ArgsToMaster, reply *ArgsFromMaster) error {
// 由于是并发的,所以需要考虑线程安全
c.mu.Lock()
c.workerList = append(c.workerList, args.Id)
// 回传一个任务地址回去
for i, taskItem := range c.taskList {
//fmt.Println(taskItem)
if taskItem.taskStatus == ready {
reply.Msg = taskItem.taskAddr + "=" + strconv.Itoa(c.nReducer) + "=" + strconv.Itoa(int(taskItem.taskType))
c.taskList[i].taskStatus = assign
c.taskList[i].taskWorker = args.Id
// 如果找到了就要解锁,不然其他线程获取不到锁
c.mu.Unlock()
return nil
}
}
c.mu.Unlock()
reply.Msg = ""
return nil
}
master维护一个任务列表,每个任务包含任务类型,文件地址,任务状态,分配到的wokerid
1 | type task struct { |
worker收到reply之后,应该在本地读取任务并运行
报错:
1 | gob: type mismatch: no fields matched compiling decoder for ArgsToMaster |
搜了一下说客户端所传的参数类型和服务端不一致
原来是因为之前修改了结构体名称后没有重新运行master。。。。。
master执行逻辑
按理说master给worker分了任务就已经可以了,但是,master有要求如下:
如果工作者在合理的时间内(在这个实验中,使用十秒)没有完成其任务,协调者应该注意到,并将相同的任务分配给不同的工作者。
所以master应当给每个任务维护一个计时器,只要10s到了任务还没有end,那这个任务就应该把状态改为ready。任务新增计时器的字段,并且每分配一次任务就要初始化一次计时器
在master的任务列表初始化结束后,开启一个新的携程,专门用于检查是否超时。此后每次任务完成回给master的时候,master都应该检查id是不是和列表中的相同。
原本修改的代码: 1
2
3
4
5
6
7
8
9
10
11
12
13
14// 用来检查有没有任务超时了
func (c *Coordinator) testTimeOut() {
for !c.Done() {
for i, taskItem := range c.taskList {
//fmt.Println("正在检查:", c.taskList[i])
if c.taskList[i].taskStatus == assign && time.Since(c.taskList[i].taskStart) >= 10*time.Second {
//fmt.Println("已到达")
fmt.Println("删除了:", taskItem.taskAddr, time.Since(taskItem.taskStart), "现在时间:", time.Now(), "任务开始时间", c.taskList[i].taskStart)
c.taskList[i].taskStatus = ready
c.taskList[i].taskWorker = -1
}
}
}
}
但是存在一个问题!就是在锁的机制上。简单地说,c里面的数据在多线程中是不安全的,每一个访问和修改都应该要加锁才对。
因此,在每一个if判断前后,都应该加上一个锁。但是这样是否就存在一个问题:所有的步骤都卡在这个锁上,降低了并行的效率,成为性能瓶颈。虽然在master上,这个锁并不一定会太影响,但是诸如检查任务是否超时的场景会经常请求访问。
所以我考虑不用之前定义的sync.Mutex,而是用读写锁RWMutex,这样至少可以多个同时读。最后写出来像这样: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// 用来检查有没有任务超时了
func (c *Coordinator) testTimeOut() {
for !c.Done() {
for i := range c.taskList {
//fmt.Println("正在检查:", c.taskList[i])
c.mu.RLock()
if c.taskList[i].taskStatus == assign && time.Since(c.taskList[i].taskStart) >= 10*time.Second {
fmt.Println("已到达")
c.mu.RUnlock()
c.mu.Lock()
fmt.Println("删除了:", c.taskList[i].taskAddr, time.Since(c.taskList[i].taskStart), "现在时间:", time.Now(), "任务开始时间", c.taskList[i].taskStart)
c.taskList[i].taskStatus = ready
c.taskList[i].taskWorker = -1
c.mu.Unlock()
c.mu.RLock() // 为了和最后的c.mu.RUnlock()配对,不然会出错
}
c.mu.RUnlock()
}
}
}
gpt 给了一版更好的,我也觉得更好看,但是我没有用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 func (c *Coordinator) testTimeOut() {
for !c.Done() {
for i := range c.taskList {
c.mu.RLock()
task := c.taskList[i] // 获取任务
taskStatus := task.taskStatus
taskStart := task.taskStart
c.mu.RUnlock()
if taskStatus == assign && time.Since(taskStart) >= 10*time.Second {
// 超时处理逻辑
c.mu.Lock()
if c.taskList[i].taskStatus == assign { // 双重检查避免竞态条件
fmt.Println("超时任务:", c.taskList[i].taskAddr, time.Since(taskStart))
c.taskList[i].taskStatus = ready
c.taskList[i].taskWorker = -1
}
c.mu.Unlock()
}
}
}
}
在Done里面,我们可以简单的把任务队列过一遍,如果所有的任务都已经变成finish,那么done返回true
1 | // main/mrcoordinator.go calls Done() periodically to find out |
worker执行逻辑
现在worker知道了任务类型和文件地址,应该使用map或者reduce函数进行操作,并将结果存到文件中。但仍然存在一个问题:我们并不是调完一次map或者reduce任务,worker就挂了,而是持续存在。因此,需要每执行完一个任务,就去找master要任务。