My favorites | Sign in
Project Home Downloads Wiki Issues Source
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# Copyright (C) 2009, Hyves (Startphone Ltd.)
#
# This module is part of the Concurrence Framework and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php

"""This module implements the stackless API on top of py.magic greenlet API
This way it is possible to run concurrence applications on top of normal python
using the greenlet module.
Because the greenlet module uses only 'hard' switching as opposed to stackless 'soft' switching
it is a bit slower (about 35%), but very usefull because you don't need to install stackless.
Note that this does not aim to be a complete implementation of stackless on top of greenlets,
just enough of the stackless API to make concurrence run.
This code was inspired by:
http://aigamedev.com/programming-tips/round-robin-multi-tasking and
also by the pypy implementation of the same thing (buggy, not being maintained?) at
https://codespeak.net/viewvc/pypy/dist/pypy/lib/stackless.py?view=markup
"""

try:
from py.magic import greenlet #as of version 1.0 of py, it does not supply greenlets anymore
except ImportError:
from greenlet import greenlet #there is an older package containing just the greenlet lib

from collections import deque

class TaskletExit(SystemExit):pass

import __builtin__
__builtin__.TaskletExit = TaskletExit


class bomb(object):
"""used as a result value for sending exceptions trough a channel"""
def __init__(self, exc_type = None, exc_value = None, exc_traceback = None):
self.type = exc_type
self.value = exc_value
self.traceback = exc_traceback

def raise_(self):
raise self.type, self.value, self.traceback

class channel(object):
"""implementation of stackless's channel object"""
def __init__(self):
self.balance = 0
self.queue = deque()

def receive(self):
return _scheduler._receive(self)

def send(self, data):
return _scheduler._send(self, data)

def send_exception(self, exp_type, *args):
self.send(bomb(exp_type, exp_type(*args)))

def send_sequence(self, iterable):
for item in iterable:
self.send(item)



class tasklet(object):
"""implementation of stackless's tasklet object"""

def __init__(self, f = None, greenlet = None, alive = False):
self.greenlet = greenlet
self.func = f
self.alive = alive
self.blocked = False
self.data = None

def bind(self, func):
if not callable(func):
raise TypeError('tasklet function must be a callable')
self.func = func

def __call__(self, *args, **kwargs):
"""this is where the new task starts to run, e.g. it is where the greenlet is created
and the 'task' is first scheduled to run"""
if self.func is None:
raise TypeError('tasklet function must be a callable')

def _func(*_args, **_kwargs):
try:
self.func(*args, **kwargs)
except TaskletExit:
pass #let it pass silently
except:
import logging
logging.exception('unhandled exception in greenlet')
#don't propagate to parent
finally:
assert _scheduler.current == self
_scheduler.remove(self)
if _scheduler._runnable: #there are more tasklets scheduled to run next
#this make sure that flow will continue in the correct greenlet, e.g. the next in the schedule
self.greenlet.parent = _scheduler._runnable[0].greenlet
self.alive = False
del self.greenlet
del self.func
del self.data

self.greenlet = greenlet(_func)
self.alive = True
_scheduler.append(self)
return self

def kill(self):
_scheduler.throw(self, TaskletExit)

def raise_exception(self, *args):
_scheduler.throw(self, *args)

def __str__(self):
return repr(self)

def __repr__(self):
if hasattr(self, 'name'):
_id = self.name
else:
_id = str(self.func)
return '<tasklet %s at %0x>' % (_id, id(self))

class scheduler(object):
def __init__(self):
self._main_task = tasklet(greenlet = greenlet.getcurrent(), alive = True)
#all non blocked tast are in this queue
#all tasks are only onces in this queue
#the current task is the first item in the queue
self._runnable = deque([self._main_task])

def schedule(self):
"""schedules the next tasks and puts the current task back at the queue of runnables"""
self._runnable.rotate(-1)
next_task = self._runnable[0]
next_task.greenlet.switch()

def schedule_block(self):
"""blocks the current task and schedules next"""
self._runnable.popleft()
next_task = self._runnable[0]
next_task.greenlet.switch()

def throw(self, task, *args):
if not task.alive: return #this is what stackless does

assert task.blocked or task in self._runnable

task.greenlet.parent = self._runnable[0].greenlet
if task.blocked:
self._runnable.appendleft(task)
else:
self._runnable.remove(task)
self._runnable.appendleft(task)

task.greenlet.throw(*args)


def _receive(self, channel):
#Receiving 1):
#A tasklet wants to receive and there is
#a queued sending tasklet. The receiver takes
#its data from the sender, unblocks it,
#and inserts it at the end of the runnables.
#The receiver continues with no switch.
#Receiving 2):
#A tasklet wants to receive and there is
#no queued sending tasklet.
#The receiver will become blocked and inserted
#into the queue. The next sender will
#handle the rest through "Sending 1)".
if channel.queue: #some sender
channel.balance -= 1
sender = channel.queue.popleft()
sender.blocked = False
self._runnable.append(sender)
data, sender.data = sender.data, None
else: #no sender
current = self._runnable[0]
channel.queue.append(current)
channel.balance -= 1
current.blocked = True
try:
self.schedule_block()
except:
channel.queue.remove(current)
channel.balance += 1
current.blocked = False
raise

data, current.data = current.data, None

if isinstance(data, bomb):
data.raise_()
else:
return data

def _send(self, channel, data):
# Sending 1):
# A tasklet wants to send and there is
# a queued receiving tasklet. The sender puts
# its data into the receiver, unblocks it,
# and inserts it at the top of the runnables.
# The receiver is scheduled.
# Sending 2):
# A tasklet wants to send and there is
# no queued receiving tasklet.
# The sender will become blocked and inserted
# into the queue. The next receiver will
# handle the rest through "Receiving 1)".
#print 'send q', channel.queue
if channel.queue: #some receiver
channel.balance += 1
receiver = channel.queue.popleft()
receiver.data = data
receiver.blocked = False
self._runnable.rotate(-1)
self._runnable.appendleft(receiver)
self._runnable.rotate(1)
self.schedule()
else: #no receiver
current = self.current
channel.queue.append(current)
channel.balance += 1
current.data = data
current.blocked = True
try:
self.schedule_block()
except:
channel.queue.remove(current)
channel.balance -= 1
current.data = None
current.blocked = False
raise

def remove(self, task):
assert task.blocked or task in self._runnable
if task in self._runnable:
self._runnable.remove(task)

def append(self, task):
assert task not in self._runnable
self._runnable.append(task)

@property
def runcount(self):
return len(self._runnable)

@property
def current(self):
return self._runnable[0]

#there is only 1 scheduler, this is it:
_scheduler = scheduler()

def getruncount():
return _scheduler.runcount

def getcurrent():
return _scheduler.current

def schedule():
return _scheduler.schedule()


Change log

r113 by henkpunt on Mar 28, 2009   Diff
fixed issues and preparing 0.3.1
Go to: 
Project members, sign in to write a code review

Older revisions

r77 by henkpunt on Feb 11, 2009   Diff
some small optimizations after
profiling
r74 by henkpunt on Feb 6, 2009   Diff
some optimizations
r73 by henkpunt on Feb 6, 2009   Diff
profiling
All revisions of this file

File info

Size: 8949 bytes, 266 lines
Powered by Google Project Hosting