My favorites | Sign in
Project Home Downloads Wiki Issues Source
Search
for
PySphere  
Sphere's Map Reduce UDF via Python
Featured, Phase-Implementation
Updated Oct 16, 2009 by collinbe...@gmail.com

Embedded Python for Sphere

Introduction

We introduce a Python embedding into Sphere called PySphere that exposes the Sphere MapReduce framework to pure Python code. This allows arbitrary Python MapReduce scripts to take advantage of Sphere's MapReduce capabilities. To demonstrate how PySphere can be used, we present an illustrative implementation of the algorithm used in the MalStone A-10 benchmark.

Normally a Sphere application is implemented using pure C++. With PySphere the work of reading and writing data to and from the Sector file system is handled by generic C++ code written with the Sphere framework, but the processing of the data is performed by custom functions written in Python. PySphere is composed of two primary components:

  • The PySphere framework - this is a standard implementation of the Sphere MapReduce framework which has been modified to call Python functions to execute the map() and reduce() functions. This is a generic component intended to be used as-is for most MapReduce processing without requiring any custom coding by the user. The Sphere framework code:
    1. reads the data from the Sector file system,
    2. passes it to the Python functions for processing, and then
    3. writes the resulting data back to Sector.

  • The Python script containing the custom map() and reduce() functions - this is the Python script containing the custom map() and reduce() functions supplied by the user of the PySphere framework.

Interoperability

Language

PySphere (and the related project PySector which exposes native Sector file system commands over Python) make Sector / Sphere coding accessible to clients written in Python. Previously, Sector / Sphere could only be accessed by using C++ code. It is our intention to make a Sector cloud accessible from multiple programming languages. Note: The Sector JNI project has also been contributed to Sector which makes Sector but not Sphere available to Java clients.

Cloud

We do not know of any large data clouds that use Python for the primary computation language. However, due to efforts made towards interoperability, many are accessible via Python. For example, Hadoop, which is a Java application, has pipes and streaming interfaces. Pipes allows for C++ code to run against Hadoop. Streaming opens up Hadoop’s Map Reduce to any coding language which can read and write Standard I/O, including Python.

Below is pseudo-code illustrating a way the map step for MalStone A can be realized. Versions for PySphere and Hadoop's Streaming API are then given and compared. Aside from differences due to the latter using Standard I/O instead of function calls, the code is very similar.

Map

for record in read( data )
    ( site, compromised_indicator ) = parse( record , '|')
    group by site

PySphere map function:

  #!/usr/bin/env python

  def parse(line):
    return line.split('|')

  def map(line, sep='\t'):
    data = parse(line)
    return data[2] + sep + data[3]

  if __name__ == __main__:
    map()

slight modification to run against Hadoop's Streaming API:

 #!/usr/bin/env python 

  import sys

  def read_input(file):
    for line in file:
      yield line.split('|')


  def map(sep='\t'):
    data = read_input(sys.stdin)
    for record in data:
      print %s%s%s % (record[2], sep, record[3])

  if __name__ == __main__:
    map()

All of the functions, including the native versions, are compared on the page MalStoneAFunctions.

Tests and Test Results

The MalStone A-10 benchmark was used to run tests evaluating the performance of PySphere. Tests were performed using a pure C++ Sphere MapReduce implementation of MalStone A, followed by tests using a PySphere version. The resulting times were then compared.

The Open Cloud Consortium Testbed was used for all tests.

  • The Sector file replication was set to 1
  • 20 1 GB files were used as the input for processing, with the 20 files evenly distributed to the 20 slave nodes.
  • Each file contained 10 million records.

The hardware configuration was:

  • Master Node: 2 x dualcore Xeon processors with 16 GB RAM
  • Slave Node: 2 x dualcore Opteron processors with 12 GB RAM

The Sector master was run on the hardware master node and 20 Sector slaves were used, one per slave node.

Total Number of Records Sphere MapReduce no Python Sphere MapReduce with Python Overhead
200 million 1 min. 45 sec. 3 min. 14 sec. 1.85

Limitations and Next Steps


 The following are some limitations with the current version of pySphere:

  • Each call to Python causes the environment to be brought up and then closed. This increases overhead. Map and reduce steps are performed on blocks of records, so their overhead is acceptable. However, the partition() and compare() functions must be called for every record in the input data. With large data sets, this leads to unacceptable overhead. Instead, pySphere provides generalized functions implemented in C++. These functions allow the user to specify the fields to use in partitioning and comparison and then applies default processing. If these generic functions are unsuitable for a particular Map Reduce job then the user would need to implement custom C++ functions. A good partition function is necessary for performance. Compare the test results above to the following results:

Total Number of Records Sphere MapReduce no Python Sphere MapReduce with Python Overhead
200 million 42 min. 10 sec. 120 min. 44 sec. 2.86

These results were obtained with a partition function which created a small number of map output files, seemingly leading to poor performance in the reduce stage. Data from the map stage was written to a smaller number of files then physical nodes and this imbalanced cause some nodes to run hot while others were under-utilized.

  • The Map and Reduce steps are called as methods with a single line of input, even though the processing is actually done on blocks of records, and they return a single key, value pair per record. This is not suited for some categories of Map Reduce jobs (Inverted Index), but works fine for others (Terasort).
  • The embedded code is specific to the MapReduce model. To expose another processing paradigm to Python additional work is required.
  • Text only data. PySphere currently only supports processing of text data. Binary data is not supported.
  • Python scripts must adhere to the defined PySphere interface. This supports many but not all processing scenarios - for example this interface is not well suited to inverted indexes.

Opening Sphere up to Standard I/O streams is underway, but that is not expected to completely replace pySphere. There will be applications not well suited to using Standard I/O and the overhead is still unknown. The next steps for pySphere are to

  1. Run against larger data to determine if the overhead grows as the data grows or is a fixed amount due to the python embedding.
  2. Provide support for the Sphere user defined function model.
  3. Work with the Sector development group to modify how shuffling is handled so that the map step can return data structures (i.e. solve the Inverted Index issue).

Sign in to add a comment
Powered by Google Project Hosting