My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
__all__ = ['PublishSubscribeQueue']
import events
from util import priority

class PSPut(events.Operation):
__slots__ = ('queue', 'message', 'key')

def __init__(self, queue, message, key, **kws):
super(PSPut, self).__init__(**kws)
self.queue = queue
self.message = message
self.key = key

def process(self, sched, coro):
super(PSPut, self).process(sched, coro)
self.queue.messages.append(self.message)
result = [self.message]
for getkey in self.queue.active_subscribers:
self.queue.subscribers[getkey] += 1
getop, getcoro = self.queue.active_subscribers[getkey]
getop.result = result
if getop.prio:
sched.active.appendleft((getop, getcoro))
else:
sched.active.append((getop, getcoro))
self.queue.active_subscribers.clear()
if self.prio & priority.CORO:
return self, coro
else:
if self.prio & priority.OP:
sched.active.appendleft((self, coro))
else:
sched.active.append((self, coro))
return None, None

class PSGet(events.TimedOperation):
__slots__ = ('queue', 'result', 'key')

def __init__(self, queue, key, **kws):
super(PSGet, self).__init__(**kws)
self.queue = queue
self.key = key

def process(self, sched, coro):
super(PSGet, self).process(sched, coro)
key = self.key or coro
assert key in self.queue.subscribers
level = self.queue.subscribers[key]
queue_level = len(self.queue.messages)
if level < queue_level:
self.result = self.queue.messages[level:] if level \
else self.queue.messages
self.queue.subscribers[key] = queue_level
return self, coro
else:
self.queue.active_subscribers[key] = self, coro

def finalize(self, sched):
super(PSGet, self).finalize(sched)
return self.result

def cleanup(self, sched, coro):
for key in self.queue.active_subscribers:
getop, getcoro = self.queue.active_subscribers[key]
if coro is getcoro:
assert getop is self

del self.queue.active_subscribers[key]
return True

class PSSubscribe(events.Operation):
__slots__ = ('queue', 'key')

def __init__(self, queue, key, **kws):
super(PSSubscribe, self).__init__(**kws)
self.queue = queue
self.key = key

def process(self, sched, coro):
super(PSSubscribe, self).process(sched, coro)
self.queue.subscribers[self.key or coro] = 0
return self, coro

class PSUnsubscribe(events.Operation):
__slots__ = ('queue', 'key')

def __init__(self, queue, key, **kws):
super(PSUnsubscribe, self).__init__(**kws)
self.queue = queue
self.key = key

def process(self, sched, coro):
super(PSUnsubscribe, self).process(sched, coro)
del self.queue.subscribers[self.key or coro]
return self, coro


class PublishSubscribeQueue:
"""A more robust replacement for the signal operations.
A coroutine subscribes itself to a PublishSubscribeQueue and get new
published messages with _fetch_ method.
"""
def __init__(self):
self.messages = []
self.subscribers = {}
self.active_subscribers = {} # holds waiting fetch ops

def publish(self, message, key=None, **kws):
"""Put a message in the queue and updates any coroutine wating with
fetch. *works as a coroutine operation*"""
return PSPut(self, message, key, **kws)

def subscribe(self, key=None, **kws):
"""Registers the calling coroutine to the queue. Sets the update index
to 0 - on fetch, that coroutine will get all the messages from the
queue. *works as a coroutine operation*"""
return PSSubscribe(self, key, **kws)

def unsubscribe(self, key=None, **kws):
"""Unregisters the calling coroutine to the queue. """
# TODO: unittest
return PSUnsubscribe(self, key, **kws)

def fetch(self, key=None, **kws):
"""Get all the new messages since the last fetch. Returns a list
of messages. *works as a coroutine operation*"""
return PSGet(self, key, **kws)

def compact(self):
"""Compacts the queue: removes all the messages from the queue that
have been fetched by all the subscribed coroutines.
Returns the number of messages that have been removed."""
if self.subscribers:
level = min(self.subscribers.itervalues())
if level:
del self.messages[:level]
return level
else:
level = len(self.messages)
del self.messages[:]
return level
Show details Hide details

Change log

r588 by ionel.mc on Apr 26, 2009   Diff
Removed trailing spaces. Fixed
inconsistent end of line format.

Merged from amcnabb8's branch.
Go to: 
Project members, sign in to write a code review

Older revisions

r586 by ionel.mc on Apr 26, 2009   Diff
added an 'sched' param to Operation's
finalize method thourgh all the code
to support additional cleanup (mainly
for TimedOperation to remove itself
from the timeouts heapq on
...
r556 by ionel.mc on Jan 03, 2009   Diff
some cleanup like remove useless
imports and a couple of relics of old
past
r496 by ionel.mc on Nov 11, 2008   Diff
fixed a bug (timeout will not be
raised) when timeout is about to
happen and the the pubsub key is not
the coroutine instance
All revisions of this file

File info

Size: 5046 bytes, 141 lines
Hosted by Google Code