| /trunk/cogen/core/proactors/ctypes_iocp_impl/__init__.py r509 | /trunk/cogen/core/proactors/ctypes_iocp_impl/__init__.py r514 | ||
| 1 | from __future__ import division | 1 | from __future__ import division |
|---|---|---|---|
| 2 | # work in progress | 2 | # work in progress |
| 3 | 3 | ||
| 4 | from api_wrappers import _get_osfhandle, CreateIoCompletionPort, CloseHandle, \ | 4 | from api_wrappers import _get_osfhandle, CreateIoCompletionPort, CloseHandle, \ |
| 5 | GetQueuedCompletionStatus, PostQueuedCompletionStatus, CancelIo,\ | 5 | GetQueuedCompletionStatus, PostQueuedCompletionStatus, CancelIo,\ |
| 6 | getaddrinfo, getsockopt, WSARecv, WSASend, AcceptEx, WSAIoctl, \ | 6 | getaddrinfo, getsockopt, WSARecv, WSASend, AcceptEx, WSAIoctl, \ |
| 7 | GetAcceptExSockaddrs, ConnectEx, TransmitFile, AllocateBuffer, \ | 7 | GetAcceptExSockaddrs, ConnectEx, TransmitFile, AllocateBuffer, \ |
| 8 | LPOVERLAPPED, OVERLAPPED, LPDWORD, PULONG_PTR, cast, c_void_p, \ | 8 | LPOVERLAPPED, OVERLAPPED, LPDWORD, PULONG_PTR, cast, c_void_p, \ |
| 9 | byref, c_char_p, create_string_buffer, c_ulong, DWORD, WSABUF, \ | 9 | byref, c_char_p, create_string_buffer, c_ulong, DWORD, WSABUF, \ |
| 10 | c_long, addrinfo_p, getaddrinfo, addrinfo, WSAPROTOCOL_INFO, \ | 10 | c_long, addrinfo_p, getaddrinfo, addrinfo, WSAPROTOCOL_INFO, \ |
| 11 | c_int, sizeof, string_at, get_osfhandle | 11 | c_int, sizeof, string_at, get_osfhandle, sockaddr_in |
| 12 | 12 | ||
| 13 | from api_consts import SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, \ | 13 | from api_consts import SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, \ |
| 14 | INVALID_HANDLE_VALUE, WSA_OPERATION_ABORTED, WSA_IO_PENDING, \ | 14 | INVALID_HANDLE_VALUE, WSA_OPERATION_ABORTED, WSA_IO_PENDING, \ |
| 15 | SOL_SOCKET, SO_PROTOCOL_INFOA | 15 | SOL_SOCKET, SO_PROTOCOL_INFOA |
| 16 | 16 | ||
| 17 | 17 | ||
| 18 | import sys | 18 | import sys |
| 19 | import socket | 19 | import socket |
| 20 | import ctypes | 20 | import ctypes |
| 21 | import struct | 21 | import struct |
| 22 | 22 | ||
| 23 | from time import sleep | 23 | from time import sleep |
| 24 | 24 | ||
| 25 | from cogen.core.proactors.base import ProactorBase | 25 | from cogen.core.proactors.base import ProactorBase |
| 26 | from cogen.core.util import priority, debug | 26 | from cogen.core.util import priority, debug |
| 27 | from cogen.core.sockets import Socket | 27 | from cogen.core.sockets import Socket |
| 28 | from cogen.core.events import ConnectionClosed, ConnectionError, CoroutineException | 28 | from cogen.core.events import ConnectionClosed, ConnectionError, CoroutineException |
| 29 | def perform_recv(act, overlapped): | 29 | def perform_recv(act, overlapped): |
| 30 | wsabuf = WSABUF() | 30 | wsabuf = WSABUF() |
| 31 | buf = create_string_buffer(act.len) | 31 | buf = create_string_buffer(act.len) |
| 32 | wsabuf.buf = cast(buf, c_char_p) | 32 | wsabuf.buf = cast(buf, c_char_p) |
| 33 | wsabuf.len = act.len | 33 | wsabuf.len = act.len |
| 34 | nbytes = c_ulong(0) | 34 | nbytes = c_ulong(0) |
| 35 | flags = c_ulong(0) | 35 | flags = c_ulong(0) |
| 36 | act.flags = buf | 36 | act.flags = buf |
| 37 | 37 | ||
| 38 | rc = WSARecv( | 38 | rc = WSARecv( |
| 39 | act.sock._fd.fileno(), # SOCKET s | 39 | act.sock._fd.fileno(), # SOCKET s |
| 40 | byref(wsabuf), # LPWSABUF lpBuffers | 40 | byref(wsabuf), # LPWSABUF lpBuffers |
| 41 | 1, # DWORD dwBufferCount | 41 | 1, # DWORD dwBufferCount |
| 42 | byref(nbytes), # LPDWORD lpNumberOfBytesRecvd | 42 | byref(nbytes), # LPDWORD lpNumberOfBytesRecvd |
| 43 | byref(flags), # LPDWORD lpFlags | 43 | byref(flags), # LPDWORD lpFlags |
| 44 | overlapped, # LPWSAOVERLAPPED lpOverlapped | 44 | overlapped, # LPWSAOVERLAPPED lpOverlapped |
| 45 | None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine | 45 | None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine |
| 46 | ) | 46 | ) |
| 47 | return rc, nbytes.value | 47 | return rc, nbytes.value |
| 48 | 48 | ||
| 49 | #~ @debug(0) | 49 | #~ @debug(0) |
| 50 | def complete_recv(act, rc, nbytes): | 50 | def complete_recv(act, rc, nbytes): |
| 51 | if nbytes: | 51 | if nbytes: |
| 52 | act.buff = act.flags[:nbytes] | 52 | act.buff = act.flags[:nbytes] |
| 53 | return act | 53 | return act |
| 54 | else: | 54 | else: |
| 55 | raise ConnectionClosed("Empty recv.") | 55 | raise ConnectionClosed("Empty recv.") |
| 56 | 56 | ||
| 57 | 57 | ||
| 58 | def perform_send(act, overlapped): | 58 | def perform_send(act, overlapped): |
| 59 | wsabuf = WSABUF() | 59 | wsabuf = WSABUF() |
| 60 | wsabuf.buf = c_char_p(act.buff) | 60 | wsabuf.buf = c_char_p(act.buff) |
| 61 | wsabuf.len = len(act.buff) | 61 | wsabuf.len = len(act.buff) |
| 62 | nbytes = c_ulong() | 62 | nbytes = c_ulong() |
| 63 | act.flags = wsabuf, nbytes | 63 | act.flags = wsabuf, nbytes |
| 64 | 64 | ||
| 65 | return WSASend( | 65 | return WSASend( |
| 66 | act.sock._fd.fileno(), # SOCKET s | 66 | act.sock._fd.fileno(), # SOCKET s |
| 67 | byref(wsabuf), # LPWSABUF lpBuffers | 67 | byref(wsabuf), # LPWSABUF lpBuffers |
| 68 | 1, # DWORD dwBufferCount | 68 | 1, # DWORD dwBufferCount |
| 69 | byref(nbytes), # LPDWORD lpNumberOfBytesSent | 69 | byref(nbytes), # LPDWORD lpNumberOfBytesSent |
| 70 | 0, # DWORD dwFlags | 70 | 0, # DWORD dwFlags |
| 71 | overlapped, # LPWSAOVERLAPPED lpOverlapped | 71 | overlapped, # LPWSAOVERLAPPED lpOverlapped |
| 72 | None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine | 72 | None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine |
| 73 | ), nbytes.value | 73 | ), nbytes.value |
| 74 | 74 | ||
| 75 | def complete_send(act, rc, nbytes): | 75 | def complete_send(act, rc, nbytes): |
| 76 | act.flags = None | 76 | act.flags = None |
| 77 | act.sent = nbytes | 77 | act.sent = nbytes |
| 78 | return act.sent and act | 78 | return act.sent and act |
| 79 | 79 | ||
| 80 | 80 | ||
| 81 | def perform_sendall(act, overlapped): | 81 | def perform_sendall(act, overlapped): |
| 82 | wsabuf = WSABUF() | 82 | wsabuf = WSABUF() |
| 83 | wsabuf.buf = c_char_p(act.buff[act.sent:]) | 83 | wsabuf.buf = c_char_p(act.buff[act.sent:]) |
| 84 | wsabuf.len = len(act.buff)-act.sent | 84 | wsabuf.len = len(act.buff)-act.sent |
| 85 | nbytes = c_ulong() | 85 | nbytes = c_ulong() |
| 86 | act.flags = wsabuf, nbytes | 86 | act.flags = wsabuf, nbytes |
| 87 | 87 | ||
| 88 | return WSASend( | 88 | return WSASend( |
| 89 | act.sock._fd.fileno(), # SOCKET s | 89 | act.sock._fd.fileno(), # SOCKET s |
| 90 | byref(wsabuf), # LPWSABUF lpBuffers | 90 | byref(wsabuf), # LPWSABUF lpBuffers |
| 91 | 1, # DWORD dwBufferCount | 91 | 1, # DWORD dwBufferCount |
| 92 | byref(nbytes), # LPDWORD lpNumberOfBytesSent | 92 | byref(nbytes), # LPDWORD lpNumberOfBytesSent |
| 93 | 0, # DWORD dwFlags | 93 | 0, # DWORD dwFlags |
| 94 | overlapped, # LPWSAOVERLAPPED lpOverlapped | 94 | overlapped, # LPWSAOVERLAPPED lpOverlapped |
| 95 | None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine | 95 | None # LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine |
| 96 | ), nbytes | 96 | ), nbytes |
| 97 | 97 | ||
| 98 | def complete_sendall(act, rc, nbytes): | 98 | def complete_sendall(act, rc, nbytes): |
| 99 | act.sent += nbytes | 99 | act.sent += nbytes |
| 100 | return act.sent == len(act.buff) and act | 100 | return act.sent == len(act.buff) and act |
| 101 | 101 | ||
| 102 | 102 | ||
| 103 | def perform_accept(act, overlapped): | 103 | def perform_accept(act, overlapped): |
| 104 | act.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 104 | act.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 105 | act.cbuff = create_string_buffer(64) | 105 | act.cbuff = create_string_buffer((sizeof(sockaddr_in) + 16) * 2) |
| 106 | nbytes = c_ulong() | ||
| 107 | |||
| 108 | prot_info = WSAPROTOCOL_INFO() | ||
| 109 | prot_info_len = c_int(sizeof(prot_info)) | ||
| 110 | getsockopt(act.sock.fileno(), SOL_SOCKET, SO_PROTOCOL_INFOA, cast(byref(prot_info), c_char_p), byref(prot_info_len)) | ||
| 111 | |||
| 106 | # BOOL | 112 | # BOOL |
| 107 | return AcceptEx( | 113 | return AcceptEx( |
| 108 | act.sock._fd.fileno(), # SOCKET sListenSocket | 114 | act.sock._fd.fileno(), # SOCKET sListenSocket |
| 109 | act.conn.fileno(), # SOCKET sAcceptSocket | 115 | act.conn.fileno(), # SOCKET sAcceptSocket |
| 110 | cast(act.cbuff, c_void_p), # PVOID lpOutputBuffer | 116 | cast(act.cbuff, c_void_p), # PVOID lpOutputBuffer |
| 111 | 0, # DWORD dwReceiveDataLength | 117 | 0, # DWORD dwReceiveDataLength |
| 112 | 32, # DWORD dwLocalAddressLength | 118 | prot_info.iMaxSockAddr + 16, # DWORD dwLocalAddressLength |
| 113 | 32, # DWORD dwRemoteAddressLength | 119 | prot_info.iMaxSockAddr + 16, # DWORD dwRemoteAddressLength |
| 114 | None, # LPDWORD lpdwBytesReceived | 120 | nbytes, # LPDWORD lpdwBytesReceived |
| 115 | overlapped # LPOVERLAPPED lpOverlapped | 121 | overlapped # LPOVERLAPPED lpOverlapped |
| 116 | ), 0 | 122 | ), 0 |
| 117 | 123 | ||
| 118 | def complete_accept(act, rc, nbytes): | 124 | def complete_accept(act, rc, nbytes): |
| 119 | act.conn.setblocking(0) | 125 | act.conn.setblocking(0) |
| 120 | act.conn.setsockopt( | 126 | act.conn.setsockopt( |
| 121 | socket.SOL_SOCKET, | 127 | socket.SOL_SOCKET, |
| 122 | SO_UPDATE_ACCEPT_CONTEXT, | 128 | SO_UPDATE_ACCEPT_CONTEXT, |
| 123 | struct.pack("I", act.sock.fileno()) | 129 | struct.pack("I", act.sock.fileno()) |
| 124 | ) | 130 | ) |
| 125 | act.addr = act.conn.getpeername() | 131 | act.addr = act.conn.getpeername() |
| 126 | 132 | ||
| 127 | # void = PVOID lpOutputBuffer, DWORD dwReceiveDataLength, DWORD dwLocalAddressLength, DWORD dwRemoteAddressLength, LPSOCKADDR *LocalSockaddr, LPINT LocalSockaddrLength, LPSOCKADDR *RemoteSockaddr, LPINT RemoteSockaddrLength | 133 | # void = PVOID lpOutputBuffer, DWORD dwReceiveDataLength, DWORD dwLocalAddressLength, DWORD dwRemoteAddressLength, LPSOCKADDR *LocalSockaddr, LPINT LocalSockaddrLength, LPSOCKADDR *RemoteSockaddr, LPINT RemoteSockaddrLength |
| 128 | # TODO ? | 134 | # TODO ? |
| 129 | #~ family, localaddr, act.addr = GetAcceptExSockaddrs( | 135 | #~ family, localaddr, act.addr = GetAcceptExSockaddrs( |
| 130 | #~ act.conn, act.cbuff | 136 | #~ act.conn, act.cbuff |
| 131 | #~ ) | 137 | #~ ) |
| 132 | act.conn = Socket(_sock=act.conn) | 138 | act.conn = Socket(_sock=act.conn) |
| 133 | return act | 139 | return act |
| 134 | 140 | ||
| 135 | def perform_connect(act, overlapped): | 141 | def perform_connect(act, overlapped): |
| 136 | # ConnectEx requires that the socket be bound beforehand | 142 | # ConnectEx requires that the socket be bound beforehand |
| 137 | try: | 143 | try: |
| 138 | # just in case we get a already-bound socket | 144 | # just in case we get a already-bound socket |
| 139 | act.sock.bind(('0.0.0.0', 0)) | 145 | act.sock.bind(('0.0.0.0', 0)) |
| 140 | except socket.error, exc: | 146 | except socket.error, exc: |
| 141 | if exc[0] not in (errno.EINVAL, errno.WSAEINVAL): | 147 | if exc[0] not in (errno.EINVAL, errno.WSAEINVAL): |
| 142 | raise | 148 | raise |
| 143 | fileno = act.sock._fd.fileno() | 149 | fileno = act.sock._fd.fileno() |
| 144 | 150 | ||
| 145 | prot_info = WSAPROTOCOL_INFO() | 151 | prot_info = WSAPROTOCOL_INFO() |
| 146 | prot_info_len = c_int(sizeof(prot_info)) | 152 | prot_info_len = c_int(sizeof(prot_info)) |
| 147 | getsockopt(fileno, SOL_SOCKET, SO_PROTOCOL_INFOA, cast(byref(prot_info), c_char_p), byref(prot_info_len)) | 153 | getsockopt(fileno, SOL_SOCKET, SO_PROTOCOL_INFOA, cast(byref(prot_info), c_char_p), byref(prot_info_len)) |
| 148 | 154 | ||
| 149 | hints = addrinfo() | 155 | hints = addrinfo() |
| 150 | hints.ai_family = prot_info.iAddressFamily | 156 | hints.ai_family = prot_info.iAddressFamily |
| 151 | hints.ai_socktype = prot_info.iSocketType | 157 | hints.ai_socktype = prot_info.iSocketType |
| 152 | hints.ai_protocol = prot_info.iProtocol | 158 | hints.ai_protocol = prot_info.iProtocol |
| 153 | 159 | ||
| 154 | result = addrinfo_p() | 160 | result = addrinfo_p() |
| 155 | getaddrinfo(act.addr[0], str(act.addr[1]), byref(hints), byref(result)); | 161 | getaddrinfo(act.addr[0], str(act.addr[1]), byref(hints), byref(result)); |
| 156 | 162 | ||
| 157 | act.flags = result | 163 | act.flags = result |
| 158 | 164 | ||
| 159 | #~ act.sock.bind(('0.0.0.0', 0)) | 165 | #~ act.sock.bind(('0.0.0.0', 0)) |
| 160 | return ConnectEx( | 166 | return ConnectEx( |
| 161 | fileno, # SOCKET s | 167 | fileno, # SOCKET s |
| 162 | result.contents.ai_addr, result.contents.ai_addrlen, | 168 | result.contents.ai_addr, result.contents.ai_addrlen, |
| 163 | None, | 169 | None, |
| 164 | 0, | 170 | 0, |
| 165 | None, | 171 | None, |
| 166 | overlapped | 172 | overlapped |
| 167 | ), 0 | 173 | ), 0 |
| 168 | 174 | ||
| 169 | def complete_connect(act, rc, nbytes): | 175 | def complete_connect(act, rc, nbytes): |
| 170 | act.sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "") | 176 | act.sock.setsockopt(socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, "") |
| 171 | return act | 177 | return act |
| 172 | 178 | ||
| 173 | def perform_sendfile(act, overlapped): | 179 | def perform_sendfile(act, overlapped): |
| 174 | # BOOL | 180 | # BOOL |
| 175 | return TransmitFile( | 181 | return TransmitFile( |
| 176 | act.sock._fd.fileno(), # SOCKET hSocket | 182 | act.sock._fd.fileno(), # SOCKET hSocket |
| 177 | get_osfhandle(act.file_handle.fileno()), # HANDLE hFile | 183 | get_osfhandle(act.file_handle.fileno()), # HANDLE hFile |
| 178 | act.length or 0, # DWORD nNumberOfBytesToWrite | 184 | act.length or 0, # DWORD nNumberOfBytesToWrite |
| 179 | act.blocksize, # DWORD nNumberOfBytesPerSend | 185 | act.blocksize, # DWORD nNumberOfBytesPerSend |
| 180 | overlapped, # LPOVERLAPPED lpOverlapped | 186 | overlapped, # LPOVERLAPPED lpOverlapped |
| 181 | None, # LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers | 187 | None, # LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers |
| 182 | 0 # DWORD dwFlags | 188 | 0 # DWORD dwFlags |
| 183 | ), 0 | 189 | ), 0 |
| 184 | 190 | ||
| 185 | def complete_sendfile(act, rc, nbytes): | 191 | def complete_sendfile(act, rc, nbytes): |
| 186 | act.sent = nbytes | 192 | act.sent = nbytes |
| 187 | return act | 193 | return act |
| 188 | 194 | ||
| 189 | class CTYPES_IOCPProactor(ProactorBase): | 195 | class CTYPES_IOCPProactor(ProactorBase): |
| 190 | supports_multiplex_first = False | 196 | supports_multiplex_first = False |
| 191 | 197 | ||
| 192 | def __init__(self, scheduler, res, **options): | 198 | def __init__(self, scheduler, res, **options): |
| 193 | super(self.__class__, self).__init__(scheduler, res, **options) | 199 | super(self.__class__, self).__init__(scheduler, res, **options) |
| 194 | self.scheduler = scheduler | 200 | self.scheduler = scheduler |
| 195 | self.iocp = CreateIoCompletionPort( | 201 | self.iocp = CreateIoCompletionPort( |
| 196 | INVALID_HANDLE_VALUE, 0, None, 0 | 202 | INVALID_HANDLE_VALUE, 0, None, 0 |
| 197 | ) | 203 | ) |
| 198 | 204 | ||
| 199 | def __del__(self): | 205 | def __del__(self): |
| 200 | self.close() | 206 | self.close() |
| 201 | 207 | ||
| 202 | def close(self): | 208 | def close(self): |
| 203 | if self.iocp: | 209 | if self.iocp: |
| 204 | poverlapped = LPOVERLAPPED() | 210 | poverlapped = LPOVERLAPPED() |
| 205 | nbytes = DWORD() | 211 | nbytes = DWORD() |
| 206 | completion_key = c_ulong() | 212 | completion_key = c_ulong() |
| 207 | while 1: | 213 | while 1: |
| 208 | rc = GetQueuedCompletionStatus( | 214 | rc = GetQueuedCompletionStatus( |
| 209 | self.iocp, # HANDLE CompletionPort | 215 | self.iocp, # HANDLE CompletionPort |
| 210 | byref(nbytes), # LPDWORD lpNumberOfBytes | 216 | byref(nbytes), # LPDWORD lpNumberOfBytes |
| 211 | byref(completion_key), # PULONG_PTR lpCompletionKey | 217 | byref(completion_key), # PULONG_PTR lpCompletionKey |
| 212 | byref(poverlapped), | 218 | byref(poverlapped), |
| 213 | 0 | 219 | 0 |
| 214 | ) | 220 | ) |
| 215 | if not poverlapped: | 221 | if not poverlapped: |
| 216 | break | 222 | break |
| 217 | else: | 223 | else: |
| 218 | act = poverlapped.contents.object | 224 | act = poverlapped.contents.object |
| 219 | if act in self.tokens: | 225 | if act in self.tokens: |
| 220 | del self.tokens[act] | 226 | del self.tokens[act] |
| 227 | CancelIo(act.sock._fd.fileno()) | ||
| 221 | else: | 228 | else: |
| 222 | import warnings | 229 | import warnings |
| 223 | warnings.warn("act(%s) not in self.tokens" % act) | 230 | warnings.warn("act(%s) not in self.tokens" % act) |
| 224 | CloseHandle(self.iocp) | 231 | CloseHandle(self.iocp) |
| 225 | self.iocp = None | 232 | self.iocp = None |
| 226 | if self.tokens: | 233 | if self.tokens: |
| 227 | import warnings | 234 | import warnings |
| 228 | warnings.warn("self.tokens still pending: %s" % self.tokens) | 235 | warnings.warn("self.tokens still pending: %s" % self.tokens) |
| 229 | super(self.__class__, self).close() | 236 | super(self.__class__, self).close() |
| 230 | 237 | ||
| 231 | def set_options(self, **bogus_options): | 238 | def set_options(self, **bogus_options): |
| 232 | self._warn_bogus_options(**bogus_options) #iocp doesn't have any options | 239 | self._warn_bogus_options(**bogus_options) #iocp doesn't have any options |
| 233 | 240 | ||
| 234 | def request_recv(self, act, coro): | 241 | def request_recv(self, act, coro): |
| 235 | return self.request_generic(act, coro, perform_recv, complete_recv) | 242 | return self.request_generic(act, coro, perform_recv, complete_recv) |
| 236 | 243 | ||
| 237 | def request_send(self, act, coro): | 244 | def request_send(self, act, coro): |
| 238 | return self.request_generic(act, coro, perform_send, complete_send) | 245 | return self.request_generic(act, coro, perform_send, complete_send) |
| 239 | 246 | ||
| 240 | def request_sendall(self, act, coro): | 247 | def request_sendall(self, act, coro): |
| 241 | return self.request_generic(act, coro, perform_sendall, complete_sendall) | 248 | return self.request_generic(act, coro, perform_sendall, complete_sendall) |
| 242 | 249 | ||
| 243 | def request_accept(self, act, coro): | 250 | def request_accept(self, act, coro): |
| 244 | return self.request_generic(act, coro, perform_accept, complete_accept) | 251 | return self.request_generic(act, coro, perform_accept, complete_accept) |
| 245 | 252 | ||
| 246 | def request_connect(self, act, coro): | 253 | def request_connect(self, act, coro): |
| 247 | return self.request_generic(act, coro, perform_connect, complete_connect) | 254 | return self.request_generic(act, coro, perform_connect, complete_connect) |
| 248 | 255 | ||
| 249 | def request_sendfile(self, act, coro): | 256 | def request_sendfile(self, act, coro): |
| 250 | return self.request_generic(act, coro, perform_sendfile, complete_sendfile) | 257 | return self.request_generic(act, coro, perform_sendfile, complete_sendfile) |
| 251 | 258 | ||
| 252 | def request_generic(self, act, coro, perform, complete): | 259 | def request_generic(self, act, coro, perform, complete): |
| 253 | """ | 260 | """ |
| 254 | Performs an overlapped request (via `perform` callable) and saves | 261 | Performs an overlapped request (via `perform` callable) and saves |
| 255 | the token and the (`overlapped`, `perform`, `complete`) trio. | 262 | the token and the (`overlapped`, `perform`, `complete`) trio. |
| 256 | """ | 263 | """ |
| 257 | overlapped = OVERLAPPED() | 264 | overlapped = OVERLAPPED() |
| 258 | overlapped.object = act | 265 | overlapped.object = act |
| 259 | self.add_token(act, coro, (overlapped, perform, complete)) | 266 | self.add_token(act, coro, (overlapped, perform, complete)) |
| 260 | 267 | ||
| 261 | rc, nbytes = perform(act, overlapped) | 268 | rc, nbytes = perform(act, overlapped) |
| 262 | completion_key = c_long(0) | 269 | completion_key = c_long(0) |
| 263 | if rc == 0: | 270 | if rc == 0: |
| 264 | # ah geez, it didn't got in the iocp, we have a result! | 271 | # ah geez, it didn't got in the iocp, we have a result! |
| 265 | pass | 272 | pass |
| 266 | 273 | ||
| 267 | 274 | ||
| 268 | # ok this is weird, apparently this doesn't need to be requeued | 275 | # ok this is weird, apparently this doesn't need to be requeued |
| 269 | # - need to investigate why (TODO) | 276 | # - need to investigate why (TODO) |
| 270 | #~ PostQueuedCompletionStatus( | 277 | #~ PostQueuedCompletionStatus( |
| 271 | #~ self.iocp, # HANDLE CompletionPort | 278 | #~ self.iocp, # HANDLE CompletionPort |
| 272 | #~ nbytes, # DWORD dwNumberOfBytesTransferred | 279 | #~ nbytes, # DWORD dwNumberOfBytesTransferred |
| 273 | #~ byref(completion_key), # ULONG_PTR dwCompletionKey | 280 | #~ byref(completion_key), # ULONG_PTR dwCompletionKey |
| 274 | #~ overlapped # LPOVERLAPPED lpOverlapped | 281 | #~ overlapped # LPOVERLAPPED lpOverlapped |
| 275 | #~ ) | 282 | #~ ) |
| 276 | elif rc != WSA_IO_PENDING: | 283 | elif rc != WSA_IO_PENDING: |
| 277 | raise ConnectionError(rc, "%s on %r" % (ctypes.FormatError(rc), act)) | 284 | raise ConnectionError(rc, "%s on %r" % (ctypes.FormatError(rc), act)) |
| 278 | 285 | ||
| 279 | 286 | ||
| 280 | def register_fd(self, act, performer): | 287 | def register_fd(self, act, performer): |
| 281 | if not act.sock._proactor_added: | 288 | if not act.sock._proactor_added: |
| 282 | CreateIoCompletionPort(act.sock._fd.fileno(), self.iocp, None, 0) | 289 | CreateIoCompletionPort(act.sock._fd.fileno(), self.iocp, None, 0) |
| 283 | act.sock._proactor_added = True | 290 | act.sock._proactor_added = True |
| 284 | 291 | ||
| 285 | def unregister_fd(self, act): | 292 | def unregister_fd(self, act): |
| 293 | overlapped, perform, complete = self.tokens[act] | ||
| 294 | overlapped.object = None | ||
| 286 | CancelIo(act.sock._fd.fileno()) | 295 | CancelIo(act.sock._fd.fileno()) |
| 287 | 296 | ||
| 288 | 297 | ||
| 289 | def try_run_act(self, act, func, rc, nbytes): | 298 | def try_run_act(self, act, func, rc, nbytes): |
| 290 | try: | 299 | try: |
| 291 | return func(act, rc, nbytes) | 300 | return func(act, rc, nbytes) |
| 292 | except: | 301 | except: |
| 293 | return CoroutineException(*sys.exc_info()) | 302 | return CoroutineException(*sys.exc_info()) |
| 294 | 303 | ||
| 295 | def process_op(self, rc, nbytes, overlap): | 304 | def process_op(self, rc, nbytes, overlap): |
| 296 | """ | 305 | """ |
| 297 | Handles the possible completion or re-queueing if conditions haven't | 306 | Handles the possible completion or re-queueing if conditions haven't |
| 298 | been met (the `complete` callable returns false) of a overlapped request. | 307 | been met (the `complete` callable returns false) of a overlapped request. |
| 299 | """ | 308 | """ |
| 300 | act = overlap.object | 309 | act = overlap.object |
| 301 | overlap.object = None | 310 | overlap.object = None |
| 302 | if act in self.tokens: | 311 | if act in self.tokens: |
| 303 | ol, perform, complete = self.tokens[act] | 312 | ol, perform, complete = self.tokens[act] |
| 304 | #~ assert ol is overlap, "%r is not %r" % (ol, overlap) | 313 | #~ assert ol is overlap, "%r is not %r" % (ol, overlap) |
| 305 | if rc == 0: | 314 | if rc == 0: |
| 306 | ract = self.try_run_act(act, complete, rc, nbytes) | 315 | ract = self.try_run_act(act, complete, rc, nbytes) |
| 307 | if ract: | 316 | if ract: |
| 308 | del self.tokens[act] | 317 | del self.tokens[act] |
| 309 | CancelIo(act.sock._fd.fileno()) | 318 | CancelIo(act.sock._fd.fileno()) |
| 310 | return ract, act.coro | 319 | return ract, act.coro |
| 311 | else: | 320 | else: |
| 312 | # operation hasn't completed yet (not enough data etc) | 321 | # operation hasn't completed yet (not enough data etc) |
| 313 | # read it in the iocp | 322 | # read it in the iocp |
| 314 | self.request_generic(act, act.coro, perform, complete) | 323 | self.request_generic(act, act.coro, perform, complete) |
| 315 | 324 | ||
| 316 | 325 | ||
| 317 | else: | 326 | else: |
| 318 | #looks like we have a problem, forward it to the coroutine. | 327 | #looks like we have a problem, forward it to the coroutine. |
| 319 | 328 | ||
| 320 | # this needs some research: ERROR_NETNAME_DELETED, need to reopen | 329 | # this needs some research: ERROR_NETNAME_DELETED, need to reopen |
| 321 | #the accept sock ?! something like: | 330 | #the accept sock ?! something like: |
| 322 | # warnings.warn("ERROR_NETNAME_DELETED: %r. Re-registering operation." % op) | 331 | # warnings.warn("ERROR_NETNAME_DELETED: %r. Re-registering operation." % op) |
| 323 | # self.registered_ops[op] = self.run_iocp(op, coro) | 332 | # self.registered_ops[op] = self.run_iocp(op, coro) |
| 324 | del self.tokens[act] | 333 | del self.tokens[act] |
| 325 | CancelIo(act.sock._fd.fileno()) | 334 | CancelIo(act.sock._fd.fileno()) |
| 326 | #~ import traceback | 335 | #~ import traceback |
| 327 | #~ traceback.print_stack() | 336 | #~ traceback.print_stack() |
| 328 | return CoroutineException( | 337 | return CoroutineException( |
| 329 | ConnectionError, ConnectionError( | 338 | ConnectionError, ConnectionError( |
| 330 | (rc, "%s on %r" % (ctypes.FormatError(rc), act)) | 339 | (rc, "%s on %r" % (ctypes.FormatError(rc), act)) |
| 331 | ) | 340 | ) |
| 332 | ), act.coro | 341 | ), act.coro |
| 333 | else: | 342 | else: |
| 334 | import warnings | 343 | import warnings |
| 335 | warnings.warn("Unknown token %s" % act) | 344 | warnings.warn("Unknown token %s" % act) |
| 336 | 345 | ||
| 337 | def run(self, timeout = 0): | 346 | def run(self, timeout = 0): |
| 338 | """ | 347 | """ |
| 339 | Calls GetQueuedCompletionStatus and handles completion via | 348 | Calls GetQueuedCompletionStatus and handles completion via |
| 340 | process_op. | 349 | process_op. |
| 341 | """ | 350 | """ |
| 342 | # same resolution as epoll | 351 | # same resolution as epoll |
| 343 | ptimeout = int( | 352 | ptimeout = int( |
| 344 | timeout.days * 86400000 + | 353 | timeout.days * 86400000 + |
| 345 | timeout.microseconds / 1000 + | 354 | timeout.microseconds / 1000 + |
| 346 | timeout.seconds * 1000 | 355 | timeout.seconds * 1000 |
| 347 | if timeout else (self.m_resolution if timeout is None else 0) | 356 | if timeout else (self.m_resolution if timeout is None else 0) |
| 348 | ) | 357 | ) |
| 349 | if self.tokens: | 358 | if self.tokens: |
| 350 | urgent = None | 359 | urgent = None |
| 351 | # we use urgent as a optimisation: the last operation is returned | 360 | # we use urgent as a optimisation: the last operation is returned |
| 352 | #directly to the scheduler (the sched might just run it till it | 361 | #directly to the scheduler (the sched might just run it till it |
| 353 | #goes to sleep) and not added in the sched.active queue | 362 | #goes to sleep) and not added in the sched.active queue |
| 354 | while 1: | 363 | while 1: |
| 355 | try: | 364 | try: |
| 356 | poverlapped = LPOVERLAPPED() | 365 | poverlapped = LPOVERLAPPED() |
| 357 | nbytes = DWORD() | 366 | nbytes = DWORD() |
| 358 | completion_key = c_ulong() | 367 | completion_key = c_ulong() |
| 359 | #~ BOOL WINAPI GetQueuedCompletionStatus( | 368 | #~ BOOL WINAPI GetQueuedCompletionStatus( |
| 360 | #~ __in HANDLE CompletionPort, | 369 | #~ __in HANDLE CompletionPort, |
| 361 | #~ __out LPDWORD lpNumberOfBytes, | 370 | #~ __out LPDWORD lpNumberOfBytes, |
| 362 | #~ __out PULONG_PTR lpCompletionKey, | 371 | #~ __out PULONG_PTR lpCompletionKey, |
| 363 | #~ __out LPOVERLAPPED *lpOverlapped, | 372 | #~ __out LPOVERLAPPED *lpOverlapped, |
| 364 | #~ __in DWORD dwMilliseconds | 373 | #~ __in DWORD dwMilliseconds |
| 365 | #~ ); | 374 | #~ ); |
| 366 | 375 | ||
| 367 | rc = GetQueuedCompletionStatus( | 376 | rc = GetQueuedCompletionStatus( |
| 368 | self.iocp, # HANDLE CompletionPort | 377 | self.iocp, # HANDLE CompletionPort |
| 369 | byref(nbytes), # LPDWORD lpNumberOfBytes | 378 | byref(nbytes), # LPDWORD lpNumberOfBytes |
| 370 | byref(completion_key), # PULONG_PTR lpCompletionKey | 379 | byref(completion_key), # PULONG_PTR lpCompletionKey |
| 371 | byref(poverlapped), | 380 | byref(poverlapped), |
| 372 | 0 if urgent else ptimeout | 381 | 0 if urgent else ptimeout |
| 373 | ) | 382 | ) |
| 374 | overlap = poverlapped and poverlapped.contents | 383 | overlap = poverlapped and poverlapped.contents |
| 375 | nbytes = nbytes.value | 384 | nbytes = nbytes.value |
| 376 | except RuntimeError, e: | 385 | except RuntimeError, e: |
| 377 | import warnings | 386 | import warnings |
| 378 | warnings.warn("RuntimeError(%s) on GetQueuedCompletionStatus." % e) | 387 | warnings.warn("RuntimeError(%s) on GetQueuedCompletionStatus." % e) |
| 379 | # we will get "This overlapped object has lost all its | 388 | # we will get "This overlapped object has lost all its |
| 380 | # references so was destroyed" when we remove a operation, | 389 | # references so was destroyed" when we remove a operation, |
| 381 | # it is garbage collected and the overlapped completes | 390 | # it is garbage collected and the overlapped completes |
| 382 | # afterwards | 391 | # afterwards |
| 383 | break | 392 | break |
| 384 | 393 | ||
| 385 | # well, this is a bit weird, if we get a aborted rc (via CancelIo | 394 | # well, this is a bit weird, if we get a aborted rc (via CancelIo |
| 386 | #i suppose) evaluating the overlap crashes the interpeter | 395 | #i suppose) evaluating the overlap crashes the interpeter |
| 387 | #with a memory read error | 396 | #with a memory read error |
| 388 | # also, we might get a "wait operation timed out", and no overlap pointer | 397 | # also, we might get a "wait operation timed out", and no overlap pointer |
| 389 | if rc != WSA_OPERATION_ABORTED and overlap: | 398 | if rc != WSA_OPERATION_ABORTED and overlap: |
| 390 | 399 | ||
| 391 | if urgent: | 400 | if urgent: |
| 392 | op, coro = urgent | 401 | op, coro = urgent |
| 393 | urgent = None | 402 | urgent = None |
| 394 | if op.prio & priority.OP: | 403 | if op.prio & priority.OP: |
| 395 | # imediately run the asociated coroutine step | 404 | # imediately run the asociated coroutine step |
| 396 | op, coro = self.scheduler.process_op( | 405 | op, coro = self.scheduler.process_op( |
| 397 | coro.run_op(op), | 406 | coro.run_op(op), |
| 398 | coro | 407 | coro |
| 399 | ) | 408 | ) |
| 400 | if coro: | 409 | if coro: |
| 401 | #TODO, what "op and " | 410 | #TODO, what "op and " |
| 402 | if op and (op.prio & priority.CORO): | 411 | if op and (op.prio & priority.CORO): |
| 403 | self.scheduler.active.appendleft( (op, coro) ) | 412 | self.scheduler.active.appendleft( (op, coro) ) |
| 404 | else: | 413 | else: |
| 405 | self.scheduler.active.append( (op, coro) ) | 414 | self.scheduler.active.append( (op, coro) ) |
| 406 | if overlap.object: | 415 | if overlap.object: |
| 416 | assert overlap.object in self.tokens | ||
| 407 | urgent = self.process_op(rc, nbytes, overlap) | 417 | urgent = self.process_op(rc, nbytes, overlap) |
| 408 | else: | 418 | else: |
| 409 | #~ import warnings | 419 | #~ import warnings |
| 410 | #~ warnings.warn("rc=(%s: %s) overlap=(%s)" % (rc, ctypes.FormatError(rc), overlap)) | 420 | #~ warnings.warn("rc=(%s: %s) overlap=(%s)" % (rc, ctypes.FormatError(rc), overlap)) |
| 411 | break | 421 | break |
| 412 | return urgent | 422 | return urgent |
| 413 | else: | 423 | else: |
| 414 | sleep(timeout) | 424 | sleep(timeout) |