Appearance
MIT 6.824 Lab1: MapReduce
MapReduce 论文阅读
MapReduce 是一种编程模型和相关的实现,用于处理和生成大数据集。它通过将数据分割并在多台机器上并行处理,大大提高了数据处理的效率。
MapReduce 工作原理
MapReduce 过程主要由两个阶段组成:Map 和 Reduce。
- Map 阶段:
- 输入数据被自动划分为
M
个输入片段 (Input Splits),每个片段通常大小为 16-64MB。 - 这些输入片段可以由不同的机器并行处理。
- Map 函数接收一个
(k1, v1)
键值对作为输入,并输出一个中间的list(k2, v2)
键值对列表。
- 输入数据被自动划分为
- Reduce 阶段:
- Map 阶段产生的中间键空间通过分区函数(例如
hash(key) % R
)划分为R
个部分,以便进行分布式处理。 - Reduce 函数接收一个中间键
k2
和与该键相关的所有中间值列表list(v2)
作为输入,即(k2, list(v2))
。它将这些值合并,并输出聚合后的结果。通常,每次Reduce调用会产生零个或一个最终值,该值被追加到对应的输出文件中。
- Map 阶段产生的中间键空间通过分区函数(例如
MapReduce 架构
MapReduce 架构包含一个主节点 (Master) 和多个工作节点 (Worker)。这是一个经典的主从(Master-Slave)架构模式。
- 输入文件分割: MapReduce 首先将输入文件分割成
M
个片段。 - 任务分配:
- 主节点负责分配
M
个 Map 任务和R
个 Reduce 任务。 - 主节点选择空闲的工作节点,并为每个节点分配一个 Map 或 Reduce 任务。
- 主节点负责分配
- Map 任务执行:
- 被分配 Map 任务的工作节点会读取相应输入片段的内容。
- 从输入数据中解析出键值对,并将每个键值对传递给用户定义的 Map 函数。
- Map 函数产生的中间键值对
(k, v)
会在内存中暂时缓存。 - 这些缓冲数据会定期写入本地磁盘,并根据分区函数(例如
hash(key) % R
)划分成R
个区域。 - 本地磁盘上这些缓冲文件的位置信息会被传回给主节点。
- 数据传输与排序 (Shuffle):
- 当 Reduce 工作节点从主节点接收到这些位置信息时,会使用远程调用从 Map 工作节点的本地磁盘读取缓冲数据。
- Reduce 工作节点读取完所有中间键值对后,会根据中间键进行排序,以便将相同键的所有出现分组在一起。如果数据量太大无法放入内存,则使用外部排序。
- Reduce 任务执行:
- Reduce 工作节点遍历排序后的中间数据,对于遇到的每一个唯一中间键。
- 将键和对应的
list(v)
传递给用户定义的 Reduce 函数。 - Reduce 函数的输出会追加到这个 Reduce 分区的最终输出文件中。
- 任务完成: 所有 Map 任务和 Reduce 任务完成后,主节点唤醒应用程序,MapReduce 计算完成。
- 输出: 完成后的输出文件可在
R
个输出文件中获取,每个 Reduce 任务对应一个文件。这些文件通常不会合并成一个文件,而是作为输入传递给另一个 MapReduce 调用。
主节点数据结构
主节点维护多个数据结构来管理任务状态和信息:
- 任务状态: 对于每个 Map 任务和 Reduce 任务,主节点存储其状态(
idle
,in-progress
,completed
)以及执行该任务的工作机器标识(针对非空闲任务)。 - 中间文件位置: 主节点是传递中间文件区域位置信息的通道。对于每个完成的 Map 任务,主节点会存储该 Map 任务生成的
R
个中间文件区域的位置和大小。当 Map 任务完成后,主节点会接收到这些位置和大小的更新信息,并将这些信息逐步推送给正在执行 Reduce 任务的工作节点。
容错性
MapReduce 具有内置的容错机制:
工作节点故障
- 主节点定期向每个工作节点发送心跳。如果一个工作节点在一段时间内没有回应,主节点会将其标记为失败。
- 该失败工作节点已完成的任何 Map 任务都会重置回空闲状态,以便重新调度到其他工作节点上。这是因为 Map 任务的输出储存在失败机器的本地磁盘上,无法访问,因此需要在失败后重新执行。
- 类似地,任何在该失败工作节点上正在进行的 Map 任务或 Reduce 任务也会重置回空闲状态并可以重新调度。
- 已完成的 Reduce 任务不需要重新执行,因为它们的输出储存在全局文件系统当中。
- 如果一个 Map 任务首先由
worker_A
执行,然后因为worker_A
失败,worker_B
重新执行了它,所有执行 Reduce 任务的 worker 都会收到重复执行的通知。任何尚未从worker_A
读取数据的 Reduce 任务将从worker_B
中读取数据。
主节点故障
- 如果主节点死亡,可以从最后一个检查点的状态重新启动新的副本。
- 然而,鉴于只有一个主节点,故障的可能性相对较小。因此,在实际实现中,如果主节点失败,通常会直接终止 MapReduce 计算,客户端可以检查此条件,如果需要可以重试 MapReduce 操作。
Lab1 实现目标
你需要在 mr/coordinator.go
、mr/worker.go
和 mr/rpc.go
这几个文件中编写你的代码,实现一个简化的 MapReduce 系统。理想情况下,Map (M
) 和 Reduce (R
) 的数量应远大于工作机器的数量。
核心任务和要求
- Coordinator (协调器):
- 负责分配 Map 和 Reduce 任务给 Worker。
- 处理 Worker 失败的情况:例如,如果一个 Worker 在 10 秒内没有完成任务,协调器会重新分配给另一个 Worker。
- 当所有任务完成后,协调器需要退出。
- Worker (工作器):
- 循环地向协调器请求任务。
- 执行 Map 或 Reduce 函数。
- 将结果写入文件。
- 当协调器指示所有任务完成后,Worker 也需要退出。
- RPC 通信: 实现一个基于 RPC 的通信机制,让 Worker 可以和 Coordinator 交互。
- Map 阶段中间文件:
- Map 阶段需要将中间键分成
nReduce
个桶(partitions)。 - 每个 Mapper 生成
nReduce
个中间文件,每个文件对应一个 Reduce 桶。
- Map 阶段需要将中间键分成
- Reduce 任务输出: Reduce 任务的输出需要写入到
mr-out-X
文件中,并遵循特定的格式。 - 并发处理: 确保你的实现能够处理并发,并考虑使用锁来保护共享数据。
- 中间 KV 存储: 可以使用 Go 的
encoding/json
包来存储中间的key/value
对。 - 测试:
test-mr.sh
脚本会用来测试你的实现,确保它产生正确的输出,支持并行,并能从 Worker 崩溃中恢复。
个人实现
1. 定义数据模型与服务契约
开始编码前,首先要定义好系统的核心“实体”和它们之间的“通信契约”。
数据模型 (Data Models): 从论文中可以得出,正在运行中的任务主要有两种类型:MapTask 和 ReduceTask。由于本作业通过 RPC 进行通信,Worker 节点本身不主动执行操作,而是向 Coordinator(协调器,后文称主节点)请求任务。因此,Worker 的状态由主节点告知,主节点会指示 Worker 执行 MapTask、ReduceTask,或者进入 WaitingTask(等待任务)或 ExitingTask(退出)状态。基于此,我们定义了以下任务类型:
go
type TaskType int
const (
MapTask TaskType = iota
ReduceTask
WaitingTask
ExitingTask
)
对于任务本身的定义,除了 TaskType
,还需要一个唯一的 TaskId
来标识任务,任务需要处理的 File
列表,Reduce 任务的总数 NReduce
,以及 Map 任务的总数 NMap
。
为了更好地管理任务状态,我们定义了 TaskStateInfo
结构体。它不仅包含了任务类型和指向任务本身的指针,还记录了任务的 StartTime
(用于实现超时重试机制)和执行该任务的 WorkerId
。
go
type Task struct {
TaskType TaskType // 任务种类
TaskId int // 任务id
File []string // 任务处理的文件列表
NReduce int // Reduce任务的数量
NMap int // Map任务的数量
}
// TaskStateInfo 任务状态信息
type TaskStateInfo struct {
TaskType TaskType // 任务类型
StartTime time.Time // 任务开始时间用于检测超时
Task *Task // 任务信息
WorkerId int // 任务分配给的Worker ID
}
服务契约 (RPC Contracts): 基于上述数据模型,我们设计 RPC 接口。
Worker 请求任务时,需要提供其 WorkerId
:
go
type RequestTaskArgs struct {
WorkerId int // Worker的ID
}
Coordinator 的响应 RequestTaskReply
需要包含任务的类型 (TaskType
)、任务ID (TaskId
)、待处理的文件列表 (File
)、Reduce 任务总数 (NReduce
) 和 Map 任务总数 (NMap
):
go
type RequestTaskReply struct {
TaskType TaskType // 任务类型:Map, Reduce, Waiting, Exiting
TaskId int // 任务ID
File []string // 对于Map任务,是输入文件;对于Reduce任务,是中间文件
NReduce int // Reduce任务的数量
NMap int // Map任务的数量
}
当 Worker 完成任务后,通过 FinishTaskArgs
告知 Coordinator 其 WorkerId
、完成的任务类型 (TaskType
) 和任务ID (TaskId
)。Coordinator 的响应 FinishTaskReply
可以为空结构体:
go
type FinishTaskArgs struct {
WorkerId int // Worker的ID
TaskType TaskType // 完成的任务类型
TaskId int // 完成的任务ID
}
type FinishTaskReply struct {
}
这样,RPC 的请求与响应数据结构就设计完成了。
2. Worker组件实现
Worker 是一个无状态的计算组件,其行为模式很简单:在一个循环中不断向 Coordinator 请求任务,执行任务,然后上报结果。
在动手实现 Worker 之前,我们先分析了实验提供的 mrsequential.go
这个单机版 MapReduce 实现。它清晰地展示了 Map 和 Reduce 两个阶段的完整流程:
- Map阶段逻辑: 读取输入文件 -> 调用用户定义的
mapf
函数 -> 将产生的键值对追加到intermediate
列表中。
go
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v\", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v\", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
这部分代码负责读取所有输入文件,并将内容传递给 mapf
函数,收集所有产生的中间键值对。
- Reduce阶段逻辑: 对
intermediate
列表中的键值对按键进行排序 -> 遍历排序后的列表,对具有相同键的键值对进行分组 -> 将每个键及其对应的值列表传递给用户定义的reducef
函数 -> 将reducef
的输出写入到最终的输出文件中。
go
sort.Sort(ByKey(intermediate))
oname := "mr-out-0"
ofile, _ := os.Create(oname)
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
ofile.Close()
}
这部分代码负责对中间键值对进行排序和聚合,然后调用 reducef
函数处理,并将结果写入输出文件。
理解了单机版的实现后,我们开始编写分布式 Worker。 Worker 的主要逻辑是:
- 生成一个唯一的
workerId
(这里使用进程ID)。 - 进入一个无限循环,在循环中: a. 向 Coordinator 请求任务。 b. 根据收到的任务类型执行相应的操作(Map、Reduce、等待或退出)。 c. 任务完成后,通知 Coordinator。
go
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// 根据进程生成一个简单的Worker ID
workerId := os.Getpid()
log.Printf("Worker %d 启动", workerId)
// Worker主循环
for {
// 1. 向Coordinator请求任务
task := requestTask(workerId)
// 2. 根据任务类型处理
switch task.TaskType {
case MapTask:
log.Printf("Worker %d 收到Map任务 %d", workerId, task.TaskId)
doMapTask(task, mapf)
case ReduceTask:
log.Printf("Worker %d 收到Reduce任务 %d", workerId, task.TaskId)
doReduceTask(task, reducef)
case WaitingTask:
log.Printf("Worker %d 没有任务,等待中...", workerId)
time.Sleep(time.Second) // 等待1秒后重试
continue
case ExitingTask:
log.Printf("Worker %d 收到退出信号", workerId)
return
}
// 3. 完成任务后通知Coordinator
finishTask(workerId, task.TaskType, task.TaskId)
}
}
requestTask
函数负责向 Coordinator 发起 RPC 请求以获取任务:
go
func requestTask(workerId int) *RequestTaskReply {
// 1. 准备RPC参数
args := RequestTaskArgs{
WorkerId: workerId,
}
// 2. 准备接收回复的结构
reply := RequestTaskReply{}
// 3. 发起RPC调用
ok := call("Coordinator.RequestTask", &args, &reply)
if !ok {
log.Printf("Worker %d 请求任务失败", workerId)
// 如果RPC失败,说明Coordinator可能已经退出,Worker也应该退出
reply.TaskType = ExitingTask
}
return &reply
}
finishTask
函数负责在任务完成后通知 Coordinator:
go
func finishTask(workerId int, taskType TaskType, taskId int) {
// 1. 准备RPC参数
args := FinishTaskArgs{
WorkerId: workerId,
TaskType: taskType,
TaskId: taskId,
}
// 2. 准备接收回复的结构
reply := FinishTaskReply{}
// 3. 发起RPC调用
ok := call("Coordinator.FinishTask", &args, &reply)
if !ok {
log.Printf("Worker %d 通知任务完成失败", workerId)
}
}
doMapTask实现要点: doMapTask
函数负责执行 Map 任务。为了应对 Worker 可能随时崩溃的情况,其实现需要保证操作的原子性。
- 执行Map: 读取 Coordinator 分配的输入文件,调用用户提供的
mapf
函数,得到内存中的中间键值对列表。 - 创建临时输出: 为
nReduce
个 Reduce 分区创建nReduce
个临时文件。直接写入最终命名的文件是危险的,因为写入过程中进程可能崩溃,导致文件损坏或不完整。使用临时文件可以避免这个问题。 - 分区与写入: 遍历内存中的键值对,根据
ihash(key) % nReduce
决定每个键值对所属的 Reduce 分区,并使用encoding/json
包将键值对编码后写入对应的临时文件。这样可以确保具有相同键的键值对最终会被同一个 Reduce 任务处理。如果存在 M 个 Map 任务和 R 个 Reduce 任务,此阶段总共会产生 M * R 个中间文件。 - 原子性提交: 所有数据成功写入所有临时文件后,通过
os.Rename
操作将每个临时文件重命名为最终的中间文件名(格式如mr-X-Y
,其中 X 是 Map 任务 ID,Y 是 Reduce 任务 ID)。os.Rename
在大多数文件系统上是原子操作,这确保了只有当所有数据都完整写入后,文件才对后续的 Reduce 任务可见。如果重命名失败,只会留下一个无用的临时文件,不会影响任务的重试。
go
func doMapTask(task *RequestTaskReply, mapf func(string, string) []KeyValue) {
log.Printf("开始执行Map任务 %d,处理文件: %s", task.TaskId, task.File[0])
// 1. 打开并读取输入文件
filename := task.File[0]
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Worker打开文件 %s 失败: %v", filename, err)
}
defer file.Close()
// 2. 读取文件内容
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("Worker读取文件 %s 失败: %v", filename, err)
}
// 3. 调用mapf获取键值对
log.Printf("调用Map函数处理文件 %s", filename)
intermediate := mapf(filename, string(content))
log.Printf("Map函数返回 %d 个键值对", len(intermediate))
// 4. 创建nReduce个临时文件,用于存储中间结果
nReduce := task.NReduce
outFiles := make([]*os.File, nReduce)
fileEncs := make([]*json.Encoder, nReduce)
// 5. 为每个Reduce任务创建临时文件(在当前目录创建)
for outindex := 0; outindex < nReduce; outindex++ {
// 在当前目录创建临时文件
outFiles[outindex], err = ioutil.TempFile(".", "mr-tmp-*")
if err != nil {
log.Fatalf("Worker创建临时文件失败: %v", err)
}
fileEncs[outindex] = json.NewEncoder(outFiles[outindex])
}
// 6. 将键值对根据hash分配到不同的临时文件中
for _, kv := range intermediate {
// 计算这个key应该分配给哪个reduce任务
outindex := ihash(kv.Key) % nReduce
// 将键值对编码为JSON并写入对应的临时文件
err := fileEncs[outindex].Encode(&kv)
if err != nil {
log.Fatalf("Worker编码KeyValue失败: %v", err)
}
}
// 7. 关闭临时文件并原子重命名为最终的中间文件
for outindex, file := range outFiles {
originalPath := file.Name() // 获取临时文件的原始路径
file.Close()
// 构建最终文件名: mr-X-Y (X是Map任务ID,Y是Reduce任务ID)
outname := fmt.Sprintf("mr-%d-%d", task.TaskId, outindex)
// 原子重命名操作,确保文件完整性
err := os.Rename(originalPath, outname)
if err != nil {
log.Fatalf("Worker重命名文件 %s 到 %s 失败: %v", originalPath, outname, err)
}
}
log.Printf("Map任务 %d 完成,生成了 %d 个中间文件", task.TaskId, nReduce)
}
doReduceTask实现要点: doReduceTask
函数负责执行 Reduce 任务,同样需要保证输出的原子性。
- 拉取数据(Shuffle): 遍历所有 Map 任务的 ID(从 0 到
NMap
),根据命名约定(mr-mapID-reduceID
,其中reduceID
是当前 Reduce 任务的 ID)拼接出需要读取的中间文件名,并打开这些文件。如果某个文件无法打开(可能由于对应的 Map 任务失败或尚未完成),记录警告并跳过该文件,继续处理其他文件。 - 解码与聚合: 从所有成功打开的中间文件中解码出 JSON 格式的键值对,并将它们全部加载到内存中的
intermediate
列表。 - 排序: 对内存中的
intermediate
列表按键(Key)进行排序,以便将具有相同键的键值对聚合在一起。 - 执行Reduce: 遍历排序后的列表,对于每个唯一的键,收集其所有的值(Value),然后将键和对应的值列表传递给用户定义的
reducef
函数。 - 原子性提交: 与 Map 任务类似,先将
reducef
函数的输出写入一个临时文件。在写入完成后,调用tempFile.Sync()
确保数据已刷到磁盘,然后关闭临时文件。最后,通过os.Rename
原子性地将临时文件重命名为最终的输出文件(格式如mr-out-X
,其中 X 是 Reduce 任务 ID)。
go
func doReduceTask(task *RequestTaskReply, reducef func(string, []string) string) {
log.Printf("开始执行Reduce任务 %d,需要处理 %d 个Map任务的输出", task.TaskId, task.NMap)
// 1. 读取所有相关的中间文件 mr-X-Y (Y是当前Reduce任务的ID)
intermediate := []KeyValue{}
for mapIndex := 0; mapIndex < task.NMap; mapIndex++ {
// 构建中间文件名:mr-mapIndex-reduceIndex
filename := fmt.Sprintf("mr-%d-%d", mapIndex, task.TaskId)
file, err := os.Open(filename)
if err != nil {
log.Printf("警告:无法打开中间文件 %s: %v", filename, err)
continue // 继续处理其他文件
}
// 解码JSON数据
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break // 文件读取完毕或发生错误
}
intermediate = append(intermediate, kv)
}
file.Close()
}
log.Printf("Reduce任务 %d 共读取到 %d 个键值对", task.TaskId, len(intermediate))
// 2. 按Key排序
sort.Sort(ByKey(intermediate))
// 3. 创建临时输出文件(在当前目录创建)
tempFile, err := ioutil.TempFile(".", "mr-out-tmp-*")
if err != nil {
log.Fatalf("Reduce任务 %d 创建临时文件失败: %v", task.TaskId, err)
}
defer tempFile.Close() // 确保临时文件最终被关闭
// 4. 对相同的Key进行Reduce操作并写入临时文件
i := 0
for i < len(intermediate) {
j := i + 1
// 找到所有相同Key的值
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
// 收集所有相同Key的Value
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
// 调用Reduce函数
output := reducef(intermediate[i].Key, values)
// 写入输出文件(格式:key value)
fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
// 5. 确保数据写入磁盘
err = tempFile.Sync()
if err != nil {
log.Fatalf("Reduce任务 %d 同步临时文件失败: %v", task.TaskId, err)
}
originalPath := tempFile.Name() // 获取临时文件的原始路径
tempFile.Close() // 关闭文件后才能重命名
// 6. 原子重命名为最终输出文件
outname := fmt.Sprintf("mr-out-%d", task.TaskId)
err = os.Rename(originalPath, outname)
if err != nil {
log.Fatalf("Reduce任务 %d 重命名输出文件 %s 到 %s 失败: %v", task.TaskId, originalPath, outname, err)
}
log.Printf("Reduce任务 %d 完成,输出文件: %s", task.TaskId, outname)
}
至此,Worker 组件的功能就完成了。
3. Coordinator组件实现
Coordinator 是整个 MapReduce 系统的状态管理和任务调度中心,其设计必须重点考虑并发安全和容错。
任务队列: 因为目的是管理待分配的任务,所以会想到并发+FIFO,也就是java当中的阻塞队列。 考虑到 Go 语言中 channel
的特性(原生的、支持并发操作的 FIFO 队列),我们使用 channel
来实现这个任务队列。这提供了一种高效且线程安全的方式来存储和分发任务。将channel作为存放任务的“阻塞队列”来使用。
go
type TaskQueue struct {
// 任务队列,使用channel实现
taskChan chan *TaskStateInfo
}
func NewTaskQueue(capacity int) *TaskQueue {
return &TaskQueue{
taskChan: make(chan *TaskStateInfo, capacity),
}
}
// offer 尝试将任务添加到队列中,如果队列已满则返回false (非阻塞)
func (tq *TaskQueue) offer(task *TaskStateInfo) bool {
select {
case tq.taskChan <- task: // 如果队列未满,添加任务
return true
default: // 如果队列已满
log.Println("TaskQueue is full, cannot add task")
return false
}
}
// poll 尝试从队列中获取任务,如果没有任务则返回nil (非阻塞)
func (tq *TaskQueue) poll() *TaskStateInfo {
select {
case task := <-tq.taskChan: // 非阻塞获取任务
return task
default: // 如果队列为空
return nil
}
}
// blockingPoll 从队列中获取任务 (阻塞直到有任务可用)
func (tq *TaskQueue) blockingPoll() *TaskStateInfo {
return <-tq.taskChan
}
状态管理与并发控制: Coordinator 作为一个有状态的服务,其内部的共享数据(如 runningTasks
映射、任务计数器等)必须被保护,以防止多个 goroutine 并发访问时出现数据竞争。这里我们使用了 sync.RWMutex
(读写锁)来保证线程安全。读写锁允许多个 goroutine 同时进行读操作,但在有 goroutine 进行写操作时会阻塞其他所有读写操作,这有助于提高并发性能,同时避免了对任务完成状态进行阻塞式轮询。
go
type Coordinator struct {
taskQueue *TaskQueue // 任务队列,使用channel实现的高效队列
runningTasks map[int]*TaskStateInfo // TaskId -> TaskStateInfo (记录正在运行的任务及其状态)
NReduce int // Reduce任务的总数量
NMap int // Map任务的总数量
MapTaskFinished int // 已完成的Map任务数量
ReduceTaskFinished int // 已完成的Reduce任务数量
AllTasksDone bool // 标志所有Map和Reduce任务是否都已完成
mutex sync.RWMutex // 读写锁,用于保护Coordinator的共享状态
}
Coordinator初始化 (MakeCoordinator
): MakeCoordinator
函数负责初始化 Coordinator。
- 创建
Coordinator
结构体实例,包括初始化任务队列、runningTasks
映射、Map/Reduce 任务总数、已完成任务计数器和AllTasksDone
标志。 - 为每个输入文件创建一个 Map 任务,并将其封装在
TaskStateInfo
中,然后将这些 Map 任务添加到任务队列taskQueue
中。 - 启动一个后台 goroutine
taskTimeoutChecker
,用于监控任务执行是否超时。 - 调用
server()
方法启动 RPC 服务,使 Coordinator 能够接收来自 Worker 的请求。
go
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := &Coordinator{
taskQueue: NewTaskQueue(1000), // 初始化任务队列,容量为1000
runningTasks: make(map[int]*TaskStateInfo),
NReduce: nReduce,
NMap: len(files),
MapTaskFinished: 0,
ReduceTaskFinished: 0,
AllTasksDone: false,
mutex: sync.RWMutex{},
}
// 1. 创建Map任务并放入队列
for i, file := range files {
task := &Task{
TaskType: MapTask,
TaskId: i,
File: []string{file}, // 每个Map任务处理一个文件
NReduce: nReduce,
NMap: len(files),
}
taskState := &TaskStateInfo{
TaskType: MapTask,
Task: task,
}
c.taskQueue.offer(taskState)
log.Printf("创建Map任务 %d: %s", i, file)
}
log.Printf("Coordinator初始化完成: %d个Map任务, %d个Reduce任务", len(files), nReduce)
// 启动任务超时检查协程
go c.taskTimeoutChecker()
c.server() // 启动RPC服务
return c
}
容错机制(超时重试 taskTimeoutChecker
): 为了处理 Worker “假死”或失联的情况,Coordinator 启动了一个独立的 goroutine taskTimeoutChecker
作为后台监控。
- 该 goroutine 使用一个定时器(例如,每10秒触发一次)定期检查,类似java中使用ExecutorService但是更轻量级。
- 在每次检查时,它会遍历
runningTasks
中记录的所有正在执行的任务。 - 如果发现某个任务的执行时间(
time.Now().Sub(taskState.StartTime)
)超过了预设的阈值(例如,20秒),则认为该任务执行超时。 - 对于超时的任务,Coordinator 会将其从
runningTasks
中移除,并将其重新放回taskQueue
任务队列中,等待被其他健康的 Worker 领取并重新执行。在重新加入队列前,会重置任务的StartTime
和WorkerId
。 - 如果所有任务(
c.AllTasksDone
)已经完成,则该监控协程退出。
go
func (c *Coordinator) taskTimeoutChecker() {
// 使用定时器每10s检查一次任务状态
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
c.mutex.RLock() // 使用读锁检查AllTasksDone
if c.AllTasksDone {
c.mutex.RUnlock()
return // 所有任务完成,退出检查
}
c.mutex.RUnlock()
<-ticker.C // 等待定时器触发
c.mutex.Lock() // 加写锁以修改runningTasks和taskQueue
now := time.Now()
var timeoutTasks []*TaskStateInfo
// 检查运行中的任务是否超时(例如,20秒)
for taskId, taskState := range c.runningTasks {
if now.Sub(taskState.StartTime) > 20*time.Second {
log.Printf("任务 %d (类型: %d, Worker: %d) 超时,重新加入队列", taskId, taskState.TaskType, taskState.WorkerId)
timeoutTasks = append(timeoutTasks, taskState)
delete(c.runningTasks, taskId) // 从正在运行的任务中移除
}
}
// 将超时的任务重新加入队列
for _, taskState := range timeoutTasks {
taskState.StartTime = time.Time{} // 重置开始时间
taskState.WorkerId = 0 // 重置Worker ID
c.taskQueue.offer(taskState) // 重新加入任务队列
}
c.mutex.Unlock()
}
}
检查作业完成状态 (Done
): Done
方法用于供外部(例如测试脚本)检查整个 MapReduce 作业是否已经完成。它通过读取 AllTasksDone
标志位来实现,并使用读锁保护。
go
func (c *Coordinator) Done() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.AllTasksDone
}
RPC处理器 (RequestTask
和 FinishTask
): RequestTask
和 FinishTask
是 Coordinator 暴露给 Worker 的两个主要 RPC 端点。它们的实现逻辑都需要在互斥锁(这里是读写锁 c.mutex
)的保护之下,以确保对共享状态的修改是原子和一致的。
RequestTask
RPC 处理器: 当 Worker 调用 RequestTask
请求任务时:
- 首先(使用读锁)检查
c.AllTasksDone
标志。如果所有任务已完成,则回复ExitingTask
,通知 Worker 退出。 - 如果任务尚未全部完成,则尝试从
c.taskQueue
中(非阻塞地)取出一个任务(taskState := c.taskQueue.poll()
)。 - 如果队列为空(没有可分配的任务),则回复
WaitingTask
,让 Worker稍后重试。 - 如果成功获取到任务,则填充
RequestTaskReply
的各个字段(TaskType
,TaskId
,File
,NReduce
,NMap
)。 - 记录该任务的
WorkerId
(请求任务的 Worker)和StartTime
(当前时间)。 - 将该任务(
taskState
)添加到c.runningTasks
映射中,表示该任务正在被执行。此步骤需要写锁。
go
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
c.mutex.RLock() // 先用读锁检查AllTasksDone
if c.AllTasksDone {
c.mutex.RUnlock()
reply.TaskType = ExitingTask
log.Printf("所有任务已完成,通知Worker %d 退出", args.WorkerId)
return nil
}
c.mutex.RUnlock()
// 1. 从队列中获取任务 (非阻塞)
taskState := c.taskQueue.poll()
// 2. 如果没有任务,返回等待状态
if taskState == nil {
reply.TaskType = WaitingTask
return nil
}
// 3. 填充回复信息
reply.TaskType = taskState.askType // 注意这里应该是 taskState.TaskType
reply.TaskId = taskState.Task.TaskId
reply.File = taskState.Task.File
reply.NReduce = taskState.Task.NReduce
reply.NMap = taskState.Task.NMap
// 4. 记录任务分配信息并更新状态 (需要写锁)
c.mutex.Lock()
defer c.mutex.Unlock()
// 再次检查任务是否已经被分配或者已经完成(防止并发问题或任务已被重新调度)
// 这一步是为了确保取出的任务仍然有效且未被其他goroutine处理
if ts, ok := c.runningTasks[taskState.Task.TaskId]; ok && ts.TaskType == taskState.Task.TaskType {
// 任务已在运行或已被重新加入队列但未被正确移除,让worker等待
// 或者简单地将任务重新放回队列,让worker下次再取
log.Printf("任务 %d (类型: %d) 已在运行或状态异常,让Worker %d 等待", taskState.Task.TaskId, taskState.Task.TaskType, args.WorkerId)
c.taskQueue.offer(taskState) // 将任务放回队列
reply.TaskType = WaitingTask
return nil
}
taskState.WorkerId = args.WorkerId
taskState.StartTime = time.Now()
c.runningTasks[taskState.Task.TaskId] = taskState // 将任务加入运行中的任务列表
log.Printf("分配任务给Worker %d: TaskType=%d, TaskId=%d, 文件:%v", args.WorkerId, reply.TaskType, reply.TaskId, reply.File)
return nil
}
FinishTask
RPC 处理器: 当 Worker 调用 FinishTask
通知任务完成时(需要写锁保护整个过程):
- 记录 Worker 完成任务的信息。
- 从
c.runningTasks
映射中移除已完成的任务。 - 根据完成的任务类型 (
args.TaskType
) 进行处理:- 如果是 MapTask:
- 增加
c.MapTaskFinished
计数。 - 记录日志,显示 Map 任务完成进度。
- 检查是否所有 Map 任务都已完成 (
c.MapTaskFinished == c.NMap
)。如果是,则调用c.createReduceTasks()
方法来创建并分发 Reduce 任务,标志着整个作业进入 Reduce 阶段。
- 增加
- 如果是 ReduceTask:
- 增加
c.ReduceTaskFinished
计数。 - 记录日志,显示 Reduce 任务完成进度。
- 检查是否所有 Reduce 任务都完成
- 增加
- 如果是 MapTask:
go
func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
c.mutex.Lock()
defer c.mutex.Unlock()
log.Printf("Worker %d完成了任务: TaskType=%d, TaskId=%d", args.WorkerId, args.TaskType, args.TaskId)
// 1. 从运行中的任务列表中移除
delete(c.runningTasks, args.TaskId)
// 2. 根据任务类型处理完成逻辑
switch args.TaskType {
case MapTask:
c.MapTaskFinished++
log.Printf("Map任务 %d 已完成,Worker %d (进度: %d/%d)", args.TaskId, args.WorkerId, c.MapTaskFinished, c.NMap)
// 检查是否所有Map任务都完成了
if c.MapTaskFinished == c.NMap {
log.Printf("所有Map任务已完成,开始创建Reduce任务")
c.createReduceTasks()
}
case ReduceTask:
c.ReduceTaskFinished++
log.Printf("Reduce任务 %d 已完成,Worker %d (进度: %d/%d)", args.TaskId, args.WorkerId, c.ReduceTaskFinished, c.NReduce)
// 检查是否所有Reduce任务都完成了
if c.ReduceTaskFinished == c.NReduce {
log.Printf("所有Reduce任务已完成,MapReduce作业完成!")
c.AllTasksDone = true
}
default:
log.Printf("未知任务类型完成: %d", args.TaskType)
}
return nil
}
// createReduceTasks 创建Reduce任务
func (c *Coordinator) createReduceTasks() {
for i := 0; i < c.NReduce; i++ {
task := &Task{
TaskType: ReduceTask,
TaskId: i,
File: []string{}, // Reduce任务不需要特定的输入文件,会自动查找中间文件
NReduce: c.NReduce,
NMap: c.NMap,
}
taskState := &TaskStateInfo{
TaskType: ReduceTask,
Task: task,
}
c.taskQueue.offer(taskState)
log.Printf("创建Reduce任务 %d", i)
}
}
至此lab1完成。