My favorites | Sign in
Project Home Wiki Issues Source
Search
for
UserGuidePython  
User guide for python mapreduce library
Updated Feb 12, 2013 by y...@google.com

Basics

See Getting Started guide for the basics of library usage.

Mapreduce parameters

You can define job-level parameters using the 'params' entry for the mapreduce to control job execution. The following parameters are now supported

Key Default value Explanation
done_callback None The url callback to call when mapreduce finishes its execution

Example:

mapreduce:
- name: Make messages lowercase
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader

Mapper parameters

The entries in the 'params' section of mapreduce.yaml are used to define per-mapper parameters. They're also used by the input readers - for example, the 'entity_kind' key above is used by the default datastore reader - but they can also be used to pass arguments to your mapper, as well.

User parameters for the mapper can be accessed via the 'context' module:

from mapreduce import context

def process(entity):
  ctx = context.get()
  params = ctx.mapreduce_spec.mapper.params
  my_mapper_arg = params['my_mapper_arg']
  # Process the entity

Note that all mapper parameters are strings, so if you need an integer or other datatype, you will need to convert it in your mapper.

Additionally, the following parameters, specified directly in the mapper spec are shared by all input readers:

Key Default value Explanation
shard_count 8 The number of concurrent mapper workers to run at once.
processing_rate 100 The aggregate maximum number of inputs processed per second by all mappers. Can be used to avoid using up all quota, interfering with online users.

Specifying readers

The mapreduce library isn't restricted to mapping over datastore entities. It comes bundled with other input readers, defined in mapreduce/input_readers.py. Currently, this includes DatastoreInputReader (the default), BlobstoreLineInputReader, which maps over lines from one or more blobs in the blobstore, and BlobstoreZipInputReader, which maps over the contents of zip files in the blobstore.

This example mapreduce.yaml demonstrates how to specify an alternate input reader:

mapreduce:
- name: Codesearch
  mapper:
    input_reader: mapreduce.input_readers.BlobstoreZipInputReader
    handler: <your handler function>
    params:
    - name: blob_key

Provided Input Readers

DatastoreInputReader

DatastoreInputReader Reads all model instances of a particular kind from the datastore. It requires entity_kind class to be defined.

Parameter Default value Explanation
entity_kind None The datastore kind to map over.
namespaces the current namespace The list of namespaces that will be searched for entity_kinds.
batch_size 50 The number of entities to read from the datastore with each batch get.

DatastoreKeyInputReader

DatastoreKeyInputReader reads keys of entities of a particular kind from the datastore. It doesn't require entity_kind class to be defined.

Parameter Default value Explanation
entity_kind None The datastore kind to map over.
namespaces the current namespace The list of namespaces that will be searched for entity_kinds.
batch_size 50 The number of entities to read from the datastore with each batch get.

BlobstoreLineInputReader

BlobstoreLineInputReader reads a \n delimited text file a line at the time. It calls the mapper once with each line, passing it a tuple comprised of the byte offset in the file of the first character in the line and the line as a string, not including the trailing newline. In other words: (byte_offset, line_value).

Parameter Default value Explanation
blob_keys None Either a string containing a blob key string or an array containing multiple blob key strings.

BlobstoreZipInputReader

BlobstoreZipInputReader iterates over all compressed files in a zipfile in Blobstore. It calls the mapper once for each file, passing it the tuple comprised of the zipfile.ZipInfo entry for the file, and a callable that returns the complete body of the file as a string. In other words: (zipinfo, file_callable).

Parameter Default value Explanation
blob_key None A string containing a blob key.

Doing per-row “reduces”

We don’t have the reduce capability yet, but you can approximate this if you have an Entity kind with a unique constraint on a particular filter. For example, say we had these entity kinds:

class UserKind(db.Model):
  # key_name is also the ldap
  ldap = db.StringProperty(required=True)
  last_week_attendance = db.IntegerProperty(default=0)

class ClassAttendance(db.Model):
  ldap = db.StringProperty(required=True)
  when = db.DateTimeProperty(required=True)

If you know you only have a single UserKind per LDAP, then you can Map over the UserKind and have a map function like this:

def find_attendance(user_entity):
  last_week = (
    datetime.datetime.now() -
    datetime.timedelta(days=7))
  count = (ClassAttendance.all()
      .filter('ldap =', user_entity.ldap)
      .filter('when >=', last_week)
      .count())
  user_entity.last_week_attendance = count
  yield op.db.Put(user_entity)

Currently we do not support transactions in the map function, so the Put operations yielded here will blindly overwrite any other data in the Datastore. For now it's best to be careful and stop live traffic from accessing these entities while a background job is running. We'll be adding transactions soon.

Validator functions

It is also possible to specify a validator function, which will be run before starting the mapreduce, passed the user parameters specified for this mapreduce, and given the opportunity to validate and modify them.

To use a validator function, specify it in your mapreduce spec with the 'params_validator' key:

mapreduce:
- name: Codesearch
  mapper:
    handler: <your handler>
    params:
    - name: file_id
    params_validator: <your validator function>

The validator function should accept a single argument, the dict of user params, which it may modify:

def my_validator(user_params):
  file_id = user_params['file_id']
  user_params['blob_key'] = File.get_by_id(int(file_id)).blob_key

Programmatically Starting Jobs

You can start mapreduce jobs from your code by using control api. See control.py file for more details. Control API can be used together with done callbacks to implement mapreduce chaining.

Not Yet Documented

  • Error handling
  • BlobstoreZipLineInputReader

Current Limitations

  • Only full range scan is supported, i.e. it's impossible to scan a subset of a particular entity kind.
Comment by ble.jur...@gmail.com, Sep 7, 2010

Here is an example using the GAE Django helper:

http://www.carlosble.com/?p=697

Comment by mehdi.ai...@gmail.com, Sep 14, 2010

Is there a way to define the maximum number of items per shard? and have the number of shard flexible?

Comment by willrei...@gmail.com, Oct 29, 2010

Is it possible to yield two different op.db.Put operations per call? I have to update two records and of course I could issue a entity.put() from within the call but the write would not be optimized by mapreduce.

ie: yield op.db.Put(some_entity) yield op.db.Put(some_other_entity)

Comment by stephen....@gmail.com, Nov 4, 2010

Do yielded db operations get retried in case of failure (like timeout on put) or is it only the map processes themselves that get retried on error?

Comment by mike.aiz...@gmail.com, Nov 5, 2010

Stephen,

No, they are not retried. In case of error the map task will be executed again by taskqueue.

Comment by stephen....@gmail.com, Nov 5, 2010

Sorry, still need one clarification. If the map task errors it will be tried again. If a yielded put operation errors - will the map task see that and get tried again? Because they are batched, I think that would me a single operation failure would cause a re-process of all the entities? Or is the success/failure of the yielded operations completely invisible so as long as the map task successfully yields them, it considers it a success?

Comment by mike.aiz...@gmail.com, Nov 5, 2010

Stephen,

Put error will result in retry of last "chunk" of input data. This is typically 10-100 entities. Batch size will be also automatically decreased in case of errors and retries.

And yes, as long as your map task successfully yields operations and is idempotent, these errors are completely invisible to you and shouldn't bother you much.

Comment by stephen....@gmail.com, Nov 5, 2010

Great, thanks! It wasn't clear to me if errors in batched operations were accounted for or lost. I can use this and sleep at night knowing that at worst it will redo the work for batch but it wont fail to update an entity indefinitely.

Comment by mike.aiz...@gmail.com, Nov 5, 2010

You don't have to have one. Withouth the index it will use the binary search to approximate the last key. Is it working unsatisfactory for you? We'll also provide other solution in upcoming month or two which, we think, will perform better sharding without looking for last key.

Comment by stephen....@gmail.com, Nov 5, 2010

I set the default shards to 1 and the dev server still auto-created the index. If I then assumed there was no way to use it without the index. If I could disable that, it would be great.

Comment by mike.aiz...@gmail.com, Nov 5, 2010

This is the unfortunate effect of dev_appserver: every time you try to use the index, it creates one. There's no way to run a query and ask dev_appserver not to create index for this specific one only. Mapper's implementation handles missing index gracefully: see input_readers.py:226. You can also disable index generation functionality in dev_appserver completely.

Comment by stephen....@gmail.com, Nov 5, 2010

Thanks for pointing that out! I'll cut out the descending query from my local copy to make it not even try. I look forward to seeing the next gen sharding when you're ready. Great job you guys have done: taskqueues -> deferred -> Mapper -> bulkupdater -> mapreduce. Really sweet progress.

Comment by stephen....@gmail.com, Nov 10, 2010

When I ran mapreduce I got the following warning in the log: Full proto too large to save, cleared variables.

Does this mean the MAX_POOL_SIZE is defaulted too high?

Comment by stephen....@gmail.com, Nov 13, 2010

I did several mapreduce migrations this weekend. After having dropped the max pool size, I no longer get those scary sounding warnings. In my context.py, I now have: MAX_POOL_SIZE = 800 1000

Comment by ttr...@gmail.com, Mar 8, 2011

The mapreduce api is great and we're already using it extensively, but the one glaring missing feature for us is the ability to only perform DatastoreInputReader?? mapreduce callbacks on a subset of the entities. We don't need much validation, just a single attribute equality check really (i.e. mapreduce all Books with publisher=100). As it stands we're wasting lots of CPU on DatastoreInputReader?? operations for millions of entities that don't need to be updated, just to reach a few hundred thousand that do. If we could specify the parameter like this:

mapreduce:
- name: Make messages lowercase
  params:
  - name: filter
    value: category='somecategory'
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader

You could use the auto-generated index for "category" to do the equality filter (or inequality filter for that matter)

Comment by stephen....@gmail.com, Mar 8, 2011

There's a note that the bulkupdate library has been deprecated in light of the mapreduce library. That said, I've kept a copy of bulkupdate in my App with some tweaks I borrowed from mapreduce for just that reason. It is inferior in several ways, but you have control over the input query.

Perhaps keeping an up to date fork of bulkupdate with as much functionality back-ported from mapreduce as possible would be a worthwhile endeavor.

https://github.com/arachnid/bulkupdate

Comment by saidimu, Mar 28, 2011
When I ran mapreduce I got the following warning in the log: Full proto too large to save, cleared variables. Does this mean the MAX_POOL_SIZE is defaulted too high?

I too got this error repeatedly, and the batch-size was not automatically decreased (as project-member mike.aiz..@gmail.com said below). I'm going to kill the task (after more than 24 hours of auto-retries) and try setting the MAX_POOL_SIZE lower. This sucks as one shouldn't have to manually fiddle with internal specs (or at the very least they should be clearly documented).

Put error will result in retry of last "chunk" of input data. This is typically 10-100 entities. Batch size will be also automatically decreased in case of errors and retries.
Comment by saidimu, Mar 28, 2011

Still getting the dreaded RequestTooLargeError?: The request to API call datastore_v3.Put() was too large. Full proto too large to save, cleared variables.

Nothing works, not even reducing MAX_POOL_SIZE to (10 100) and MAX_ENTITY_COUNT (to 10).

Comment by mike.aiz...@gmail.com, Mar 28, 2011

saidimu,

Please create an issue and please provide full stack trace there.

Comment by saidimu, Mar 28, 2011

Entered as  issue #94  http://code.google.com/p/appengine-mapreduce/issues/detail?id=94

The stack trace is attached to the issue.

Comment by SRabbelier, Apr 2, 2011

Counters (and their use in the UI) are not documented?

Comment by nlak...@gmail.com, Oct 23, 2011

Please update the doc to make namespaces -> namespace, since multiple namespaces are obsolete.

Comment by ed...@potatolondon.com, Nov 9, 2011

I am trying to use the DatastoreInputReader? on Google App Engine. When I use the mapreduce code on a local dev_appengine it uses the shard correcty where is does a range of each shard and process each entirty. When I put it on app engine it creates all shards but only 1 actively doing the entity processing. Can anyone help?

Comment by duarte.f...@gmail.com, Nov 23, 2011

Hi, anyone has any complete example to read a txt file from the blobstore? appreciate.

Comment by reesefra...@gmail.com, Nov 28, 2011

This should probably be updated, now that there's the reduce capability with the 1.6 SDk.

Comment by jcoll...@vendasta.com, Dec 20, 2012

I'm really hoping someone on this project is feeling generous enough to answer this legacy question:

We are using the "legacy" version (non-PipelineAPI) of this library. Even when we have kinds with >150,000 entities, we only ever get one shard processing. Doesn't seem to matter if we set the shard_count to 4, 16, 128 - always only one shard processes anything (i.e., one shard processes the entire dataset).

Any ideas? In the long distant past, I recall that you needed to create a desc index on the key, but I think that was fixed up with the auto-magic scatter property. I feel like I've just missed a step.

Any help would be appreciated because one shard is very, very slow.

Thanks, Jason Collins

Comment by jcoll...@vendasta.com, Jan 3, 2013

I found the bug in the mapreduce framework. Patch to fix is attached to this issue: http://code.google.com/p/appengine-mapreduce/issues/detail?id=154

Comment by mlamb...@gmail.com, Feb 1, 2014

"Only full range scan is supported, i.e. it's impossible to scan a subset of a particular entity kind."

This is no longer true, as the mapreduce library supports 'filters' being passed to the mapper_params.


Sign in to add a comment
Powered by Google Project Hosting