Skip to content

Commit

Permalink
feature: 任意模式回滚下允许跳过token前置检查
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuaikang committed Jan 2, 2024
1 parent b40a935 commit ec5380e
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 33 deletions.
52 changes: 46 additions & 6 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,73 @@

@ensure_return_pipeline_contrib_api_result
def rollback(
root_pipeline_id: str, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None, mode: str = TOKEN
root_pipeline_id: str,
start_node_id: str,
target_node_id: str,
skip_rollback_nodes: list = None,
mode: str = TOKEN,
**options
):
"""
:param root_pipeline_id: pipeline id
:param start_node_id: 开始的 id
:param target_node_id: 开始的 id
:param skip_rollback_nodes: 需要跳过回滚的节点信息, 仅在TOKEN模式下有效
:param skip_check_token: 是否跳过检查,仅在ANY模式下有效
:param mode 回滚模式
:return: True or False
"""
RollbackDispatcher(root_pipeline_id, mode).rollback(start_node_id, target_node_id)
RollbackDispatcher(root_pipeline_id, mode).rollback(
start_node_id, target_node_id, skip_rollback_nodes=skip_rollback_nodes, **options
)


@ensure_return_pipeline_contrib_api_result
def reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN):
RollbackDispatcher(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id)
def reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN, **options):
"""
预约一次回滚任务
:param root_pipeline_id: pipeline id
:param start_node_id: 回滚的起始id
:param target_node_id: 回滚的目标节点id
:param mode: 模式
:param skip_check_token: 是否跳过检查,仅在ANY模式下有效
:return: True or False
"""
RollbackDispatcher(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id, **options)


@ensure_return_pipeline_contrib_api_result
def cancel_reserved_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN):
"""
取消预约的回滚任务
:param root_pipeline_id: pipeline id
:param start_node_id: 回滚的起始id
:param target_node_id: 回滚的目标节点id
:param mode: 模式
:return: True or False
"""
RollbackDispatcher(root_pipeline_id, mode).cancel_reserved_rollback(start_node_id, target_node_id)


@ensure_return_pipeline_contrib_api_result
def retry_rollback_failed_node(root_pipeline_id: str, node_id: str, retry_data: dict = None, mode: str = TOKEN):
"""
重试回滚, 仅支持token模式下的回滚重试
:param root_pipeline_id: pipeline id
:param node_id: 要重试的节点id
:param retry_data: 重试的数据
:param mode: 回滚模式
"""
RollbackDispatcher(root_pipeline_id, mode).retry_rollback_failed_node(node_id, retry_data)


@ensure_return_pipeline_contrib_api_result
def get_allowed_rollback_node_id_list(root_pipeline_id: str, start_node_id: str, mode: str = TOKEN):
return RollbackDispatcher(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id)
def get_allowed_rollback_node_id_list(root_pipeline_id: str, start_node_id: str, mode: str = TOKEN, **options):
"""
获取允许回滚的节点范围
:param root_pipeline_id: pipeline id
:param start_node_id: 回滚的开始位置
:param mode: 回滚的模式
:param skip_check_token: 是否跳过检查,仅在ANY模式下有效
"""
return RollbackDispatcher(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id, **options)
63 changes: 38 additions & 25 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(self, root_pipeline_id):
# 检查pipeline 回滚的合法性
RollbackValidator.validate_pipeline(root_pipeline_id)

def get_allowed_rollback_node_id_list(self, start_node_id):
def get_allowed_rollback_node_id_list(self, start_node_id, **options):
"""
获取允许回滚的节点范围
规则:token 一致的节点允许回滚
Expand All @@ -151,7 +151,7 @@ def get_allowed_rollback_node_id_list(self, start_node_id):
raise RollBackException(
"rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id)
)
node_map = self._get_allowed_rollback_node_map()
node_map = self.get_allowed_rollback_node_map()
service_activity_node_list = [
node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity
]
Expand All @@ -168,7 +168,7 @@ def get_allowed_rollback_node_id_list(self, start_node_id):

return nodes

def _get_allowed_rollback_node_map(self):
def get_allowed_rollback_node_map(self):
# 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表
finished_node_id_list = (
State.objects.filter(root_id=self.root_pipeline_id, name__in=[states.FINISHED, states.FAILED])
Expand All @@ -179,7 +179,7 @@ def _get_allowed_rollback_node_map(self):
# 获取node_id 到 node_detail的映射
return {n.node_id: json.loads(n.detail) for n in node_detail_list}

def _reserve(self, start_node_id, target_node_id, reserve_rollback=True):
def _reserve(self, start_node_id, target_node_id, reserve_rollback=True, **options):
# 节点预约 需要在 Node 里面 插入 reserve_rollback = True, 为 True的节点执行完将暂停
RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id)
RollbackValidator.validate_node(target_node_id)
Expand All @@ -205,16 +205,13 @@ def _reserve(self, start_node_id, target_node_id, reserve_rollback=True):
if reserve_rollback:
# 一个流程只能同时拥有一个预约任务
if RollbackPlan.objects.filter(root_pipeline_id=self.root_pipeline_id, is_expired=False).exists():
raise RollBackException(
"reserve rollback failed, the rollbackPlan, current state={}, node_id={}".format(
state.name, start_node_id
)
)
raise RollBackException("reserve rollback failed, there exists another unfinished rollback plan")
RollbackPlan.objects.create(
root_pipeline_id=self.root_pipeline_id,
start_node_id=start_node_id,
target_node_id=target_node_id,
mode=self.mode,
options=options,
)
else:
# 取消回滚,删除所有的任务
Expand All @@ -227,37 +224,53 @@ def _reserve(self, start_node_id, target_node_id, reserve_rollback=True):
node.detail = json.dumps(node_detail)
node.save()

def reserve_rollback(self, start_node_id, target_node_id):
def reserve_rollback(self, start_node_id, target_node_id, **options):
"""
预约回滚
"""
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)
self._reserve(start_node_id, target_node_id)
self._reserve(start_node_id, target_node_id, **options)

def cancel_reserved_rollback(self, start_node_id, target_node_id):
"""
取消预约回滚
"""
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)
self._reserve(start_node_id, target_node_id, reserve_rollback=False)


class AnyRollbackHandler(BaseRollbackHandler):
mode = ANY

def get_allowed_rollback_node_id_list(self, start_node_id, **options):
# 如果开启了token跳过检查这个选项,那么将返回所有运行过的节点作为回滚范围
if options.get("skip_check_token", False):
node_map = self.get_allowed_rollback_node_map()
node_map.pop(start_node_id, Node)
return list(node_map.keys())
return super(AnyRollbackHandler, self).get_allowed_rollback_node_id_list(start_node_id, **options)

def retry_rollback_failed_node(self, node_id, retry_data):
""" """
raise RollBackException("rollback failed: when mode is any, not support retry")

def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
def reserve_rollback(self, start_node_id, target_node_id, **options):
"""
预约回滚
"""
if not options.get("skip_check_token", False):
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)
self._reserve(start_node_id, target_node_id, **options)

def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None, **options):
RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id)
RollbackValidator.validate_node(start_node_id, allow_failed=True)
RollbackValidator.validate_node(target_node_id)
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)
# 相同token回滚时,不允许同一token路径上有正在运行的节点
RollbackValidator.validate_node_state(self.root_pipeline_id, start_node_id)
if not options.get("skip_check_token", False):
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)
# 相同token回滚时,不允许同一token路径上有正在运行的节点
RollbackValidator.validate_node_state(self.root_pipeline_id, start_node_id)

node_map = self._get_allowed_rollback_node_map()
node_map = self.get_allowed_rollback_node_map()
rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id)

graph, other_nodes = rollback_graph.build_rollback_graph()
Expand Down Expand Up @@ -337,7 +350,7 @@ def _get_failed_skip_node_id_list(self, node_id_list):
).values_list("node_id", flat=True)
return failed_skip_node_id_list

def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None, **options):

if skip_rollback_nodes is None:
skip_rollback_nodes = []
Expand All @@ -353,7 +366,7 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
if self._node_state_is_failed(start_node_id):
skip_rollback_nodes.append(start_node_id)

node_map = self._get_allowed_rollback_node_map()
node_map = self.get_allowed_rollback_node_map()
rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id)

runtime = BambooDjangoRuntime()
Expand Down Expand Up @@ -400,17 +413,17 @@ def __init__(self, root_pipeline_id, mode):
else:
raise RollBackException("rollback failed: not support this mode, please check")

def rollback(self, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None):
self.handler.rollback(start_node_id, target_node_id, skip_rollback_nodes)
def rollback(self, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None, **options):
self.handler.rollback(start_node_id, target_node_id, skip_rollback_nodes, **options)

def reserve_rollback(self, start_node_id: str, target_node_id: str):
self.handler.reserve_rollback(start_node_id, target_node_id)
def reserve_rollback(self, start_node_id: str, target_node_id: str, **options):
self.handler.reserve_rollback(start_node_id, target_node_id, **options)

def retry_rollback_failed_node(self, node_id: str, retry_data: dict = None):
self.handler.retry_rollback_failed_node(node_id, retry_data)

def cancel_reserved_rollback(self, start_node_id: str, target_node_id: str):
self.handler.cancel_reserved_rollback(start_node_id, target_node_id)

def get_allowed_rollback_node_id_list(self, start_node_id: str):
return self.handler.get_allowed_rollback_node_id_list(start_node_id)
def get_allowed_rollback_node_id_list(self, start_node_id: str, **options):
return self.handler.get_allowed_rollback_node_id_list(start_node_id, **options)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 3.2.23 on 2023-12-29 09:11

import pipeline.contrib.fields
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("rollback", "0002_auto_20231020_1234"),
]

operations = [
migrations.AddField(
model_name="rollbackplan",
name="options",
field=pipeline.contrib.fields.SerializerField(default={}, verbose_name="rollback options"),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ class RollbackPlan(models.Model):
start_node_id = models.CharField(verbose_name="start node id", max_length=64, db_index=True)
target_node_id = models.CharField(verbose_name="target_node_id", max_length=64, db_index=True)
mode = models.CharField(verbose_name="rollback mode", max_length=32, default=TOKEN)
options = SerializerField(verbose_name="rollback options", default={})
is_expired = models.BooleanField(verbose_name="is expired", default=False)
4 changes: 2 additions & 2 deletions runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def set_pipeline_token(self, pipeline_tree: dict):

def set_node_snapshot(self, root_pipeline_id, node_id, code, version, context_values, inputs, outputs):
"""
创建一分节点快照
创建一份节点快照
"""
try:
RollbackNodeSnapshot = apps.get_model("rollback", "RollbackNodeSnapshot")
Expand Down Expand Up @@ -74,7 +74,7 @@ def start_rollback(self, root_pipeline_id, node_id):
root_pipeline_id=root_pipeline_id, start_node_id=node_id, is_expired=False
)
handler = RollbackDispatcher(root_pipeline_id=root_pipeline_id, mode=rollback_plan.mode)
handler.rollback(rollback_plan.start_node_id, rollback_plan.target_node_id)
handler.rollback(rollback_plan.start_node_id, rollback_plan.target_node_id, **rollback_plan.options)
rollback_plan.is_expired = True
rollback_plan.save(update_fields=["is_expired"])
except Exception as e:
Expand Down
Loading

0 comments on commit ec5380e

Please sign in to comment.