diff --git a/sql/archiver.py b/sql/archiver.py index f191c1fd9a..ad4f8a0965 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -26,7 +26,7 @@ from common.utils.extend_json_encoder import ExtendJSONEncoder from common.utils.timer import FuncTimer from sql.engines import get_engine -from sql.notify import notify_for_audit +from sql.notify import notify_for_audit, notify_for_archive from sql.plugins.pt_archiver import PtArchiver from sql.utils.resource_group import user_instances, user_groups from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup @@ -312,6 +312,7 @@ def add_archive_task(archive_ids=None): async_task( "sql.archiver.archive", archive_id, + hook=notify_for_archive, group=f'archive-{time.strftime("%Y-%m-%d %H:%M:%S ")}', timeout=-1, task_name=f"archive-{archive_id}", @@ -400,79 +401,83 @@ def archive(archive_id): select_cnt = 0 insert_cnt = 0 delete_cnt = 0 - with FuncTimer() as t: - p = pt_archiver.execute_cmd(cmd_args) - stdout = "" - for line in iter(p.stdout.readline, ""): - if re.match(r"^SELECT\s(\d+)$", line, re.I): - select_cnt = re.findall(r"^SELECT\s(\d+)$", line) - elif re.match(r"^INSERT\s(\d+)$", line, re.I): - insert_cnt = re.findall(r"^INSERT\s(\d+)$", line) - elif re.match(r"^DELETE\s(\d+)$", line, re.I): - delete_cnt = re.findall(r"^DELETE\s(\d+)$", line) - stdout += f"{line}\n" - statistics = stdout - # 获取异常信息 - stderr = p.stderr.read() - if stderr: - statistics = stdout + stderr - - # 判断归档结果 - select_cnt = int(select_cnt[0]) if select_cnt else 0 - insert_cnt = int(insert_cnt[0]) if insert_cnt else 0 - delete_cnt = int(delete_cnt[0]) if delete_cnt else 0 - error_info = "" - success = True - if stderr: - error_info = f"命令执行报错:{stderr}" - success = False - if mode == "dest": - # 删除源数据,判断删除数量和写入数量 - if not no_delete and (insert_cnt != delete_cnt): - error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}" - success = False - elif mode == "file": - # 删除源数据,判断查询数量和删除数量 - if not no_delete and (select_cnt != delete_cnt): - error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" - success = False - elif mode == "purge": - # 直接删除。判断查询数量和删除数量 - if select_cnt != delete_cnt: - error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" + try: + with FuncTimer() as t: + p = pt_archiver.execute_cmd(cmd_args) + stdout = "" + for line in iter(p.stdout.readline, ""): + if re.match(r"^SELECT\s(\d+)$", line, re.I): + select_cnt = re.findall(r"^SELECT\s(\d+)$", line) + elif re.match(r"^INSERT\s(\d+)$", line, re.I): + insert_cnt = re.findall(r"^INSERT\s(\d+)$", line) + elif re.match(r"^DELETE\s(\d+)$", line, re.I): + delete_cnt = re.findall(r"^DELETE\s(\d+)$", line) + stdout += f"{line}\n" + statistics = stdout + # 获取异常信息 + stderr = p.stderr.read() + if stderr: + statistics = stdout + stderr + + # 判断归档结果 + select_cnt = int(select_cnt[0]) if select_cnt else 0 + insert_cnt = int(insert_cnt[0]) if insert_cnt else 0 + delete_cnt = int(delete_cnt[0]) if delete_cnt else 0 + error_info = "" + success = True + if stderr: + error_info = f"命令执行报错:{stderr}" success = False - - # 执行信息保存到数据库 - if connection.connection and not connection.is_usable(): - close_old_connections() - # 更新最后归档时间 - ArchiveConfig(id=archive_id, last_archive_time=t.end).save( - update_fields=["last_archive_time"] - ) - # 替换密码信息后保存 - shell_cmd = " ".join(cmd_args) - ArchiveLog.objects.create( - archive=archive_info, - cmd=( - shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***") - if mode == "dest" - else shell_cmd.replace(s_ins.password, "***") - ), - condition=condition, - mode=mode, - no_delete=no_delete, - sleep=sleep, - select_cnt=select_cnt, - insert_cnt=insert_cnt, - delete_cnt=delete_cnt, - statistics=statistics, - success=success, - error_info=error_info, - start_time=t.start, - end_time=t.end, - ) - if not success: - raise Exception(f"{error_info}\n{statistics}") + if mode == "dest": + # 删除源数据,判断删除数量和写入数量 + if not no_delete and (insert_cnt != delete_cnt): + error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}" + success = False + elif mode == "file": + # 删除源数据,判断查询数量和删除数量 + if not no_delete and (select_cnt != delete_cnt): + error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" + success = False + elif mode == "purge": + # 直接删除。判断查询数量和删除数量 + if select_cnt != delete_cnt: + error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}" + success = False + + # 执行信息保存到数据库 + if connection.connection and not connection.is_usable(): + close_old_connections() + # 更新最后归档时间 + ArchiveConfig(id=archive_id, last_archive_time=t.end).save( + update_fields=["last_archive_time"] + ) + # 替换密码信息后保存 + shell_cmd = " ".join(cmd_args) + ArchiveLog.objects.create( + archive=archive_info, + cmd=( + shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***") + if mode == "dest" + else shell_cmd.replace(s_ins.password, "***") + ), + condition=condition, + mode=mode, + no_delete=no_delete, + sleep=sleep, + select_cnt=select_cnt, + insert_cnt=insert_cnt, + delete_cnt=delete_cnt, + statistics=statistics, + success=success, + error_info=error_info, + start_time=t.start, + end_time=t.end, + ) + if not success: + raise Exception(f"{error_info}\n{statistics}") + return src_db_name, src_table_name, error_info + except Exception as e: + return src_db_name, src_table_name, str(e) @permission_required("sql.menu_archive", raise_exception=True) @@ -531,6 +536,7 @@ def archive_once(request): async_task( "sql.archiver.archive", archive_id, + hook=notify_for_archive, timeout=-1, task_name=f"archive-{archive_id}", ) diff --git a/sql/binlog.py b/sql/binlog.py index 812f00b905..9f2b364a33 100644 --- a/sql/binlog.py +++ b/sql/binlog.py @@ -248,5 +248,11 @@ def my2sql_file(args, user): args["output-dir"] = path cmd_args = my2sql.generate_args2cmd(args) # 使用output-dir参数执行命令保存sql - my2sql.execute_cmd(cmd_args) - return user, path + error_info = "" + try: + # 假设 my2sql.execute_cmd 返回一个包含 user 和 path 的元组 + my2sql.execute_cmd(cmd_args) + return user, path, error_info + except Exception as e: + # 捕获所有异常并返回错误信息 + return user, path, str(e) diff --git a/sql/notify.py b/sql/notify.py index 4ab3ea98bc..2d713f3045 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -3,7 +3,7 @@ import importlib import logging import re -from dataclasses import dataclass, field +from dataclasses import dataclass, field, asdict from enum import Enum from itertools import chain from typing import Union, List @@ -42,6 +42,7 @@ class EventType(Enum): EXECUTE = "execute" AUDIT = "audit" M2SQL = "m2sql" + ARCHIVE = "archive" @dataclass @@ -52,11 +53,20 @@ class My2SqlResult: error: str = "" +@dataclass +class ArchiveResult: + archive_id: str + success: bool + src_db_name: str + src_table_name: str + error: str = "" + + @dataclass class Notifier: - workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult] = ( - None - ) + workflow: Union[ + SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult + ] = None sys_config: SysConfig = None # init false, class property, 不是 instance property name: str = field(init=False, default="base") @@ -70,7 +80,11 @@ def __post_init__(self): raise ValueError("需要提供 WorkflowAudit 或 workflow") if not self.workflow: self.workflow = self.audit.get_workflow() - if not self.audit and not isinstance(self.workflow, My2SqlResult): + if ( + not self.audit + and not isinstance(self.workflow, My2SqlResult) + and not isinstance(self.workflow, ArchiveResult) + ): self.audit = self.workflow.get_audit() # 防止 get_auditor 显式的传了个 None if not self.sys_config: @@ -118,10 +132,14 @@ def render(self): self.request_data["workflow_content"] = QueryPrivilegesApplySerializer( self.workflow ).data + elif isinstance(self.workflow, ArchiveResult) or isinstance( + self.workflow, My2SqlResult + ): + self.request_data["workflow_content"] = asdict(self.workflow) else: raise ValueError(f"workflow type `{type(self.workflow)}` not supported yet") - - self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data + if self.audit: + self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data def send(self): url = self.sys_config.get(self.sys_config_key) @@ -338,12 +356,12 @@ def render_execute(self): def render_m2sql(self): submitter_in_db = Users.objects.get(username=self.workflow.submitter) - if self.workflow.success: + if self.workflow.error: + title = "[Archery 通知]My2SQL执行失败" + content = f"解析SQL文件报错,{self.workflow.error}" + else: title = "[Archery 通知]My2SQL执行结束" content = f"解析的SQL文件在{self.workflow.file_path}目录下,请前往查看" - else: - title = "[Archery 通知]My2SQL执行失败" - content = self.workflow.error self.messages = [ LegacyMessage( msg_to=[submitter_in_db], @@ -352,6 +370,26 @@ def render_m2sql(self): ) ] + def render_archive(self): + if self.workflow.error: + title = ( + f"[Archery 通知]archive归档任务{self.workflow.src_db_name}." + f"{self.workflow.src_table_name}归档失败,报错信息为:{self.workflow.error}" + ) + content = "请登录archery查看任务归档详细日志信息" + else: + title = ( + f"[Archery 通知]archive归档任务{self.workflow.src_db_name}." + f"{self.workflow.src_table_name}归档成功" + ) + content = "请登录archery查看任务归档详细日志信息" + self.messages = [ + LegacyMessage( + msg_title=title, + msg_content=content, + ) + ] + def render(self): """渲染消息, 存储到 self.messages""" if self.event_type == EventType.EXECUTE: @@ -360,6 +398,8 @@ def render(self): self.render_audit() if self.event_type == EventType.M2SQL: self.render_m2sql() + if self.event_type == EventType.ARCHIVE: + self.render_archive() class DingdingWebhookNotifier(LegacyRender): @@ -477,7 +517,7 @@ def send(self): def auto_notify( sys_config: SysConfig, workflow: Union[ - SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult + SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult ] = None, audit: WorkflowAudit = None, audit_detail: WorkflowAuditDetail = None, @@ -551,3 +591,20 @@ def notify_for_my2sql(task): # 发送 sys_config = SysConfig() auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL) + + +def notify_for_archive(task): + """ + archive执行结束的通知 + :param task: + :return: + """ + result = ArchiveResult( + success=task.success, + src_db_name=task.result[0], + src_table_name=task.result[1], + error=task.result[2], + ) + # 发送 + sys_config = SysConfig() + auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE)