Blog


MIT DCC, Lab 1

Creating a simple distributed MapReduce program


MapReduce is a simple method of processing tons of data on large clusters of machines.

It’s constructed as folows:

  1. Write a map function that takes in a key value pair, and produces multiple intermediate key value pairs
  2. Write a reduce function that groups all intermediate key value pairs with the same key, and emits a final value

Map:(k1,v1)list(k2,v2)Map: (k1, v1) \rightarrow list(k2, v2)

Reduce:(k2,list(v2))list(v2)Reduce: (k2, list(v2)) \rightarrow list(v2)

An example program could be a word counter program. This program would take in lots of text files and output how many times each word appears across all the documents.

Map: Take file contents and emit “XXX 1” pairs for each word in the text

Reduce: Count every instance of “XXX 1”. That’s the number of times XXX appears in the text. Boom, there’s your word count.

This may seem inefficient at first. That’s because it is. However, the goal of MapReduce isn’t efficiency, it’s parallelization.

(Simplified solution) Let’s say you’re doing word counting on 1TB of text, and you only have 4GB of ram for each compute instance. During the map operation, split the result of the Map into 250 buckets per map task and emit to file “Result-MapTaskX-BucketY”. Use a hash function so each word always goes into the same bucket (ex. “Hello” always goes to bucket 148). Then to reduce bucket Y, find all instances of “Result-*-Y”, read their intermediate keys and combine them into one big bucket (which will fit into mem), then run the reduce function to produce “Result-Y”.

This construction would allow you to perform up to 250 operations simultaneously without fear of collisions.

(In the real world this would be much more complex. I would do local word count aggregation instead of emitting an entry per word, I would have the reducer stream data and keep an accumulator instead of aggregating then sorting then reducing, I would compress the output of maps before writing to disk, I would keep bucket sizes much smaller than 4G etc)

The theoretical design is how my implementation works. Below, I’ve removed the task-specific code for brevity and left the MapReduce logic in tact.

First, you have tasks.

type Task struct {
	InProgress bool
	Completed  bool
	ID         int

	timer *time.Timer
	lock  sync.Mutex
}

func (t *Task) createWatcher() *time.Timer {
	return time.AfterFunc(10*time.Second, func() {
		t.lock.Lock()
		t.InProgress = false
		t.lock.Unlock()
	})
}

func (t *Task) Start() {
	t.lock.Lock()
	defer t.lock.Unlock()

	if t.timer != nil {
		t.timer.Stop()
	}

	t.InProgress = true
	t.timer = t.createWatcher()
}

func (t *Task) Complete() {
	t.lock.Lock()
	defer t.lock.Unlock()

	t.Completed = true
	if t.timer != nil {
		t.timer.Stop()
	}
	t.InProgress = false
}

When you start a task, it will mark itself in progress. After 10 seconds if it hasn’t been completed, it unmarks itself so the coordinator knows it can reassign it.

Then you have the coordinator:

type Coordinator struct {
	// Map tasks are filename -> task pairs
	// Reduce tasks are index -> task pairs. Reduce task at index 0 looks for all mr-*-0 files and outputs to mr-out-0	
	MapTasks    map[string]*Task
	ReduceTasks []*Task
	// Number of buckets.
	nReduce     int
	lock        sync.Mutex
}

func (c *Coordinator) GetNextTask(request *EmptyRequest, reply *NextTask) error {
	c.lock.Lock()
	defer c.lock.Unlock()

	if !c.mapTasksAreDone() {
		nextMapTask, filePath := c.getNextMapTask()

		if nextMapTask == nil {
			// Return "wait"
		}

		// Return new map task
	

	if !c.reduceTasksAreDone() {
		nextReduceTask, bucket := c.getNextReduceTask()

		if nextReduceTask == nil {
			// Return "wait"
		}

		// Return next reduce task
	}

	// Both map and reduce tasks are done, so tell worker to exit
	reply.Type = TaskTypeDone

	return nil
}

// In the real code, this func has variations for map vs reduce tasks
func (c *Coordinator) CompleteTask(request *CompleteTaskRequest, reply *EmptyRequest) error {
	// Mark task as complete
}

The coordinator acts as a source of truth, but doesn’t do anything unless asked. Workers call GetNextTask and CompleteTask via RPC. Coordinator handles internal state.

Finally you have the workers


func runMap(mapf func(string, string) []KeyValue, filePath string, ID int, nReduce int) error {
	// Run map logic
}

func runReduce(reducef func(string, []string) string, NMap int, bucket int) error {
	// Run reduction logic
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		reply := NextTask{}
		RequestNextTask(&reply)

		if reply.Type == TaskTypeMap {
			// call runMap
		}

		if reply.Type == TaskTypeReduce {
			// call runReduce
		}

		if reply.Type == TaskTypeDone {
			return
		}

		time.Sleep(50 * time.Millisecond)
	}
}

Workers simply ask for work at regular intervals, then do what they’re told.

The beauty of this system is how many problems can be expressed using this simple construction. The lab said it would take 6-8 hours, but I managed to get it in 4. Hooray!


Alright so what did I learn from this lab.

Specifics

  • How to use remote procedure calls in go
  • Usage of files as temp storage in distributed systems when processing lots of data
  • Worker / coordinator pattern
  • Usage of remote procedure calls for centralization of state
  • Bucketing strategy. When tons of data from disparate sources needs to be grouped but processing is parallel, create buckets, use a hash func to decide which bucket data goes into. Buckets can be recombined later.
  • Using timeouts in go. At first I used context.WithTimeout, then I did time.AfterFunc. Both useful functions to know.
  • Practiced localization of lock state. Locks should be held as close as possible to the data they’re locking (ex. tasks should hold their own locks and expose methods, not the coordinator).

General lessons

  • Thinking about problems as independent steps. For distributed problems, you have to think about pure inputs and outputs. Stages of processing should be independent of one another, and only care about the output the last stage produces. A functional-programming mindset is useful.
  • Failure based thinking. Assuming that every step of the process is guaranteed to fail, and having a backup plan. This is a good way to approach anything.