-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP]Add lock processing for queue purge #22
base: devel
Are you sure you want to change the base?
Conversation
At present, some operations require a locking mechanism when concurrent requests. This patch implements the basic logic of the lock.
When the queue is purging, sending the message to queue will fail, and the failure information will be recorded in the log. Redmine: http://192.168.15.2/issues/11196
33973bd
to
6efb219
Compare
LOG.error('PURGE LOCK: The queue: %s in project: %s' | ||
'is purging, so can not post.' % (queue_name, | ||
project)) | ||
return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是不是直接raise
出错比较好?否则实际上消息没有成功发布,但HTTP返回了201?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的逻辑就是清空队列时,发送的消息也要被清空,但是mongo无法保证,所以这里直接阻拦不让入库,变相实现了这个功能。所以就没必要触发异常了,发不进去是正常现象。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
锁的机制不完整,并不能完整解决问题。
根据之前的了解,mongo本身有写时的独占锁。假设purge用的清空操作也是一种写入,那么不可能会发生清空和写入新信息同时发生的情况。
看bug描述,无法确认发消息的操作正好发生在清空队列的过程中。所以我倾向认为这里并没有bug存在。
@@ -658,6 +666,16 @@ def post(self, queue_name, messages, client_uuid, project=None): | |||
# The worst-case scenario is that we'll increase the counter | |||
# several times and we'd end up with some non-active messages. | |||
|
|||
# Get a lock for claim. | |||
is_locked = self._lock_ctrl.is_locked(queue_name, 'queues', | |||
project, 'purge') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个锁的机制还是有问题,有可能这么检查的时候没有锁但下面的插入操作前被锁了。除非这里每条消息的发布也加锁防止purge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这种情况就相当于purge前发送的消息,理论上是可以发送成功的。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这样的话,其实也就是只在 purge 这个操作执行期间不让发送信息。不过看 purge 调用的 collection.remove 用的是 w:0 ,估计也是一种异步操作,所以 purge 操作本身应该也会很快完成。
LOG.debug(ex) | ||
return False | ||
|
||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collection.update 应该不会轻易出现异常,这里的本意应该是没有更新成功就返回 False 。 collection.update 返回的是一个 WriteResult ,在 pymongo 里应该是个 dict ,需要根据返回结果判断是否加锁成功。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里应该只有一种情况会失败,就是找不到条件里的数据,然后插入失败(key重复),也就是此时已经存在锁了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
即使找不到满足条件的数据,也不会出现异常, collection.update 仍然会返回一个 WriteResult ,告诉客户端没有更新任何数据。
是我对 upsert 的理解有误了,的确是可以用 unique index 来保证这里只有一种情况失败。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那这里的except好像不能太宽泛。否则的话让前面那个修饰utils.raises_conn_error
都没有意义了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
恩 这个有道理 只需要捕获key
重复的错误。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaochao 这里的upsert=True,如果找不到对应的doc去更新,则会插入一条数据,这里报错是插入时候报key
重复,这个我测试过,确实是这样的。
@@ -658,6 +666,16 @@ def post(self, queue_name, messages, client_uuid, project=None): | |||
# The worst-case scenario is that we'll increase the counter | |||
# several times and we'd end up with some non-active messages. | |||
|
|||
# Get a lock for claim. | |||
is_locked = self._lock_ctrl.is_locked(queue_name, 'queues', | |||
project, 'purge') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这样的话,其实也就是只在 purge 这个操作执行期间不让发送信息。不过看 purge 调用的 collection.remove 用的是 w:0 ,估计也是一种异步操作,所以 purge 操作本身应该也会很快完成。
@huntxu 这里我也倾向不是bug。 |
确实,这个比较难界定什么消息是刚好在清理过程中发送进来的 |
When the queue is purging, sending the message to queue will fail,
and the failure information will be recorded in the log.
Redmine: http://192.168.15.2/issues/11196