Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update heartbeat interval implementation [#127] #128

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

Version 2.10.7
--------------
- Fixed bug with heartbeat interval on the client not sent frequently enough [#127] - Thanks Ivan Héda.
- Added support for Python 3.12.

Version 2.10.6
--------------
- Fixed deprecated warning when using Python 3.11.
Expand Down
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Additional documentation is available on `amqpstorm.io <https://www.amqpstorm.io
Changelog
=========

Version 2.10.7
--------------
- Fixed bug with heartbeat interval on the client not sent frequently enough [#127] - Thanks Ivan Héda.
- Added support for Python 3.12.

Version 2.10.6
--------------
- Fixed deprecated warning when using Python 3.11.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""AMQPStorm."""
__version__ = '2.10.6' # noqa
__version__ = '2.10.7' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
6 changes: 3 additions & 3 deletions amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

LOGGER = logging.getLogger(__name__)

DEFAULT_HEARTBEAT_INTERVAL = 60
DEFAULT_HEARTBEAT_TIMEOUT = 60
DEFAULT_SOCKET_TIMEOUT = 10
DEFAULT_VIRTUAL_HOST = '/'

Expand Down Expand Up @@ -58,7 +58,7 @@ class Connection(Stateful):
:param str password: Password
:param int port: Server port
:param str virtual_host: Virtual host
:param int heartbeat: RabbitMQ Heartbeat interval
:param int heartbeat: RabbitMQ Heartbeat timeout
:param int,float timeout: Socket timeout
:param bool ssl: Enable SSL
:param dict ssl_options: SSL kwargs
Expand All @@ -81,7 +81,7 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
'password': password,
'port': port,
'virtual_host': kwargs.get('virtual_host', DEFAULT_VIRTUAL_HOST),
'heartbeat': kwargs.get('heartbeat', DEFAULT_HEARTBEAT_INTERVAL),
'heartbeat': kwargs.get('heartbeat', DEFAULT_HEARTBEAT_TIMEOUT),
'timeout': kwargs.get('timeout', DEFAULT_SOCKET_TIMEOUT),
'ssl': kwargs.get('ssl', False),
'ssl_options': kwargs.get('ssl_options', {}),
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class Heartbeat(object):
"""Internal Heartbeat handler."""

def __init__(self, interval, send_heartbeat_impl, timer=threading.Timer):
def __init__(self, timeout, send_heartbeat_impl, timer=threading.Timer):
self.send_heartbeat_impl = send_heartbeat_impl
self.timer_impl = timer
self._lock = threading.Lock()
Expand All @@ -20,7 +20,7 @@ def __init__(self, interval, send_heartbeat_impl, timer=threading.Timer):
self._exceptions = None
self._reads_since_check = 0
self._writes_since_check = 0
self._interval = interval
self._interval = None if timeout is None else max(timeout / 2, 0)
self._threshold = 0

def register_read(self):
Expand Down
16 changes: 8 additions & 8 deletions amqpstorm/tests/unit/test_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ def test_heartbeat_stop(self):
self.assertFalse(heartbeat._running.is_set())
self.assertIsNone(heartbeat._timer)

def test_heartbeat_interval(self):
def test_heartbeat_timeout(self):
heartbeat = Heartbeat(60, fake_function)

self.assertEqual(heartbeat._interval, 60)
self.assertEqual(heartbeat._interval, 30)
self.assertEqual(heartbeat._threshold, 0)

heartbeat = Heartbeat(360, fake_function)

self.assertEqual(heartbeat._interval, 360)
self.assertEqual(heartbeat._interval, 180)
self.assertEqual(heartbeat._threshold, 0)

def test_heartbeat_no_interval(self):
Expand Down Expand Up @@ -229,15 +229,15 @@ def test_heartbeat_raise_exception(self):

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 120s',
'Connection dead, no heartbeat or data received in >= 60s',
heartbeat._raise_or_append_exception
)

heartbeat = Heartbeat(120, None)

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 240',
'Connection dead, no heartbeat or data received in >= 120s',
heartbeat._raise_or_append_exception
)

Expand All @@ -252,14 +252,14 @@ def check(exception):

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 120s',
'Connection dead, no heartbeat or data received in >= 60s',
check, heartbeat._exceptions
)

heartbeat._interval = 120
heartbeat._interval = 120 // 2

self.assertRaisesRegex(
AMQPConnectionError,
'Connection dead, no heartbeat or data received in >= 240',
'Connection dead, no heartbeat or data received in >= 120s',
check, heartbeat._exceptions
)
4 changes: 2 additions & 2 deletions amqpstorm/tests/unit/uri_connection/test_uri_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ssl

from amqpstorm import UriConnection
from amqpstorm.connection import DEFAULT_HEARTBEAT_INTERVAL
from amqpstorm.connection import DEFAULT_HEARTBEAT_TIMEOUT
from amqpstorm.connection import DEFAULT_SOCKET_TIMEOUT
from amqpstorm.connection import DEFAULT_VIRTUAL_HOST
from amqpstorm.tests.utility import TestFramework
Expand All @@ -20,7 +20,7 @@ def test_uri_default(self):
DEFAULT_VIRTUAL_HOST)
self.assertEqual(connection.parameters['port'], 5672)
self.assertEqual(connection.parameters['heartbeat'],
DEFAULT_HEARTBEAT_INTERVAL)
DEFAULT_HEARTBEAT_TIMEOUT)
self.assertEqual(connection.parameters['timeout'],
DEFAULT_SOCKET_TIMEOUT)
self.assertFalse(connection.parameters['ssl'])
Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/uri_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from amqpstorm.compatibility import ssl
from amqpstorm.compatibility import urlparse
from amqpstorm.connection import Connection
from amqpstorm.connection import DEFAULT_HEARTBEAT_INTERVAL
from amqpstorm.connection import DEFAULT_HEARTBEAT_TIMEOUT
from amqpstorm.connection import DEFAULT_SOCKET_TIMEOUT
from amqpstorm.connection import DEFAULT_VIRTUAL_HOST
from amqpstorm.exception import AMQPConnectionError
Expand Down Expand Up @@ -83,7 +83,7 @@ def _parse_uri_options(self, parsed_uri, use_ssl=False, ssl_options=None):
'ssl': use_ssl,
'virtual_host': vhost,
'heartbeat': int(kwargs.pop('heartbeat',
[DEFAULT_HEARTBEAT_INTERVAL])[0]),
[DEFAULT_HEARTBEAT_TIMEOUT])[0]),
'timeout': int(kwargs.pop('timeout',
[DEFAULT_SOCKET_TIMEOUT])[0])
}
Expand Down