Skip to content

Commit

Permalink
Compatibility fix for Python 2.6 and UriConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Sep 5, 2015
1 parent 3caf61a commit 29eca29
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 54 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

### Version 1.2.4
- Fixed compatibility issues with Python 2.6 and UriConnection.

### Version 1.2.3
- Added a Client-side Heartbeat monitor.
- Added Timeout to Connection.open.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""AMQP-Storm."""
__version__ = '1.2.3'
__version__ = '1.2.4'
__author__ = 'eandersson'

import logging
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def open(self):
self.set_state(self.OPENING)
self.io.open(self.parameters['hostname'],
self.parameters['port'])
self._send_handshake()
self.heartbeat.start(self._exceptions)
self._send_handshake()
self._wait_for_connection_to_open()
LOGGER.debug('Connection Opened')

Expand Down
4 changes: 2 additions & 2 deletions amqpstorm/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class Heartbeat(object):
def __init__(self, interval):
self._timer = None
self._exceptions = None
self._last_heartbeat = None
self._beats_since_check = None
self._last_heartbeat = 0.0
self._beats_since_check = 0
self._interval = int(interval) + 1
self._threshold = self._interval * 2

Expand Down
75 changes: 48 additions & 27 deletions amqpstorm/uri_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""AMQP-Storm Uri wrapper for Connection."""
__author__ = 'eandersson'

import sys
import logging

try:
Expand All @@ -18,9 +17,6 @@

LOGGER = logging.getLogger(__name__)

if sys.version_info < (2, 7):
LOGGER.critical('UriConnection not supported in Python 2.6')

if ssl:
SSL_VERSIONS = {}
if hasattr(ssl, 'PROTOCOL_TLSv1_2'):
Expand Down Expand Up @@ -50,7 +46,7 @@
class UriConnection(Connection):
"""Wrapper of the Connection class that takes the AMQP uri schema."""

def __init__(self, uri):
def __init__(self, uri, lazy=False):
"""Create a new Connection instance using an AMQP Uri string.
e.g.
Expand All @@ -59,35 +55,60 @@ def __init__(self, uri):
:param str uri: AMQP Connection string
"""
parsed = urlparse.urlparse(uri)
use_ssl = parsed.scheme == 'amqps'
hostname = parsed.hostname or 'localhost'
port = parsed.port or 5672
username = parsed.username or 'guest'
password = parsed.password or 'guest'
virtual_host = urlparse.unquote(parsed.path[1:]) or '/'
kwargs = urlparse.parse_qs(parsed.query)
heartbeat = kwargs.get('heartbeat', [60])[0]
timeout = kwargs.get('timeout', [30])[0]
lazy = parsed.query.startswith('lazy')

ssl_options = {}
if ssl and use_ssl:
ssl_options = self._parse_ssl_options(kwargs)
uri = self._patch_uri(uri)
parsed_uri = urlparse.urlparse(uri)
use_ssl = parsed_uri.scheme == 'https'
hostname = parsed_uri.hostname or 'localhost'
port = parsed_uri.port or 5672
username = parsed_uri.username or 'guest'
password = parsed_uri.password or 'guest'
kwargs = self._parse_uri_options(parsed_uri, use_ssl, lazy)
super(UriConnection, self).__init__(hostname, username,
password, port,
virtual_host=virtual_host,
heartbeat=int(heartbeat),
timeout=int(timeout),
ssl=use_ssl,
ssl_options=ssl_options,
lazy=lazy)
**kwargs)

@staticmethod
def _patch_uri(uri):
"""If a custom uri schema is used with python 2.6 (e.g. amqps),
it will ignore some of the parsing logic.
As a work-around for this we change the amqp/amqps schema
internally to use http/https.
:param str uri: AMQP Connection string
:rtype: str
"""
index = uri.find(':')
if uri[:index] == 'amqps':
uri = uri.replace('amqps', 'https', 1)
elif uri[:index] == 'amqp':
uri = uri.replace('amqp', 'http', 1)
return uri

def _parse_uri_options(self, parsed_uri, use_ssl, lazy):
"""Parse the uri options.
:param parsed_uri:
:param bool use_ssl:
:return:
"""
kwargs = urlparse.parse_qs(parsed_uri.query)
options = {
'ssl': use_ssl,
'virtual_host': urlparse.unquote(parsed_uri.path[1:]) or '/',
'heartbeat': int(kwargs.get('heartbeat', [60])[0]),
'timeout': int(kwargs.get('timeout', [30])[0]),
'lazy': lazy
}
if ssl and use_ssl:
options['ssl_options'] = self._parse_ssl_options(kwargs)
return options

def _parse_ssl_options(self, ssl_kwargs):
"""Parse SSL Options.
:param ssl_kwargs:
:return:
:rtype: dict
"""
ssl_options = {}
for key in ssl_kwargs:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

setup(name='AMQP-Storm',
version='1.2.3',
version='1.2.4',
description='Thread-safe Python AMQP Client Library based on pamqp.',
long_description=long_description,
author='Erik Olof Gunnar Andersson',
Expand Down
41 changes: 19 additions & 22 deletions tests/uri_connection_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
logging.basicConfig(level=logging.DEBUG)


@unittest.skipIf(sys.version_info < (2, 7),
'UriConnection not supported in Python 2.6')
class UriConnectionTests(unittest.TestCase):
def test_default_uri(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(connection.parameters['hostname'], 'localhost')
self.assertEqual(connection.parameters['username'], 'guest')
self.assertEqual(connection.parameters['password'], 'guest')
Expand All @@ -31,52 +29,51 @@ def test_default_uri(self):

def test_uri_set_hostname(self):
connection = \
UriConnection('amqp://guest:guest@my-server:5672/%2F?lazy&'
'heartbeat=1337')
UriConnection('amqp://guest:guest@my-server:5672/%2F?'
'heartbeat=1337', True)
self.assertEqual(connection.parameters['hostname'], 'my-server')

def test_uri_set_username(self):
connection = \
UriConnection('amqp://username:guest@localhost:5672/%2F?lazy&'
'heartbeat=1337')
UriConnection('amqp://username:guest@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertEqual(connection.parameters['username'], 'username')

def test_uri_set_password(self):
connection = \
UriConnection('amqp://guest:password@localhost:5672/%2F?lazy&'
'heartbeat=1337')
UriConnection('amqp://guest:password@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertEqual(connection.parameters['password'], 'password')

def test_uri_set_port(self):
connection = \
UriConnection('amqp://guest:guest@localhost:1337/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:1337/%2F', True)
self.assertEqual(connection.parameters['port'], 1337)

def test_uri_set_heartbeat(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy&'
'heartbeat=1337')
UriConnection('amqp://guest:guest@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertEqual(connection.parameters['heartbeat'], 1337)

def test_uri_set_timeout(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy&'
'timeout=1337')
UriConnection('amqp://guest:guest@localhost:5672/%2F?'
'timeout=1337', True)
self.assertEqual(connection.parameters['timeout'], 1337)

def test_uri_set_virtual_host(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/travis?lazy')
UriConnection('amqp://guest:guest@localhost:5672/travis', True)
self.assertEqual(connection.parameters['virtual_host'], 'travis')

def test_uri_set_ssl(self):
connection = UriConnection('amqps://guest:guest@localhost:5671/%2F?'
'lazy&'
'ssl_version=protocol_sslv3&'
'cert_reqs=cert_required&'
'keyfile=file.key&'
'certfile=file.crt&'
'ca_certs=test')
'ca_certs=test', True)
self.assertTrue(connection.parameters['ssl'])
self.assertEqual(connection.parameters['ssl_options']['ssl_version'],
ssl.PROTOCOL_SSLv3)
Expand All @@ -91,31 +88,31 @@ def test_uri_set_ssl(self):

def test_get_ssl_version(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.PROTOCOL_SSLv3,
connection._get_ssl_version('protocol_sslv3'))

def test_get_invalid_ssl_version(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(connection._get_ssl_version('protocol_test'),
ssl.PROTOCOL_TLSv1)

def test_get_ssl_validation(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.CERT_REQUIRED,
connection._get_ssl_validation('cert_required'))

def test_get_invalid_ssl_validation(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.CERT_NONE,
connection._get_ssl_validation('cert_test'))

def test_get_ssl_options(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?lazy')
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
ssl_kwargs = {
'cert_reqs': ['cert_required'],
'ssl_version': ['protocol_sslv3'],
Expand Down

0 comments on commit 29eca29

Please sign in to comment.