My favorites | Sign in
Project Logo
                
People details
Project owners:
  marcin.cieslik, seemura

PaPy is a framework to construct and execute pipelines (flow-charts) of arbitrary tasks in parallel, either locally, using multi-processing or multi-threading, or remotely, using RPC (Remote Procedure Calls) as provided by RPyC (Remote Python Call). The goal is to enable users to create highly generic, modular and workflows for execution on ad hoc computing grids, consisting of, local workstations and remote desktops/servers.

This software is developed in the Mura Lab at the University of Virginia.

The pipeline is represented as an arbitrary directed acyclic graph. The user has to define functions of the nodes (called Pipers) and the edges (called pipes), which represent data-flow or dependency. Piper instances are assigned to virtual resources (called IMaps) which are technically pools of local and/or remote processes or threads. IMaps are similar to multiprocessing.Pool imap method, but support multiple tasks i.e. (function, iterable) tuples, which are interwoven rather than being evaluated one after another.

An example of a valid Piper function (it identifies the host, process and itself):

@imports([['os',[]], ['socket',[]], ['sys', []]])
def hid(i):
    return "item %s is on host:%s using process:%s, and function:%s" %\
    (i[0], socket.gethostname(), os.getpid(), id(hid))

Creates a IMap instance, which utilizes 4 processes:

local_pool = IMap(worker_num =4)

Creates a IMap instance, which utilizes 4 threads:

local_thread_pool = IMap(worker_type ='thread', worker_num =4)

Creates a IMap instance, which utilizes 8 remote processes on 2 hosts.

remote_pool = IMap(worker_num =0, worker_remote =[['host1', 4],['host2:port'], 4]])

The worker_num =0 overrides the default of creating local worker processes in the number of available CPUs. Remote hosts ('host1' and 'host2') should run a RPyC classic server in forking mode i.e.:

python classic_server.py -m 'forking'

The input to the pipeline needs to be a collection and PaPy processes data in the pipeline in batches of adjustable size which allows for a parallel(memory consumption) vs lazy(immediate results) tradeoff.

A generic pipeline template can be found here: http://papy.googlecode.com/svn/trunk/src/papy/utils/templates/pipeline.py

And here is a typical example for concurrency frameworks:

# This pipeline follows and greps a file for a specified search pattern.
from papy import *
from IMap import IMap

#1. function definitions
@imports([('re',[])])
def grep(inbox, pattern):
    """ Searches input string for pattern.
        returns None if pattern is not found.
    """
    if re.search(pattern, inbox[0]):
        return inbox[0]

#2. parallelism and topology of pipeline
def pipeline(Imap):
    # wrap the functions into pipers
    # follow =True argumetn like 'tail -f'
    grepper_w = Worker(grep, ('papy',))
    grepper = Piper(grepper_w, parallel =Imap)
    printer = Piper(workers.io.print_)
    # define the topology
    pipes = Plumber()
    pipes.add_pipe((grepper, printer))
    return pipes

#3. run-time
if __name__ == '__main__':
    #
    for imap_ in [IMap(worker_type ='process', worker_num =2)]:
        # make input input
        handle = open('input_file', 'rb')
        handle_generator = workers.io.load_file(handle, follow =True)
        # get pipeline instance
        pipes = pipeline(imap_)
        # connect input data
        pipes.set_inputs([handle_generator])
        # start calculations/processing in background
        pipes.plunge()
        print """ In PaPy a thread pulls at the intput (handle_generator)
                  If the generator reaches the end it raises StopIteration
                  which makes PaPy stop gracefully.

                  The follow =True argument tells the generator to follow the
                  file like 'tail -f'. No StopIteration will ever be raised.
                  We can tell the pipeline to shut-down manually::
:
                    pipes.chinkup()

                  The pulling thread will notice this only after it has
                  received a result, because pipelines are not allowed to miss
                  any data-items (or events). We have to generate a sentinel

                    echo 'papy' >> input_file
              """

PaPy is flexible:

PaPy has features:

PaPy has (optional) dependencies:

PaPy has limitations:









Hosted by Google Code