My favorites | Sign in
Project Home Downloads Wiki Issues Source
Search
for
MapReduce_API_Documentation  
A description of the MapReduce API in AppScale
Deprecated, APIs-Documentation, Advanced
Updated Jul 29, 2011 by ckri...@gmail.com

Introduction

While AppScale aims to provide full API compatibility with Google App Engine, there are features we wish to offer users that are not provided by Google App Engine directly. One of these features is the ability to write MapReduce programs that run over your own hardware in an AppScale deployment.

Requirements

The use of this API is recommended for users who have already set up AppScale and have deployed it via either HBase or Hypertable. This is because we use the Hadoop MapReduce implementation, which is only run by default when using HBase or Hypertable. Specifically, it runs Hadoop Streaming to run MapReduce jobs, which the current programming languages supported being Python, Perl, and Ruby. Support for other languages can be requested via the AppScale Community Group, to potentially appear in a future release.

The API

The AppScale MapReduce API is intentionally minimalistic by design, in order to provide the programmer with just enough functionality to get the most out of MapReduce. A sample application, mapreduce, is also included as a demonstration of the functions exhibited in the API.

  • putMRInput(data, inputLoc): Given a string data and a Hadoop file system location inputLoc, creates a file on the Hadoop file system named inputLoc
  • runMRJob(mapper, reducer, inputLoc, outputLoc, config={}): Given the relative paths to a mapper and reducer file (relative to the main Python file being run), runs a Hadoop MapReduce Streaming job. Input is fed to the program via the HDFS file named inputLoc, and output is fed to the HDFS file named outputLoc. If a hash is passed as config, the key / value pairs are passed as configuration options to Hadoop Streaming.
  • getMROutput(outputLoc): Given a Hadoop file system location outputLoc, returns a string with the contents of the named file.

  • writeTempFile(suffix, data): Writes a file to /tmp on the local machine with the contents data. Is useful for passing a file with nodes to exclude from MapReduce jobs.
  • getAllIPs(): Returns an array of all the IPs in the AppScale cloud. Is useful for excluding or including nodes based on some application logic.
  • getNumOfNodes(): Returns an integer with the number of nodes in the AppScale cloud. Is useful for determining at MapReduce run time, how many Map tasks and Reduce tasks should be run for optimal performance (recommended value is 1 Map task per node).
  • getMRLogs(outputLoc): Returns a string with the MapReduce job log for the job whose output is located at outputLoc. Data is returned as a combination of XML and key / value pairs, in the standard !Hadoop format.

How to Use the API

Since these functions have all been added to the Python TaskQueue API, users of AppScale 1.4 (which uses Google App Engine 1.3.2) can gain access to these functions with one simple import statement:

from google.appengine.api.labs import taskqueue

Note that since we added the MapReduce API to the Task Queue API, pulling in the Task Queue API pulls in support for both. Let's look at some examples, from our mapreduce sample application. The application bundles in a mapper (map.rb) and a reducer (reduce.rb) written in Ruby that generates random numbers, but as we can see from our code here, the specifics of that algorithm don't matter too much. We begin by constructing an input file for the MapReduce job:

  def genInput(self, power, inputLoc):
    n = 2 ** power
    bucket_size = n / taskqueue.getNumOfNodes()
    
    vals = range(1, n / bucket_size + 1)  
    vals = [i * bucket_size for i in vals]
    
    buckets = ""
    index = 0
    for i in vals:
      if index == 0:
        start = 0
      else:
        start = vals[index - 1]
    
      this_range = str(start+1) + "\t" + str(vals[index]) + "\n"
      buckets += this_range
      index += 1

    taskqueue.putMRInput(buckets, inputLoc)
    return

Don't get too hung up on the details - it makes a big string that will tell our mappers how many random numbers to generate in a given range. It uses taskqueue.getNumOfNodes() to find out how many machines are available for computation, and then evenly splits it up. For example, if we ran in a standard four node deployment, this function would generate an input file telling each mapper to generate 1/4 of the total numbers needed. We then use taskqueue.putMRInput(buckets, inputLoc) to write our input file to the local filesystem (for security reasons, this writes to /tmp), needed for Hadoop. We then use the Task Queue API to asynchronously request that the MapReduce job be run:

  def runMRJob(self,powerStr):
    mapFile = "map.rb"
    redFile = "reduce.rb"
    power = int(powerStr)
        
    outputFile = "output-" + powerStr
     
    resultStr = memcache.get("result-" + powerStr)
    if resultStr is None:
      inputFile = "input-" + powerStr
      self.genInput(power,inputFile)
      
      # Add the task to the default queue.
      taskqueue.add(url='/worker', 
        params=
          {'map': mapFile, 
          'reduce': redFile, 
          'input': inputFile,
          'output': outputFile, 
          'power': power})

Here we did a little bit of optimization as well. We use the Memcache API to store the results from running MapReduce jobs, so that if a user wants to run a job and it's already been run, not to run it again - just give us the output. Of course, this doesn't work for all MapReduce jobs, but as our input is always the same, and because our programs are deterministic, this works nicely. Let's see where this gets asynchronously picked up:

class MapReduceWorker(webapp.RequestHandler):
  def post(self): # should run at most 1/s
    mapper = self.request.get('map')
    reducer = self.request.get('reduce')
    inputSource = self.request.get('input')
    outputSource = self.request.get('output')
    powerStr = self.request.get('power')
    
    taskqueue.runMRJob(mapper, reducer, inputSource, outputSource)
    resultStr = taskqueue.getMROutput(outputSource)
    resultStr = resultStr.replace("\n", "<br />")
    resultStr = resultStr.replace("sum", "&#931;")
    memcache.set("currentVal", powerStr)
    memcache.set("currentTask", "runMRJob")
    memcache.set("power-" + powerStr, powerStr)
    memcache.set("result-" + powerStr, resultStr)

The critical lines to focus on here are:

    taskqueue.runMRJob(mapper, reducer, inputSource, outputSource)
    resultStr = taskqueue.getMROutput(outputSource)

The first line takes the path to our mapper and reducer files as well as the path to the input file (returned by putMRInput) and where the output should be written to in the Hadoop Distributed File System. This call will start up the MapReduce job across the available machines and returns immediately. The second call, to getMROutput, is blocking - it only returns once the specified HDFS location has been written to, and returns a string containing the result of the MapReduce job. We then format the output (as we are presenting it in HTML) and place the result in our cache to speed up future requests for this data. If the job happens to fail for some reason or we wish to examine the logs produced by Hadoop (telling us all the runtime options used for this job), we can also do that easily:

  def getLogs(self, powerStr):
    resultStr = memcache.get("timing-" + powerStr)

    if resultStr is None:
      outputFile = "output-" + powerStr
      resultStr = taskqueue.getMRLogs(outputFile)
      resultStr = resultStr.replace("\n", "<br />")
      resultStr = resultStr.replace("<value>", " = ")
      memcache.set("power-" + powerStr, powerStr)
      memcache.set("timing-" + powerStr, resultStr)    

Again, as reading from our cache is much faster than reading from HDFS, we check there first and whenever we read from HDFS, we store it in our cache.

This introduction covers the basics of the AppScale MapReduce API - feel free to examine and use the mapreduce application bundled with the AppScale Tools for a more complete explanation.

Of course, make sure you're running over HBase or Hypertable when using this API - these calls will fail otherwise.


Sign in to add a comment
Powered by Google Project Hosting