My favorites | Sign in
Project Logo
                
Changes to /trunk/cogen/core/proactors/ctypes_iocp_impl/__init__.py
r509 vs. r514   Edit
  Compare: vs.   Format:
Revision r514
Go to: 
Project members, sign in to write a code review
/trunk/cogen/core/proactors/ctypes_iocp_impl/__init__.py   r509 /trunk/cogen/core/proactors/ctypes_iocp_impl/__init__.py   r514
1 from __future__ import division 1 from __future__ import division
2 # work in progress 2 # work in progress
3 3
4 from api_wrappers import _get_osfhandle, CreateIoCompletionPort, CloseHandle, \ 4 from api_wrappers import _get_osfhandle, CreateIoCompletionPort, CloseHandle, \
5 GetQueuedCompletionStatus, PostQueuedCompletionStatus, CancelIo,\ 5 GetQueuedCompletionStatus, PostQueuedCompletionStatus, CancelIo,\
6 getaddrinfo, getsockopt, WSARecv, WSASend, AcceptEx, WSAIoctl, \ 6 getaddrinfo, getsockopt, WSARecv, WSASend, AcceptEx, WSAIoctl, \
7 GetAcceptExSockaddrs, ConnectEx, TransmitFile, AllocateBuffer, \ 7 GetAcceptExSockaddrs, ConnectEx, TransmitFile, AllocateBuffer, \
8 LPOVERLAPPED, OVERLAPPED, LPDWORD, PULONG_PTR, cast, c_void_p, \ 8 LPOVERLAPPED, OVERLAPPED, LPDWORD, PULONG_PTR, cast, c_void_p, \
9 byref, c_char_p, create_string_buffer, c_ulong, DWORD, WSABUF, \ 9 byref, c_char_p, create_string_buffer, c_ulong, DWORD, WSABUF, \
10 c_long, addrinfo_p, getaddrinfo, addrinfo, WSAPROTOCOL_INFO, \ 10 c_long, addrinfo_p, getaddrinfo, addrinfo, WSAPROTOCOL_INFO, \
11 c_int, sizeof, string_at, get_osfhandle 11 c_int, sizeof, string_at, get_osfhandle, sockaddr_in
12 12
13 from api_consts import SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, \ 13 from api_consts import SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, \
14 INVALID_HANDLE_VALUE, WSA_OPERATION_ABORTED, WSA_IO_PENDING, \ 14 INVALID_HANDLE_VALUE, WSA_OPERATION_ABORTED, WSA_IO_PENDING, \
15 SOL_SOCKET, SO_PROTOCOL_INFOA 15 SOL_SOCKET, SO_PROTOCOL_INFOA
16 16
17 17
18 import sys 18 import sys
19 import socket 19 import socket
20 import ctypes 20 import ctypes
21 import struct 21 import struct
22 22
23 from time import sleep 23 from time import sleep
24 24
25 from cogen.core.proactors.base import ProactorBase 25 from cogen.core.proactors.base import ProactorBase
26 from cogen.core.util import priority, debug 26 from cogen.core.util import priority, debug
27 from cogen.core.sockets import Socket 27 from cogen.core.sockets import Socket
28 from cogen.core.events import ConnectionClosed, ConnectionError, CoroutineException 28 from cogen.core.events import ConnectionClosed, ConnectionError, CoroutineException
29 def perform_recv(act, overlapped): 29 def perform_recv(act, overlapped):
30 wsabuf = WSABUF() 30 wsabuf = WSABUF()
31 buf = create_string_buffer(act.len) 31 buf = create_string_buffer(act.len)
32 wsabuf.buf = cast(buf, c_char_p) 32 wsabuf.buf = cast(buf, c_char_p)
33 wsabuf.len = act.len 33 wsabuf.len = act.len
34 nbytes = c_ulong(0) 34 nbytes = c_ulong(0)
35 flags = c_ulong(0) 35 flags = c_ulong(0)
36 act.flags = buf 36 act.flags = buf
37 37
38 rc = WSARecv( 38 rc = WSARecv(
39 act.sock._fd.fileno(), # SOCKET s 39 act.sock._fd.fileno(), # SOCKET s
40 byref(wsabuf), # LPWSABUF lpBuffers 40 byref(wsabuf), # LPWSABUF lpBuffers
41 1, # DWORD dwBufferCount 41 1, # DWORD dwBufferCount
42 byref(nbytes), # LPDWORD lpNumberOfBytesRecvd 42 byref(nbytes), # LPDWORD lpNumberOfBytesRecvd
43 byref(flags), # LPDWORD lpFlags 43 byref(flags), # LPDWORD lpFlags
44 overlapped, # LPWSAOVERLAPPED lpOverlapped 44 overlapped, # LPWSAOVERLAPPED lpOverlapped
45 None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine 45 None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
46 ) 46 )
47 return rc, nbytes.value 47 return rc, nbytes.value
48 48
49 #~ @debug(0) 49 #~ @debug(0)
50 def complete_recv(act, rc, nbytes): 50 def complete_recv(act, rc, nbytes):
51 if nbytes: 51 if nbytes:
52 act.buff = act.flags[:nbytes] 52 act.buff = act.flags[:nbytes]
53 return act 53 return act
54 else: 54 else:
55 raise ConnectionClosed("Empty recv.") 55 raise ConnectionClosed("Empty recv.")
56 56
57 57
58 def perform_send(act, overlapped): 58 def perform_send(act, overlapped):
59 wsabuf = WSABUF() 59 wsabuf = WSABUF()
60 wsabuf.buf = c_char_p(act.buff) 60 wsabuf.buf = c_char_p(act.buff)
61 wsabuf.len = len(act.buff) 61 wsabuf.len = len(act.buff)
62 nbytes = c_ulong() 62 nbytes = c_ulong()
63 act.flags = wsabuf, nbytes 63 act.flags = wsabuf, nbytes
64 64
65 return WSASend( 65 return WSASend(
66 act.sock._fd.fileno(), # SOCKET s 66 act.sock._fd.fileno(), # SOCKET s
67 byref(wsabuf), # LPWSABUF lpBuffers 67 byref(wsabuf), # LPWSABUF lpBuffers
68 1, # DWORD dwBufferCount 68 1, # DWORD dwBufferCount
69 byref(nbytes), # LPDWORD lpNumberOfBytesSent 69 byref(nbytes), # LPDWORD lpNumberOfBytesSent
70 0, # DWORD dwFlags 70 0, # DWORD dwFlags
71 overlapped, # LPWSAOVERLAPPED lpOverlapped 71 overlapped, # LPWSAOVERLAPPED lpOverlapped
72 None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine 72 None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
73 ), nbytes.value 73 ), nbytes.value
74 74
75 def complete_send(act, rc, nbytes): 75 def complete_send(act, rc, nbytes):
76 act.flags = None 76 act.flags = None
77 act.sent = nbytes 77 act.sent = nbytes
78 return act.sent and act 78 return act.sent and act
79 79
80 80
81 def perform_sendall(act, overlapped): 81 def perform_sendall(act, overlapped):
82 wsabuf = WSABUF() 82 wsabuf = WSABUF()
83 wsabuf.buf = c_char_p(act.buff[act.sent:]) 83 wsabuf.buf = c_char_p(act.buff[act.sent:])
84 wsabuf.len = len(act.buff)-act.sent 84 wsabuf.len = len(act.buff)-act.sent
85 nbytes = c_ulong() 85 nbytes = c_ulong()
86 act.flags = wsabuf, nbytes 86 act.flags = wsabuf, nbytes
87 87
88 return WSASend( 88 return WSASend(
89 act.sock._fd.fileno(), # SOCKET s 89 act.sock._fd.fileno(), # SOCKET s
90 byref(wsabuf), # LPWSABUF lpBuffers 90 byref(wsabuf), # LPWSABUF lpBuffers
91 1, # DWORD dwBufferCount 91 1, # DWORD dwBufferCount
92 byref(nbytes), # LPDWORD lpNumberOfBytesSent 92 byref(nbytes), # LPDWORD lpNumberOfBytesSent
93 0, # DWORD dwFlags 93 0, # DWORD dwFlags
94 overlapped, # LPWSAOVERLAPPED lpOverlapped 94 overlapped, # LPWSAOVERLAPPED lpOverlapped
95 None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine 95 None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
96 ), nbytes 96 ), nbytes
97 97
98 def complete_sendall(act, rc, nbytes): 98 def complete_sendall(act, rc, nbytes):
99 act.sent += nbytes 99 act.sent += nbytes
100 return act.sent == len(act.buff) and act 100 return act.sent == len(act.buff) and act
101 101
102 102
103 def perform_accept(act, overlapped): 103 def perform_accept(act, overlapped):
104 act.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 104 act.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105 act.cbuff = create_string_buffer(64) 105 act.cbuff = create_string_buffer((sizeof(sockaddr_in) + 16) * 2)
106 nbytes = c_ulong()
107
108 prot_info = WSAPROTOCOL_INFO()
109 prot_info_len = c_int(sizeof(prot_info))
110 getsockopt(act.sock.fileno(), SOL_SOCKET, SO_PROTOCOL_INFOA, cast(byref(prot_info), c_char_p), byref(prot_info_len))
111
106 # BOOL 112 # BOOL
107 return AcceptEx( 113 return AcceptEx(
108 act.sock._fd.fileno(), # SOCKET sListenSocket 114 act.sock._fd.fileno(), # SOCKET sListenSocket
109 act.conn.fileno(), # SOCKET sAcceptSocket 115 act.conn.fileno(), # SOCKET sAcceptSocket
110 cast(act.cbuff, c_void_p), # PVOID lpOutputBuffer 116 cast(act.cbuff, c_void_p), # PVOID lpOutputBuffer
111 0, # DWORD dwReceiveDataLength 117 0, # DWORD dwReceiveDataLength
112 32, # DWORD dwLocalAddressLength 118 prot_info.iMaxSockAddr + 16, # DWORD dwLocalAddressLength
113 32, # DWORD dwRemoteAddressLength 119 prot_info.iMaxSockAddr + 16, # DWORD dwRemoteAddressLength
114 None, # LPDWORD lpdwBytesReceived 120 nbytes, # LPDWORD lpdwBytesReceived
115 overlapped # LPOVERLAPPED lpOverlapped 121 overlapped # LPOVERLAPPED lpOverlapped
116 ), 0 122 ), 0
117 123
118 def complete_accept(act, rc, nbytes): 124 def complete_accept(act, rc, nbytes):
119 act.conn.setblocking(0) 125 act.conn.setblocking(0)
120 act.conn.setsockopt( 126 act.conn.setsockopt(
121 socket.SOL_SOCKET, 127 socket.SOL_SOCKET,
122 SO_UPDATE_ACCEPT_CONTEXT, 128 SO_UPDATE_ACCEPT_CONTEXT,
123 struct.pack("I", act.sock.fileno()) 129 struct.pack("I", act.sock.fileno())
124 ) 130 )
125 act.addr = act.conn.getpeername() 131 act.addr = act.conn.getpeername()
126 132
127 # void = PVOID lpOutputBuffer, DWORD dwReceiveDataLength, DWORD dwLocalAddressLength, DWORD dwRemoteAddressLength, LPSOCKADDR *LocalSockaddr, LPINT LocalSockaddrLength, LPSOCKADDR *RemoteSockaddr, LPINT RemoteSockaddrLength 133 # void = PVOID lpOutputBuffer, DWORD dwReceiveDataLength, DWORD dwLocalAddressLength, DWORD dwRemoteAddressLength, LPSOCKADDR *LocalSockaddr, LPINT LocalSockaddrLength, LPSOCKADDR *RemoteSockaddr, LPINT RemoteSockaddrLength
128 # TODO ? 134 # TODO ?
129 #~ family, localaddr, act.addr = GetAcceptExSockaddrs( 135 #~ family, localaddr, act.addr = GetAcceptExSockaddrs(
130 #~ act.conn, act.cbuff 136 #~ act.conn, act.cbuff
131 #~ ) 137 #~ )
132 act.conn = Socket(_sock=act.conn) 138 act.conn = Socket(_sock=act.conn)
133 return act 139 return act
134 140
135 def perform_connect(act, overlapped): 141 def perform_connect(act, overlapped):
136 # ConnectEx requires that the socket be bound beforehand 142 # ConnectEx requires that the socket be bound beforehand
137 try: 143 try:
138 # just in case we get a already-bound socket 144 # just in case we get a already-bound socket
139 act.sock.bind(('0.0.0.0', 0)) 145 act.sock.bind(('0.0.0.0', 0))
140 except socket.error, exc: 146 except socket.error, exc:
141 if exc[0] not in (errno.EINVAL, errno.WSAEINVAL): 147 if exc[0] not in (errno.EINVAL, errno.WSAEINVAL):
142 raise 148 raise
143 fileno = act.sock._fd.fileno() 149 fileno = act.sock._fd.fileno()
144 150
145 prot_info = WSAPROTOCOL_INFO() 151 prot_info = WSAPROTOCOL_INFO()
146 prot_info_len = c_int(sizeof(prot_info)) 152 prot_info_len = c_int(sizeof(prot_info))
147 getsockopt(fileno, SOL_SOCKET, SO_PROTOCOL_INFOA, cast(byref(prot_info), c_char_p), byref(prot_info_len)) 153 getsockopt(fileno, SOL_SOCKET, SO_PROTOCOL_INFOA, cast(byref(prot_info), c_char_p), byref(prot_info_len))
148 154
149 hints = addrinfo() 155 hints = addrinfo()
150 hints.ai_family = prot_info.iAddressFamily 156 hints.ai_family = prot_info.iAddressFamily
151 hints.ai_socktype = prot_info.iSocketType 157 hints.ai_socktype = prot_info.iSocketType
152 hints.ai_protocol = prot_info.iProtocol 158 hints.ai_protocol = prot_info.iProtocol
153 159
154 result = addrinfo_p() 160 result = addrinfo_p()
155 getaddrinfo(act.addr[0], str(act.addr[1]), byref(hints), byref(result)); 161 getaddrinfo(act.addr[0], str(act.addr[1]), byref(hints), byref(result));
156 162
157 act.flags = result 163 act.flags = result
158 164
159 #~ act.sock.bind(('0.0.0.0', 0)) 165 #~ act.sock.bind(('0.0.0.0', 0))
160 return ConnectEx( 166 return ConnectEx(
161 fileno, # SOCKET s 167 fileno, # SOCKET s
162 result.contents.ai_addr, result.contents.ai_addrlen, 168 result.contents.ai_addr, result.contents.ai_addrlen,
163 None, 169 None,
164 0, 170 0,
165 None, 171 None,
166 overlapped 172 overlapped
167 ), 0 173 ), 0
168 174
169 def complete_connect(act, rc, nbytes): 175 def complete_connect(act, rc, nbytes):
170 act.sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "") 176 act.sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "")
171 return act 177 return act
172 178
173 def perform_sendfile(act, overlapped): 179 def perform_sendfile(act, overlapped):
174 # BOOL 180 # BOOL
175 return TransmitFile( 181 return TransmitFile(
176 act.sock._fd.fileno(), # SOCKET hSocket 182 act.sock._fd.fileno(), # SOCKET hSocket
177 get_osfhandle(act.file_handle.fileno()), # HANDLE hFile 183 get_osfhandle(act.file_handle.fileno()), # HANDLE hFile
178 act.length or 0, # DWORD nNumberOfBytesToWrite 184 act.length or 0, # DWORD nNumberOfBytesToWrite
179 act.blocksize, # DWORD nNumberOfBytesPerSend 185 act.blocksize, # DWORD nNumberOfBytesPerSend
180 overlapped, # LPOVERLAPPED lpOverlapped 186 overlapped, # LPOVERLAPPED lpOverlapped
181 None, # LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers 187 None, # LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers
182 0 # DWORD dwFlags 188 0 # DWORD dwFlags
183 ), 0 189 ), 0
184 190
185 def complete_sendfile(act, rc, nbytes): 191 def complete_sendfile(act, rc, nbytes):
186 act.sent = nbytes 192 act.sent = nbytes
187 return act 193 return act
188 194
189 class CTYPES_IOCPProactor(ProactorBase): 195 class CTYPES_IOCPProactor(ProactorBase):
190 supports_multiplex_first = False 196 supports_multiplex_first = False
191 197
192 def __init__(self, scheduler, res, **options): 198 def __init__(self, scheduler, res, **options):
193 super(self.__class__, self).__init__(scheduler, res, **options) 199 super(self.__class__, self).__init__(scheduler, res, **options)
194 self.scheduler = scheduler 200 self.scheduler = scheduler
195 self.iocp = CreateIoCompletionPort( 201 self.iocp = CreateIoCompletionPort(
196 INVALID_HANDLE_VALUE, 0, None, 0 202 INVALID_HANDLE_VALUE, 0, None, 0
197 ) 203 )
198 204
199 def __del__(self): 205 def __del__(self):
200 self.close() 206 self.close()
201 207
202 def close(self): 208 def close(self):
203 if self.iocp: 209 if self.iocp:
204 poverlapped = LPOVERLAPPED() 210 poverlapped = LPOVERLAPPED()
205 nbytes = DWORD() 211 nbytes = DWORD()
206 completion_key = c_ulong() 212 completion_key = c_ulong()
207 while 1: 213 while 1:
208 rc = GetQueuedCompletionStatus( 214 rc = GetQueuedCompletionStatus(
209 self.iocp, # HANDLE CompletionPort 215 self.iocp, # HANDLE CompletionPort
210 byref(nbytes), # LPDWORD lpNumberOfBytes 216 byref(nbytes), # LPDWORD lpNumberOfBytes
211 byref(completion_key), # PULONG_PTR lpCompletionKey 217 byref(completion_key), # PULONG_PTR lpCompletionKey
212 byref(poverlapped), 218 byref(poverlapped),
213 0 219 0
214 ) 220 )
215 if not poverlapped: 221 if not poverlapped:
216 break 222 break
217 else: 223 else:
218 act = poverlapped.contents.object 224 act = poverlapped.contents.object
219 if act in self.tokens: 225 if act in self.tokens:
220 del self.tokens[act] 226 del self.tokens[act]
227 CancelIo(act.sock._fd.fileno())
221 else: 228 else:
222 import warnings 229 import warnings
223 warnings.warn("act(%s) not in self.tokens" % act) 230 warnings.warn("act(%s) not in self.tokens" % act)
224 CloseHandle(self.iocp) 231 CloseHandle(self.iocp)
225 self.iocp = None 232 self.iocp = None
226 if self.tokens: 233 if self.tokens:
227 import warnings 234 import warnings
228 warnings.warn("self.tokens still pending: %s" % self.tokens) 235 warnings.warn("self.tokens still pending: %s" % self.tokens)
229 super(self.__class__, self).close() 236 super(self.__class__, self).close()
230 237
231 def set_options(self, **bogus_options): 238 def set_options(self, **bogus_options):
232 self._warn_bogus_options(**bogus_options) #iocp doesn't have any options 239 self._warn_bogus_options(**bogus_options) #iocp doesn't have any options
233 240
234 def request_recv(self, act, coro): 241 def request_recv(self, act, coro):
235 return self.request_generic(act, coro, perform_recv, complete_recv) 242 return self.request_generic(act, coro, perform_recv, complete_recv)
236 243
237 def request_send(self, act, coro): 244 def request_send(self, act, coro):
238 return self.request_generic(act, coro, perform_send, complete_send) 245 return self.request_generic(act, coro, perform_send, complete_send)
239 246
240 def request_sendall(self, act, coro): 247 def request_sendall(self, act, coro):
241 return self.request_generic(act, coro, perform_sendall, complete_sendall) 248 return self.request_generic(act, coro, perform_sendall, complete_sendall)
242 249
243 def request_accept(self, act, coro): 250 def request_accept(self, act, coro):
244 return self.request_generic(act, coro, perform_accept, complete_accept) 251 return self.request_generic(act, coro, perform_accept, complete_accept)
245 252
246 def request_connect(self, act, coro): 253 def request_connect(self, act, coro):
247 return self.request_generic(act, coro, perform_connect, complete_connect) 254 return self.request_generic(act, coro, perform_connect, complete_connect)
248 255
249 def request_sendfile(self, act, coro): 256 def request_sendfile(self, act, coro):
250 return self.request_generic(act, coro, perform_sendfile, complete_sendfile) 257 return self.request_generic(act, coro, perform_sendfile, complete_sendfile)
251 258
252 def request_generic(self, act, coro, perform, complete): 259 def request_generic(self, act, coro, perform, complete):
253 """ 260 """
254 Performs an overlapped request (via `perform` callable) and saves 261 Performs an overlapped request (via `perform` callable) and saves
255 the token and the (`overlapped`, `perform`, `complete`) trio. 262 the token and the (`overlapped`, `perform`, `complete`) trio.
256 """ 263 """
257 overlapped = OVERLAPPED() 264 overlapped = OVERLAPPED()
258 overlapped.object = act 265 overlapped.object = act
259 self.add_token(act, coro, (overlapped, perform, complete)) 266 self.add_token(act, coro, (overlapped, perform, complete))
260 267
261 rc, nbytes = perform(act, overlapped) 268 rc, nbytes = perform(act, overlapped)
262 completion_key = c_long(0) 269 completion_key = c_long(0)
263 if rc == 0: 270 if rc == 0:
264 # ah geez, it didn't got in the iocp, we have a result! 271 # ah geez, it didn't got in the iocp, we have a result!
265 pass 272 pass
266 273
267 274
268 # ok this is weird, apparently this doesn't need to be requeued 275 # ok this is weird, apparently this doesn't need to be requeued
269 # - need to investigate why (TODO) 276 # - need to investigate why (TODO)
270 #~ PostQueuedCompletionStatus( 277 #~ PostQueuedCompletionStatus(
271 #~ self.iocp, # HANDLE CompletionPort 278 #~ self.iocp, # HANDLE CompletionPort
272 #~ nbytes, # DWORD dwNumberOfBytesTransferred 279 #~ nbytes, # DWORD dwNumberOfBytesTransferred
273 #~ byref(completion_key), # ULONG_PTR dwCompletionKey 280 #~ byref(completion_key), # ULONG_PTR dwCompletionKey
274 #~ overlapped # LPOVERLAPPED lpOverlapped 281 #~ overlapped # LPOVERLAPPED lpOverlapped
275 #~ ) 282 #~ )
276 elif rc != WSA_IO_PENDING: 283 elif rc != WSA_IO_PENDING:
277 raise ConnectionError(rc, "%s on %r" % (ctypes.FormatError(rc), act)) 284 raise ConnectionError(rc, "%s on %r" % (ctypes.FormatError(rc), act))
278 285
279 286
280 def register_fd(self, act, performer): 287 def register_fd(self, act, performer):
281 if not act.sock._proactor_added: 288 if not act.sock._proactor_added:
282 CreateIoCompletionPort(act.sock._fd.fileno(), self.iocp, None, 0) 289 CreateIoCompletionPort(act.sock._fd.fileno(), self.iocp, None, 0)
283 act.sock._proactor_added = True 290 act.sock._proactor_added = True
284 291
285 def unregister_fd(self, act): 292 def unregister_fd(self, act):
293 overlapped, perform, complete = self.tokens[act]
294 overlapped.object = None
286 CancelIo(act.sock._fd.fileno()) 295 CancelIo(act.sock._fd.fileno())
287 296
288 297
289 def try_run_act(self, act, func, rc, nbytes): 298 def try_run_act(self, act, func, rc, nbytes):
290 try: 299 try:
291 return func(act, rc, nbytes) 300 return func(act, rc, nbytes)
292 except: 301 except:
293 return CoroutineException(*sys.exc_info()) 302 return CoroutineException(*sys.exc_info())
294 303
295 def process_op(self, rc, nbytes, overlap): 304 def process_op(self, rc, nbytes, overlap):
296 """ 305 """
297 Handles the possible completion or re-queueing if conditions haven't 306 Handles the possible completion or re-queueing if conditions haven't
298 been met (the `complete` callable returns false) of a overlapped request. 307 been met (the `complete` callable returns false) of a overlapped request.
299 """ 308 """
300 act = overlap.object 309 act = overlap.object
301 overlap.object = None 310 overlap.object = None
302 if act in self.tokens: 311 if act in self.tokens:
303 ol, perform, complete = self.tokens[act] 312 ol, perform, complete = self.tokens[act]
304 #~ assert ol is overlap, "%r is not %r" % (ol, overlap) 313 #~ assert ol is overlap, "%r is not %r" % (ol, overlap)
305 if rc == 0: 314 if rc == 0:
306 ract = self.try_run_act(act, complete, rc, nbytes) 315 ract = self.try_run_act(act, complete, rc, nbytes)
307 if ract: 316 if ract:
308 del self.tokens[act] 317 del self.tokens[act]
309 CancelIo(act.sock._fd.fileno()) 318 CancelIo(act.sock._fd.fileno())
310 return ract, act.coro 319 return ract, act.coro
311 else: 320 else:
312 # operation hasn't completed yet (not enough data etc) 321 # operation hasn't completed yet (not enough data etc)
313 # read it in the iocp 322 # read it in the iocp
314 self.request_generic(act, act.coro, perform, complete) 323 self.request_generic(act, act.coro, perform, complete)
315 324
316 325
317 else: 326 else:
318 #looks like we have a problem, forward it to the coroutine. 327 #looks like we have a problem, forward it to the coroutine.
319 328
320 # this needs some research: ERROR_NETNAME_DELETED, need to reopen 329 # this needs some research: ERROR_NETNAME_DELETED, need to reopen
321 #the accept sock ?! something like: 330 #the accept sock ?! something like:
322 # warnings.warn("ERROR_NETNAME_DELETED: %r. Re-registering operation." % op) 331 # warnings.warn("ERROR_NETNAME_DELETED: %r. Re-registering operation." % op)
323 # self.registered_ops[op] = self.run_iocp(op, coro) 332 # self.registered_ops[op] = self.run_iocp(op, coro)
324 del self.tokens[act] 333 del self.tokens[act]
325 CancelIo(act.sock._fd.fileno()) 334 CancelIo(act.sock._fd.fileno())
326 #~ import traceback 335 #~ import traceback
327 #~ traceback.print_stack() 336 #~ traceback.print_stack()
328 return CoroutineException( 337 return CoroutineException(
329 ConnectionError, ConnectionError( 338 ConnectionError, ConnectionError(
330 (rc, "%s on %r" % (ctypes.FormatError(rc), act)) 339 (rc, "%s on %r" % (ctypes.FormatError(rc), act))
331 ) 340 )
332 ), act.coro 341 ), act.coro
333 else: 342 else:
334 import warnings 343 import warnings
335 warnings.warn("Unknown token %s" % act) 344 warnings.warn("Unknown token %s" % act)
336 345
337 def run(self, timeout = 0): 346 def run(self, timeout = 0):
338 """ 347 """
339 Calls GetQueuedCompletionStatus and handles completion via 348 Calls GetQueuedCompletionStatus and handles completion via
340 process_op. 349 process_op.
341 """ 350 """
342 # same resolution as epoll 351 # same resolution as epoll
343 ptimeout = int( 352 ptimeout = int(
344 timeout.days * 86400000 + 353 timeout.days * 86400000 +
345 timeout.microseconds / 1000 + 354 timeout.microseconds / 1000 +
346 timeout.seconds * 1000 355 timeout.seconds * 1000
347 if timeout else (self.m_resolution if timeout is None else 0) 356 if timeout else (self.m_resolution if timeout is None else 0)
348 ) 357 )
349 if self.tokens: 358 if self.tokens:
350 urgent = None 359 urgent = None
351 # we use urgent as a optimisation: the last operation is returned 360 # we use urgent as a optimisation: the last operation is returned
352 #directly to the scheduler (the sched might just run it till it 361 #directly to the scheduler (the sched might just run it till it
353 #goes to sleep) and not added in the sched.active queue 362 #goes to sleep) and not added in the sched.active queue
354 while 1: 363 while 1:
355 try: 364 try:
356 poverlapped = LPOVERLAPPED() 365 poverlapped = LPOVERLAPPED()
357 nbytes = DWORD() 366 nbytes = DWORD()
358 completion_key = c_ulong() 367 completion_key = c_ulong()
359 #~ BOOL WINAPI GetQueuedCompletionStatus( 368 #~ BOOL WINAPI GetQueuedCompletionStatus(
360 #~ __in HANDLE CompletionPort, 369 #~ __in HANDLE CompletionPort,
361 #~ __out LPDWORD lpNumberOfBytes, 370 #~ __out LPDWORD lpNumberOfBytes,
362 #~ __out PULONG_PTR lpCompletionKey, 371 #~ __out PULONG_PTR lpCompletionKey,
363 #~ __out LPOVERLAPPED *lpOverlapped, 372 #~ __out LPOVERLAPPED *lpOverlapped,
364 #~ __in DWORD dwMilliseconds 373 #~ __in DWORD dwMilliseconds
365 #~ ); 374 #~ );
366 375
367 rc = GetQueuedCompletionStatus( 376 rc = GetQueuedCompletionStatus(
368 self.iocp, # HANDLE CompletionPort 377 self.iocp, # HANDLE CompletionPort
369 byref(nbytes), # LPDWORD lpNumberOfBytes 378 byref(nbytes), # LPDWORD lpNumberOfBytes
370 byref(completion_key), # PULONG_PTR lpCompletionKey 379 byref(completion_key), # PULONG_PTR lpCompletionKey
371 byref(poverlapped), 380 byref(poverlapped),
372 0 if urgent else ptimeout 381 0 if urgent else ptimeout
373 ) 382 )
374 overlap = poverlapped and poverlapped.contents 383 overlap = poverlapped and poverlapped.contents
375 nbytes = nbytes.value 384 nbytes = nbytes.value
376 except RuntimeError, e: 385 except RuntimeError, e:
377 import warnings 386 import warnings
378 warnings.warn("RuntimeError(%s) on GetQueuedCompletionStatus." % e) 387 warnings.warn("RuntimeError(%s) on GetQueuedCompletionStatus." % e)
379 # we will get "This overlapped object has lost all its 388 # we will get "This overlapped object has lost all its
380 # references so was destroyed" when we remove a operation, 389 # references so was destroyed" when we remove a operation,
381 # it is garbage collected and the overlapped completes 390 # it is garbage collected and the overlapped completes
382 # afterwards 391 # afterwards
383 break 392 break
384 393
385 # well, this is a bit weird, if we get a aborted rc (via CancelIo 394 # well, this is a bit weird, if we get a aborted rc (via CancelIo
386 #i suppose) evaluating the overlap crashes the interpeter 395 #i suppose) evaluating the overlap crashes the interpeter
387 #with a memory read error 396 #with a memory read error
388 # also, we might get a "wait operation timed out", and no overlap pointer 397 # also, we might get a "wait operation timed out", and no overlap pointer
389 if rc != WSA_OPERATION_ABORTED and overlap: 398 if rc != WSA_OPERATION_ABORTED and overlap:
390 399
391 if urgent: 400 if urgent:
392 op, coro = urgent 401 op, coro = urgent
393 urgent = None 402 urgent = None
394 if op.prio & priority.OP: 403 if op.prio & priority.OP:
395 # imediately run the asociated coroutine step 404 # imediately run the asociated coroutine step
396 op, coro = self.scheduler.process_op( 405 op, coro = self.scheduler.process_op(
397 coro.run_op(op), 406 coro.run_op(op),
398 coro 407 coro
399 ) 408 )
400 if coro: 409 if coro:
401 #TODO, what "op and " 410 #TODO, what "op and "
402 if op and (op.prio & priority.CORO): 411 if op and (op.prio & priority.CORO):
403 self.scheduler.active.appendleft( (op, coro) ) 412 self.scheduler.active.appendleft( (op, coro) )
404 else: 413 else:
405 self.scheduler.active.append( (op, coro) ) 414 self.scheduler.active.append( (op, coro) )
406 if overlap.object: 415 if overlap.object:
416 assert overlap.object in self.tokens
407 urgent = self.process_op(rc, nbytes, overlap) 417 urgent = self.process_op(rc, nbytes, overlap)
408 else: 418 else:
409 #~ import warnings 419 #~ import warnings
410 #~ warnings.warn("rc=(%s: %s) overlap=(%s)" % (rc, ctypes.FormatError(rc), overlap)) 420 #~ warnings.warn("rc=(%s: %s) overlap=(%s)" % (rc, ctypes.FormatError(rc), overlap))
411 break 421 break
412 return urgent 422 return urgent
413 else: 423 else:
414 sleep(timeout) 424 sleep(timeout)
Hosted by Google Code