My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from __future__ import division
from time import sleep
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLERR, EPOLLHUP, \
EPOLLET, EPOLLONESHOT, EPOLLMSG



from base import ProactorBase, perform_recv, perform_accept, perform_send, \
perform_sendall, perform_sendfile, \
perform_connect

from cogen.core.sockets import ConnectionClosed

class StdlibEpollProactor(ProactorBase):
"epoll based proactor implementation using python 2.6 select module."
def __init__(self, scheduler, res, default_size=1024, **options):
super(self.__class__, self).__init__(scheduler, res, **options)
self.scheduler = scheduler
self.epoll_obj = epoll(default_size)
self.shadow = {}

def unregister_fd(self, act, fd=None):
act.sock._proactor_added = False
fileno = fd or act.sock.fileno()
try:
del self.shadow[fileno]
except KeyError, e:
import warnings
warnings.warn("fd remove error: %r" % e)

try:
self.epoll_obj.unregister(fileno)
except OSError, e:
import warnings
warnings.warn("fd remove error: %r" % e)

def register_fd(self, act, performer):
fileno = act.sock.fileno()
self.shadow[fileno] = act
flag = EPOLLIN if performer == perform_recv \
or performer == perform_accept else EPOLLOUT

if act.sock._proactor_added:
self.epoll_obj.modify(fileno, flag | EPOLLONESHOT)
else:
self.epoll_obj.register(fileno, flag | EPOLLONESHOT)
act.sock._proactor_added = True

def run(self, timeout = 0):
"""
Run a proactor loop and return new socket events. Timeout is a timedelta
object, 0 if active coros or None.

epoll timeout param is a integer number of seconds.
"""
ptimeout = float(
timeout.microseconds/1000000+timeout.seconds if timeout
else (self.resolution if timeout is None else 0)
)
if self.tokens:
events = self.epoll_obj.poll(ptimeout, 1024)
len_events = len(events)-1
for nr, (fd, ev) in enumerate(events):
act = self.shadow.pop(fd)
if ev & EPOLLHUP:
self.epoll_obj.unregister(fd)
self.handle_error_event(act, 'Hang up.', ConnectionClosed)
elif ev & EPOLLERR:
self.epoll_obj.unregister(fd)
self.handle_error_event(act, 'Unknown error.')
else:
if nr == len_events:
ret = self.yield_event(act)
if not ret:
self.epoll_obj.modify(fd, ev | EPOLLONESHOT)
self.shadow[fd] = act
return ret
else:
if not self.handle_event(act):
self.epoll_obj.modify(fd, ev | EPOLLONESHOT)
self.shadow[fd] = act


else:
sleep(timeout)
# todo; fix this to timeout value
Show details Hide details

Change log

r592 by ionel.mc on Sep 15, 2009   Diff
Added a fix for a case when a timeout is
raised (and the associated socket
subsequently removed from the proactor)
and the socket _proactor_added flag
remains to true
Go to: 
Project members, sign in to write a code review

Older revisions

r588 by ionel.mc on Apr 26, 2009   Diff
Removed trailing spaces. Fixed
inconsistent end of line format.

Merged from amcnabb8's branch.
r556 by ionel.mc on Jan 03, 2009   Diff
some cleanup like remove useless
imports and a couple of relics of old
past
r551 by ionel.mc on Dec 30, 2008   Diff
Remake of the previous error handling
fix (didn't worked that well with
ops_greedy set to True due to side-
effects while the token is still in
the shadow).
All revisions of this file

File info

Size: 3326 bytes, 86 lines
Hosted by Google Code