Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add archive notify #2852

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 79 additions & 73 deletions sql/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from common.utils.extend_json_encoder import ExtendJSONEncoder
from common.utils.timer import FuncTimer
from sql.engines import get_engine
from sql.notify import notify_for_audit
from sql.notify import notify_for_audit, notify_for_archive
from sql.plugins.pt_archiver import PtArchiver
from sql.utils.resource_group import user_instances, user_groups
from sql.models import ArchiveConfig, ArchiveLog, Instance, ResourceGroup
Expand Down Expand Up @@ -312,6 +312,7 @@ def add_archive_task(archive_ids=None):
async_task(
"sql.archiver.archive",
archive_id,
hook=notify_for_archive,
group=f'archive-{time.strftime("%Y-%m-%d %H:%M:%S ")}',
timeout=-1,
task_name=f"archive-{archive_id}",
Expand Down Expand Up @@ -400,79 +401,83 @@ def archive(archive_id):
select_cnt = 0
insert_cnt = 0
delete_cnt = 0
with FuncTimer() as t:
p = pt_archiver.execute_cmd(cmd_args)
stdout = ""
for line in iter(p.stdout.readline, ""):
if re.match(r"^SELECT\s(\d+)$", line, re.I):
select_cnt = re.findall(r"^SELECT\s(\d+)$", line)
elif re.match(r"^INSERT\s(\d+)$", line, re.I):
insert_cnt = re.findall(r"^INSERT\s(\d+)$", line)
elif re.match(r"^DELETE\s(\d+)$", line, re.I):
delete_cnt = re.findall(r"^DELETE\s(\d+)$", line)
stdout += f"{line}\n"
statistics = stdout
# 获取异常信息
stderr = p.stderr.read()
if stderr:
statistics = stdout + stderr

# 判断归档结果
select_cnt = int(select_cnt[0]) if select_cnt else 0
insert_cnt = int(insert_cnt[0]) if insert_cnt else 0
delete_cnt = int(delete_cnt[0]) if delete_cnt else 0
error_info = ""
success = True
if stderr:
error_info = f"命令执行报错:{stderr}"
success = False
if mode == "dest":
# 删除源数据,判断删除数量和写入数量
if not no_delete and (insert_cnt != delete_cnt):
error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}"
success = False
elif mode == "file":
# 删除源数据,判断查询数量和删除数量
if not no_delete and (select_cnt != delete_cnt):
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False
elif mode == "purge":
# 直接删除。判断查询数量和删除数量
if select_cnt != delete_cnt:
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
try:
with FuncTimer() as t:
p = pt_archiver.execute_cmd(cmd_args)
stdout = ""
for line in iter(p.stdout.readline, ""):
if re.match(r"^SELECT\s(\d+)$", line, re.I):
select_cnt = re.findall(r"^SELECT\s(\d+)$", line)
elif re.match(r"^INSERT\s(\d+)$", line, re.I):
insert_cnt = re.findall(r"^INSERT\s(\d+)$", line)
elif re.match(r"^DELETE\s(\d+)$", line, re.I):
delete_cnt = re.findall(r"^DELETE\s(\d+)$", line)
stdout += f"{line}\n"
statistics = stdout
# 获取异常信息
stderr = p.stderr.read()
if stderr:
statistics = stdout + stderr

# 判断归档结果
select_cnt = int(select_cnt[0]) if select_cnt else 0
insert_cnt = int(insert_cnt[0]) if insert_cnt else 0
delete_cnt = int(delete_cnt[0]) if delete_cnt else 0
error_info = ""
success = True
if stderr:
error_info = f"命令执行报错:{stderr}"
success = False

# 执行信息保存到数据库
if connection.connection and not connection.is_usable():
close_old_connections()
# 更新最后归档时间
ArchiveConfig(id=archive_id, last_archive_time=t.end).save(
update_fields=["last_archive_time"]
)
# 替换密码信息后保存
shell_cmd = " ".join(cmd_args)
ArchiveLog.objects.create(
archive=archive_info,
cmd=(
shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***")
if mode == "dest"
else shell_cmd.replace(s_ins.password, "***")
),
condition=condition,
mode=mode,
no_delete=no_delete,
sleep=sleep,
select_cnt=select_cnt,
insert_cnt=insert_cnt,
delete_cnt=delete_cnt,
statistics=statistics,
success=success,
error_info=error_info,
start_time=t.start,
end_time=t.end,
)
if not success:
raise Exception(f"{error_info}\n{statistics}")
if mode == "dest":
# 删除源数据,判断删除数量和写入数量
if not no_delete and (insert_cnt != delete_cnt):
error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}"
success = False
elif mode == "file":
# 删除源数据,判断查询数量和删除数量
if not no_delete and (select_cnt != delete_cnt):
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False
elif mode == "purge":
# 直接删除。判断查询数量和删除数量
if select_cnt != delete_cnt:
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False

# 执行信息保存到数据库
if connection.connection and not connection.is_usable():
close_old_connections()
# 更新最后归档时间
ArchiveConfig(id=archive_id, last_archive_time=t.end).save(
update_fields=["last_archive_time"]
)
# 替换密码信息后保存
shell_cmd = " ".join(cmd_args)
ArchiveLog.objects.create(
archive=archive_info,
cmd=(
shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***")
if mode == "dest"
else shell_cmd.replace(s_ins.password, "***")
),
condition=condition,
mode=mode,
no_delete=no_delete,
sleep=sleep,
select_cnt=select_cnt,
insert_cnt=insert_cnt,
delete_cnt=delete_cnt,
statistics=statistics,
success=success,
error_info=error_info,
start_time=t.start,
end_time=t.end,
)
if not success:
raise Exception(f"{error_info}\n{statistics}")
return src_db_name, src_table_name, error_info
except Exception as e:
return src_db_name, src_table_name, str(e)
Comment on lines +479 to +480
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里同样, 不建议这样捕获异常, 会造成任务状态和实际不一致

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在最后通知阶段根据是否有报错信息来区分成功还是失败,这样的话,不管是pt-archiver 执行失败了,还是说pt-archiver 执行成功了,但是删除行数和插入行数不对应,都应该报错,提示归档存在问题。这样try 可能就是在def archive(archive_id) 这个函数内部任何任何报错,比如最后记录归档日志失败,都报错,这个时候可能归档是成功的只是记录日志失败了。也报错了。这样是否合理?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以通过 raise exception 来实现相同的目的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

最终报错抛到前端页面么?还是继续发送通知?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果 raise exception 我记得 django-q 会自动记录的



@permission_required("sql.menu_archive", raise_exception=True)
Expand Down Expand Up @@ -531,6 +536,7 @@ def archive_once(request):
async_task(
"sql.archiver.archive",
archive_id,
hook=notify_for_archive,
timeout=-1,
task_name=f"archive-{archive_id}",
)
Expand Down
10 changes: 8 additions & 2 deletions sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,5 +248,11 @@ def my2sql_file(args, user):
args["output-dir"] = path
cmd_args = my2sql.generate_args2cmd(args)
# 使用output-dir参数执行命令保存sql
my2sql.execute_cmd(cmd_args)
return user, path
error_info = ""
try:
# 假设 my2sql.execute_cmd 返回一个包含 user 和 path 的元组
my2sql.execute_cmd(cmd_args)
return user, path, error_info
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不建议这样简单的捕获所有异常, 这个函数是给异步 worker 调用的, 捕获异常后, worker 会认为这个任务正常执行了, 后续的逻辑判断会不对

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我是想在最后my2sql通知时,还是通过错误内容来判断my2sql 执行成功与否,比如设置一个错误的my2sql目录

# 捕获所有异常并返回错误信息
return user, path, str(e)
81 changes: 69 additions & 12 deletions sql/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import importlib
import logging
import re
from dataclasses import dataclass, field
from dataclasses import dataclass, field, asdict
from enum import Enum
from itertools import chain
from typing import Union, List
Expand Down Expand Up @@ -42,6 +42,7 @@ class EventType(Enum):
EXECUTE = "execute"
AUDIT = "audit"
M2SQL = "m2sql"
ARCHIVE = "archive"


@dataclass
Expand All @@ -52,11 +53,20 @@ class My2SqlResult:
error: str = ""


@dataclass
class ArchiveResult:
archive_id: str
success: bool
src_db_name: str
src_table_name: str
error: str = ""


@dataclass
class Notifier:
workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult] = (
None
)
workflow: Union[
SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult
] = None
sys_config: SysConfig = None
# init false, class property, 不是 instance property
name: str = field(init=False, default="base")
Expand All @@ -70,7 +80,11 @@ def __post_init__(self):
raise ValueError("需要提供 WorkflowAudit 或 workflow")
if not self.workflow:
self.workflow = self.audit.get_workflow()
if not self.audit and not isinstance(self.workflow, My2SqlResult):
if (
not self.audit
and not isinstance(self.workflow, My2SqlResult)
and not isinstance(self.workflow, ArchiveResult)
):
self.audit = self.workflow.get_audit()
# 防止 get_auditor 显式的传了个 None
if not self.sys_config:
Expand Down Expand Up @@ -118,10 +132,14 @@ def render(self):
self.request_data["workflow_content"] = QueryPrivilegesApplySerializer(
self.workflow
).data
elif isinstance(self.workflow, ArchiveResult) or isinstance(
self.workflow, My2SqlResult
):
self.request_data["workflow_content"] = asdict(self.workflow)
else:
raise ValueError(f"workflow type `{type(self.workflow)}` not supported yet")

self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data
if self.audit:
self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data

def send(self):
url = self.sys_config.get(self.sys_config_key)
Expand Down Expand Up @@ -338,12 +356,12 @@ def render_execute(self):

def render_m2sql(self):
submitter_in_db = Users.objects.get(username=self.workflow.submitter)
if self.workflow.success:
if self.workflow.error:
title = "[Archery 通知]My2SQL执行失败"
content = f"解析SQL文件报错,{self.workflow.error}"
else:
title = "[Archery 通知]My2SQL执行结束"
content = f"解析的SQL文件在{self.workflow.file_path}目录下,请前往查看"
else:
title = "[Archery 通知]My2SQL执行失败"
content = self.workflow.error
self.messages = [
LegacyMessage(
msg_to=[submitter_in_db],
Expand All @@ -352,6 +370,26 @@ def render_m2sql(self):
)
]

def render_archive(self):
if self.workflow.error:
title = (
f"[Archery 通知]archive归档任务{self.workflow.src_db_name}."
f"{self.workflow.src_table_name}归档失败,报错信息为:{self.workflow.error}"
)
content = "请登录archery查看任务归档详细日志信息"
else:
title = (
f"[Archery 通知]archive归档任务{self.workflow.src_db_name}."
f"{self.workflow.src_table_name}归档成功"
)
content = "请登录archery查看任务归档详细日志信息"
self.messages = [
LegacyMessage(
msg_title=title,
msg_content=content,
)
]

def render(self):
"""渲染消息, 存储到 self.messages"""
if self.event_type == EventType.EXECUTE:
Expand All @@ -360,6 +398,8 @@ def render(self):
self.render_audit()
if self.event_type == EventType.M2SQL:
self.render_m2sql()
if self.event_type == EventType.ARCHIVE:
self.render_archive()


class DingdingWebhookNotifier(LegacyRender):
Expand Down Expand Up @@ -477,7 +517,7 @@ def send(self):
def auto_notify(
sys_config: SysConfig,
workflow: Union[
SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult
SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult, ArchiveResult
] = None,
audit: WorkflowAudit = None,
audit_detail: WorkflowAuditDetail = None,
Expand Down Expand Up @@ -551,3 +591,20 @@ def notify_for_my2sql(task):
# 发送
sys_config = SysConfig()
auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL)


def notify_for_archive(task):
"""
archive执行结束的通知
:param task:
:return:
"""
result = ArchiveResult(
success=task.success,
src_db_name=task.result[0],
src_table_name=task.result[1],
error=task.result[2],
)
# 发送
sys_config = SysConfig()
auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE)
Loading