In this assignment (which will be completed individually), you'll build a MapReduce library as a way to learn the Go programming language and as a way to learn about fault tolerance in distributed systems. In the first part you will write a simple MapReduce program. In the second part you will write the distributed MapReduce logic (map and reduce task execution), plus a Master that hands out jobs to workers and handles failures of workers. The interface to the library and the approach to fault tolerance is similar to the one described in the original MapReduce paper.
You must write all the code you hand in for CS134, except for code that we give you as part of the assignment. You may discuss clarifications about the assignments with other students, but you may not discuss solutions or look at or copy each others' code. Please do not publish your code or make it available to future CS134 students -- for example, please do not make your code visible on GitHub. Please see the main class homepage for information about the use of AI.
This assignment has been revamped following feedback from last year. As a result, please be patient with us as we work out any unexpected skeleton code and test case bugs, ambiguous guidelines, or unclear instructions. If you find anything that seems off, incorrect, wrong, unclear, or otherwise needs more clarification, please do not hesitate to contact us via Piazza, office hours, discussion, or lecture!
Setup a private Git repository: To start this assignment, you should duplicate the (public) assignment 1 git repository into a private repository of your own. The assignment 1 repository can be found here: ucla-progsoftsys/cs134-assignment1-skeleton . Your own private copy of the assignment 1 repository is called a mirror repository. Each individual student should have their own private mirror repository where they commit their changes.
$ git clone --mirror git@github.com:ucla-progsoftsys/cs134-assignment1-skeleton.git
$ cd cs134-assignment1-skeleton.git
$ git push --mirror git@github.com:${USER_NAME}/assignment1.git
$ git remote set-url --push origin git@github.com:${USER_NAME}/assignment1.git
$ cd ..
$ git clone git@github.com:${USER_NAME}/assignment1.git
$ rm -rf cs134-assignment1-skeleton.git
$ cd assignment1/
$ git remote -v
origin git@github.com:${USER_NAME}/assignment1.git (fetch)
origin git@github.com:${USER_NAME}/assignment1.git (push)
Now you have your own copy of the original ucla-progsoftsys/cs134-assignment1-skeleton repository.
Git allows you to keep track of the changes you make to the code. For example, if you want to checkpoint your progress, you can commit and push your changes by running:
# make some changes $ git status $ git commit -am 'Added partial solution to assignment 1' $ git push
The repository is organized into two directories:
In this part, you will implement a simple word count program using the MapReduce pattern to understand the overall Map and Reduce paradigms before moving to the distributed version, all running sequentially. You will work in the sequential/ directory.
Before you start coding, read Section 2 of the MapReduce paper. Your Map() and Reduce() functions will differ a bit from those in the paper's Section 2.1. Your Map() will be passed the entire file content as a single string value; it should split it into words, and return a list.List of key/value pairs, of type KeyValue. Your Reduce() will be called once for each unique key, with a list of all the values generated by Map() for that key; it should return a single output string.
Open sequential/wc.go and implement the three functions marked with // Your code here:
You can test your implementation by running:
$ cd sequential $ go test -v === RUN TestPartI_MapSmallInput --- PASS: TestPartI_MapSmallInput (0.00s) === RUN TestPartI_ReduceSmallInput --- PASS: TestPartI_ReduceSmallInput (0.00s) === RUN TestPartI_DoMapReduceSmallInput --- PASS: TestPartI_DoMapReduceSmallInput (0.00s) === RUN TestPartI_DoMapReduceKJV12Input --- PASS: TestPartI_DoMapReduceKJV12Input (0.17s) === RUN TestPartI_MapRandomInput --- PASS: TestPartI_MapRandomInput (0.41s) === RUN TestPartI_ReduceRandomInput --- PASS: TestPartI_ReduceRandomInput (0.00s) === RUN TestPartI_DoMapReduceRandomInput --- PASS: TestPartI_DoMapReduceRandomInput (0.07s) === RUN TestPartI_DoMapReduceSingleWord --- PASS: TestPartI_DoMapReduceSingleWord (0.00s) === RUN TestPartI_DoMapReduceCaseSensitive --- PASS: TestPartI_DoMapReduceCaseSensitive (0.00s) === RUN TestPartI_MapWhitespaceOnly --- PASS: TestPartI_MapWhitespaceOnly (0.00s) === RUN TestPartI_ReduceSingleValue --- PASS: TestPartI_ReduceSingleValue (0.00s) === RUN TestPartI_MapNoLetters --- PASS: TestPartI_MapNoLetters (0.00s) PASS ok cs134-assignment1-sequential 0.834s
All tests should pass. You can also run the program directly:
$ go run wc.go kjv12.txt
Hint: you can use strings.FieldsFunc to split a string into components.
Hint: for the purposes of this exercise, you can consider a word to be any contiguous sequence of letters, as determined by unicode.IsLetter. A good read on what strings are in Go is the Go Blog on strings.
Hint: the strconv package is handy to convert strings to integers etc.
Hint: use the test cases to understand what you are being tested! They can give helpful examples on how functions are called and what the expected return values for those calls are.
In this part you will complete a version of MapReduce that splits the work up over a set of worker threads, in order to exploit multiple cores. You will work in the distributed/ directory. A master thread hands out work to the workers and waits for them to finish. The master communicates with the workers via RPC, but results are passed via the file system. We give you the worker code (mapreduce/worker.go), the code that starts the workers, and code to deal with RPC messages (mapreduce/common.go).
Tip for Part II + Part III: DPrintf is provided as a debug print function. Use this function in your code to print out debugging information. On run, set Debug = 1 to see all output, or set Debug = 0 in mapreduce.go to hide the output.
There are two sub-tasks in this part:
First, implement DoMap() and DoReduce() in mapreduce/mapreduce.go. These functions are called with the appropriate arguments when the master does call(workerAddress, "Worker.DoJob", ...), and each one executes to completion a single map or reduce task.
DoMap() should read its assigned input split (use MapName() to get the filename), call the user-supplied Map function on the contents, and partition the resulting key-value pairs across nreduce intermediate files (use ReduceName()). Use the provided openPairsFile(), appendPairsFilePair(), and closePairsFile() helper functions to write key-value pairs to intermediate files.
DoReduce() should read the intermediate files produced by all map tasks for its reduce partition, group values by key, call the user-supplied Reduce function for each key, and write the results to the merge output file (use MergeName()). The keys in the output should be sorted. You can just use sort.Strings to handle the sorting. Use readFilePairs() to read intermediate files and the pairs file helpers to write output.
We provide two tests in distributed/mapreduce/test_test.go that target only Part II-A logic: TestPartIIA_DoMapPartitioning and TestPartIIA_DoReduceAggregatesAndSorts.
$ cd distributed/mapreduce $ go test -v -run PartIIA === RUN TestPartIIA_DoMapPartitioning --- PASS: TestPartIIA_DoMapPartitioning (0.01s) === RUN TestPartIIA_DoReduceAggregatesAndSorts --- PASS: TestPartIIA_DoReduceAggregatesAndSorts (0.00s) PASS ok cs134-assignment1/mapreduce 0.226s alec:mapreduce %
Next, complete RunMaster() in mapreduce/master.go to hand out the map and reduce jobs to workers, and return only when all the jobs have finished.
Look at Run() in mapreduce.go. It calls Split() to split the input into per-map-job files, then calls your RunMaster() to run the map and reduce jobs, then calls Merge() to assemble the per-reduce-job outputs into a single output file. RunMaster() only needs to tell the workers the name of the original input file (mr.file) and the job number; each worker knows from which files to read its input and to which files to write its output.
Each worker sends a Register RPC to the master when it starts. mapreduce.go already implements the master's MapReduce.Register RPC handler for you, and passes the new worker's information to mr.registerChannel. Your RunMaster should process new worker registrations by reading from this channel.
How to use RPC/call(): RPC lets one process call a method running in another process (or on another machine) using request/reply semantics similar to a normal function call. In this assignment, the function you are calling over RPC is already defined for you (Worker.DoJob) For this and future assignments, the helper function signature to actually make an RPC call is as follows: call(srv, rpcname, args, reply) bool.
The returned bool indicates whether the RPC call completed and returned a response. If the return value is false, do not read fields from reply.
// Arguments are specified in the struct as defined in common.go
args := &DoJobArgs{
File: "kjv12.txt",
Operation: Map,
JobNumber: 3,
NumOtherPhase: 50,
}
// Create an empty reply struct; the worker fills it in.
reply := DoJobReply{}
// This call blocks until the RPC returns or fails.
ok := call(workerAddr, "Worker.DoJob", args, &reply)
if ok {
// RPC reached the worker; now read reply fields such as reply.OK.
} else {
// Worker unreachable or RPC failed; reassign the job.
}
For a Reduce job, construct the args the same way except use Operation: Reduce and set NumOtherPhase: mr.nMap (because each reduce task reads one intermediate file from each map task). In this assignment's DoJobReply, the returned value to read is reply.OK. In general, the pattern is always: fill an args struct, pass a pointer to a reply struct, call RPC, check the returned bool, then read reply fields only if the call returned true.
Information about the MapReduce job is in the MapReduce struct, defined in mapreduce.go. Modify the MapReduce struct to keep track of any additional state (e.g. the set of available workers), and initialize this additional state in the InitMapReduce() function. The master does not need to know which Map or Reduce functions are being used for the job; the workers will take care of executing the right code for Map or Reduce.
The master should send RPCs to the workers in parallel so that the workers can work on jobs concurrently. You will find the go statement useful for this purpose and the Go RPC documentation.
The master may have to wait for a worker to finish before it can hand out more jobs. You may find channels useful to synchronize threads that are waiting for reply with the master once the reply arrives. Channels are explained in the document on Concurrency in Go.
You should run your code using Go's unit test system. We supply you with a set of tests in test_test.go. You can run unit tests as follows:
$ cd distributed/mapreduce $ go test -v -run PartIIB === RUN TestPartIIB_Basic Test: Basic mapreduce ... ... Basic Passed --- PASS: TestPartIIB_Basic (1.17s) === RUN TestPartIIB_Parallelism Test: Parallelism ... ... Parallelism Passed --- PASS: TestPartIIB_Parallelism (1.66s) PASS ok cs134-assignment1/mapreduce 3.054s
You are done with Part II when your implementation passes the TestPartIIB_Basic and TestPartIIB_Parallelism tests in test_test.go. TestPartIIB_Basic verifies that the full MapReduce pipeline produces correct output with two workers. TestPartIIB_Parallelism verifies that your master dispatches jobs to multiple workers concurrently (i.e., not one at a time). You don't yet have to worry about failures of workers.
The easiest way to track down bugs is to insert log.Printf() statements, collect the output in a file with go test > out, and then think about whether the output matches your understanding of how your code should behave. The last step (thinking) is the most important.
The code we give you runs the workers as threads within a single UNIX process, and can exploit multiple cores on a single machine. For simplicity, all projects will use unix sockets (think using a file as a network connection), limiting running all components of our system to the same computer, but some modifications could be done in order to run the workers on multiple machines communicating over a network. The RPCs would have to use TCP rather than UNIX-domain sockets; there would need to be a way to start worker processes on all the machines; and all the machines would have to share storage through some kind of network file system.
In this part you will make the master handle failed workers. MapReduce makes this relatively easy because workers don't have persistent state. If a worker fails, any RPCs that the master issued to that worker will fail (e.g. due to a timeout). Thus, if the master's RPC to the worker fails, the master should re-assign the job given to the failed worker to another worker.
An RPC failure doesn't necessarily mean that the worker failed; the worker may just be unreachable but still computing. Thus, it may happen that two workers receive the same job and compute it. However, because jobs are idempotent, it doesn't matter if the same job is computed twice--both times it will generate the same output. So, you don't have to do anything special for this case. (Our tests never fail workers in the middle of job without killing the worker, so you don't even have to worry about several workers writing to the same output file.)
You don't have to handle failures of the master; we will assume it won't fail. Making the master fault-tolerant is more difficult because it keeps persistent state that would have to be recovered in order to resume operations after a master failure. Much of the later assignments are devoted to this challenge.
Your implementation must pass the remaining test cases in test_test.go: TestPartIIIOneFailure and TestPartIIIManyFailures. TestPartIIIOneFailure starts two workers, one of which fails after handling 10 RPCs. Your master must detect the failure and complete all remaining jobs using the other worker. TestPartIIIManyFailures continuously starts pairs of workers that each fail after 10 jobs. Your master must keep making forward progress by re-assigning failed jobs to newly available workers.
You can run all tests at once with:
$ cd distributed/mapreduce $ go test -v === RUN TestPartIIA_DoMapPartitioning --- PASS: TestPartIIA_DoMapPartitioning (0.00s) === RUN TestPartIIA_DoReduceAggregatesAndSorts --- PASS: TestPartIIA_DoReduceAggregatesAndSorts (0.00s) === RUN TestPartIIB_Basic Test: Basic mapreduce ... ... Basic Passed --- PASS: TestPartIIB_Basic (1.20s) === RUN TestPartIIB_Parallelism Test: Parallelism ... ... Parallelism Passed --- PASS: TestPartIIB_Parallelism (1.80s) === RUN TestPartIII_OneFailure Test: One Failure mapreduce ... DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker0 shutdown error ... One Failure Passed --- PASS: TestPartIII_OneFailure (1.42s) === RUN TestPartIII_ManyFailures Test: ManyFailures mapreduce ... DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker3 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker4 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker7 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker11 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker6 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker10 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker1 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker0 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker2 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker8 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker5 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker9 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker12 shutdown error DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-worker13 shutdown error ... Many Failures Passed --- PASS: TestPartIII_ManyFailures (8.35s) === RUN TestPartIII_Timeout Test: Timeout ... DoWork: RPC /var/tmp/134-mapreduce-501/mr51565-timeout-worker0 shutdown error ... Timeout Passed --- PASS: TestPartIII_Timeout (4.38s) PASS ok cs134-assignment1/mapreduce 17.381s
It is highly recommended to also run your tests with the Go race detector enabled to catch potential concurrency issues:
$ go test -v -race
For all assignments, the autograder may run your code with the race detector.
To submit the assignment, submit the code as-is to Gradescope, following the same code structure as from the provided skeleton code. If you have your code pushed to a private GitHub repository, you can also submit by linking your GitHub account in Gradescope and selecting your repository. Your autograder score displayed immediately after submission is NOT your final score. After the late submission period closes, we will re-run the exact same test cases on your current active submission in order to determine your final grade. Barring any academic integrity issues or egregious bugs in the autograder, the autograder in Gradescope is the same autograder we will use to determine your final score.