diff --git a/CHANGELOG b/CHANGELOG index a3c15192..7c682783 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ ### Version 1.2.3-dev - Added Client-side Heartbeat monitor. +- Added a Connection Timeout to Connection.open. +- Fixed potential bug in the Socket Poller Error handling. ### Version 1.2.2 - Added shortcuts for common properties in the Message class, e.g. message.app_id. diff --git a/amqpstorm/connection.py b/amqpstorm/connection.py index 9a1b2460..222d310a 100644 --- a/amqpstorm/connection.py +++ b/amqpstorm/connection.py @@ -222,7 +222,7 @@ def _wait_for_connection_to_open(self): :return: """ start_time = time.time() - timeout = self.parameters['timeout'] + timeout = self.parameters['timeout'] or 5 while not self.is_open: if time.time() - start_time > timeout: raise AMQPConnectionError('Connection timed out') diff --git a/amqpstorm/io.py b/amqpstorm/io.py index 3171bb2f..f0f3c102 100644 --- a/amqpstorm/io.py +++ b/amqpstorm/io.py @@ -60,9 +60,9 @@ def is_ready(self): self.timeout) return bool(ready), bool(write) except select.error as why: - if why.args[0] == EINTR: - return False, False - self.on_error(why) + if why.args[0] != EINTR: + self.on_error(why) + return False, False class IO(Stateful): diff --git a/tests/connection_tests.py b/tests/connection_tests.py index 1b10ea19..dcede023 100644 --- a/tests/connection_tests.py +++ b/tests/connection_tests.py @@ -1,13 +1,18 @@ __author__ = 'eandersson' import ssl +import socket import logging +import threading + +from mock import MagicMock try: import unittest2 as unittest except ImportError: import unittest +from amqpstorm.io import IO from amqpstorm import Connection from amqpstorm.exception import * @@ -78,3 +83,27 @@ def test_close_state(self): def test_open_channel_on_closed_connection(self): connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True) self.assertRaises(AMQPConnectionError, connection.channel) + + def test_wait_for_connection(self): + connection = Connection('127.0.0.1', 'guest', 'guest', timeout=5, + lazy=True) + connection.set_state(connection.OPENING) + io = IO(connection.parameters) + io.socket = MagicMock(name='socket', spec=socket.socket) + connection.io = io + + def func(conn): + conn.set_state(conn.OPEN) + + threading.Timer(function=func, interval=1, args=(connection, )).start() + connection._wait_for_connection_to_open() + + def test_wait_for_connection_raises_on_timeout(self): + connection = Connection('127.0.0.1', 'guest', 'guest', timeout=1, + lazy=True) + connection.set_state(connection.OPENING) + io = IO(connection.parameters) + io.socket = MagicMock(name='socket', spec=socket.socket) + connection.io = io + self.assertRaises(AMQPConnectionError, + connection._wait_for_connection_to_open) diff --git a/tests/io_tests.py b/tests/io_tests.py index 1d46794a..a932254d 100644 --- a/tests/io_tests.py +++ b/tests/io_tests.py @@ -1,7 +1,6 @@ __author__ = 'eandersson' import ssl -import mock import socket import logging