From ec5380e6645b13d5459466301e3fcaf345fdcb88 Mon Sep 17 00:00:00 2001 From: hanshuaikang <1758504262@qq.com> Date: Tue, 2 Jan 2024 16:48:04 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E4=BB=BB=E6=84=8F=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E5=9B=9E=E6=BB=9A=E4=B8=8B=E5=85=81=E8=AE=B8=E8=B7=B3=E8=BF=87?= =?UTF-8?q?token=E5=89=8D=E7=BD=AE=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipeline/contrib/rollback/api.py | 52 ++++++- .../pipeline/contrib/rollback/handler.py | 63 ++++---- .../migrations/0003_rollbackplan_options.py | 19 +++ .../pipeline/contrib/rollback/models.py | 1 + .../pipeline/eri/imp/rollback.py | 4 +- .../pipeline/tests/contrib/test_rollback.py | 134 ++++++++++++++++++ 6 files changed, 240 insertions(+), 33 deletions(-) create mode 100644 runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0003_rollbackplan_options.py diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py index f8deac76..92269cd3 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py @@ -17,33 +17,73 @@ @ensure_return_pipeline_contrib_api_result def rollback( - root_pipeline_id: str, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None, mode: str = TOKEN + root_pipeline_id: str, + start_node_id: str, + target_node_id: str, + skip_rollback_nodes: list = None, + mode: str = TOKEN, + **options ): """ :param root_pipeline_id: pipeline id :param start_node_id: 开始的 id :param target_node_id: 开始的 id + :param skip_rollback_nodes: 需要跳过回滚的节点信息, 仅在TOKEN模式下有效 + :param skip_check_token: 是否跳过检查,仅在ANY模式下有效 :param mode 回滚模式 :return: True or False """ - RollbackDispatcher(root_pipeline_id, mode).rollback(start_node_id, target_node_id) + RollbackDispatcher(root_pipeline_id, mode).rollback( + start_node_id, target_node_id, skip_rollback_nodes=skip_rollback_nodes, **options + ) @ensure_return_pipeline_contrib_api_result -def reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): - RollbackDispatcher(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id) +def reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN, **options): + """ + 预约一次回滚任务 + :param root_pipeline_id: pipeline id + :param start_node_id: 回滚的起始id + :param target_node_id: 回滚的目标节点id + :param mode: 模式 + :param skip_check_token: 是否跳过检查,仅在ANY模式下有效 + :return: True or False + """ + RollbackDispatcher(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id, **options) @ensure_return_pipeline_contrib_api_result def cancel_reserved_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): + """ + 取消预约的回滚任务 + :param root_pipeline_id: pipeline id + :param start_node_id: 回滚的起始id + :param target_node_id: 回滚的目标节点id + :param mode: 模式 + :return: True or False + """ RollbackDispatcher(root_pipeline_id, mode).cancel_reserved_rollback(start_node_id, target_node_id) @ensure_return_pipeline_contrib_api_result def retry_rollback_failed_node(root_pipeline_id: str, node_id: str, retry_data: dict = None, mode: str = TOKEN): + """ + 重试回滚, 仅支持token模式下的回滚重试 + :param root_pipeline_id: pipeline id + :param node_id: 要重试的节点id + :param retry_data: 重试的数据 + :param mode: 回滚模式 + """ RollbackDispatcher(root_pipeline_id, mode).retry_rollback_failed_node(node_id, retry_data) @ensure_return_pipeline_contrib_api_result -def get_allowed_rollback_node_id_list(root_pipeline_id: str, start_node_id: str, mode: str = TOKEN): - return RollbackDispatcher(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id) +def get_allowed_rollback_node_id_list(root_pipeline_id: str, start_node_id: str, mode: str = TOKEN, **options): + """ + 获取允许回滚的节点范围 + :param root_pipeline_id: pipeline id + :param start_node_id: 回滚的开始位置 + :param mode: 回滚的模式 + :param skip_check_token: 是否跳过检查,仅在ANY模式下有效 + """ + return RollbackDispatcher(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id, **options) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py index de5240e4..6e74ce1b 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py @@ -140,7 +140,7 @@ def __init__(self, root_pipeline_id): # 检查pipeline 回滚的合法性 RollbackValidator.validate_pipeline(root_pipeline_id) - def get_allowed_rollback_node_id_list(self, start_node_id): + def get_allowed_rollback_node_id_list(self, start_node_id, **options): """ 获取允许回滚的节点范围 规则:token 一致的节点允许回滚 @@ -151,7 +151,7 @@ def get_allowed_rollback_node_id_list(self, start_node_id): raise RollBackException( "rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id) ) - node_map = self._get_allowed_rollback_node_map() + node_map = self.get_allowed_rollback_node_map() service_activity_node_list = [ node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity ] @@ -168,7 +168,7 @@ def get_allowed_rollback_node_id_list(self, start_node_id): return nodes - def _get_allowed_rollback_node_map(self): + def get_allowed_rollback_node_map(self): # 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表 finished_node_id_list = ( State.objects.filter(root_id=self.root_pipeline_id, name__in=[states.FINISHED, states.FAILED]) @@ -179,7 +179,7 @@ def _get_allowed_rollback_node_map(self): # 获取node_id 到 node_detail的映射 return {n.node_id: json.loads(n.detail) for n in node_detail_list} - def _reserve(self, start_node_id, target_node_id, reserve_rollback=True): + def _reserve(self, start_node_id, target_node_id, reserve_rollback=True, **options): # 节点预约 需要在 Node 里面 插入 reserve_rollback = True, 为 True的节点执行完将暂停 RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id) RollbackValidator.validate_node(target_node_id) @@ -205,16 +205,13 @@ def _reserve(self, start_node_id, target_node_id, reserve_rollback=True): if reserve_rollback: # 一个流程只能同时拥有一个预约任务 if RollbackPlan.objects.filter(root_pipeline_id=self.root_pipeline_id, is_expired=False).exists(): - raise RollBackException( - "reserve rollback failed, the rollbackPlan, current state={}, node_id={}".format( - state.name, start_node_id - ) - ) + raise RollBackException("reserve rollback failed, there exists another unfinished rollback plan") RollbackPlan.objects.create( root_pipeline_id=self.root_pipeline_id, start_node_id=start_node_id, target_node_id=target_node_id, mode=self.mode, + options=options, ) else: # 取消回滚,删除所有的任务 @@ -227,37 +224,53 @@ def _reserve(self, start_node_id, target_node_id, reserve_rollback=True): node.detail = json.dumps(node_detail) node.save() - def reserve_rollback(self, start_node_id, target_node_id): + def reserve_rollback(self, start_node_id, target_node_id, **options): """ 预约回滚 """ RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) - self._reserve(start_node_id, target_node_id) + self._reserve(start_node_id, target_node_id, **options) def cancel_reserved_rollback(self, start_node_id, target_node_id): """ 取消预约回滚 """ - RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) self._reserve(start_node_id, target_node_id, reserve_rollback=False) class AnyRollbackHandler(BaseRollbackHandler): mode = ANY + def get_allowed_rollback_node_id_list(self, start_node_id, **options): + # 如果开启了token跳过检查这个选项,那么将返回所有运行过的节点作为回滚范围 + if options.get("skip_check_token", False): + node_map = self.get_allowed_rollback_node_map() + node_map.pop(start_node_id, Node) + return list(node_map.keys()) + return super(AnyRollbackHandler, self).get_allowed_rollback_node_id_list(start_node_id, **options) + def retry_rollback_failed_node(self, node_id, retry_data): """ """ raise RollBackException("rollback failed: when mode is any, not support retry") - def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): + def reserve_rollback(self, start_node_id, target_node_id, **options): + """ + 预约回滚 + """ + if not options.get("skip_check_token", False): + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) + self._reserve(start_node_id, target_node_id, **options) + + def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None, **options): RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id) RollbackValidator.validate_node(start_node_id, allow_failed=True) RollbackValidator.validate_node(target_node_id) - RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) - # 相同token回滚时,不允许同一token路径上有正在运行的节点 - RollbackValidator.validate_node_state(self.root_pipeline_id, start_node_id) + if not options.get("skip_check_token", False): + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) + # 相同token回滚时,不允许同一token路径上有正在运行的节点 + RollbackValidator.validate_node_state(self.root_pipeline_id, start_node_id) - node_map = self._get_allowed_rollback_node_map() + node_map = self.get_allowed_rollback_node_map() rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) graph, other_nodes = rollback_graph.build_rollback_graph() @@ -337,7 +350,7 @@ def _get_failed_skip_node_id_list(self, node_id_list): ).values_list("node_id", flat=True) return failed_skip_node_id_list - def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): + def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None, **options): if skip_rollback_nodes is None: skip_rollback_nodes = [] @@ -353,7 +366,7 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): if self._node_state_is_failed(start_node_id): skip_rollback_nodes.append(start_node_id) - node_map = self._get_allowed_rollback_node_map() + node_map = self.get_allowed_rollback_node_map() rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) runtime = BambooDjangoRuntime() @@ -400,11 +413,11 @@ def __init__(self, root_pipeline_id, mode): else: raise RollBackException("rollback failed: not support this mode, please check") - def rollback(self, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None): - self.handler.rollback(start_node_id, target_node_id, skip_rollback_nodes) + def rollback(self, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None, **options): + self.handler.rollback(start_node_id, target_node_id, skip_rollback_nodes, **options) - def reserve_rollback(self, start_node_id: str, target_node_id: str): - self.handler.reserve_rollback(start_node_id, target_node_id) + def reserve_rollback(self, start_node_id: str, target_node_id: str, **options): + self.handler.reserve_rollback(start_node_id, target_node_id, **options) def retry_rollback_failed_node(self, node_id: str, retry_data: dict = None): self.handler.retry_rollback_failed_node(node_id, retry_data) @@ -412,5 +425,5 @@ def retry_rollback_failed_node(self, node_id: str, retry_data: dict = None): def cancel_reserved_rollback(self, start_node_id: str, target_node_id: str): self.handler.cancel_reserved_rollback(start_node_id, target_node_id) - def get_allowed_rollback_node_id_list(self, start_node_id: str): - return self.handler.get_allowed_rollback_node_id_list(start_node_id) + def get_allowed_rollback_node_id_list(self, start_node_id: str, **options): + return self.handler.get_allowed_rollback_node_id_list(start_node_id, **options) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0003_rollbackplan_options.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0003_rollbackplan_options.py new file mode 100644 index 00000000..cb411bce --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0003_rollbackplan_options.py @@ -0,0 +1,19 @@ +# Generated by Django 3.2.23 on 2023-12-29 09:11 + +import pipeline.contrib.fields +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("rollback", "0002_auto_20231020_1234"), + ] + + operations = [ + migrations.AddField( + model_name="rollbackplan", + name="options", + field=pipeline.contrib.fields.SerializerField(default={}, verbose_name="rollback options"), + ), + ] diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py index cefac0b9..5f310940 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py @@ -50,4 +50,5 @@ class RollbackPlan(models.Model): start_node_id = models.CharField(verbose_name="start node id", max_length=64, db_index=True) target_node_id = models.CharField(verbose_name="target_node_id", max_length=64, db_index=True) mode = models.CharField(verbose_name="rollback mode", max_length=32, default=TOKEN) + options = SerializerField(verbose_name="rollback options", default={}) is_expired = models.BooleanField(verbose_name="is expired", default=False) diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py b/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py index ea1a15a9..ce6b273a 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py @@ -31,7 +31,7 @@ def set_pipeline_token(self, pipeline_tree: dict): def set_node_snapshot(self, root_pipeline_id, node_id, code, version, context_values, inputs, outputs): """ - 创建一分节点快照 + 创建一份节点快照 """ try: RollbackNodeSnapshot = apps.get_model("rollback", "RollbackNodeSnapshot") @@ -74,7 +74,7 @@ def start_rollback(self, root_pipeline_id, node_id): root_pipeline_id=root_pipeline_id, start_node_id=node_id, is_expired=False ) handler = RollbackDispatcher(root_pipeline_id=root_pipeline_id, mode=rollback_plan.mode) - handler.rollback(rollback_plan.start_node_id, rollback_plan.target_node_id) + handler.rollback(rollback_plan.start_node_id, rollback_plan.target_node_id, **rollback_plan.options) rollback_plan.is_expired = True rollback_plan.save(update_fields=["is_expired"]) except Exception as e: diff --git a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py index 9fc47f6b..e151a060 100644 --- a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py +++ b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py @@ -30,6 +30,9 @@ token_rollback = MagicMock() token_rollback.apply_async = MagicMock(return_value=True) +any_rollback = MagicMock() +any_rollback.apply_async = MagicMock(return_value=True) + class TestRollBackBase(TestCase): @mock.patch("pipeline.contrib.rollback.handler.token_rollback", MagicMock(return_value=token_rollback)) @@ -338,3 +341,134 @@ def test_retry_rollback_failed_node(self): result = api.retry_rollback_failed_node(root_pipeline_id, node_id) self.assertTrue(result.result) + + @mock.patch("pipeline.contrib.rollback.handler.any_rollback", MagicMock(return_value=any_rollback)) + def test_rollback_with_any_mode_and_skip_check_token(self): + pipeline_id = unique_id("n") + State.objects.create( + node_id=pipeline_id, root_id=pipeline_id, parent_id=pipeline_id, name=states.RUNNING, version=unique_id("v") + ) + + start_node_id = unique_id("n") + State.objects.create( + node_id=start_node_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + + target_node_id = unique_id("n") + State.objects.create( + node_id=target_node_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + + target_node_detail = { + "id": target_node_id, + "type": PE.ServiceActivity, + "targets": {target_node_id: start_node_id}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, + "can_skip": True, + "code": "bk_display", + "version": "v1.0", + "error_ignorable": True, + "can_retry": True, + } + + start_node_detail = { + "id": start_node_id, + "type": PE.ServiceActivity, + "targets": {}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, + "can_skip": True, + "code": "bk_display", + "version": "v1.0", + "error_ignorable": True, + "can_retry": True, + } + + Node.objects.create(node_id=target_node_id, detail=json.dumps(target_node_detail)) + Node.objects.create(node_id=start_node_id, detail=json.dumps(start_node_detail)) + Process.objects.create(root_pipeline_id=pipeline_id, current_node_id=start_node_id, priority=1) + result = api.reserve_rollback(pipeline_id, start_node_id, target_node_id) + self.assertFalse(result.result) + message = "rollback failed: pipeline token not exist, pipeline_id={}".format(pipeline_id) + self.assertEqual(str(result.exc), message) + + RollbackToken.objects.create( + root_pipeline_id=pipeline_id, token=json.dumps({target_node_id: "xxx", start_node_id: "xxx"}) + ) + + result = api.rollback(pipeline_id, start_node_id, target_node_id, mode="ANY", skip_check_token=True) + self.assertTrue(result.result) + + def test_get_allowed_rollback_node_id_list_with_skip_check_token(self): + pipeline_id = unique_id("n") + State.objects.create( + node_id=pipeline_id, root_id=pipeline_id, parent_id=pipeline_id, name=states.RUNNING, version=unique_id("v") + ) + + start_node_id = unique_id("n") + State.objects.create( + node_id=start_node_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + + target_node_id = unique_id("n") + State.objects.create( + node_id=target_node_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + + target_node_detail = { + "id": target_node_id, + "type": PE.ServiceActivity, + "targets": {target_node_id: start_node_id}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, + "can_skip": True, + "code": "bk_display", + "version": "v1.0", + "error_ignorable": True, + "can_retry": True, + } + + start_node_detail = { + "id": start_node_id, + "type": PE.ServiceActivity, + "targets": {}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, + "can_skip": True, + "code": "bk_display", + "version": "v1.0", + "error_ignorable": True, + "can_retry": True, + } + + Node.objects.create(node_id=target_node_id, detail=json.dumps(target_node_detail)) + Node.objects.create(node_id=start_node_id, detail=json.dumps(start_node_detail)) + + result = api.get_allowed_rollback_node_id_list(pipeline_id, start_node_id, "ANY", skip_check_token=True) + self.assertTrue(result.result) + self.assertListEqual(list({target_node_id}), list(set(result.data))) + + RollbackToken.objects.create( + root_pipeline_id=pipeline_id, token=json.dumps({target_node_id: "xsx", start_node_id: "xxx"}) + ) + + result = api.get_allowed_rollback_node_id_list(pipeline_id, start_node_id, "ANY") + self.assertTrue(result.result) + self.assertListEqual([], result.data)