My favorites | Sign in
Project Home Wiki Issues Source
Search
for
MapReduceDemoApp  
How to run MapReduce jobs in Python over App Engine.
Phase-Deploy
Updated Feb 27, 2014 by tkaitch...@google.com

Overview

The MapReduce API aims to make it easy to write MapReduce jobs in your Google App Engine applications. In this article, we examine how to write three different MapReduce jobs that use the Python version of App Engine. We follow the source code found in the demo application - see the Source tab and look under python/demo/main.py

This article also assumes you have some familiarity with MapReduce. If you do not, you may want to read up on it or watch Mike Aizatsky's video from Google I/O 2011, which covers MapReduce in general as well as the MapReduce API for Python and Java.

Follow along with us at home - the source for the MapReduce demo can be found here.

The Demo App

The sample application included with the MapReduce API allows users to upload a zip file containing one or more text files (hereafter referred to as the input corpus) and perform specific types of data analysis on that data. Specifically, three types of analysis jobs (implemented as MapReduce jobs) can be run over the data:

  • Word Count: For each word in the input corpus, determine how often each word appears.
  • Index: For each word in the input corpus, determine which files it appears in.
  • Phrases: Determine the "improbable words" in each input file - that is, phrases that appear in this input file but not in the others.

All of our examples specify Map and Reduce functions, and start a MapReduce job via the following API call:

MapreducePipeline.run(
          job_name,
          mapper_spec,
          reducer_spec,
          input_reader_spec,
          output_writer_spec=None,
          mapper_params=None,
          reducer_params=None,
          shards=None
)

This function call constructs a Pipeline via the Pipeline API with the following structure:

  1. Call the user-supplied Map function
  2. Perform a Shuffle based on the output of the Map function
  3. Call the user-supplied Reduce function
  4. Clean up temporary files emitted by Map, Shuffle, and Reduce

The Shuffle and Cleanup functions (Steps 2 and 4) are automatically provided by the MapReduce API. Therefore, we continue by discussing the Mappers and Reducers needed for each of the three jobs that the demo application utilizes.

Word Count

Let's begin by looking at how we invoke the MapReduce job for our Word Count example:

yield mapreduce_pipeline.MapreducePipeline(
    "word_count",
    "main.word_count_map",
    "main.word_count_reduce",
    "mapreduce.input_readers.BlobstoreZipInputReader",
    "mapreduce.output_writers.BlobstoreOutputWriter",
    mapper_params={
        "blob_key": blobkey,
    },
    reducer_params={
        "mime_type": "text/plain",
    },
    shards=16)

Here, we indicate that the name of our job is "word_count" along with the names of the functions that will perform the Map and Reduce functions (word_count_map and word_count_reduce, respectively). As our input is a zip file stored in the Blobstore, we use the BlobstoreZipInputReader, and as we are writing plaintext back to the Blobstore, we use the BlobstoreOutputWriter. Finally, we tell our Map function the location in the Blobstore where it can find the input file (the blob key), and we tell our reducer what the format for the final output of the job will be (plaintext).

Map function: Our mapper consists of the following code:

def word_count_map(data):
  """Word count map function."""
  (entry, text_fn) = data
  text = text_fn()

  logging.debug("Got %s", entry.filename)
  for s in split_into_sentences(text):
    for w in split_into_words(s.lower()):
      yield (w, "")

As we can see, our Map function splits each line of input it receives (note that unzipping the file is already done for us by the input function we provided), and for each word it finds, it emits (word, ""). The value has no special meaning here - our Reduce function will not use it.

Reduce function: Our reducer consists of the following code:

def word_count_reduce(key, values):
  """Word count reduce function."""
  yield "%s: %d\n" % (key, len(values))

This code is even simpler - here, we get all the values for a specific key. We don't care about what the values are - we only care about how many of them there are, as this tells us how many times we saw a particular word. We perform this count and emit (word, count for word).

Our final output is a set of key-value pairs - keys are words from the input corpus, and values are the counts associated with their respective word.

Index

The way we start the MapReduce job is pretty much the same as in the previous example, so let's look at what's changed:

Map function: Our mapper consists of the following code:

def index_map(data):
  """Index demo map function."""
  (entry, text_fn) = data
  text = text_fn()

  logging.debug("Got %s", entry.filename)
  for s in split_into_sentences(text):
    for w in split_into_words(s.lower()):
      yield (w, entry.filename)

Here our Map function breaks each input sentence into words like before. What's different is that here, we emit the word and the filename we found it in (instead of just "").

Reduce function: Our reducer consists of the following code:

def index_reduce(key, values):
  """Index demo reduce function."""
  yield "%s: %s\n" % (key, list(set(values)))

This Reduce function is pretty straightforward - we are given a word and the list of files it is found in, which is exactly what we wanted to find out! We just convert the values into an array (list) and return the result.

Our final output is a set of key-value pairs - keys are words from the input corpus, and values are a list of the files that the associated word was found in.

Phrases

Like before, the way we start this MapReduce job not too interesting - let's see what's changed:

Map function: Our mapper consists of the following code:

PHRASE_LENGTH = 4


def phrases_map(data):
  """Phrases demo map function."""
  (entry, text_fn) = data
  text = text_fn()
  filename = entry.filename

  logging.debug("Got %s", filename)
  for s in split_into_sentences(text):
    words = split_into_words(s.lower())
    if len(words) < PHRASE_LENGTH:
      yield (":".join(words), filename)
      continue
    for i in range(0, len(words) - PHRASE_LENGTH):
      yield (":".join(words[i:i+PHRASE_LENGTH]), filename)

Here we've indicated that a phrase is no more than four words in length - in practice, if this value is set too high, all phrases are deemed improbable, and if it's too low, very few things are deemed improbable (4 is a nice sweet spot). Our Map function splits each sentence into chunks of words (phrases). It always emits the whole input line if it has less than four words in it, and after that, emits every phrase in the sentence. What that means is every four words next to each other will be emitted (e.g., "A B C D E" emits two phrases, "A B C D" and "B C D E"). We thus emit each phrase as the key and the filename that it was found in as the value.

Reduce function: Our reducer consists of the following code:

def phrases_reduce(key, values):
  """Phrases demo reduce function."""
  if len(values) < 10:
    return
  counts = {}
  for filename in values:
    counts[filename] = counts.get(filename, 0) + 1

  words = re.sub(r":", " ", key)
  threshold = len(values) / 2
  for filename, count in counts.items():
    if count > threshold:
      yield "%s:%s\n" % (words, filename)

The Reduce function receives a phrase along with a list of the files that we've seen the phrase in. It first ensures that enough input data has been received for each key - if it hasn't been used 10 times or more, then this phrase isn't something that we think is a likely phrase that the author uses. We then count how many times this phrase appears in all files, and if a majority of its occurrences are in a single file, we say that this phrase is an improbable phrase. By this, we mean that it shows up very often in this file compared to all other files and has shown up a large enough number of times that is unlikely to have been accidental.

Our final output is a set of key-value pairs - keys are phrases, while the associated value is the filename that the phrase was found in.

Conclusion

That concludes our examples on how to use the Python MapReduce API. As you've seen, the MapReduce API is designed to make MapReduce easy within App Engine - you write simple Python code and don't have to worry about how to make your code fault-tolerant or how to schedule your jobs. Hopefully this whets your appetite, so get coding and making some cool applications!

Comment by vincent....@gmail.com, Sep 14, 2011

Can we have the same for Java ? There are Reducer Java classes but I do not find the related how to.

Thanks.

Comment by skygam...@gmail.com, Sep 28, 2011

So do I wonder if we can do this for Java? The reducer part is so new, are there anything to reference? Thank you so much!

Comment by rpere...@beneficiofacil.com.br, Oct 6, 2011

+1 for some java samples

Comment by ber...@gmail.com, Oct 31, 2011

Is there any documentation on the rest of the demo? I'm trying to insert my own map/reduce in the example, but I can't make the button for the new map/reducer activate.

Comment by florin.c...@gmail.com, Apr 26, 2012

Any update on the Java portion?

Comment by shekhar....@gmail.com, Jun 14, 2014

Any idea why following warnings and INFO level logs are printed? My word count map-reduce job is not running :(

WARNING  2014-06-14 22:03:00,039 taskqueue_stub.py:1974] Task task1 failed to execute. This task will retry in 12.800 seconds
INFO     2014-06-14 22:03:13,016 module.py:639] default: "POST /mapreduce/pipeline/run HTTP/1.1" 404 -
WARNING  2014-06-14 22:03:13,016 taskqueue_stub.py:1974] Task task1 failed to execute. This task will retry in 25.600 seconds
INFO     2014-06-14 22:03:38,862 module.py:639] default: "POST /mapreduce/pipeline/run HTTP/1.1" 404 -
WARNING  2014-06-14 22:03:38,862 taskqueue_stub.py:1974] Task task1 failed to execute. This task will retry in 51.200 seconds
INFO     2014-06-14 22:04:30,176 module.py:639] default: "POST /mapreduce/pipeline/run HTTP/1.1" 404 -
WARNING  2014-06-14 22:04:30,176 taskqueue_stub.py:1974] Task task1 failed to execute. This task will retry in 102.400 seconds
Comment by shekhar....@gmail.com, Jun 14, 2014

In some other articles I read that we have to create mapreduce.yaml file but information about that file is not given here. That file is really required or not?


Sign in to add a comment
Powered by Google Project Hosting