PaPy is a framework to construct and execute pipelines (flow-charts) of arbitrary tasks in parallel, either locally, using multi-processing or multi-threading, or remotely, using RPC (Remote Procedure Calls) as provided by RPyC (Remote Python Call). The goal is to enable users to create highly generic, modular and workflows for execution on ad hoc computing grids, consisting of, local workstations and remote desktops/servers.
This software is developed in the Mura Lab at the University of Virginia.
The pipeline is represented as an arbitrary directed acyclic graph. The user has to define functions of the nodes (called Pipers) and the edges (called pipes), which represent data-flow or dependency. Piper instances are assigned to virtual resources (called IMaps) which are technically pools of local and/or remote processes or threads. IMaps are similar to multiprocessing.Pool imap method, but support multiple tasks i.e. (function, iterable) tuples, which are interwoven rather than being evaluated one after another.
An example of a valid Piper function (it identifies the host, process and itself):
@imports([['os',[]], ['socket',[]], ['sys', []]])
def hid(i):
return "item %s is on host:%s using process:%s, and function:%s" %\
(i[0], socket.gethostname(), os.getpid(), id(hid))Creates a IMap instance, which utilizes 4 processes:
local_pool = IMap(worker_num =4)
Creates a IMap instance, which utilizes 4 threads:
local_thread_pool = IMap(worker_type ='thread', worker_num =4)
Creates a IMap instance, which utilizes 8 remote processes on 2 hosts.
remote_pool = IMap(worker_num =0, worker_remote =[['host1', 4],['host2:port'], 4]])
The worker_num =0 overrides the default of creating local worker processes in the number of available CPUs. Remote hosts ('host1' and 'host2') should run a RPyC classic server in forking mode i.e.:
python classic_server.py -m 'forking'
The input to the pipeline needs to be a collection and PaPy processes data in the pipeline in batches of adjustable size which allows for a parallel(memory consumption) vs lazy(immediate results) tradeoff.
A generic pipeline template can be found here: http://papy.googlecode.com/svn/trunk/src/papy/utils/templates/pipeline.py
And here is a typical example for concurrency frameworks:
# This pipeline follows and greps a file for a specified search pattern.
from papy import *
from IMap import IMap
#1. function definitions
@imports([('re',[])])
def grep(inbox, pattern):
""" Searches input string for pattern.
returns None if pattern is not found.
"""
if re.search(pattern, inbox[0]):
return inbox[0]
#2. parallelism and topology of pipeline
def pipeline(Imap):
# wrap the functions into pipers
# follow =True argumetn like 'tail -f'
grepper_w = Worker(grep, ('papy',))
grepper = Piper(grepper_w, parallel =Imap)
printer = Piper(workers.io.print_)
# define the topology
pipes = Plumber()
pipes.add_pipe((grepper, printer))
return pipes
#3. run-time
if __name__ == '__main__':
#
for imap_ in [IMap(worker_type ='process', worker_num =2)]:
# make input input
handle = open('input_file', 'rb')
handle_generator = workers.io.load_file(handle, follow =True)
# get pipeline instance
pipes = pipeline(imap_)
# connect input data
pipes.set_inputs([handle_generator])
# start calculations/processing in background
pipes.plunge()
print """ In PaPy a thread pulls at the intput (handle_generator)
If the generator reaches the end it raises StopIteration
which makes PaPy stop gracefully.
The follow =True argument tells the generator to follow the
file like 'tail -f'. No StopIteration will ever be raised.
We can tell the pipeline to shut-down manually::
:
pipes.chinkup()
The pulling thread will notice this only after it has
received a result, because pipelines are not allowed to miss
any data-items (or events). We have to generate a sentinel
echo 'papy' >> input_file
"""PaPy is flexible:
- graph topology is unrestricted
- user-function code is unrestricted
- the number of inputs is unrestricted
- the number of IMaps is unrestricted
- the number of used processes/thread is unrestricted
- pipers can be assigned to IMaps arbitrarily
- functions can be composed into workers flexibly
- data can be sharing via queues, files or memory(posix only)
- IMaps can be shared (load-balancing)
- memory-parallelism-laziness trade-off is adjustable
- cross-platform hosts are supported
PaPy has features:
- construction of arbitrarily complex pipelines
- flexible local and remote parallelism
- shared local and remote resources
- robustness to exceptions
- support for time-outs
- real-time logging
- os-independent (really a feature of multiprocessing)
- cross-platform (really a feature of RPyC)
- in memory communication (really a feature of posix_ipc)
- tested & documented.
PaPy has (optional) dependencies:
- Python 2.5 or 2.6
- multiprocessing if on Python 2.5 (not recommended)
- RPyC for distributed computing
- posix_ipc for efficiency on shared memory systems
- simplejson for JSON functionality
PaPy has limitations:
- functions have strict call/return semantics
- all python modules used must be available on all hosts
- threads and processes cannot be pooled in a single IMap instance
- all inputs have to be of the same length
- all input items need to be picklable