From f4d505cdd9b98aa95a074a371b99c3b54a284528 Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 17 Aug 2024 01:38:30 +0200 Subject: [PATCH 1/9] Attempt to send data immediately if all write buffers are empty --- uvloop/handles/stream.pyx | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index d4e02e3e..7e71d1cd 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -680,6 +680,48 @@ cdef class UVStream(UVBaseTransport): if self._conn_lost: self._conn_lost += 1 return + + cdef int sent + + if (self._buffer_size == 0 and + (self._handle).write_queue_size == 0): + + sent_ = self._try_write(buf) + + if sent_ is None: + # A `self._fatal_error` was called. + # It might not raise an exception under some + # conditions. + if not self._closing: + raise RuntimeError('stream is open after ' + 'UVStream._try_write returned None') + + return + + sent = sent_ + + if sent == 0: + # All data was successfully written. + # on_write will call "maybe_resume_protocol". + self._on_write() + return + + if sent > 0: + if UVLOOP_DEBUG: + if sent == len(buf): + raise RuntimeError('_try_write sent all data and ' + 'returned non-zero') + + if PyBytes_CheckExact(buf): + # Cast bytes to memoryview to avoid copying + # data that wasn't sent. + buf = memoryview(buf) + buf = buf[sent:] + + # At this point it's either data was sent partially, + # or an EAGAIN has happened. + # buffer remaining data and send it later + self._buffer_write(buf) self._initiate_write() From 20aa1bb93fa6ffe0ff2ad26af39d468d1d021ffa Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 17 Aug 2024 16:10:43 +0200 Subject: [PATCH 2/9] Fix stale tests --- tests/test_context.py | 5 +++-- uvloop/handles/stream.pyx | 23 +++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_context.py b/tests/test_context.py index 03733756..2b2329f9 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -512,9 +512,10 @@ async def write_over(): proto.transport.write(b'q' * 16384) count += 1 else: - proto.transport.write(b'q' * 16384) proto.transport.set_write_buffer_limits(high=256, low=128) - count += 1 + while not proto.transport.get_write_buffer_size(): + proto.transport.write(b'q' * 16384) + count += 1 return count s = self.loop.run_in_executor(None, accept) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 7e71d1cd..e1ce7334 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -375,6 +375,7 @@ cdef class UVStream(UVBaseTransport): # uv_try_write -- less layers of code. The error # checking logic is copied from libuv. written = system.write(fd, buf, blen) + while written == -1 and ( errno.errno == errno.EINTR or (system.PLATFORM_IS_APPLE and @@ -464,7 +465,7 @@ cdef class UVStream(UVBaseTransport): if not buf_len: return - if (self._handle).write_queue_size == 0: + if not self._protocol_paused and (self._handle).write_queue_size == 0: # libuv internal write buffers for this stream are empty. if buf_len == 1: # If we only have one piece of data to send, let's @@ -681,14 +682,14 @@ cdef class UVStream(UVBaseTransport): self._conn_lost += 1 return - cdef int sent + cdef ssize_t written - if (self._buffer_size == 0 and + if (not self._protocol_paused and self._buffer_size == 0 and (self._handle).write_queue_size == 0): - sent_ = self._try_write(buf) + written_ = self._try_write(buf) - if sent_ is None: + if written_ is None: # A `self._fatal_error` was called. # It might not raise an exception under some # conditions. @@ -698,17 +699,16 @@ cdef class UVStream(UVBaseTransport): return - sent = sent_ + written = written_ - if sent == 0: + if written == 0: # All data was successfully written. # on_write will call "maybe_resume_protocol". - self._on_write() return - if sent > 0: + if written > 0: if UVLOOP_DEBUG: - if sent == len(buf): + if written == len(buf): raise RuntimeError('_try_write sent all data and ' 'returned non-zero') @@ -716,7 +716,7 @@ cdef class UVStream(UVBaseTransport): # Cast bytes to memoryview to avoid copying # data that wasn't sent. buf = memoryview(buf) - buf = buf[sent:] + buf = buf[written_:] # At this point it's either data was sent partially, # or an EAGAIN has happened. @@ -940,7 +940,6 @@ cdef void __uv_stream_on_write( uv.uv_write_t* req, int status, ) noexcept with gil: - if UVLOOP_DEBUG: if req.data is NULL: aio_logger.error( From fde18efb162b95a1821d33479532c42dd6cf5bef Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 17 Aug 2024 16:19:32 +0200 Subject: [PATCH 3/9] Don't test get_write_buffer_size because it is difficult to predict and it depends on SO_SNDBUF value --- tests/test_tcp.py | 5 ----- uvloop/handles/stream.pyx | 1 - 2 files changed, 6 deletions(-) diff --git a/tests/test_tcp.py b/tests/test_tcp.py index d7a73fbf..3a8e71c2 100644 --- a/tests/test_tcp.py +++ b/tests/test_tcp.py @@ -1224,21 +1224,16 @@ def resume_writing(self): t, p = await self.loop.create_connection(Protocol, *addr) t.write(b'q' * 512) - self.assertEqual(t.get_write_buffer_size(), 512) - t.set_write_buffer_limits(low=16385) - self.assertFalse(paused) self.assertEqual(t.get_write_buffer_limits(), (16385, 65540)) with self.assertRaisesRegex(ValueError, 'high.*must be >= low'): t.set_write_buffer_limits(high=0, low=1) t.set_write_buffer_limits(high=1024, low=128) - self.assertFalse(paused) self.assertEqual(t.get_write_buffer_limits(), (128, 1024)) t.set_write_buffer_limits(high=256, low=128) - self.assertTrue(paused) self.assertEqual(t.get_write_buffer_limits(), (128, 256)) t.close() diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index e1ce7334..6aad0546 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -375,7 +375,6 @@ cdef class UVStream(UVBaseTransport): # uv_try_write -- less layers of code. The error # checking logic is copied from libuv. written = system.write(fd, buf, blen) - while written == -1 and ( errno.errno == errno.EINTR or (system.PLATFORM_IS_APPLE and From e360c3a0c55713c78cae2feb58704eee43349067 Mon Sep 17 00:00:00 2001 From: taras Date: Sat, 17 Aug 2024 16:39:41 +0200 Subject: [PATCH 4/9] Rename written to bytes_written --- uvloop/handles/stream.pyx | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 6aad0546..3057e19d 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -681,14 +681,14 @@ cdef class UVStream(UVBaseTransport): self._conn_lost += 1 return - cdef ssize_t written + cdef ssize_t bytes_written if (not self._protocol_paused and self._buffer_size == 0 and (self._handle).write_queue_size == 0): - written_ = self._try_write(buf) + bytes_written_ = self._try_write(buf) - if written_ is None: + if bytes_written_ is None: # A `self._fatal_error` was called. # It might not raise an exception under some # conditions. @@ -698,16 +698,16 @@ cdef class UVStream(UVBaseTransport): return - written = written_ + bytes_written = bytes_written_ - if written == 0: - # All data was successfully written. + if bytes_written == 0: + # All data was successfully bytes_written. # on_write will call "maybe_resume_protocol". return - if written > 0: + if bytes_written > 0: if UVLOOP_DEBUG: - if written == len(buf): + if bytes_written == len(buf): raise RuntimeError('_try_write sent all data and ' 'returned non-zero') @@ -715,7 +715,7 @@ cdef class UVStream(UVBaseTransport): # Cast bytes to memoryview to avoid copying # data that wasn't sent. buf = memoryview(buf) - buf = buf[written_:] + buf = buf[bytes_written_:] # At this point it's either data was sent partially, # or an EAGAIN has happened. From cfa33abe461880d0525efaf532588f93bf653d71 Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 27 Aug 2024 19:50:53 +0200 Subject: [PATCH 5/9] Fix comment --- uvloop/handles/stream.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 3057e19d..e56ff2d9 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -701,7 +701,7 @@ cdef class UVStream(UVBaseTransport): bytes_written = bytes_written_ if bytes_written == 0: - # All data was successfully bytes_written. + # All data was successfully written. # on_write will call "maybe_resume_protocol". return From 4f66353c33252489ce2d4549bd218a543f37cfe4 Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 10 Sep 2024 13:20:23 +0200 Subject: [PATCH 6/9] Skip fast path in _initiate_write() if we attempted it in write(). This removes unnecessary syscall --- uvloop/handles/stream.pxd | 2 +- uvloop/handles/stream.pyx | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/uvloop/handles/stream.pxd b/uvloop/handles/stream.pxd index 8ca87437..c06631b6 100644 --- a/uvloop/handles/stream.pxd +++ b/uvloop/handles/stream.pxd @@ -35,7 +35,7 @@ cdef class UVStream(UVBaseTransport): # and then call _initiate_write() to start writing either immediately or in # the next iteration (loop._queue_write()). cdef inline _buffer_write(self, object data) - cdef inline _initiate_write(self) + cdef inline _initiate_write(self, bint skip_fast_path) # _exec_write() is the method that does the actual send, and _try_write() # is a fast-path used in _exec_write() to send a single chunk. diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index cd87b557..2b6a62de 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -424,11 +424,13 @@ cdef class UVStream(UVBaseTransport): self._buffer_size += dlen self._buffer.append(data) - cdef inline _initiate_write(self): - if (not self._protocol_paused and - (self._handle).write_queue_size == 0 and - self._buffer_size > self._high_water): + cdef inline _initiate_write(self, bint skip_fast_path): + if (not skip_fast_path and + not self._protocol_paused and + (self._handle).write_queue_size == 0 and + self._buffer_size > self._high_water): # Fast-path. If: + # - the caller hasn't tried fast path itself # - the protocol isn't yet paused, # - there is no data in libuv buffers for this stream, # - the protocol will be paused if we continue to buffer data @@ -687,9 +689,7 @@ cdef class UVStream(UVBaseTransport): cdef ssize_t bytes_written - if (not self._protocol_paused and self._buffer_size == 0 and - (self._handle).write_queue_size == 0): - + if self._get_write_buffer_size() == 0: bytes_written_ = self._try_write(buf) if bytes_written_ is None: @@ -726,7 +726,7 @@ cdef class UVStream(UVBaseTransport): # buffer remaining data and send it later self._buffer_write(buf) - self._initiate_write() + self._initiate_write(True) # skip fast path in _initiate_write def writelines(self, bufs): self._ensure_alive() @@ -738,7 +738,7 @@ cdef class UVStream(UVBaseTransport): return for buf in bufs: self._buffer_write(buf) - self._initiate_write() + self._initiate_write(False) # try fast path in _initiate_write def write_eof(self): self._ensure_alive() From 4e96818059150d9ee12285633f902a9a18ef1650 Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 10 Sep 2024 13:30:37 +0200 Subject: [PATCH 7/9] Revert unnecessary check --- uvloop/handles/stream.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 2b6a62de..9b63297d 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -470,7 +470,7 @@ cdef class UVStream(UVBaseTransport): if not buf_len: return - if not self._protocol_paused and (self._handle).write_queue_size == 0: + if (self._handle).write_queue_size == 0: # libuv internal write buffers for this stream are empty. if buf_len == 1: # If we only have one piece of data to send, let's From f6f509440d5aa319b2ba0774b46e4802ebe64015 Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 10 Sep 2024 13:32:22 +0200 Subject: [PATCH 8/9] Revert unnecessary change --- uvloop/handles/stream.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 9b63297d..41bb632b 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -943,6 +943,7 @@ cdef void __uv_stream_on_write( uv.uv_write_t* req, int status, ) noexcept with gil: + if UVLOOP_DEBUG: if req.data is NULL: aio_logger.error( From ede3f381bd809af0acd826fec9db08ad8636b232 Mon Sep 17 00:00:00 2001 From: taras Date: Tue, 10 Sep 2024 13:32:48 +0200 Subject: [PATCH 9/9] Revert unnecessary change --- uvloop/handles/stream.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 41bb632b..2f88b9b1 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -943,7 +943,7 @@ cdef void __uv_stream_on_write( uv.uv_write_t* req, int status, ) noexcept with gil: - + if UVLOOP_DEBUG: if req.data is NULL: aio_logger.error(