From 9169291a6baa70238a4bce1b35686fe7d08cda3f Mon Sep 17 00:00:00 2001 From: yanta Date: Wed, 30 Oct 2024 14:43:07 +0800 Subject: [PATCH] =?UTF-8?q?black=20=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/archiver.py | 4 ++-- sql/notify.py | 51 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/sql/archiver.py b/sql/archiver.py index 6a49093351..d63fc17c14 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -26,7 +26,7 @@ from common.utils.extend_json_encoder import ExtendJSONEncoder from common.utils.timer import FuncTimer from sql.engines import get_engine -from sql.notify import notify_for_audit,notify_for_archive +from sql.notify import notify_for_audit, notify_for_archive from sql.plugins.pt_archiver import PtArchiver from sql.utils.resource_group import user_instances, user_groups from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup @@ -475,7 +475,7 @@ def archive(archive_id): try: if not success: raise Exception(f"{error_info}\n{statistics}") - return src_db_name,src_table_name + return src_db_name, src_table_name except Exception as e: return src_db_name, src_table_name, error_info diff --git a/sql/notify.py b/sql/notify.py index 4f515afd02..69f013d054 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -52,6 +52,7 @@ class My2SqlResult: file_path: str = "" error: str = "" + @dataclass class ArchiveResult: archive_id: str @@ -60,11 +61,12 @@ class ArchiveResult: src_table_name: str error: str = "" + @dataclass class Notifier: - workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult] = ( - None - ) + workflow: Union[ + SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult + ] = None sys_config: SysConfig = None # init false, class property, 不是 instance property name: str = field(init=False, default="base") @@ -78,7 +80,11 @@ def __post_init__(self): raise ValueError("需要提供 WorkflowAudit 或 workflow") if not self.workflow: self.workflow = self.audit.get_workflow() - if not self.audit and not isinstance(self.workflow, My2SqlResult) and not isinstance(self.workflow, ArchiveResult): + if ( + not self.audit + and not isinstance(self.workflow, My2SqlResult) + and not isinstance(self.workflow, ArchiveResult) + ): self.audit = self.workflow.get_audit() # 防止 get_auditor 显式的传了个 None if not self.sys_config: @@ -126,8 +132,10 @@ def render(self): self.request_data["workflow_content"] = QueryPrivilegesApplySerializer( self.workflow ).data - elif isinstance(self.workflow, ArchiveResult) or isinstance(self.workflow, My2SqlResult): - self.request_data["workflow_content"] =asdict(self.workflow) + elif isinstance(self.workflow, ArchiveResult) or isinstance( + self.workflow, My2SqlResult + ): + self.request_data["workflow_content"] = asdict(self.workflow) else: raise ValueError(f"workflow type `{type(self.workflow)}` not supported yet") if self.audit: @@ -183,7 +191,9 @@ def render_audit(self): auth_ends_at = datetime.datetime.strftime( workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" ) - workflow_content += f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" + workflow_content += ( + f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" + ) elif workflow_type == WorkflowType.SQL_REVIEW: workflow_type_display = WorkflowType.SQL_REVIEW.label workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) @@ -264,9 +274,7 @@ def render_audit(self): re.sub("[\r\n\f]{2,}", "\n", self.audit_detail.remark), ) elif status == WorkflowStatus.ABORTED: # 审核取消,通知所有审核人 - msg_title = "[{}]提交人主动终止工单#{}".format( - workflow_type_display, audit_id - ) + msg_title = "[{}]提交人主动终止工单#{}".format(workflow_type_display, audit_id) # 接收人,发送给该资源组内对应权限组所有的用户 auth_group_names = [ Group.objects.get(id=auth_group_id).name @@ -364,10 +372,17 @@ def render_m2sql(self): def render_archive(self): if self.workflow.error: - title = f"[Archery 通知]archive归档任务%s.%s归档失败,报错信息为:%s" % (self.workflow.src_db_name, self.workflow.src_table_name,self.workflow.error) + title = f"[Archery 通知]archive归档任务%s.%s归档失败,报错信息为:%s" % ( + self.workflow.src_db_name, + self.workflow.src_table_name, + self.workflow.error, + ) content = f"请登录archery查看任务归档详细日志信息" else: - title = f"[Archery 通知archive归档任务%s.%s归档成功" % (self.workflow.src_db_name, self.workflow.src_table_name) + title = f"[Archery 通知archive归档任务%s.%s归档成功" % ( + self.workflow.src_db_name, + self.workflow.src_table_name, + ) content = f"请登录archery查看任务归档详细日志信息" self.messages = [ LegacyMessage( @@ -578,6 +593,7 @@ def notify_for_my2sql(task): sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL) + def notify_for_archive(task): """ archive执行结束的通知 @@ -586,14 +602,13 @@ def notify_for_archive(task): """ if task.success: result = My2SqlResult( - success=True, src_db_name=task.result[0], src_table_name=task.result[1],error=task.result[2] + success=True, + src_db_name=task.result[0], + src_table_name=task.result[1], + error=task.result[2], ) else: - result = My2SqlResult( - success=False, error=task.result - ) + result = My2SqlResult(success=False, error=task.result) # 发送 sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE) - -