User guide for python mapreduce library
See Getting Started guide for the basics of library usage.
You can define job-level parameters using the 'params' entry for the mapreduce to control job execution. The following parameters are now supported
mapreduce: - name: Make messages lowercase params: - name: done_callback value: /done mapper: handler: main.lower_case_posts input_reader: mapreduce.input_readers.DatastoreInputReader
The entries in the 'params' section of mapreduce.yaml are used to define per-mapper parameters. They're also used by the input readers - for example, the 'entity_kind' key above is used by the default datastore reader - but they can also be used to pass arguments to your mapper, as well.
User parameters for the mapper can be accessed via the 'context' module:
from mapreduce import context def process(entity): ctx = context.get() params = ctx.mapreduce_spec.mapper.params my_mapper_arg = params['my_mapper_arg'] # Process the entity
Note that all mapper parameters are strings, so if you need an integer or other datatype, you will need to convert it in your mapper.
Additionally, the following parameters, specified directly in the mapper spec are shared by all input readers:
The mapreduce library isn't restricted to mapping over datastore entities. It comes bundled with other input readers, defined in mapreduce/input_readers.py. Currently, this includes DatastoreInputReader (the default), BlobstoreLineInputReader, which maps over lines from one or more blobs in the blobstore, and BlobstoreZipInputReader, which maps over the contents of zip files in the blobstore.
This example mapreduce.yaml demonstrates how to specify an alternate input reader:
mapreduce: - name: Codesearch mapper: input_reader: mapreduce.input_readers.BlobstoreZipInputReader handler: <your handler function> params: - name: blob_key
DatastoreInputReader Reads all model instances of a particular kind from the datastore. It requires entity_kind class to be defined.
DatastoreKeyInputReader reads keys of entities of a particular kind from the datastore. It doesn't require entity_kind class to be defined.
BlobstoreLineInputReader reads a \n delimited text file a line at the time. It calls the mapper once with each line, passing it a tuple comprised of the byte offset in the file of the first character in the line and the line as a string, not including the trailing newline. In other words: (byte_offset, line_value).
BlobstoreZipInputReader iterates over all compressed files in a zipfile in Blobstore. It calls the mapper once for each file, passing it the tuple comprised of the zipfile.ZipInfo entry for the file, and a callable that returns the complete body of the file as a string. In other words: (zipinfo, file_callable).
We don’t have the reduce capability yet, but you can approximate this if you have an Entity kind with a unique constraint on a particular filter. For example, say we had these entity kinds:
class UserKind(db.Model): # key_name is also the ldap ldap = db.StringProperty(required=True) last_week_attendance = db.IntegerProperty(default=0) class ClassAttendance(db.Model): ldap = db.StringProperty(required=True) when = db.DateTimeProperty(required=True)
If you know you only have a single UserKind per LDAP, then you can Map over the UserKind and have a map function like this:
def find_attendance(user_entity): last_week = ( datetime.datetime.now() - datetime.timedelta(days=7)) count = (ClassAttendance.all() .filter('ldap =', user_entity.ldap) .filter('when >=', last_week) .count()) user_entity.last_week_attendance = count yield op.db.Put(user_entity)
Currently we do not support transactions in the map function, so the Put operations yielded here will blindly overwrite any other data in the Datastore. For now it's best to be careful and stop live traffic from accessing these entities while a background job is running. We'll be adding transactions soon.
It is also possible to specify a validator function, which will be run before starting the mapreduce, passed the user parameters specified for this mapreduce, and given the opportunity to validate and modify them.
To use a validator function, specify it in your mapreduce spec with the 'params_validator' key:
mapreduce: - name: Codesearch mapper: handler: <your handler> params: - name: file_id params_validator: <your validator function>
The validator function should accept a single argument, the dict of user params, which it may modify:
def my_validator(user_params): file_id = user_params['file_id'] user_params['blob_key'] = File.get_by_id(int(file_id)).blob_key
You can start mapreduce jobs from your code by using control api. See control.py file for more details. Control API can be used together with done callbacks to implement mapreduce chaining.