diff --git a/CHANGES b/CHANGES index e1496fa15c..7c4b476ec2 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Connection.register_connect_callback() is made public. * Fix async `read_response` to use `disable_decoding`. * Add 'aclose()' methods to async classes, deprecate async close(). * Fix #2831, add auto_close_connection_pool=True arg to asyncio.Redis.from_url() diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 9724a3dd54..de27da1714 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -788,7 +788,7 @@ async def __aexit__(self, exc_type, exc_value, traceback): def __del__(self): if self.connection: - self.connection._deregister_connect_callback(self.on_connect) + self.connection.deregister_connect_callback(self.on_connect) async def aclose(self): # In case a connection property does not yet exist @@ -799,7 +799,7 @@ async def aclose(self): async with self._lock: if self.connection: await self.connection.disconnect() - self.connection._deregister_connect_callback(self.on_connect) + self.connection.deregister_connect_callback(self.on_connect) await self.connection_pool.release(self.connection) self.connection = None self.channels = {} @@ -862,7 +862,7 @@ async def connect(self): ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected - self.connection._register_connect_callback(self.on_connect) + self.connection.register_connect_callback(self.on_connect) else: await self.connection.connect() if self.push_handler_func is not None and not HIREDIS_AVAILABLE: diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 739cd7f64e..bbd438fc0b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -235,12 +235,24 @@ def repr_pieces(self): def is_connected(self): return self._reader is not None and self._writer is not None - def _register_connect_callback(self, callback): + def register_connect_callback(self, callback): + """ + Register a callback to be called when the connection is established either + initially or reconnected. This allows listeners to issue commands that + are ephemeral to the connection, for example pub/sub subscription or + key tracking. The callback must be a _method_ and will be kept as + a weak reference. + """ wm = weakref.WeakMethod(callback) if wm not in self._connect_callbacks: self._connect_callbacks.append(wm) - def _deregister_connect_callback(self, callback): + def deregister_connect_callback(self, callback): + """ + De-register a previously registered callback. It will no-longer receive + notifications on connection events. Calling this is not required when the + listener goes away, since the callbacks are kept as weak methods. + """ try: self._connect_callbacks.remove(weakref.WeakMethod(callback)) except ValueError: diff --git a/redis/client.py b/redis/client.py index 53748afbcc..0af7e050d6 100755 --- a/redis/client.py +++ b/redis/client.py @@ -759,7 +759,7 @@ def __del__(self) -> None: def reset(self) -> None: if self.connection: self.connection.disconnect() - self.connection._deregister_connect_callback(self.on_connect) + self.connection.deregister_connect_callback(self.on_connect) self.connection_pool.release(self.connection) self.connection = None self.health_check_response_counter = 0 @@ -817,7 +817,7 @@ def execute_command(self, *args): ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected - self.connection._register_connect_callback(self.on_connect) + self.connection.register_connect_callback(self.on_connect) if self.push_handler_func is not None and not HIREDIS_AVAILABLE: self.connection._parser.set_push_handler(self.push_handler_func) connection = self.connection diff --git a/redis/cluster.py b/redis/cluster.py index 595efcb683..0405b0547c 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1776,7 +1776,7 @@ def execute_command(self, *args): ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected - self.connection._register_connect_callback(self.on_connect) + self.connection.register_connect_callback(self.on_connect) if self.push_handler_func is not None and not HIREDIS_AVAILABLE: self.connection._parser.set_push_handler(self.push_handler_func) connection = self.connection diff --git a/redis/connection.py b/redis/connection.py index 8129bf03a4..c201224e35 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -237,12 +237,24 @@ def _construct_command_packer(self, packer): else: return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode) - def _register_connect_callback(self, callback): + def register_connect_callback(self, callback): + """ + Register a callback to be called when the connection is established either + initially or reconnected. This allows listeners to issue commands that + are ephemeral to the connection, for example pub/sub subscription or + key tracking. The callback must be a _method_ and will be kept as + a weak reference. + """ wm = weakref.WeakMethod(callback) if wm not in self._connect_callbacks: self._connect_callbacks.append(wm) - def _deregister_connect_callback(self, callback): + def deregister_connect_callback(self, callback): + """ + De-register a previously registered callback. It will no-longer receive + notifications on connection events. Calling this is not required when the + listener goes away, since the callbacks are kept as weak methods. + """ try: self._connect_callbacks.remove(weakref.WeakMethod(callback)) except ValueError: