My favorites | Sign in
Project Home Downloads Wiki Issues Source
Repository:
Checkout   Browse   Changes   Clones    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python
#
# Library: pyflowctrl
# Module: core3 (prototype)
#
import sys
import collections

class EmptyStream(Exception): pass
class DataTypeInconsistent(Exception): pass

# Process statuses
NOT_STARTED = 0
WAITING = 1
PROCESSING = 2

class Stream(object):
def __init__(self, stream_type=None, data_type=None):
'''
define parameters for Stream
* stream_type = ('input', 'output')
* data_type describes type and structure of data in stream OR None

Note: data_type with value 'None' can be used when incomming or
outgoing data type is not important. Example: for class Printer
incoming data can be any type
'''
super(Stream, self).__init__()
self.__buffer = collections.deque()
self.stream_type = stream_type
self.data_type = data_type

def get(self):
''' get data from stream '''
try:
result = self.__buffer.popleft()
except IndexError:
raise EmptyStream
return result

def put(self, value):
''' put data in stream '''
self.__buffer.append(value)

class Process(object):
def __init__(self):
self.pid = None # process id
self.io = {
'input': Stream(),
'output': Stream(),
}
self.status = NOT_STARTED
self.running = self.main()

def main(self):
''' main loop '''
while True:
yield NOT_STARTED

def run_once(self, data=None):
''' wrapper for main loop

if data is specified run_once return the result of processing
if data is not specified run_once return the status of process
'''
if data:
self.io['input'].put(data)
self.running.next()
return self.io['output'].get()
else:
return self.running.next()


class ProcessFlow(object):
def __init__(self):
self.__counter_pid = 0 # process id counter
self.links = []
self.pmap = {} # process map
self.streams = {} # merged process stream

def getpid(self):
''' generate new process id '''
self.__counter_pid += 1
return self.__counter_pid

def new(self, new_process):
''' add new processe to the flow '''
new_process.pid = self.getpid()
self.pmap[new_process.pid] = new_process
return new_process.pid

def link(self, details):
''' make link between processes '''

# details = {'sid', 'son', 'tid', 'tin'}
# tid - target id
# tin - target input name
# sid - source id
# son - source output name

# check stream type before process linkage
source_datatype = self.pmap[details['sid']].io[details['son']].data_type
target_datatype = self.pmap[details['tid']].io[details['tin']].data_type
if source_datatype <> target_datatype:
if target_datatype is not None:
raise DataTypeInconsistent('%s,%s' % (self.pmap[details['sid']], self.pmap[details['tid']]))

# check if link already exists
if (details['sid'], details['son']) not in self.streams:
self.streams[(details['sid'], details['son'])] = Stream()

# process linkage
self.pmap[details['sid']].io[details['son']] = self.streams[(details['sid'], details['son'])]
self.pmap[details['tid']].io[details['tin']] = self.streams[(details['sid'], details['son'])]

def run(self):
''' run flow '''
total_procs = len(self.pmap)
stopped_procs = 0
while True:
if stopped_procs == total_procs:
break

# every cicle of process flow the number of waiting processes is 0
waiting_procs = 0

for i in self.pmap:
if self.pmap[i] is None:
continue
try:
if self.pmap[i].run_once() == WAITING:
waiting_procs += 1
except StopIteration:
self.pmap[i] = None
stopped_procs += 1

# if all processes are waiting data -> stop process flow
if waiting_procs == total_procs:
break


Change log

d90b83b80d1d by ownport <ownport> on Aug 14, 2011   Diff
pyflowctrl: new core module (core3.py)
added
Go to: 
Project members, sign in to write a code review

Older revisions

All revisions of this file

File info

Size: 4544 bytes, 141 lines
Powered by Google Project Hosting