From 345bd9b95acb48b93fb277cb67c9e35c031d9cc6 Mon Sep 17 00:00:00 2001 From: yanta Date: Wed, 30 Oct 2024 10:02:28 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=BD=92=E6=A1=A3?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/archiver.py | 12 +++++++++--- sql/notify.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/sql/archiver.py b/sql/archiver.py index f191c1fd9a..6a49093351 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -26,7 +26,7 @@ from common.utils.extend_json_encoder import ExtendJSONEncoder from common.utils.timer import FuncTimer from sql.engines import get_engine -from sql.notify import notify_for_audit +from sql.notify import notify_for_audit,notify_for_archive from sql.plugins.pt_archiver import PtArchiver from sql.utils.resource_group import user_instances, user_groups from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup @@ -312,6 +312,7 @@ def add_archive_task(archive_ids=None): async_task( "sql.archiver.archive", archive_id, + hook=notify_for_archive, group=f'archive-{time.strftime("%Y-%m-%d %H:%M:%S ")}', timeout=-1, task_name=f"archive-{archive_id}", @@ -471,8 +472,12 @@ def archive(archive_id): start_time=t.start, end_time=t.end, ) - if not success: - raise Exception(f"{error_info}\n{statistics}") + try: + if not success: + raise Exception(f"{error_info}\n{statistics}") + return src_db_name,src_table_name + except Exception as e: + return src_db_name, src_table_name, error_info @permission_required("sql.menu_archive", raise_exception=True) @@ -531,6 +536,7 @@ def archive_once(request): async_task( "sql.archiver.archive", archive_id, + hook=notify_for_archive, timeout=-1, task_name=f"archive-{archive_id}", ) diff --git a/sql/notify.py b/sql/notify.py index 4ab3ea98bc..54c9147855 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -42,6 +42,7 @@ class EventType(Enum): EXECUTE = "execute" AUDIT = "audit" M2SQL = "m2sql" + ARCHIVE = "archive" @dataclass @@ -51,10 +52,17 @@ class My2SqlResult: file_path: str = "" error: str = "" +@dataclass +class ArchiveResult: + archive_id: str + success: bool + src_db_name: str + src_table_name: str + error: str = "" @dataclass class Notifier: - workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult] = ( + workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult] = ( None ) sys_config: SysConfig = None @@ -70,7 +78,7 @@ def __post_init__(self): raise ValueError("需要提供 WorkflowAudit 或 workflow") if not self.workflow: self.workflow = self.audit.get_workflow() - if not self.audit and not isinstance(self.workflow, My2SqlResult): + if not self.audit and not isinstance(self.workflow, My2SqlResult) and not isinstance(self.workflow, ArchiveResult): self.audit = self.workflow.get_audit() # 防止 get_auditor 显式的传了个 None if not self.sys_config: @@ -352,6 +360,20 @@ def render_m2sql(self): ) ] + def render_archive(self): + if self.workflow.error: + title = f"[Archery 通知]archive归档任务%s.%s归档失败,报错信息为:%s" % (self.workflow.src_db_name, self.workflow.src_table_name,self.workflow.error) + content = f"请登录archery查看任务归档详细日志信息" + else: + title = f"[Archery 通知archive归档任务%s.%s归档成功" % (self.workflow.src_db_name, self.workflow.src_table_name) + content = f"请登录archery查看任务归档详细日志信息" + self.messages = [ + LegacyMessage( + msg_title=title, + msg_content=content, + ) + ] + def render(self): """渲染消息, 存储到 self.messages""" if self.event_type == EventType.EXECUTE: @@ -360,6 +382,8 @@ def render(self): self.render_audit() if self.event_type == EventType.M2SQL: self.render_m2sql() + if self.event_type == EventType.ARCHIVE: + self.render_archive() class DingdingWebhookNotifier(LegacyRender): @@ -477,7 +501,7 @@ def send(self): def auto_notify( sys_config: SysConfig, workflow: Union[ - SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult + SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult ] = None, audit: WorkflowAudit = None, audit_detail: WorkflowAuditDetail = None, @@ -551,3 +575,23 @@ def notify_for_my2sql(task): # 发送 sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL) + +def notify_for_archive(task): + """ + archive执行结束的通知 + :param task: + :return: + """ + if task.success: + result = My2SqlResult( + success=True, src_db_name=task.result[0], src_table_name=task.result[1],error=task.result[2] + ) + else: + result = My2SqlResult( + success=False, error=task.result + ) + # 发送 + sys_config = SysConfig() + auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE) + + From d1853b2417a1675b2062a9df8a9e9389700e41c1 Mon Sep 17 00:00:00 2001 From: yanta Date: Wed, 30 Oct 2024 14:33:41 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E9=80=9A=E7=94=A8Webhook=E5=9C=B0=E5=9D=80?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A2=9E=E5=8A=A0=E5=BD=92=E6=A1=A3=E5=92=8C?= =?UTF-8?q?archive=E9=80=9A=E7=9F=A5=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/notify.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/notify.py b/sql/notify.py index 54c9147855..4f515afd02 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -3,7 +3,7 @@ import importlib import logging import re -from dataclasses import dataclass, field +from dataclasses import dataclass, field, asdict from enum import Enum from itertools import chain from typing import Union, List @@ -126,10 +126,12 @@ def render(self): self.request_data["workflow_content"] = QueryPrivilegesApplySerializer( self.workflow ).data + elif isinstance(self.workflow, ArchiveResult) or isinstance(self.workflow, My2SqlResult): + self.request_data["workflow_content"] =asdict(self.workflow) else: raise ValueError(f"workflow type `{type(self.workflow)}` not supported yet") - - self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data + if self.audit: + self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data def send(self): url = self.sys_config.get(self.sys_config_key) From 9169291a6baa70238a4bce1b35686fe7d08cda3f Mon Sep 17 00:00:00 2001 From: yanta Date: Wed, 30 Oct 2024 14:43:07 +0800 Subject: [PATCH 3/9] =?UTF-8?q?black=20=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/archiver.py | 4 ++-- sql/notify.py | 51 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/sql/archiver.py b/sql/archiver.py index 6a49093351..d63fc17c14 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -26,7 +26,7 @@ from common.utils.extend_json_encoder import ExtendJSONEncoder from common.utils.timer import FuncTimer from sql.engines import get_engine -from sql.notify import notify_for_audit,notify_for_archive +from sql.notify import notify_for_audit, notify_for_archive from sql.plugins.pt_archiver import PtArchiver from sql.utils.resource_group import user_instances, user_groups from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup @@ -475,7 +475,7 @@ def archive(archive_id): try: if not success: raise Exception(f"{error_info}\n{statistics}") - return src_db_name,src_table_name + return src_db_name, src_table_name except Exception as e: return src_db_name, src_table_name, error_info diff --git a/sql/notify.py b/sql/notify.py index 4f515afd02..69f013d054 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -52,6 +52,7 @@ class My2SqlResult: file_path: str = "" error: str = "" + @dataclass class ArchiveResult: archive_id: str @@ -60,11 +61,12 @@ class ArchiveResult: src_table_name: str error: str = "" + @dataclass class Notifier: - workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult] = ( - None - ) + workflow: Union[ + SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult + ] = None sys_config: SysConfig = None # init false, class property, 不是 instance property name: str = field(init=False, default="base") @@ -78,7 +80,11 @@ def __post_init__(self): raise ValueError("需要提供 WorkflowAudit 或 workflow") if not self.workflow: self.workflow = self.audit.get_workflow() - if not self.audit and not isinstance(self.workflow, My2SqlResult) and not isinstance(self.workflow, ArchiveResult): + if ( + not self.audit + and not isinstance(self.workflow, My2SqlResult) + and not isinstance(self.workflow, ArchiveResult) + ): self.audit = self.workflow.get_audit() # 防止 get_auditor 显式的传了个 None if not self.sys_config: @@ -126,8 +132,10 @@ def render(self): self.request_data["workflow_content"] = QueryPrivilegesApplySerializer( self.workflow ).data - elif isinstance(self.workflow, ArchiveResult) or isinstance(self.workflow, My2SqlResult): - self.request_data["workflow_content"] =asdict(self.workflow) + elif isinstance(self.workflow, ArchiveResult) or isinstance( + self.workflow, My2SqlResult + ): + self.request_data["workflow_content"] = asdict(self.workflow) else: raise ValueError(f"workflow type `{type(self.workflow)}` not supported yet") if self.audit: @@ -183,7 +191,9 @@ def render_audit(self): auth_ends_at = datetime.datetime.strftime( workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" ) - workflow_content += f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" + workflow_content += ( + f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" + ) elif workflow_type == WorkflowType.SQL_REVIEW: workflow_type_display = WorkflowType.SQL_REVIEW.label workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) @@ -264,9 +274,7 @@ def render_audit(self): re.sub("[\r\n\f]{2,}", "\n", self.audit_detail.remark), ) elif status == WorkflowStatus.ABORTED: # 审核取消,通知所有审核人 - msg_title = "[{}]提交人主动终止工单#{}".format( - workflow_type_display, audit_id - ) + msg_title = "[{}]提交人主动终止工单#{}".format(workflow_type_display, audit_id) # 接收人,发送给该资源组内对应权限组所有的用户 auth_group_names = [ Group.objects.get(id=auth_group_id).name @@ -364,10 +372,17 @@ def render_m2sql(self): def render_archive(self): if self.workflow.error: - title = f"[Archery 通知]archive归档任务%s.%s归档失败,报错信息为:%s" % (self.workflow.src_db_name, self.workflow.src_table_name,self.workflow.error) + title = f"[Archery 通知]archive归档任务%s.%s归档失败,报错信息为:%s" % ( + self.workflow.src_db_name, + self.workflow.src_table_name, + self.workflow.error, + ) content = f"请登录archery查看任务归档详细日志信息" else: - title = f"[Archery 通知archive归档任务%s.%s归档成功" % (self.workflow.src_db_name, self.workflow.src_table_name) + title = f"[Archery 通知archive归档任务%s.%s归档成功" % ( + self.workflow.src_db_name, + self.workflow.src_table_name, + ) content = f"请登录archery查看任务归档详细日志信息" self.messages = [ LegacyMessage( @@ -578,6 +593,7 @@ def notify_for_my2sql(task): sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL) + def notify_for_archive(task): """ archive执行结束的通知 @@ -586,14 +602,13 @@ def notify_for_archive(task): """ if task.success: result = My2SqlResult( - success=True, src_db_name=task.result[0], src_table_name=task.result[1],error=task.result[2] + success=True, + src_db_name=task.result[0], + src_table_name=task.result[1], + error=task.result[2], ) else: - result = My2SqlResult( - success=False, error=task.result - ) + result = My2SqlResult(success=False, error=task.result) # 发送 sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE) - - From 86302bd4f96e859d1785511cb02623f5ad90dfaf Mon Sep 17 00:00:00 2001 From: yanta Date: Wed, 30 Oct 2024 16:56:36 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dlint=E6=8A=A5=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/notify.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/notify.py b/sql/notify.py index 69f013d054..c1fd7ae1c0 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -191,9 +191,7 @@ def render_audit(self): auth_ends_at = datetime.datetime.strftime( workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" ) - workflow_content += ( - f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" - ) + workflow_content += f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" elif workflow_type == WorkflowType.SQL_REVIEW: workflow_type_display = WorkflowType.SQL_REVIEW.label workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) @@ -274,7 +272,9 @@ def render_audit(self): re.sub("[\r\n\f]{2,}", "\n", self.audit_detail.remark), ) elif status == WorkflowStatus.ABORTED: # 审核取消,通知所有审核人 - msg_title = "[{}]提交人主动终止工单#{}".format(workflow_type_display, audit_id) + msg_title = "[{}]提交人主动终止工单#{}".format( + workflow_type_display, audit_id + ) # 接收人,发送给该资源组内对应权限组所有的用户 auth_group_names = [ Group.objects.get(id=auth_group_id).name From dcb6a420ed94b71e4adf2e9104254941be1df6ad Mon Sep 17 00:00:00 2001 From: yanta Date: Thu, 31 Oct 2024 11:40:58 +0800 Subject: [PATCH 5/9] =?UTF-8?q?my2sql=E5=A4=B1=E8=B4=A5=E5=92=8Carchive?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E9=83=BD=E6=8A=9B=E5=87=BA=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/archiver.py | 147 ++++++++++++++++++++++++------------------------ sql/binlog.py | 8 ++- sql/notify.py | 15 ++--- 3 files changed, 86 insertions(+), 84 deletions(-) diff --git a/sql/archiver.py b/sql/archiver.py index d63fc17c14..2444d08327 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -401,83 +401,84 @@ def archive(archive_id): select_cnt = 0 insert_cnt = 0 delete_cnt = 0 - with FuncTimer() as t: - p = pt_archiver.execute_cmd(cmd_args) - stdout = "" - for line in iter(p.stdout.readline, ""): - if re.match(r"^SELECT\s(\d+)$", line, re.I): - select_cnt = re.findall(r"^SELECT\s(\d+)$", line) - elif re.match(r"^INSERT\s(\d+)$", line, re.I): - insert_cnt = re.findall(r"^INSERT\s(\d+)$", line) - elif re.match(r"^DELETE\s(\d+)$", line, re.I): - delete_cnt = re.findall(r"^DELETE\s(\d+)$", line) - stdout += f"{line}\n" - statistics = stdout - # 获取异常信息 - stderr = p.stderr.read() - if stderr: - statistics = stdout + stderr - - # 判断归档结果 - select_cnt = int(select_cnt[0]) if select_cnt else 0 - insert_cnt = int(insert_cnt[0]) if insert_cnt else 0 - delete_cnt = int(delete_cnt[0]) if delete_cnt else 0 - error_info = "" - success = True - if stderr: - error_info = f"命令执行报错:{stderr}" - success = False - if mode == "dest": - # 删除源数据,判断删除数量和写入数量 - if not no_delete and (insert_cnt != delete_cnt): - error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}" - success = False - elif mode == "file": - # 删除源数据,判断查询数量和删除数量 - if not no_delete and (select_cnt != delete_cnt): - error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" - success = False - elif mode == "purge": - # 直接删除。判断查询数量和删除数量 - if select_cnt != delete_cnt: - error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" - success = False - - # 执行信息保存到数据库 - if connection.connection and not connection.is_usable(): - close_old_connections() - # 更新最后归档时间 - ArchiveConfig(id=archive_id, last_archive_time=t.end).save( - update_fields=["last_archive_time"] - ) - # 替换密码信息后保存 - shell_cmd = " ".join(cmd_args) - ArchiveLog.objects.create( - archive=archive_info, - cmd=( - shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***") - if mode == "dest" - else shell_cmd.replace(s_ins.password, "***") - ), - condition=condition, - mode=mode, - no_delete=no_delete, - sleep=sleep, - select_cnt=select_cnt, - insert_cnt=insert_cnt, - delete_cnt=delete_cnt, - statistics=statistics, - success=success, - error_info=error_info, - start_time=t.start, - end_time=t.end, - ) try: + with FuncTimer() as t: + p = pt_archiver.execute_cmd(cmd_args) + stdout = "" + for line in iter(p.stdout.readline, ""): + if re.match(r"^SELECT\s(\d+)$", line, re.I): + select_cnt = re.findall(r"^SELECT\s(\d+)$", line) + elif re.match(r"^INSERT\s(\d+)$", line, re.I): + insert_cnt = re.findall(r"^INSERT\s(\d+)$", line) + elif re.match(r"^DELETE\s(\d+)$", line, re.I): + delete_cnt = re.findall(r"^DELETE\s(\d+)$", line) + stdout += f"{line}\n" + statistics = stdout + # 获取异常信息 + stderr = p.stderr.read() + if stderr: + statistics = stdout + stderr + + # 判断归档结果 + select_cnt = int(select_cnt[0]) if select_cnt else 0 + insert_cnt = int(insert_cnt[0]) if insert_cnt else 0 + delete_cnt = int(delete_cnt[0]) if delete_cnt else 0 + error_info = "" + success = True + if stderr: + error_info = f"命令执行报错:{stderr}" + success = False + if mode == "dest": + # 删除源数据,判断删除数量和写入数量 + if not no_delete and (insert_cnt != delete_cnt): + error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}" + success = False + elif mode == "file": + # 删除源数据,判断查询数量和删除数量 + if not no_delete and (select_cnt != delete_cnt): + error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" + success = False + elif mode == "purge": + # 直接删除。判断查询数量和删除数量 + if select_cnt != delete_cnt: + error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" + success = False + + # 执行信息保存到数据库 + if connection.connection and not connection.is_usable(): + close_old_connections() + # 更新最后归档时间 + ArchiveConfig(id=archive_id, last_archive_time=t.end).save( + update_fields=["last_archive_time"] + ) + # 替换密码信息后保存 + shell_cmd = " ".join(cmd_args) + ArchiveLog.objects.create( + archive=archive_info, + cmd=( + shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***") + if mode == "dest" + else shell_cmd.replace(s_ins.password, "***") + ), + condition=condition, + mode=mode, + no_delete=no_delete, + sleep=sleep, + select_cnt=select_cnt, + insert_cnt=insert_cnt, + delete_cnt=delete_cnt, + statistics=statistics, + success=success, + error_info=error_info, + start_time=t.start, + end_time=t.end, + ) if not success: raise Exception(f"{error_info}\n{statistics}") - return src_db_name, src_table_name + return src_db_name, src_table_name,error_info except Exception as e: - return src_db_name, src_table_name, error_info + return src_db_name, src_table_name, str(e) + @permission_required("sql.menu_archive", raise_exception=True) diff --git a/sql/binlog.py b/sql/binlog.py index 812f00b905..6d32b7ee7b 100644 --- a/sql/binlog.py +++ b/sql/binlog.py @@ -248,5 +248,9 @@ def my2sql_file(args, user): args["output-dir"] = path cmd_args = my2sql.generate_args2cmd(args) # 使用output-dir参数执行命令保存sql - my2sql.execute_cmd(cmd_args) - return user, path + try: + my2sql.execute_cmd(cmd_args) + return user, path + except Exception as e: + # 捕获所有异常并返回错误信息 + return user, str(e) \ No newline at end of file diff --git a/sql/notify.py b/sql/notify.py index c1fd7ae1c0..bdf6e7d310 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -600,15 +600,12 @@ def notify_for_archive(task): :param task: :return: """ - if task.success: - result = My2SqlResult( - success=True, - src_db_name=task.result[0], - src_table_name=task.result[1], - error=task.result[2], - ) - else: - result = My2SqlResult(success=False, error=task.result) + result = ArchiveResult( + success=task.success, + src_db_name=task.result[0], + src_table_name=task.result[1], + error=task.result[2], + ) # 发送 sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE) From ff04426f57a4a6367fe3cdf6a71cd2cbe62ae149 Mon Sep 17 00:00:00 2001 From: yanta Date: Thu, 31 Oct 2024 14:46:20 +0800 Subject: [PATCH 6/9] lint --- sql/archiver.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/archiver.py b/sql/archiver.py index 2444d08327..ad4f8a0965 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -475,12 +475,11 @@ def archive(archive_id): ) if not success: raise Exception(f"{error_info}\n{statistics}") - return src_db_name, src_table_name,error_info + return src_db_name, src_table_name, error_info except Exception as e: return src_db_name, src_table_name, str(e) - @permission_required("sql.menu_archive", raise_exception=True) def archive_log(request): """获取归档日志列表""" From 5fbad71b5e3875502b60c171ce2fd28e1b54af4e Mon Sep 17 00:00:00 2001 From: yanta Date: Thu, 31 Oct 2024 14:47:55 +0800 Subject: [PATCH 7/9] lint --- sql/binlog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/binlog.py b/sql/binlog.py index 6d32b7ee7b..2432f9a032 100644 --- a/sql/binlog.py +++ b/sql/binlog.py @@ -253,4 +253,4 @@ def my2sql_file(args, user): return user, path except Exception as e: # 捕获所有异常并返回错误信息 - return user, str(e) \ No newline at end of file + return user, str(e) From aedc934373be09ee4088acf7015f79c73c696495 Mon Sep 17 00:00:00 2001 From: yanta Date: Thu, 31 Oct 2024 15:59:51 +0800 Subject: [PATCH 8/9] =?UTF-8?q?My2sql=E6=8A=A5=E9=94=99=E6=97=B6=E6=8A=9B?= =?UTF-8?q?=E5=87=BA=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/binlog.py | 6 ++++-- sql/notify.py | 25 ++++++++++++------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/binlog.py b/sql/binlog.py index 2432f9a032..83abdf1c78 100644 --- a/sql/binlog.py +++ b/sql/binlog.py @@ -248,9 +248,11 @@ def my2sql_file(args, user): args["output-dir"] = path cmd_args = my2sql.generate_args2cmd(args) # 使用output-dir参数执行命令保存sql + error_info = '' try: + # 假设 my2sql.execute_cmd 返回一个包含 user 和 path 的元组 my2sql.execute_cmd(cmd_args) - return user, path + return user, path, error_info except Exception as e: # 捕获所有异常并返回错误信息 - return user, str(e) + return user, path, str(e) diff --git a/sql/notify.py b/sql/notify.py index bdf6e7d310..2d713f3045 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -356,12 +356,12 @@ def render_execute(self): def render_m2sql(self): submitter_in_db = Users.objects.get(username=self.workflow.submitter) - if self.workflow.success: + if self.workflow.error: + title = "[Archery 通知]My2SQL执行失败" + content = f"解析SQL文件报错,{self.workflow.error}" + else: title = "[Archery 通知]My2SQL执行结束" content = f"解析的SQL文件在{self.workflow.file_path}目录下,请前往查看" - else: - title = "[Archery 通知]My2SQL执行失败" - content = self.workflow.error self.messages = [ LegacyMessage( msg_to=[submitter_in_db], @@ -372,18 +372,17 @@ def render_m2sql(self): def render_archive(self): if self.workflow.error: - title = f"[Archery 通知]archive归档任务%s.%s归档失败,报错信息为:%s" % ( - self.workflow.src_db_name, - self.workflow.src_table_name, - self.workflow.error, + title = ( + f"[Archery 通知]archive归档任务{self.workflow.src_db_name}." + f"{self.workflow.src_table_name}归档失败,报错信息为:{self.workflow.error}" ) - content = f"请登录archery查看任务归档详细日志信息" + content = "请登录archery查看任务归档详细日志信息" else: - title = f"[Archery 通知archive归档任务%s.%s归档成功" % ( - self.workflow.src_db_name, - self.workflow.src_table_name, + title = ( + f"[Archery 通知]archive归档任务{self.workflow.src_db_name}." + f"{self.workflow.src_table_name}归档成功" ) - content = f"请登录archery查看任务归档详细日志信息" + content = "请登录archery查看任务归档详细日志信息" self.messages = [ LegacyMessage( msg_title=title, From e5e2c7138fe858643f77b9fcca2fb3cbbb906e6b Mon Sep 17 00:00:00 2001 From: yanta Date: Thu, 31 Oct 2024 16:01:34 +0800 Subject: [PATCH 9/9] =?UTF-8?q?lint=E6=8A=A5=E9=94=99=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/binlog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/binlog.py b/sql/binlog.py index 83abdf1c78..9f2b364a33 100644 --- a/sql/binlog.py +++ b/sql/binlog.py @@ -248,7 +248,7 @@ def my2sql_file(args, user): args["output-dir"] = path cmd_args = my2sql.generate_args2cmd(args) # 使用output-dir参数执行命令保存sql - error_info = '' + error_info = "" try: # 假设 my2sql.execute_cmd 返回一个包含 user 和 path 的元组 my2sql.execute_cmd(cmd_args)