|
MapReduce_API_Documentation
IntroductionWhile AppScale aims to provide full API compatibility with Google App Engine, there are features we wish to offer users that are not provided by Google App Engine directly. One of these features is the ability to write MapReduce programs that run over your own hardware in an AppScale deployment. RequirementsThe use of this API is recommended for users who have already set up AppScale and have deployed it via either HBase or Hypertable. This is because we use the Hadoop MapReduce implementation, which is only run by default when using HBase or Hypertable. Specifically, it runs Hadoop Streaming to run MapReduce jobs, which the current programming languages supported being Python, Perl, and Ruby. Support for other languages can be requested via the AppScale Community Group, to potentially appear in a future release. The APIThe AppScale MapReduce API is intentionally minimalistic by design, in order to provide the programmer with just enough functionality to get the most out of MapReduce. A sample application, mapreduce, is also included as a demonstration of the functions exhibited in the API.
How to Use the APISince these functions have all been added to the Python TaskQueue API, users of AppScale 1.4 (which uses Google App Engine 1.3.2) can gain access to these functions with one simple import statement: from google.appengine.api.labs import taskqueue Note that since we added the MapReduce API to the Task Queue API, pulling in the Task Queue API pulls in support for both. Let's look at some examples, from our mapreduce sample application. The application bundles in a mapper (map.rb) and a reducer (reduce.rb) written in Ruby that generates random numbers, but as we can see from our code here, the specifics of that algorithm don't matter too much. We begin by constructing an input file for the MapReduce job: def genInput(self, power, inputLoc):
n = 2 ** power
bucket_size = n / taskqueue.getNumOfNodes()
vals = range(1, n / bucket_size + 1)
vals = [i * bucket_size for i in vals]
buckets = ""
index = 0
for i in vals:
if index == 0:
start = 0
else:
start = vals[index - 1]
this_range = str(start+1) + "\t" + str(vals[index]) + "\n"
buckets += this_range
index += 1
taskqueue.putMRInput(buckets, inputLoc)
returnDon't get too hung up on the details - it makes a big string that will tell our mappers how many random numbers to generate in a given range. It uses taskqueue.getNumOfNodes() to find out how many machines are available for computation, and then evenly splits it up. For example, if we ran in a standard four node deployment, this function would generate an input file telling each mapper to generate 1/4 of the total numbers needed. We then use taskqueue.putMRInput(buckets, inputLoc) to write our input file to the local filesystem (for security reasons, this writes to /tmp), needed for Hadoop. We then use the Task Queue API to asynchronously request that the MapReduce job be run: def runMRJob(self,powerStr):
mapFile = "map.rb"
redFile = "reduce.rb"
power = int(powerStr)
outputFile = "output-" + powerStr
resultStr = memcache.get("result-" + powerStr)
if resultStr is None:
inputFile = "input-" + powerStr
self.genInput(power,inputFile)
# Add the task to the default queue.
taskqueue.add(url='/worker',
params=
{'map': mapFile,
'reduce': redFile,
'input': inputFile,
'output': outputFile,
'power': power})Here we did a little bit of optimization as well. We use the Memcache API to store the results from running MapReduce jobs, so that if a user wants to run a job and it's already been run, not to run it again - just give us the output. Of course, this doesn't work for all MapReduce jobs, but as our input is always the same, and because our programs are deterministic, this works nicely. Let's see where this gets asynchronously picked up: class MapReduceWorker(webapp.RequestHandler):
def post(self): # should run at most 1/s
mapper = self.request.get('map')
reducer = self.request.get('reduce')
inputSource = self.request.get('input')
outputSource = self.request.get('output')
powerStr = self.request.get('power')
taskqueue.runMRJob(mapper, reducer, inputSource, outputSource)
resultStr = taskqueue.getMROutput(outputSource)
resultStr = resultStr.replace("\n", "<br />")
resultStr = resultStr.replace("sum", "Σ")
memcache.set("currentVal", powerStr)
memcache.set("currentTask", "runMRJob")
memcache.set("power-" + powerStr, powerStr)
memcache.set("result-" + powerStr, resultStr)The critical lines to focus on here are: taskqueue.runMRJob(mapper, reducer, inputSource, outputSource)
resultStr = taskqueue.getMROutput(outputSource)The first line takes the path to our mapper and reducer files as well as the path to the input file (returned by putMRInput) and where the output should be written to in the Hadoop Distributed File System. This call will start up the MapReduce job across the available machines and returns immediately. The second call, to getMROutput, is blocking - it only returns once the specified HDFS location has been written to, and returns a string containing the result of the MapReduce job. We then format the output (as we are presenting it in HTML) and place the result in our cache to speed up future requests for this data. If the job happens to fail for some reason or we wish to examine the logs produced by Hadoop (telling us all the runtime options used for this job), we can also do that easily: def getLogs(self, powerStr):
resultStr = memcache.get("timing-" + powerStr)
if resultStr is None:
outputFile = "output-" + powerStr
resultStr = taskqueue.getMRLogs(outputFile)
resultStr = resultStr.replace("\n", "<br />")
resultStr = resultStr.replace("<value>", " = ")
memcache.set("power-" + powerStr, powerStr)
memcache.set("timing-" + powerStr, resultStr) Again, as reading from our cache is much faster than reading from HDFS, we check there first and whenever we read from HDFS, we store it in our cache. This introduction covers the basics of the AppScale MapReduce API - feel free to examine and use the mapreduce application bundled with the AppScale Tools for a more complete explanation. Of course, make sure you're running over HBase or Hypertable when using this API - these calls will fail otherwise. |