Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance and reduce latency of Transport.write by attempting to send data immediately if all write buffers are empty #619

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,10 @@ async def write_over():
proto.transport.write(b'q' * 16384)
count += 1
else:
proto.transport.write(b'q' * 16384)
proto.transport.set_write_buffer_limits(high=256, low=128)
count += 1
while not proto.transport.get_write_buffer_size():
proto.transport.write(b'q' * 16384)
count += 1
return count

s = self.loop.run_in_executor(None, accept)
Expand Down
5 changes: 0 additions & 5 deletions tests/test_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1224,21 +1224,16 @@ def resume_writing(self):
t, p = await self.loop.create_connection(Protocol, *addr)

t.write(b'q' * 512)
self.assertEqual(t.get_write_buffer_size(), 512)

t.set_write_buffer_limits(low=16385)
self.assertFalse(paused)
self.assertEqual(t.get_write_buffer_limits(), (16385, 65540))

with self.assertRaisesRegex(ValueError, 'high.*must be >= low'):
t.set_write_buffer_limits(high=0, low=1)

t.set_write_buffer_limits(high=1024, low=128)
self.assertFalse(paused)
self.assertEqual(t.get_write_buffer_limits(), (128, 1024))

t.set_write_buffer_limits(high=256, low=128)
self.assertTrue(paused)
self.assertEqual(t.get_write_buffer_limits(), (128, 256))

t.close()
Expand Down
2 changes: 1 addition & 1 deletion uvloop/handles/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ cdef class UVStream(UVBaseTransport):
# and then call _initiate_write() to start writing either immediately or in
# the next iteration (loop._queue_write()).
cdef inline _buffer_write(self, object data)
cdef inline _initiate_write(self)
cdef inline _initiate_write(self, bint skip_fast_path)

# _exec_write() is the method that does the actual send, and _try_write()
# is a fast-path used in _exec_write() to send a single chunk.
Expand Down
53 changes: 47 additions & 6 deletions uvloop/handles/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,13 @@ cdef class UVStream(UVBaseTransport):
self._buffer_size += dlen
self._buffer.append(data)

cdef inline _initiate_write(self):
if (not self._protocol_paused and
(<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
self._buffer_size > self._high_water):
cdef inline _initiate_write(self, bint skip_fast_path):
if (not skip_fast_path and
not self._protocol_paused and
(<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
self._buffer_size > self._high_water):
# Fast-path. If:
# - the caller hasn't tried fast path itself
# - the protocol isn't yet paused,
# - there is no data in libuv buffers for this stream,
# - the protocol will be paused if we continue to buffer data
Expand Down Expand Up @@ -684,8 +686,47 @@ cdef class UVStream(UVBaseTransport):
if self._conn_lost:
self._conn_lost += 1
return

cdef ssize_t bytes_written

if self._get_write_buffer_size() == 0:
bytes_written_ = self._try_write(buf)

if bytes_written_ is None:
# A `self._fatal_error` was called.
# It might not raise an exception under some
# conditions.
if not self._closing:
raise RuntimeError('stream is open after '
'UVStream._try_write returned None')

return

bytes_written = bytes_written_

if bytes_written == 0:
# All data was successfully written.
# on_write will call "maybe_resume_protocol".
return

if bytes_written > 0:
if UVLOOP_DEBUG:
if bytes_written == len(buf):
raise RuntimeError('_try_write sent all data and '
'returned non-zero')

if PyBytes_CheckExact(buf):
# Cast bytes to memoryview to avoid copying
# data that wasn't sent.
buf = memoryview(buf)
buf = buf[bytes_written_:]

# At this point it's either data was sent partially,
# or an EAGAIN has happened.
# buffer remaining data and send it later

self._buffer_write(buf)
self._initiate_write()
self._initiate_write(True) # skip fast path in _initiate_write

def writelines(self, bufs):
self._ensure_alive()
Expand All @@ -697,7 +738,7 @@ cdef class UVStream(UVBaseTransport):
return
for buf in bufs:
self._buffer_write(buf)
self._initiate_write()
self._initiate_write(False) # try fast path in _initiate_write

def write_eof(self):
self._ensure_alive()
Expand Down
Loading