My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from __future__ import division
from select import poll, POLLERR, POLLHUP, POLLNVAL, POLLIN, POLLPRI, POLLOUT
from time import sleep

from base import ProactorBase, perform_recv, perform_accept, perform_send, \
perform_sendall, perform_sendfile, \
perform_connect

from cogen.core.sockets import ConnectionClosed

class PollProactor(ProactorBase):
POLL_ERR = POLLERR | POLLHUP | POLLNVAL
POLL_IN = POLLIN | POLLPRI | POLL_ERR
POLL_OUT = POLLOUT | POLL_ERR

def __init__(self, scheduler, res, **options):
super(self.__class__, self).__init__(scheduler, res, **options)
self.scheduler = scheduler
self.poller = poll()
self.shadow = {}

def unregister_fd(self, act, fd=None):
fileno = fd or act.sock.fileno()
try:
del self.shadow[fileno]
except KeyError, e:
import warnings
warnings.warn("fd remove error: %r" % e)
self.poller.unregister(fileno)

def register_fd(self, act, performer):
fileno = act.sock.fileno()
self.shadow[fileno] = act
flag = self.POLL_IN if performer == perform_recv \
or performer == perform_accept else self.POLL_OUT
self.poller.register(fileno, flag | self.POLL_ERR)

def run(self, timeout = 0):
"""
Run a proactor loop and return new socket events. Timeout is a timedelta
object, 0 if active coros or None.
"""
# poll timeout param is a integer number of miliseconds (seconds/1000).
ptimeout = int(
timeout.days * 86400000 +
timeout.microseconds / 1000 +
timeout.seconds * 1000
if timeout else (self.m_resolution if timeout is None else 0)
)
if self.tokens:
events = self.poller.poll(ptimeout)
len_events = len(events)-1
for nr, (fd, ev) in enumerate(events):
act = self.shadow.pop(fd)
if ev & POLLHUP:
self.poller.unregister(fd)
self.handle_error_event(act, 'Hang up.', ConnectionClosed)
elif ev & POLLNVAL:
self.poller.unregister(fd)
self.handle_error_event(act, 'Invalid descriptor.')
elif ev & POLLERR:
self.poller.unregister(fd)
self.handle_error_event(act, 'Unknown error.')
else:
if nr == len_events:
ret = self.yield_event(act)
if ret:
self.poller.unregister(fd)
else:
self.shadow[fd] = act
return ret
else:
if self.handle_event(act):
self.poller.unregister(fd)
else:
self.shadow[fd] = act

else:
sleep(timeout)
Show details Hide details

Change log

r588 by ionel.mc on Apr 26, 2009   Diff
Removed trailing spaces. Fixed
inconsistent end of line format.

Merged from amcnabb8's branch.
Go to: 
Project members, sign in to write a code review

Older revisions

r556 by ionel.mc on Jan 03, 2009   Diff
some cleanup like remove useless
imports and a couple of relics of old
past
r551 by ionel.mc on Dec 30, 2008   Diff
Remake of the previous error handling
fix (didn't worked that well with
ops_greedy set to True due to side-
effects while the token is still in
the shadow).
r550 by ionel.mc on Dec 30, 2008   Diff
moved CoroutineException from
events.py in coroutines.py;
renamed ConnectionError to
SocketError;
moved SocketError, ConnectionError in
...
All revisions of this file

File info

Size: 3126 bytes, 79 lines
Hosted by Google Code