Skip to content

Commit

Permalink
Cleaned up thread locking in basic
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Jul 14, 2024
1 parent 89ebaaa commit c2c7b59
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions amqpstorm/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def get(self, queue='', no_ack=False, to_dict=False, auto_decode=True,
message_impl = Message
get_frame = specification.Basic.Get(queue=queue,
no_ack=no_ack)
with self._channel.lock and self._channel.rpc.lock:
with self._channel.lock:
message = self._get_message(get_frame, auto_decode=auto_decode,
message_impl=message_impl)
if message and to_dict:
Expand Down Expand Up @@ -166,8 +166,9 @@ def cancel(self, consumer_tag=''):
if not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
cancel_frame = specification.Basic.Cancel(consumer_tag=consumer_tag)
result = self._channel.rpc_request(cancel_frame)
self._channel.remove_consumer_tag(consumer_tag)
with self._channel.lock:
result = self._channel.rpc_request(cancel_frame)
self._channel.remove_consumer_tag(consumer_tag)
return result

def publish(self, body, routing_key, exchange='', properties=None,
Expand Down Expand Up @@ -364,23 +365,24 @@ def _get_message(self, get_frame, auto_decode, message_impl):
:rtype: Message
"""
message_uuid = self._channel.rpc.register_request(
get_frame.valid_responses + ['ContentHeader', 'ContentBody']
)
try:
self._channel.write_frame(get_frame)
get_ok_frame = self._channel.rpc.get_request(message_uuid,
raw=True,
multiple=True)
if isinstance(get_ok_frame, specification.Basic.GetEmpty):
return None
content_header = self._channel.rpc.get_request(message_uuid,
raw=True,
multiple=True)
body = self._get_content_body(message_uuid,
content_header.body_size)
finally:
self._channel.rpc.remove(message_uuid)
with self._channel.rpc.lock:
message_uuid = self._channel.rpc.register_request(
get_frame.valid_responses + ['ContentHeader', 'ContentBody']
)
try:
self._channel.write_frame(get_frame)
get_ok_frame = self._channel.rpc.get_request(message_uuid,
raw=True,
multiple=True)
if isinstance(get_ok_frame, specification.Basic.GetEmpty):
return None
content_header = self._channel.rpc.get_request(message_uuid,
raw=True,
multiple=True)
body = self._get_content_body(message_uuid,
content_header.body_size)
finally:
self._channel.rpc.remove(message_uuid)
return message_impl(channel=self._channel,
body=body,
method=dict(get_ok_frame),
Expand Down

0 comments on commit c2c7b59

Please sign in to comment.