Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 54 additions & 103 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,45 +161,30 @@ def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
self._loop_reading_cb = None
self._paused = True
self._buffered = False

self._read_backup_buffer = bytearray(32768)
self._read_buffer = None
self._read_buffer_belongs_to_proto = False

super().__init__(loop, sock, protocol, waiter, extra, server)

self._reschedule_on_resume = False
self._loop.call_soon(self._loop_reading)
self._paused = False

def set_protocol(self, protocol):
if isinstance(protocol, protocols.BufferedProtocol):
self._loop_reading_cb = self._loop_reading__get_buffer
else:
self._loop_reading_cb = self._loop_reading__data_received

self._buffered = isinstance(protocol, protocols.BufferedProtocol)
self._buffer_belongs_to_proto = False
super().set_protocol(protocol)

if self.is_reading():
# reset reading callback / buffers / self._read_fut
self.pause_reading()
self.resume_reading()

def is_reading(self):
return not self._paused and not self._closing

def pause_reading(self):
if self._closing or self._paused:
return
self._paused = True

if self._read_fut is not None and not self._read_fut.done():
# TODO: This is an ugly hack to cancel the current read future
# *and* avoid potential race conditions, as read cancellation
# goes through `future.cancel()` and `loop.call_soon()`.
# We then use this special attribute in the reader callback to
# exit *immediately* without doing any cleanup/rescheduling.
self._read_fut.__asyncio_cancelled_on_pause__ = True

self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True

if self._loop.get_debug():
logger.debug("%r pauses reading", self)

Expand Down Expand Up @@ -228,67 +213,6 @@ def _loop_reading__on_eof(self):
self.close()

def _loop_reading(self, fut=None):
self._loop_reading_cb(fut)

def _loop_reading__data_received(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return

data = None
try:
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
self._closing)
self._read_fut = None
if fut.done():
# deliver data later in "finally" clause
data = fut.result()
else:
# the future will be replaced by next proactor.recv call
fut.cancel()

if self._closing:
# since close() has been called we ignore any read data
data = None
return

if data == b'':
# we got end-of-file so no need to reschedule a new read
return

# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise
else:
self._read_fut.add_done_callback(self._loop_reading__data_received)
finally:
if data:
self._protocol.data_received(data)
elif data == b'':
self._loop_reading__on_eof()

def _loop_reading__get_buffer(self, fut):
if (fut is not None and
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
return

if self._paused:
self._reschedule_on_resume = True
return
Expand Down Expand Up @@ -324,32 +248,59 @@ def _loop_reading__get_buffer(self, fut):
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return
if self._buffered:
try:
if self._read_buffer_belongs_to_proto:
self._protocol.buffer_updated(nbytes)
else:
protocols._feed_data_to_bufferred_proto(
self._protocol, self._read_buffer[:nbytes])
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return
else:
try:
self._protocol.data_received(
self._read_buffer[:nbytes])
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.data_received() call failed.')
return

if self._closing or nbytes == 0:
# since close() has been called we ignore any read data
return

try:
buf = self._protocol.get_buffer(-1)
if not len(buf):
raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return
if self._buffered:
try:
self._read_buffer = self._protocol.get_buffer(-1)
if not len(self._read_buffer):
raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return
else:
self._read_buffer_belongs_to_proto = True
else:
self._read_buffer = self._read_backup_buffer
self._read_buffer_belongs_to_proto = False

try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading__get_buffer)
# TODO 3.8: Use WSARecv instead of WSARecvInto like libuv
# (see win/tcp.c; uv_process_tcp_read_req function.)
# WSARecv accepts a buffer as its first argument, which
# means we can use it for BufferedProtocol just fine,
# but it's easier to work with (in particular we can
# handle cancellation better.)
self._read_fut = self._loop._proactor.recv_into(
self._sock, self._read_buffer)
self._read_fut.add_done_callback(self._loop_reading)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand Down
19 changes: 19 additions & 0 deletions Lib/asyncio/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,22 @@ def pipe_connection_lost(self, fd, exc):

def process_exited(self):
"""Called when subprocess has exited."""


def _feed_data_to_bufferred_proto(proto, data):
data_len = len(data)
while data_len:
buf = proto.get_buffer(data_len)
buf_len = len(buf)
if not buf_len:
raise RuntimeError('get_buffer() returned an empty buffer')

if buf_len >= data_len:
buf[:data_len] = data
proto.buffer_updated(data_len)
return
else:
buf[:buf_len] = data[:buf_len]
proto.buffer_updated(buf_len)
data = data[buf_len:]
data_len = len(data)
21 changes: 1 addition & 20 deletions Lib/asyncio/sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def data_received(self, data):
if chunk:
try:
if self._app_protocol_is_buffer:
_feed_data_to_bufferred_proto(
protocols._feed_data_to_bufferred_proto(
self._app_protocol, chunk)
else:
self._app_protocol.data_received(chunk)
Expand Down Expand Up @@ -721,22 +721,3 @@ def _abort(self):
self._transport.abort()
finally:
self._finalize()


def _feed_data_to_bufferred_proto(proto, data):
data_len = len(data)
while data_len:
buf = proto.get_buffer(data_len)
buf_len = len(buf)
if not buf_len:
raise RuntimeError('get_buffer() returned an empty buffer')

if buf_len >= data_len:
buf[:data_len] = data
proto.buffer_updated(data_len)
return
else:
buf[:buf_len] = data[:buf_len]
proto.buffer_updated(buf_len)
data = data[buf_len:]
data_len = len(data)
10 changes: 6 additions & 4 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2504,10 +2504,12 @@ def test_sendfile_close_peer_in_the_middle_of_receiving(self):
self.loop.sendfile(cli_proto.transport, self.file))
self.run_loop(srv_proto.done)

self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA),
srv_proto.nbytes)
self.assertTrue(1024 <= self.file.tell() < len(self.DATA),
self.file.tell())
self.assertLessEqual(1024, srv_proto.nbytes)
self.assertLessEqual(srv_proto.nbytes, len(self.DATA))

self.assertLessEqual(1024, self.file.tell())
self.assertLessEqual(self.file.tell(), len(self.DATA))

self.assertTrue(cli_proto.transport.is_closing())

def test_sendfile_fallback_close_peer_in_the_middle_of_receiving(self):
Expand Down
Loading