My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from __future__ import division
import win32file
import pywintypes
import socket
import ctypes
import struct
import sys
from time import sleep

from base import ProactorBase
from cogen.core.util import priority
from cogen.core.sockets import Socket, SocketError, ConnectionClosed
from cogen.core.coroutines import CoroutineException

def perform_recv(act, overlapped):
act.buff = win32file.AllocateReadBuffer(act.len)
return win32file.WSARecv(act.sock._fd, act.buff, overlapped, 0)

def complete_recv(act, rc, nbytes):
if nbytes:
act.buff = act.buff[:nbytes]
return act
else:
raise ConnectionClosed("Empty recv.")


def perform_send(act, overlapped):
return win32file.WSASend(act.sock._fd, act.buff, overlapped, 0)

def complete_send(act, rc, nbytes):
act.sent = nbytes
return act.sent and act


def perform_sendall(act, overlapped):
return win32file.WSASend(act.sock._fd, act.buff[act.sent:], overlapped, 0)

def complete_sendall(act, rc, nbytes):
act.sent += nbytes
return act.sent == len(act.buff) and act


def perform_accept(act, overlapped):
act.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
act.cbuff = win32file.AllocateReadBuffer(64)
return win32file.AcceptEx(
act.sock._fd.fileno(), act.conn.fileno(), act.cbuff, overlapped
), 0

def complete_accept(act, rc, nbytes):
act.conn.setblocking(0)
act.conn.setsockopt(
socket.SOL_SOCKET,
win32file.SO_UPDATE_ACCEPT_CONTEXT,
struct.pack("I", act.sock.fileno())
)
family, localaddr, act.addr = win32file.GetAcceptExSockaddrs(
act.conn, act.cbuff
)
act.conn = act.sock.__class__(_sock=act.conn)
return act


def perform_connect(act, overlapped):
# ConnectEx requires that the socket be bound beforehand
try:
# just in case we get a already-bound socket
act.sock.bind(('0.0.0.0', 0))
except socket.error, exc:
if exc[0] not in (errno.EINVAL, errno.WSAEINVAL):
raise
return win32file.ConnectEx(act.sock, act.addr, overlapped)

def complete_connect(act, rc, nbytes):
act.sock.setsockopt(socket.SOL_SOCKET, win32file.SO_UPDATE_CONNECT_CONTEXT, "")
return act

def perform_sendfile(act, overlapped):
return win32file.TransmitFile(
act.sock,
win32file._get_osfhandle(act.file_handle.fileno()),
act.length or 0,
act.blocksize, overlapped, 0
), 0

def complete_sendfile(act, rc, nbytes):
act.sent = nbytes
return act

class IOCPProactor(ProactorBase):
supports_multiplex_first = False

def __init__(self, scheduler, res, **options):
super(self.__class__, self).__init__(scheduler, res, **options)
self.scheduler = scheduler
self.iocp = win32file.CreateIoCompletionPort(
win32file.INVALID_HANDLE_VALUE, None, 0, 0
)
if not hasattr(win32file, 'ConnectEx'):
#todo, move this in iocp class
import warnings
warnings.warn("IOCPProactor implementation requires a newer pywin32 module (win32file doesn't have ConnectEx).")
if not hasattr(win32file, 'TransmitFile'):
#todo, move this in iocp class
import warnings
warnings.warn("IOCPProactor implementation requires a newer pywin32 module (win32file doesn't have TransmitFile).")


def set_options(self, **bogus_options):
self._warn_bogus_options(**bogus_options) #iocp doesn't have any options

def request_recv(self, act, coro):
return self.request_generic(act, coro, perform_recv, complete_recv)

def request_send(self, act, coro):
return self.request_generic(act, coro, perform_send, complete_send)

def request_sendall(self, act, coro):
return self.request_generic(act, coro, perform_sendall, complete_sendall)

def request_accept(self, act, coro):
return self.request_generic(act, coro, perform_accept, complete_accept)

def request_connect(self, act, coro):
return self.request_generic(act, coro, perform_connect, complete_connect)

def request_sendfile(self, act, coro):
return self.request_generic(act, coro, perform_sendfile, complete_sendfile)

def request_generic(self, act, coro, perform, complete):
"""
Performs an overlapped request (via `perform` callable) and saves
the token and the (`overlapped`, `perform`, `complete`) trio.
"""
overlapped = pywintypes.OVERLAPPED()
overlapped.object = act
self.add_token(act, coro, (overlapped, perform, complete))

try:
rc, nbytes = perform(act, overlapped)

if rc == 0:
# ah geez, it didn't got in the iocp, we have a result!"
win32file.PostQueuedCompletionStatus(
self.iocp, nbytes, 0, overlapped
)
except pywintypes.error, exc:
raise SocketError(exc)

def register_fd(self, act, performer):
if not act.sock._proactor_added:
win32file.CreateIoCompletionPort(act.sock._fd.fileno(), self.iocp, 0, 0)
act.sock._proactor_added = True

def unregister_fd(self, act):
win32file.CancelIo(act.sock._fd.fileno())


def try_run_act(self, act, func, rc, nbytes):
try:
return func(act, rc, nbytes)
except:
return CoroutineException(*sys.exc_info())

def process_op(self, rc, nbytes, overlap):
"""
Handles the possible completion or re-queueing if conditions haven't
been met (the `complete` callable returns false) of a overlapped request.
"""
act = overlap.object
overlap.object = None
if act in self.tokens:
ol, perform, complete = self.tokens[act]
assert ol is overlap
if rc == 0:
ract = self.try_run_act(act, complete, rc, nbytes)
if ract:
del self.tokens[act]
win32file.CancelIo(act.sock._fd.fileno())
return ract, act.coro
else:
# operation hasn't completed yet (not enough data etc)
# read it in the iocp
self.request_generic(act, act.coro, perform, complete)


else:
#looks like we have a problem, forward it to the coroutine.

# this needs some research: ERROR_NETNAME_DELETED, need to reopen
#the accept sock ?! something like:
# warnings.warn("ERROR_NETNAME_DELETED: %r. Re-registering operation." % op)
# self.registered_ops[op] = self.run_iocp(op, coro)
del self.tokens[act]
win32file.CancelIo(act.sock._fd.fileno())
return CoroutineException((
SocketError, SocketError(
(rc, "%s on %r" % (ctypes.FormatError(rc), act))
)
)), act.coro
else:
import warnings
warnings.warn("Unknown token %s" % act)

def run(self, timeout = 0):
"""
Calls GetQueuedCompletionStatus and handles completion via
IOCPProactor.process_op.
"""
# same resolution as epoll
ptimeout = int(
timeout.days * 86400000 +
timeout.microseconds / 1000 +
timeout.seconds * 1000
if timeout else (self.m_resolution if timeout is None else 0)
)
if self.tokens:
scheduler = self.scheduler
urgent = None
# we use urgent as a optimisation: the last operation is returned
#directly to the scheduler (the sched might just run it till it
#goes to sleep) and not added in the sched.active queue
while 1:
try:
rc, nbytes, key, overlap = win32file.GetQueuedCompletionStatus(
self.iocp,
0 if urgent else ptimeout
)
except RuntimeError:
# we will get "This overlapped object has lost all its
# references so was destroyed" when we remove a operation,
# it is garbage collected and the overlapped completes
# afterwards
break

# well, this is a bit weird, if we get a aborted rc (via CancelIo
#i suppose) evaluating the overlap crashes the interpeter
#with a memory read error
if rc != win32file.WSA_OPERATION_ABORTED and overlap:

if urgent:
op, coro = urgent
urgent = None
if op.prio & priority.OP:
# imediately run the asociated coroutine step
op, coro = scheduler.process_op(
coro.run_op(op, scheduler),
coro
)
if coro:
#TODO, what "op and "
if op and (op.prio & priority.CORO):
scheduler.active.appendleft( (op, coro) )
else:
scheduler.active.append( (op, coro) )
if overlap.object:
urgent = self.process_op(rc, nbytes, overlap)
else:
break
return urgent
else:
sleep(timeout)
Show details Hide details

Change log

r588 by ionel.mc on Apr 26, 2009   Diff
Removed trailing spaces. Fixed
inconsistent end of line format.

Merged from amcnabb8's branch.
Go to: 
Project members, sign in to write a code review

Older revisions

r586 by ionel.mc on Apr 26, 2009   Diff
added an 'sched' param to Operation's
finalize method thourgh all the code
to support additional cleanup (mainly
for TimedOperation to remove itself
from the timeouts heapq on
...
r568 by ionel.mc on Jan 18, 2009   Diff
changed the constructor for the new
socket from accept to be the same as
the acceptor socket's class
r556 by ionel.mc on Jan 03, 2009   Diff
some cleanup like remove useless
imports and a couple of relics of old
past
All revisions of this file

File info

Size: 9912 bytes, 262 lines
Hosted by Google Code