diff --git a/amqpstorm/basic.py b/amqpstorm/basic.py index 0622738..e560417 100644 --- a/amqpstorm/basic.py +++ b/amqpstorm/basic.py @@ -143,11 +143,12 @@ def consume(self, callback=None, queue='', consumer_tag='', raise AMQPInvalidArgument('no_local should be a boolean') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') - consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag, - exclusive, no_ack, - no_local, queue) - tag = self._consume_add_and_get_tag(consume_rpc_result) - self._channel._consumer_callbacks[tag] = callback + with self._channel.lock: + consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag, + exclusive, no_ack, + no_local, queue) + tag = self._consume_add_and_get_tag(consume_rpc_result) + self._channel._consumer_callbacks[tag] = callback return tag def cancel(self, consumer_tag=''):