diff --git a/.coveragerc b/.coveragerc index 936d6bd996..c2c57943e3 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,9 +1,8 @@ [run] -include = - common* - sql* - sql_api* +source = + . omit = src* downloads* sql/migrations/* + venv* diff --git a/.env.list b/.env.list index 418af00f8d..31878197c9 100644 --- a/.env.list +++ b/.env.list @@ -13,6 +13,6 @@ AUTH_LDAP_USER_ATTR_MAP=username=cn,display=displayname,email=email CSRF_TRUSTED_ORIGINS=http://127.0.0.1:9123 # https://django-q.readthedocs.io/en/latest/configure.html# -Q_CLUISTER_WORKERS=4 -Q_CLUISTER_TIMEOUT=60 +Q_CLUSTER_WORKERS=4 +Q_CLUSTER_TIMEOUT=60 Q_CLUISTER_SYNC=false diff --git a/.github/workflows/django.yml b/.github/workflows/django.yml index 0722acc2d8..063a8b76f9 100644 --- a/.github/workflows/django.yml +++ b/.github/workflows/django.yml @@ -78,13 +78,12 @@ jobs: run: | mysql -h127.0.0.1 -uroot -e "CREATE DATABASE archery CHARSET UTF8MB4;" mysql -h127.0.0.1 -uroot -e "DROP DATABASE IF EXISTS test_archery;CREATE DATABASE test_archery CHARSET UTF8MB4;" - mysql -h127.0.0.1 -uroot test_archery工单通知 +
+ +
+ +
+
短信服务

diff --git a/common/tests.py b/common/tests.py index 8719b2e305..f92e0ae802 100644 --- a/common/tests.py +++ b/common/tests.py @@ -509,15 +509,6 @@ def testWorkflowByUser(self): expected_rows = ((self.u2.display, 3), (self.u1.display, 2)) self.assertEqual(result["rows"], expected_rows) - def testDashboard(self): - """Dashboard测试""" - # TODO 这部分测试并没有遵循单元测试, 而是某种集成测试, 直接从响应到结果, 并且只检查状态码 - # TODO 需要具体查看pyecharst有没有被调用, 以及调用的参数 - c = Client() - c.force_login(self.superuser1) - r = c.get("/dashboard/") - self.assertEqual(r.status_code, 200) - class AuthTest(TestCase): def setUp(self): diff --git a/sql/archiver.py b/sql/archiver.py index d58be592ba..c054921d42 100644 --- a/sql/archiver.py +++ b/sql/archiver.py @@ -202,7 +202,9 @@ def archive_apply(request): ) archive_id = archive_info.id # 调用工作流插入审核信息 - audit_result = Audit.add(WorkflowDict.workflow_type["archive"], archive_id) + audit_result, audit_detail = Audit.add( + WorkflowDict.workflow_type["archive"], archive_id + ) except Exception as msg: logger.error(traceback.format_exc()) result["status"] = 1 @@ -210,12 +212,13 @@ def archive_apply(request): else: result = audit_result # 消息通知 - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=archive_id, workflow_type=WorkflowDict.workflow_type["archive"] - ).audit_id + ) async_task( notify_for_audit, - audit_id=audit_id, + workflow_audit=workflow_audit, + workflow_audit_detail=audit_detail, timeout=60, task_name=f"archive-apply-{archive_id}", ) @@ -245,15 +248,17 @@ def archive_audit(request): # 使用事务保持数据一致性 try: with transaction.atomic(): - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=archive_id, workflow_type=WorkflowDict.workflow_type["archive"], - ).audit_id + ) + audit_id = workflow_audit.audit_id # 调用工作流插入审核信息,更新业务表审核状态 - audit_status = Audit.audit( + audit_status, workflow_audit_detail = Audit.audit( audit_id, audit_status, user.username, audit_remark - )["data"]["workflow_status"] + ) + audit_status = audit_status["data"]["workflow_status"] ArchiveConfig( id=archive_id, status=audit_status, @@ -267,10 +272,11 @@ def archive_audit(request): return render(request, "error.html", context) else: # 消息通知 + workflow_audit.refresh_from_db() async_task( notify_for_audit, - audit_id=audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=workflow_audit_detail, timeout=60, task_name=f"archive-audit-{archive_id}", ) diff --git a/sql/notify.py b/sql/notify.py index 5728b4a431..f812084b4d 100755 --- a/sql/notify.py +++ b/sql/notify.py @@ -1,359 +1,531 @@ # -*- coding: UTF-8 -*- import datetime +import importlib +import logging import re +from dataclasses import dataclass, field +from enum import Enum from itertools import chain +from typing import Union, List +import requests +from django.conf import settings from django.contrib.auth.models import Group + from common.config import SysConfig +from common.utils.const import WorkflowDict +from common.utils.sendmsg import MsgSender from sql.models import ( QueryPrivilegesApply, Users, SqlWorkflow, ResourceGroup, ArchiveConfig, + WorkflowAudit, + WorkflowAuditDetail, + SqlWorkflowContent, ) from sql.utils.resource_group import auth_group_users -from common.utils.sendmsg import MsgSender -from common.utils.const import WorkflowDict from sql.utils.workflow_audit import Audit - -import logging +from sql_api.serializers import ( + WorkflowContentSerializer, + WorkflowAuditListSerializer, + QueryPrivilegesApplySerializer, + ArchiveConfigSerializer, + InstanceSerializer, +) logger = logging.getLogger("default") -def __notify_cnf_status(): - """返回消息通知开关""" - sys_config = SysConfig() - mail_status = sys_config.get("mail") - ding_status = sys_config.get("ding_to_person") - ding_webhook_status = sys_config.get("ding") - wx_status = sys_config.get("wx") - qywx_webhook_status = sys_config.get("qywx_webhook") - feishu_webhook_status = sys_config.get("feishu_webhook") - feishu_status = sys_config.get("feishu") - if not any( - [ - mail_status, - ding_status, - ding_webhook_status, - wx_status, - feishu_status, - feishu_webhook_status, - qywx_webhook_status, - ] +class EventType(Enum): + EXECUTE = "execute" + AUDIT = "audit" + M2SQL = "m2sql" + + +@dataclass +class My2SqlResult: + submitter: str + success: bool + file_path: str = "" + error: str = "" + + +class Notifier: + name = "base" + sys_config_key: str = "" + + def __init__( + self, + workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult], + sys_config: SysConfig, + audit: WorkflowAudit = None, + audit_detail: WorkflowAuditDetail = None, + event_type: EventType = EventType.AUDIT, ): - logger.info("未开启任何消息通知,可在系统设置中开启") - return False - else: - return True + self.workflow = workflow + self.audit = audit + self.audit_detail = audit_detail + self.event_type = event_type + self.sys_config = sys_config + def render(self): + raise NotImplementedError -def __send(msg_title, msg_content, msg_to, msg_cc=None, **kwargs): - """ - 按照通知配置发送通知消息 - :param msg_title: 通知标题 - :param msg_content: 通知内容 - :param msg_to: 通知人user list - :return: - """ - sys_config = SysConfig() - msg_sender = MsgSender() - msg_cc = msg_cc if msg_cc else [] - dingding_webhook = kwargs.get("dingding_webhook") - feishu_webhook = kwargs.get("feishu_webhook") - qywx_webhook = kwargs.get("qywx_webhook") - msg_to_email = [user.email for user in msg_to if user.email] - msg_cc_email = [user.email for user in msg_cc if user.email] - msg_to_ding_user = [ - user.ding_user_id for user in chain(msg_to, msg_cc) if user.ding_user_id - ] - msg_to_wx_user = [ - user.wx_user_id if user.wx_user_id else user.username - for user in chain(msg_to, msg_cc) - ] - logger.info(f"{msg_to_email}{msg_cc_email}{msg_to_wx_user}{chain(msg_to, msg_cc)}") - if sys_config.get("mail"): - msg_sender.send_email( - msg_title, msg_content, msg_to_email, list_cc_addr=msg_cc_email - ) - if sys_config.get("ding") and dingding_webhook: - msg_sender.send_ding(dingding_webhook, msg_title + "\n" + msg_content) - if sys_config.get("ding_to_person"): - msg_sender.send_ding2user(msg_to_ding_user, msg_title + "\n" + msg_content) - if sys_config.get("wx"): - msg_sender.send_wx2user(msg_title + "\n" + msg_content, msg_to_wx_user) - if sys_config.get("feishu_webhook") and feishu_webhook: - msg_sender.send_feishu_webhook(feishu_webhook, msg_title, msg_content) - if sys_config.get("feishu"): - open_id = [ - user.feishu_open_id for user in chain(msg_to, msg_cc) if user.feishu_open_id - ] - user_mail = [ - user.email for user in chain(msg_to, msg_cc) if not user.feishu_open_id - ] - msg_sender.send_feishu_user(msg_title, msg_content, open_id, user_mail) - if sys_config.get("qywx_webhook") and qywx_webhook: - msg_sender.send_qywx_webhook(qywx_webhook, msg_title + "\n" + msg_content) + def send(self): + raise NotImplementedError + def should_run(self): + if not self.sys_config_key: + return True + config_status = self.sys_config.get(self.sys_config_key) + if config_status: + return True -def notify_for_audit(audit_id, **kwargs): - """ - 工作流消息通知,不包含工单执行结束的通知 - :param audit_id: - :param kwargs: - :return: - """ - # 判断是否开启消息通知,未开启直接返回 - if not __notify_cnf_status(): - return None - sys_config = SysConfig() + def run(self): + if not self.should_run(): + return + self.render() + self.send() - # 获取审核信息 - audit_detail = Audit.detail(audit_id=audit_id) - audit_id = audit_detail.audit_id - workflow_audit_remark = kwargs.get("audit_remark", "") - base_url = sys_config.get("archery_base_url", "http://127.0.0.1:8000").rstrip("/") - workflow_url = "{base_url}/workflow/{audit_id}".format( - base_url=base_url, audit_id=audit_detail.audit_id - ) - workflow_id = audit_detail.workflow_id - workflow_type = audit_detail.workflow_type - status = audit_detail.current_status - workflow_title = audit_detail.workflow_title - workflow_from = audit_detail.create_user_display - group_name = audit_detail.group_name - dingding_webhook = ResourceGroup.objects.get( - group_id=audit_detail.group_id - ).ding_webhook - feishu_webhook = ResourceGroup.objects.get( - group_id=audit_detail.group_id - ).feishu_webhook - qywx_webhook = ResourceGroup.objects.get( - group_id=audit_detail.group_id - ).qywx_webhook - # 获取当前审批和审批流程 - workflow_auditors, current_workflow_auditors = Audit.review_info( - audit_detail.workflow_id, audit_detail.workflow_type - ) - # 准备消息内容 - if workflow_type == WorkflowDict.workflow_type["query"]: - workflow_type_display = WorkflowDict.workflow_type["query_display"] - workflow_detail = QueryPrivilegesApply.objects.get(apply_id=workflow_id) - instance = workflow_detail.instance.instance_name - db_name = " " - if workflow_detail.priv_type == 1: - workflow_content = """数据库清单:{}\n授权截止时间:{}\n结果集:{}\n""".format( - workflow_detail.db_list, - datetime.datetime.strftime( - workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" - ), - workflow_detail.limit_num, - ) - elif workflow_detail.priv_type == 2: +class GenericWebhookNotifier(Notifier): + name = "generic_webhook" + sys_config_key: str = "generic_webhook_url" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.request_data = None + + def render(self): + self.request_data = {} + if isinstance(self.workflow, SqlWorkflow): + workflow_content = SqlWorkflowContent.objects.get(workflow=self.workflow) + self.request_data["workflow_content"] = WorkflowContentSerializer( + workflow_content + ).data + instance = self.workflow.instance + self.request_data["instance"] = InstanceSerializer(instance).data + elif isinstance(self.workflow, ArchiveConfig): + self.request_data["workflow_content"] = ArchiveConfigSerializer( + self.workflow + ).data + elif isinstance(self.workflow, QueryPrivilegesApply): + self.request_data["workflow_content"] = QueryPrivilegesApplySerializer( + self.workflow + ).data + else: + raise ValueError(f"workflow type `{type(self.workflow)}` not supported yet") + + self.request_data["audit"] = WorkflowAuditListSerializer(self.audit).data + + def send(self): + url = self.sys_config.get(self.sys_config_key) + requests.post(url, json=self.request_data) + + +@dataclass +class LegacyMessage: + msg_title: str + msg_content: str + msg_to: List[Users] = field(default_factory=list) + msg_cc: List[Users] = field(default_factory=list) + + +class LegacyRender(Notifier): + messages: List[LegacyMessage] + sys_config_key: str = "" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.messages = [] + + def render_audit(self): + # 获取审核信息 + audit_id = self.audit.audit_id + base_url = self.sys_config.get( + "archery_base_url", "http://127.0.0.1:8000" + ).rstrip("/") + workflow_url = "{base_url}/workflow/{audit_id}".format( + base_url=base_url, audit_id=self.audit.audit_id + ) + workflow_id = self.audit.workflow_id + workflow_type = self.audit.workflow_type + status = self.audit.current_status + workflow_title = self.audit.workflow_title + workflow_from = self.audit.create_user_display + group_name = self.audit.group_name + # 获取当前审批和审批流程 + workflow_auditors, current_workflow_auditors = Audit.review_info( + self.audit.workflow_id, self.audit.workflow_type + ) + # workflow content, 即申请通过后要执行什么东西 + # 执行的 SQL 语句, 授权的范围 + if workflow_type == WorkflowDict.workflow_type["query"]: + workflow_type_display = WorkflowDict.workflow_type["query_display"] + workflow_detail = QueryPrivilegesApply.objects.get(apply_id=workflow_id) + instance = workflow_detail.instance.instance_name db_name = workflow_detail.db_list - workflow_content = """数据库:{}\n表清单:{}\n授权截止时间:{}\n结果集:{}\n""".format( - workflow_detail.db_list, - workflow_detail.table_list, - datetime.datetime.strftime( - workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" - ), - workflow_detail.limit_num, + workflow_content = "" + if workflow_detail.priv_type == 1: + workflow_content = f"""数据库清单:{workflow_detail.db_list}\n""" + elif workflow_detail.priv_type == 2: + workflow_content = f"""数据库:{workflow_detail.db_list}\n表清单:{workflow_detail.table_list}\n""" + auth_ends_at = datetime.datetime.strftime( + workflow_detail.valid_date, "%Y-%m-%d %H:%M:%S" + ) + workflow_content += ( + f"""授权截止时间:{auth_ends_at}\n结果集:{workflow_detail.limit_num}\n""" + ) + elif workflow_type == WorkflowDict.workflow_type["sqlreview"]: + workflow_type_display = WorkflowDict.workflow_type["sqlreview_display"] + workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) + instance = workflow_detail.instance.instance_name + db_name = workflow_detail.db_name + workflow_content = re.sub( + "[\r\n\f]{2,}", + "\n", + workflow_detail.sqlworkflowcontent.sql_content[0:500].replace("\r", ""), + ) + elif workflow_type == WorkflowDict.workflow_type["archive"]: + workflow_type_display = WorkflowDict.workflow_type["archive_display"] + workflow_detail = ArchiveConfig.objects.get(pk=workflow_id) + instance = workflow_detail.src_instance.instance_name + db_name = workflow_detail.src_db_name + workflow_content = """归档表:{}\n归档模式:{}\n归档条件:{}\n""".format( + workflow_detail.src_table_name, + workflow_detail.mode, + workflow_detail.condition, ) else: - workflow_content = "" - elif workflow_type == WorkflowDict.workflow_type["sqlreview"]: - workflow_type_display = WorkflowDict.workflow_type["sqlreview_display"] - workflow_detail = SqlWorkflow.objects.get(pk=workflow_id) - instance = workflow_detail.instance.instance_name - db_name = workflow_detail.db_name - workflow_content = re.sub( - "[\r\n\f]{2,}", - "\n", - workflow_detail.sqlworkflowcontent.sql_content[0:500].replace("\r", ""), + raise Exception("工单类型不正确") + # 渲染提醒内容, 包括工单的所有信息, 申请人, 审批流等 + if status == WorkflowDict.workflow_status["audit_wait"]: # 申请阶段 + msg_title = "[{}]新的工单申请#{}".format(workflow_type_display, audit_id) + # 接收人,发送给该资源组内对应权限组所有的用户 + auth_group_names = Group.objects.get(id=self.audit.current_audit).name + msg_to = auth_group_users([auth_group_names], self.audit.group_id) + # 消息内容 + msg_content = """发起时间:{} +发起人:{} +组:{} +目标实例:{} +数据库:{} +审批流程:{} +当前审批:{} +工单名称:{} +工单地址:{} +工单详情预览:{}""".format( + workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), + workflow_from, + group_name, + instance, + db_name, + workflow_auditors, + current_workflow_auditors, + workflow_title, + workflow_url, + workflow_content, + ) + elif status == WorkflowDict.workflow_status["audit_success"]: # 审核通过 + msg_title = "[{}]工单审核通过#{}".format(workflow_type_display, audit_id) + # 接收人,仅发送给申请人 + msg_to = [Users.objects.get(username=self.audit.create_user)] + # 消息内容 + msg_content = """发起时间:{}\n发起人:{}\n组:{}\n目标实例:{}\n数据库:{}\n审批流程:{}\n工单名称:{}\n工单地址:{}\n工单详情预览:{}\n""".format( + workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), + workflow_from, + group_name, + instance, + db_name, + workflow_auditors, + workflow_title, + workflow_url, + workflow_content, + ) + elif status == WorkflowDict.workflow_status["audit_reject"]: # 审核驳回 + msg_title = "[{}]工单被驳回#{}".format(workflow_type_display, audit_id) + # 接收人,仅发送给申请人 + msg_to = [Users.objects.get(username=self.audit.create_user)] + # 消息内容 + msg_content = """发起时间:{}\n目标实例:{}\n数据库:{}\n工单名称:{}\n工单地址:{}\n驳回原因:{}\n提醒:此工单被审核不通过,请按照驳回原因进行修改!""".format( + workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), + instance, + db_name, + workflow_title, + workflow_url, + re.sub("[\r\n\f]{2,}", "\n", self.audit_detail.remark), + ) + elif status == WorkflowDict.workflow_status["audit_abort"]: # 审核取消,通知所有审核人 + msg_title = "[{}]提交人主动终止工单#{}".format(workflow_type_display, audit_id) + # 接收人,发送给该资源组内对应权限组所有的用户 + auth_group_names = [ + Group.objects.get(id=auth_group_id).name + for auth_group_id in self.audit.audit_auth_groups.split(",") + ] + msg_to = auth_group_users(auth_group_names, self.audit.group_id) + # 消息内容 + msg_content = """发起时间:{}\n发起人:{}\n组:{}\n目标实例:{}\n数据库:{}\n工单名称:{}\n工单地址:{}\n终止原因:{}""".format( + workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), + workflow_from, + group_name, + instance, + db_name, + workflow_title, + workflow_url, + re.sub("[\r\n\f]{2,}", "\n", self.audit_detail.remark), + ) + else: + raise Exception("工单状态不正确") + logger.info(f"通知Debug{msg_to}") + self.messages.append(LegacyMessage(msg_title, msg_content, msg_to)) + + def render_execute(self): + base_url = self.sys_config.get( + "archery_base_url", "http://127.0.0.1:8000" + ).rstrip("/") + audit_auth_group, current_audit_auth_group = Audit.review_info( + self.workflow.id, 2 ) - elif workflow_type == WorkflowDict.workflow_type["archive"]: - workflow_type_display = WorkflowDict.workflow_type["archive_display"] - workflow_detail = ArchiveConfig.objects.get(pk=workflow_id) - instance = workflow_detail.src_instance.instance_name - db_name = workflow_detail.src_db_name - workflow_content = """归档表:{}\n归档模式:{}\n归档条件:{}\n""".format( - workflow_detail.src_table_name, - workflow_detail.mode, - workflow_detail.condition, + audit_id = Audit.detail_by_workflow_id(self.workflow.id, 2).audit_id + url = "{base_url}/workflow/{audit_id}".format( + base_url=base_url, audit_id=audit_id ) - else: - raise Exception("工单类型不正确") - - # 准备消息格式 - if status == WorkflowDict.workflow_status["audit_wait"]: # 申请阶段 - msg_title = "[{}]新的工单申请#{}".format(workflow_type_display, audit_id) - # 接收人,发送给该资源组内对应权限组所有的用户 - auth_group_names = Group.objects.get(id=audit_detail.current_audit).name - msg_to = auth_group_users([auth_group_names], audit_detail.group_id) - msg_cc = Users.objects.filter(username__in=kwargs.get("cc_users", [])) - # 消息内容 - msg_content = """发起时间:{}\n发起人:{}\n组:{}\n目标实例:{}\n数据库:{}\n审批流程:{}\n当前审批:{}\n工单名称:{}\n工单地址:{}\n工单详情预览:{}\n""".format( - workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), - workflow_from, - group_name, - instance, - db_name, - workflow_auditors, - current_workflow_auditors, - workflow_title, - workflow_url, - workflow_content, + msg_title = ( + f"[{WorkflowDict.workflow_type['sqlreview_display']}]工单" + f"{self.workflow.get_status_display()}#{audit_id}" ) - elif status == WorkflowDict.workflow_status["audit_success"]: # 审核通过 - msg_title = "[{}]工单审核通过#{}".format(workflow_type_display, audit_id) - # 接收人,仅发送给申请人 - msg_to = [Users.objects.get(username=audit_detail.create_user)] - msg_cc = Users.objects.filter(username__in=kwargs.get("cc_users", [])) - # 消息内容 - msg_content = """发起时间:{}\n发起人:{}\n组:{}\n目标实例:{}\n数据库:{}\n审批流程:{}\n工单名称:{}\n工单地址:{}\n工单详情预览:{}\n""".format( - workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), - workflow_from, - group_name, - instance, - db_name, - workflow_auditors, - workflow_title, - workflow_url, - workflow_content, + preview = re.sub( + "[\r\n\f]{2,}", + "\n", + self.workflow.sqlworkflowcontent.sql_content[0:500].replace("\r", ""), ) - elif status == WorkflowDict.workflow_status["audit_reject"]: # 审核驳回 - msg_title = "[{}]工单被驳回#{}".format(workflow_type_display, audit_id) - # 接收人,仅发送给申请人 - msg_to = [Users.objects.get(username=audit_detail.create_user)] - msg_cc = Users.objects.filter(username__in=kwargs.get("cc_users", [])) - # 消息内容 - msg_content = """发起时间:{}\n目标实例:{}\n数据库:{}\n工单名称:{}\n工单地址:{}\n驳回原因:{}\n提醒:此工单被审核不通过,请按照驳回原因进行修改!""".format( - workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), - instance, - db_name, - workflow_title, - workflow_url, - re.sub("[\r\n\f]{2,}", "\n", workflow_audit_remark), + msg_content = f"""发起时间:{self.workflow.create_time.strftime("%Y-%m-%d %H:%M:%S")} +发起人:{self.workflow.engineer_display} +组:{self.workflow.group_name} +目标实例:{self.workflow.instance.instance_name} +数据库:{self.workflow.db_name} +审批流程:{audit_auth_group} +工单名称:{self.workflow.workflow_name} +工单地址:{url} +工单详情预览:{preview}""" + # 邮件通知申请人,抄送DBA + msg_to = Users.objects.filter(username=self.workflow.engineer) + msg_cc = auth_group_users( + auth_group_names=["DBA"], group_id=self.workflow.group_id ) - elif status == WorkflowDict.workflow_status["audit_abort"]: # 审核取消,通知所有审核人 - msg_title = "[{}]提交人主动终止工单#{}".format(workflow_type_display, audit_id) - # 接收人,发送给该资源组内对应权限组所有的用户 - auth_group_names = [ - Group.objects.get(id=auth_group_id).name - for auth_group_id in audit_detail.audit_auth_groups.split(",") + self.messages.append(LegacyMessage(msg_title, msg_content, msg_to, msg_cc)) + # DDL通知 + if ( + self.sys_config.get("ddl_notify_auth_group") + and self.workflow.status == "workflow_finish" + ): + # 判断上线语句是否存在DDL,存在则通知相关人员 + if self.workflow.syntax_type == 1: + # 消息内容通知 + msg_title = "[Archery]有新的DDL语句执行完成#{}".format(audit_id) + msg_content = f"""发起人:{Users.objects.get(username=self.workflow.engineer).display} +变更组:{self.workflow.group_name} +变更实例:{self.workflow.instance.instance_name} +变更数据库:{self.workflow.db_name} +工单名称:{self.workflow.workflow_name} +工单地址:{url} +工单预览:{preview}""" + # 获取通知成员ddl_notify_auth_group + ddl_notify_auth_group = self.sys_config.get( + "ddl_notify_auth_group", "" + ).split(",") + msg_to = Users.objects.filter(groups__name__in=ddl_notify_auth_group) + self.messages.append(LegacyMessage(msg_title, msg_content, msg_to)) + + def render_m2sql(self): + submitter_in_db = Users.objects.get(username=self.workflow.submitter) + if self.workflow.success: + title = "[Archery 通知]My2SQL执行结束" + content = f"解析的SQL文件在{self.workflow.file_path}目录下,请前往查看" + else: + title = "[Archery 通知]My2SQL执行失败" + content = self.workflow.error + self.messages = [ + LegacyMessage( + msg_to=[submitter_in_db], + msg_title=title, + msg_content=content, + ) ] - msg_to = auth_group_users(auth_group_names, audit_detail.group_id) - msg_cc = Users.objects.filter(username__in=kwargs.get("cc_users", [])) - # 消息内容 - msg_content = """发起时间:{}\n发起人:{}\n组:{}\n目标实例:{}\n数据库:{}\n工单名称:{}\n工单地址:{}\n终止原因:{}""".format( - workflow_detail.create_time.strftime("%Y-%m-%d %H:%M:%S"), - workflow_from, - group_name, - instance, - db_name, - workflow_title, - workflow_url, - re.sub("[\r\n\f]{2,}", "\n", workflow_audit_remark), - ) - else: - raise Exception("工单状态不正确") - logger.info(f"通知Debug{msg_to}{msg_cc}") - # 发送通知 - __send( - msg_title, - msg_content, - msg_to, - msg_cc, - feishu_webhook=feishu_webhook, - dingding_webhook=dingding_webhook, - qywx_webhook=qywx_webhook, - ) + + def render(self): + """渲染消息, 存储到 self.messages""" + if self.event_type == EventType.EXECUTE: + self.render_execute() + if self.event_type == EventType.AUDIT: + self.render_audit() + if self.event_type == EventType.M2SQL: + self.render_m2sql() + + +class DingdingWebhookNotifier(LegacyRender): + name = "dingding_webhook" + sys_config_key: str = "ding" + + def send(self): + dingding_webhook = ResourceGroup.objects.get( + group_id=self.audit.group_id + ).ding_webhook + if not dingding_webhook: + return + msg_sender = MsgSender() + for m in self.messages: + msg_sender.send_ding(dingding_webhook, f"{m.msg_title}\n{m.msg_content}") + + +class DingdingPersonNotifier(LegacyRender): + name = "ding_to_person" + sys_config_key: str = "ding_to_person" + + def send(self): + msg_sender = MsgSender() + for m in self.messages: + ding_user_id_list = [ + user.ding_user_id + for user in chain(m.msg_to, m.msg_cc) + if user.ding_user_id + ] + msg_sender.send_ding2user( + ding_user_id_list, f"{m.msg_title}\n{m.msg_content}" + ) + + +class FeishuWebhookNotifier(LegacyRender): + name = "feishu_webhook" + sys_config_key: str = "feishu_webhook" + + def send(self): + feishu_webhook = ResourceGroup.objects.get( + group_id=self.audit.group_id + ).feishu_webhook + if not feishu_webhook: + return + msg_sender = MsgSender() + for m in self.messages: + msg_sender.send_feishu_webhook(feishu_webhook, m.msg_title, m.msg_content) + + +class FeishuPersonNotifier(LegacyRender): + name = "feishu_to_person" + sys_config_key: str = "feishu" + + def send(self): + msg_sender = MsgSender() + for m in self.messages: + open_id = [ + user.feishu_open_id + for user in chain(m.msg_to, m.msg_cc) + if user.feishu_open_id + ] + user_mail = [ + user.email + for user in chain(m.msg_to, m.msg_cc) + if not user.feishu_open_id + ] + msg_sender.send_feishu_user(m.msg_title, m.msg_content, open_id, user_mail) -def notify_for_execute(workflow): +class QywxWebhookNotifier(LegacyRender): + name = "qywx_webhook" + sys_config_key: str = "qywx_webhook" + + def send(self): + qywx_webhook = ResourceGroup.objects.get( + group_id=self.audit.group_id + ).qywx_webhook + if not qywx_webhook: + return + msg_sender = MsgSender() + for m in self.messages: + msg_sender.send_qywx_webhook( + qywx_webhook, f"{m.msg_title}\n{m.msg_content}" + ) + + +class MailNotifier(LegacyRender): + name = "mail" + sys_config_key = "mail" + + def send(self): + msg_sender = MsgSender() + for m in self.messages: + msg_to_email = [user.email for user in m.msg_to if user.email] + msg_cc_email = [user.email for user in m.msg_cc if user.email] + msg_sender.send_email( + m.msg_title, m.msg_content, msg_to_email, list_cc_addr=msg_cc_email + ) + + +def auto_notify( + sys_config: SysConfig, + workflow: Union[ + SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult + ] = None, + audit: WorkflowAudit = None, + audit_detail: WorkflowAuditDetail = None, + event_type: EventType = EventType.AUDIT, +): + """ + 加载所有的 notifier, 调用 notifier 的 render 和 send 方法 + 内部方法, 有数据库查询, 为了方便测试, 请勿使用 async_task 调用, 防止 patch 后调用失败 + """ + if not workflow and event_type == EventType.AUDIT: + if audit.workflow_type == 1: + workflow = QueryPrivilegesApply.objects.get(apply_id=audit.workflow_id) + if audit.workflow_type == 2: + workflow = SqlWorkflow.objects.get(id=audit.workflow_id) + for notifier in settings.ENABLED_NOTIFIERS: + file, _class = notifier.split(":") + try: + notify_module = importlib.import_module(file) + notifier = getattr(notify_module, _class) + except (ImportError, AttributeError) as e: + logger.error(f"failed to import notifier {notifier}, {str(e)}") + continue + try: + notifier = notifier( + workflow=workflow, + audit=audit, + audit_detail=audit_detail, + event_type=event_type, + sys_config=sys_config, + ) + notifier.run() + except Exception as e: # NOQA 捕获一些错误, 让其他的 notifier 可以正常运行 + logger.error(f"failed to notify using `{notifier}`: {str(e)}") + + +def notify_for_execute(workflow: SqlWorkflow, sys_config: SysConfig = None): + if not sys_config: + sys_config = SysConfig() + auto_notify(workflow=workflow, sys_config=sys_config) + + +def notify_for_audit( + workflow_audit: WorkflowAudit, workflow_audit_detail: WorkflowAuditDetail = None +): """ - 工单执行结束的通知 - :param workflow: + 工作流消息通知适配器, 供 async_task 调用, 方便后续的 mock + 直接传 model 对象, 注意函数内部不要做数据库相关的查询, 以免测试不好mock + :param workflow_audit: + :param workflow_audit_detail: :return: """ - # 判断是否开启消息通知,未开启直接返回 - if not __notify_cnf_status(): - return None sys_config = SysConfig() - - # 获取当前审批和审批流程 - base_url = sys_config.get("archery_base_url", "http://127.0.0.1:8000").rstrip("/") - audit_auth_group, current_audit_auth_group = Audit.review_info(workflow.id, 2) - audit_id = Audit.detail_by_workflow_id(workflow.id, 2).audit_id - url = "{base_url}/workflow/{audit_id}".format(base_url=base_url, audit_id=audit_id) - msg_title = "[{}]工单{}#{}".format( - WorkflowDict.workflow_type["sqlreview_display"], - workflow.get_status_display(), - audit_id, - ) - msg_content = """发起时间:{}\n发起人:{}\n组:{}\n目标实例:{}\n数据库:{}\n审批流程:{}\n工单名称:{}\n工单地址:{}\n工单详情预览:{}\n""".format( - workflow.create_time.strftime("%Y-%m-%d %H:%M:%S"), - workflow.engineer_display, - workflow.group_name, - workflow.instance.instance_name, - workflow.db_name, - audit_auth_group, - workflow.workflow_name, - url, - re.sub( - "[\r\n\f]{2,}", - "\n", - workflow.sqlworkflowcontent.sql_content[0:500].replace("\r", ""), - ), - ) - # 邮件通知申请人,抄送DBA - msg_to = Users.objects.filter(username=workflow.engineer) - msg_cc = auth_group_users(auth_group_names=["DBA"], group_id=workflow.group_id) - - # 处理接收人 - dingding_webhook = ResourceGroup.objects.get( - group_id=workflow.group_id - ).ding_webhook - feishu_webhook = ResourceGroup.objects.get( - group_id=workflow.group_id - ).feishu_webhook - qywx_webhook = ResourceGroup.objects.get(group_id=workflow.group_id).qywx_webhook - # 发送通知 - __send( - msg_title, - msg_content, - msg_to, - msg_cc, - dingding_webhook=dingding_webhook, - feishu_webhook=feishu_webhook, - qywx_webhook=qywx_webhook, + auto_notify( + workflow=None, + audit=workflow_audit, + audit_detail=workflow_audit_detail, + sys_config=sys_config, ) - # DDL通知 - if sys_config.get("ddl_notify_auth_group") and workflow.status == "workflow_finish": - # 判断上线语句是否存在DDL,存在则通知相关人员 - if workflow.syntax_type == 1: - # 消息内容通知 - msg_title = "[Archery]有新的DDL语句执行完成#{}".format(audit_id) - msg_content = """发起人:{}\n变更组:{}\n变更实例:{}\n变更数据库:{}\n工单名称:{}\n工单地址:{}\n工单预览:{}\n""".format( - Users.objects.get(username=workflow.engineer).display, - workflow.group_name, - workflow.instance.instance_name, - workflow.db_name, - workflow.workflow_name, - url, - workflow.sqlworkflowcontent.sql_content[0:500], - ) - # 获取通知成员ddl_notify_auth_group - ddl_notify_auth_group = sys_config.get("ddl_notify_auth_group", "").split( - "," - ) - msg_to = Users.objects.filter(groups__name__in=ddl_notify_auth_group) - # 发送通知 - __send(msg_title, msg_content, msg_to, msg_cc) - def notify_for_my2sql(task): """ @@ -361,15 +533,14 @@ def notify_for_my2sql(task): :param task: :return: """ - # 判断是否开启消息通知,未开启直接返回 - if not __notify_cnf_status(): - return None if task.success: - msg_title = "[Archery 通知]My2SQL执行结束" - msg_content = f"解析的SQL文件在{task.result[1]}目录下,请前往查看" + result = My2SqlResult( + success=True, submitter=task.kwargs["user"], file_path=task.result[1] + ) else: - msg_title = "[Archery 通知]My2SQL执行失败" - msg_content = f"{task.result}" + result = My2SqlResult( + success=False, submitter=task.kwargs["user"], error=task.result + ) # 发送 - msg_to = [task.kwargs["user"]] - __send(msg_title, msg_content, msg_to) + sys_config = SysConfig() + auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.M2SQL) diff --git a/sql/query_privileges.py b/sql/query_privileges.py index 4c88980574..90c6d73f84 100644 --- a/sql/query_privileges.py +++ b/sql/query_privileges.py @@ -301,12 +301,12 @@ def query_priv_apply(request): else: result = audit_result # 消息通知 - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=apply_id, workflow_type=WorkflowDict.workflow_type["query"] - ).audit_id + ) async_task( notify_for_audit, - audit_id=audit_id, + workflow_audit=workflow_audit, timeout=60, task_name=f"query-priv-apply-{apply_id}", ) @@ -444,7 +444,7 @@ def query_priv_audit(request): ).audit_id # 调用工作流接口审核 - audit_result = Audit.audit( + audit_result, workflow_audit_detail = Audit.audit( audit_id, audit_status, user.username, audit_remark ) @@ -462,10 +462,11 @@ def query_priv_audit(request): return render(request, "error.html", context) else: # 消息通知 + audit_detail.refresh_from_db() async_task( notify_for_audit, - audit_id=audit_id, - audit_remark=audit_remark, + workflow_audit=audit_detail, + workflow_audit_detail=workflow_audit_detail, timeout=60, task_name=f"query-priv-audit-{apply_id}", ) diff --git a/sql/sql_workflow.py b/sql/sql_workflow.py index 6f8707ad0d..0ac8f7c899 100644 --- a/sql/sql_workflow.py +++ b/sql/sql_workflow.py @@ -11,16 +11,15 @@ from django.http import HttpResponse, HttpResponseRedirect, JsonResponse from django.shortcuts import render, get_object_or_404 from django.urls import reverse -from django.utils import timezone +from django_q.tasks import async_task from common.config import SysConfig -from common.utils.const import Const, WorkflowDict +from common.utils.const import WorkflowDict from common.utils.extend_json_encoder import ExtendJSONEncoder +from sql.engines import get_engine from sql.engines.models import ReviewResult, ReviewSet -from sql.notify import notify_for_audit, notify_for_execute -from sql.models import ResourceGroup -from sql.utils.resource_group import user_groups, user_instances -from sql.utils.tasks import add_sql_schedule, del_schedule +from sql.notify import notify_for_audit, EventType, notify_for_execute +from sql.utils.resource_group import user_groups from sql.utils.sql_review import ( can_timingtask, can_cancel, @@ -29,11 +28,9 @@ can_view, can_rollback, ) +from sql.utils.tasks import add_sql_schedule, del_schedule from sql.utils.workflow_audit import Audit -from .models import SqlWorkflow, SqlWorkflowContent, Instance -from django_q.tasks import async_task - -from sql.engines import get_engine +from .models import SqlWorkflow logger = logging.getLogger("default") @@ -251,11 +248,12 @@ def passed(request): try: with transaction.atomic(): # 调用工作流接口审核 - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=workflow_id, workflow_type=WorkflowDict.workflow_type["sqlreview"], - ).audit_id - audit_result = Audit.audit( + ) + audit_id = workflow_audit.audit_id + audit_result, audit_detail = Audit.audit( audit_id, WorkflowDict.workflow_status["audit_success"], user.username, @@ -286,8 +284,8 @@ def passed(request): if is_notified: async_task( notify_for_audit, - audit_id=audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=audit_detail, timeout=60, task_name=f"sqlreview-pass-{workflow_id}", ) @@ -378,7 +376,7 @@ def execute(request): else True ) if is_notified: - notify_for_execute(SqlWorkflow.objects.get(id=workflow_id)) + notify_for_execute(workflow=SqlWorkflow.objects.get(id=workflow_id)) return HttpResponseRedirect(reverse("sql:detail", args=(workflow_id,))) @@ -453,7 +451,7 @@ def cancel(request): if workflow_id == 0: context = {"errMsg": "workflow_id参数为空."} return render(request, "error.html", context) - workflow_detail = SqlWorkflow.objects.get(id=workflow_id) + sql_workflow = SqlWorkflow.objects.get(id=workflow_id) audit_remark = request.POST.get("cancel_remark") if audit_remark is None: context = {"errMsg": "终止原因不能为空"} @@ -468,14 +466,15 @@ def cancel(request): try: with transaction.atomic(): # 调用工作流接口取消或者驳回 - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=workflow_id, workflow_type=WorkflowDict.workflow_type["sqlreview"], - ).audit_id + ) + audit_id = workflow_audit.audit_id # 仅待审核的需要调用工作流,审核通过的不需要 - if workflow_detail.status != "workflow_manreviewing": + if sql_workflow.status != "workflow_manreviewing": # 增加工单日志 - if user.username == workflow_detail.engineer: + if user.username == sql_workflow.engineer: Audit.add_log( audit_id=audit_id, operation_type=3, @@ -494,8 +493,8 @@ def cancel(request): operator_display=request.user.display, ) else: - if user.username == workflow_detail.engineer: - Audit.audit( + if user.username == sql_workflow.engineer: + _, workflow_audit_detail = Audit.audit( audit_id, WorkflowDict.workflow_status["audit_abort"], user.username, @@ -503,7 +502,7 @@ def cancel(request): ) # 非提交人需要校验审核权限 elif user.has_perm("sql.sql_review"): - Audit.audit( + _, workflow_audit_detail = Audit.audit( audit_id, WorkflowDict.workflow_status["audit_reject"], user.username, @@ -513,12 +512,12 @@ def cancel(request): raise PermissionDenied # 删除定时执行task - if workflow_detail.status == "workflow_timingtask": + if sql_workflow.status == "workflow_timingtask": schedule_name = f"sqlreview-timing-{workflow_id}" del_schedule(schedule_name) # 将流程状态修改为人工终止流程 - workflow_detail.status = "workflow_abort" - workflow_detail.save() + sql_workflow.status = "workflow_abort" + sql_workflow.save() except Exception as msg: logger.error(f"取消工单报错,错误信息:{traceback.format_exc()}") context = {"errMsg": msg} @@ -532,18 +531,15 @@ def cancel(request): else True ) if is_notified: - audit_detail = Audit.detail_by_workflow_id( - workflow_id=workflow_id, - workflow_type=WorkflowDict.workflow_type["sqlreview"], - ) - if audit_detail.current_status in ( + workflow_audit.refresh_from_db() + if workflow_audit.current_status in ( WorkflowDict.workflow_status["audit_abort"], WorkflowDict.workflow_status["audit_reject"], ): async_task( notify_for_audit, - audit_id=audit_detail.audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=workflow_audit_detail, timeout=60, task_name=f"sqlreview-cancel-{workflow_id}", ) diff --git a/sql/test_notify.py b/sql/test_notify.py new file mode 100644 index 0000000000..2c175faccb --- /dev/null +++ b/sql/test_notify.py @@ -0,0 +1,563 @@ +import json +from datetime import datetime, timedelta +from unittest.mock import patch, Mock, ANY + +from django.contrib.auth.models import Group +from django.contrib.auth import get_user_model +from django.test import TestCase + +from common.config import SysConfig +from sql.models import ( + Instance, + SqlWorkflow, + SqlWorkflowContent, + QueryPrivilegesApply, + WorkflowAudit, + WorkflowAuditDetail, + ResourceGroup, + ArchiveConfig, +) +from sql.notify import ( + auto_notify, + EventType, + LegacyRender, + GenericWebhookNotifier, + My2SqlResult, + DingdingWebhookNotifier, + DingdingPersonNotifier, + FeishuPersonNotifier, + FeishuWebhookNotifier, + QywxWebhookNotifier, + LegacyMessage, + Notifier, + notify_for_execute, + notify_for_audit, + notify_for_my2sql, + MailNotifier, +) + +User = get_user_model() + + +class TestNotify(TestCase): + """ + 测试消息 + """ + + def setUp(self): + self.sys_config = SysConfig() + self.aug = Group.objects.create(id=1, name="auth_group") + self.user = User.objects.create( + username="test_user", display="中文显示", is_active=True + ) + self.su = User.objects.create( + username="s_user", + display="中文显示", + is_active=True, + is_superuser=True, + ) + self.su.groups.add(self.aug) + + tomorrow = datetime.today() + timedelta(days=1) + self.ins = Instance.objects.create( + instance_name="some_ins", + type="slave", + db_type="mysql", + host="some_host", + port=3306, + user="ins_user", + password="some_str", + ) + self.wf = SqlWorkflow.objects.create( + workflow_name="some_name", + group_id=1, + group_name="g1", + engineer=self.user.username, + engineer_display=self.user.display, + audit_auth_groups="some_audit_group", + create_time=datetime.now(), + status="workflow_timingtask", + is_backup=True, + instance=self.ins, + db_name="some_db", + syntax_type=1, + ) + SqlWorkflowContent.objects.create( + workflow=self.wf, sql_content="some_sql", execute_result="" + ) + self.query_apply_1 = QueryPrivilegesApply.objects.create( + group_id=1, + group_name="some_name", + title="some_title1", + user_name="some_user", + instance=self.ins, + db_list="some_db,some_db2", + limit_num=100, + valid_date=tomorrow, + priv_type=1, + status=0, + audit_auth_groups="some_audit_group", + ) + # 必须要有的几个 + # WorkflowAudit, 审核表, 每一个工作流关联一条记录 + # WorkflowAuditDetail, 审核详情, 每一个审核步骤一条记录, 并且都关联到一个 WorkflowAudit + self.audit_wf = WorkflowAudit.objects.create( + group_id=1, + group_name="some_group", + workflow_id=self.wf.id, + workflow_type=2, + workflow_title="申请标题", + workflow_remark="申请备注", + audit_auth_groups="1", + current_audit="1", + next_audit="2", + current_status=0, + create_user=self.user.username, + ) + self.audit_wf_detail = WorkflowAuditDetail.objects.create( + audit_id=self.audit_wf.audit_id, + audit_user=self.user.display, + audit_time=datetime.now(), + audit_status=1, + remark="测试备注", + ) + self.audit_query = WorkflowAudit.objects.create( + group_id=1, + group_name="some_group", + workflow_id=self.query_apply_1.apply_id, + workflow_type=1, + workflow_title="申请标题", + workflow_remark="申请备注", + audit_auth_groups="1,2,3", + current_audit="1", + next_audit="2", + current_status=0, + ) + self.audit_query_detail = WorkflowAuditDetail.objects.create( + audit_id=self.audit_query.audit_id, + audit_user=self.user.display, + audit_time=datetime.now(), + audit_status=1, + remark="测试query备注", + ) + + self.rs = ResourceGroup.objects.create(group_id=1, ding_webhook="url") + + self.archive_apply = ArchiveConfig.objects.create( + title="测试归档", + resource_group=self.rs, + src_instance=self.ins, + src_db_name="foo", + src_table_name="bar", + dest_db_name="foo-dest", + dest_table_name="bar-dest", + mode="purge", + no_delete=False, + status=0, + user_name=self.user.username, + user_display=self.user.display, + ) + self.archive_apply_audit = WorkflowAudit.objects.create( + group_id=1, + group_name="some_group", + workflow_id=self.archive_apply.id, + workflow_type=3, + workflow_title=self.archive_apply.title, + workflow_remark="申请备注", + audit_auth_groups="1,2,3", + current_audit="1", + next_audit="2", + current_status=0, + ) + + def tearDown(self): + self.sys_config.purge() + User.objects.all().delete() + SqlWorkflow.objects.all().delete() + SqlWorkflowContent.objects.all().delete() + WorkflowAudit.objects.all().delete() + WorkflowAuditDetail.objects.all().delete() + ArchiveConfig.objects.all().delete() + ResourceGroup.objects.all().delete() + + def test_empty_notifiers(self): + with self.settings(ENABLED_NOTIFIERS=()): + auto_notify( + workflow=self.wf, + event_type=EventType.EXECUTE, + sys_config=self.sys_config, + ) + + def test_base_notifier(self): + self.sys_config.set("foo", "bar") + n = Notifier(workflow=self.wf, sys_config=self.sys_config) + n.sys_config_key = "foo" + self.assertTrue(n.should_run()) + n.sys_config_key = "not-foo" + self.assertFalse(n.should_run()) + + @patch("sql.notify.FeishuWebhookNotifier.run") + def test_auto_notify(self, mock_run): + with self.settings(ENABLED_NOTIFIERS=("sql.notify:FeishuWebhookNotifier",)): + auto_notify(self.sys_config, event_type=EventType.EXECUTE) + mock_run.assert_called_once() + + @patch("sql.notify.auto_notify") + def test_notify_for_execute(self, mock_auto_notify: Mock): + """测试适配器""" + notify_for_execute(self.wf) + mock_auto_notify.assert_called_once_with(workflow=self.wf, sys_config=ANY) + + @patch("sql.notify.auto_notify") + def test_notify_for_audit(self, mock_auto_notify: Mock): + """测试适配器""" + notify_for_audit( + workflow_audit=self.audit_wf, workflow_audit_detail=self.audit_wf_detail + ) + mock_auto_notify.assert_called_once_with( + workflow=None, + sys_config=ANY, + audit=self.audit_wf, + audit_detail=self.audit_wf_detail, + ) + + @patch("sql.notify.auto_notify") + def test_notify_for_m2sql(self, mock_auto_notify: Mock): + """测试适配器""" + task = Mock() + task.success = True + task.kwargs = {"user": "foo"} + task.result = ["", "/foo"] + expect_workflow = My2SqlResult(success=True, submitter="foo", file_path="/foo") + notify_for_my2sql(task) + mock_auto_notify.assert_called_once_with( + workflow=expect_workflow, sys_config=ANY, event_type=EventType.M2SQL + ) + mock_auto_notify.reset_mock() + # 测试失败的情况 + task.success = False + task.result = "Traceback blahblah" + expect_workflow = My2SqlResult( + success=False, submitter="foo", error=task.result + ) + notify_for_my2sql(task) + mock_auto_notify.assert_called_once_with( + workflow=expect_workflow, sys_config=ANY, event_type=EventType.M2SQL + ) + + # 下面的测试均为 notifier 的测试, 测试 render 和 send + def test_legacy_render_execution(self): + notifier = LegacyRender( + workflow=self.wf, event_type=EventType.EXECUTE, sys_config=self.sys_config + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("工单", notifier.messages[0].msg_title) + with self.assertRaises(NotImplementedError): + notifier.send() + + def test_legacy_render_execution_ddl(self): + """DDL 比普通的工单多一个通知 dba""" + self.wf.syntax_type = 1 + self.wf.status = "workflow_finish" + self.wf.save() + self.sys_config.set("ddl_notify_auth_group", self.aug.name) + notifier = LegacyRender( + workflow=self.wf, event_type=EventType.EXECUTE, sys_config=self.sys_config + ) + notifier.render() + self.assertEqual(len(notifier.messages), 2) + self.assertIn("有新的DDL语句执行完成", notifier.messages[1].msg_title) + + def test_legacy_render_audit(self): + notifier = LegacyRender( + workflow=self.wf, + event_type=EventType.AUDIT, + audit=self.audit_wf, + audit_detail=self.audit_wf_detail, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("新的工单申请", notifier.messages[0].msg_title) + + def test_legacy_render_query_audit(self): + # 默认是库权限的 + notifier = LegacyRender( + workflow=self.query_apply_1, + event_type=EventType.AUDIT, + audit=self.audit_query, + audit_detail=self.audit_query_detail, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("数据库清单", notifier.messages[0].msg_content) + + # 表级别的权限申请 + self.query_apply_1.priv_type = 2 + self.query_apply_1.table_list = "foo,bar" + self.query_apply_1.save() + notifier = LegacyRender( + workflow=self.query_apply_1, + event_type=EventType.AUDIT, + audit=self.audit_query, + audit_detail=self.audit_query_detail, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("表清单", notifier.messages[0].msg_content) + self.assertIn("foo,bar", notifier.messages[0].msg_content) + + def test_legacy_render_archive_audit(self): + notifier = LegacyRender( + workflow=self.archive_apply, + event_type=EventType.AUDIT, + audit=self.archive_apply_audit, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("归档表", notifier.messages[0].msg_content) + + def test_legacy_render_audit_success(self): + """审核通过消息""" + # 只测试上线工单 + self.audit_wf.current_status = 1 + self.audit_wf.save() + notifier = LegacyRender( + workflow=self.wf, + event_type=EventType.AUDIT, + audit=self.audit_wf, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("工单审核通过", notifier.messages[0].msg_title) + + def test_legacy_render_audit_reject(self): + self.audit_wf.current_status = 2 + self.audit_wf.save() + self.audit_wf_detail.remark = "驳回foo-bar" + self.audit_wf_detail.save() + notifier = LegacyRender( + workflow=self.wf, + event_type=EventType.AUDIT, + audit=self.audit_wf, + audit_detail=self.audit_wf_detail, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("工单被驳回", notifier.messages[0].msg_title) + self.assertIn("驳回foo-bar", notifier.messages[0].msg_content) + + def test_legacy_render_audit_abort(self): + self.audit_wf.current_status = 3 + self.audit_wf.save() + self.audit_wf_detail.remark = "撤回foo-bar" + self.audit_wf_detail.save() + notifier = LegacyRender( + workflow=self.wf, + event_type=EventType.AUDIT, + audit=self.audit_wf, + audit_detail=self.audit_wf_detail, + sys_config=self.sys_config, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertIn("提交人主动终止工单", notifier.messages[0].msg_title) + self.assertIn("撤回foo-bar", notifier.messages[0].msg_content) + + def test_legacy_render_m2sql(self): + successful_workflow = My2SqlResult( + submitter=self.user.username, success=True, file_path="/foo/bar" + ) + notifier = LegacyRender( + workflow=successful_workflow, + sys_config=self.sys_config, + event_type=EventType.M2SQL, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertEqual(notifier.messages[0].msg_title, "[Archery 通知]My2SQL执行结束") + # 失败 + failed_workflow = My2SqlResult( + submitter=self.user.username, success=False, error="Traceback blahblah" + ) + notifier = LegacyRender( + workflow=failed_workflow, + sys_config=self.sys_config, + event_type=EventType.M2SQL, + ) + notifier.render() + self.assertEqual(len(notifier.messages), 1) + self.assertEqual(notifier.messages[0].msg_title, "[Archery 通知]My2SQL执行失败") + + def test_general_webhook(self): + notifier = GenericWebhookNotifier( + workflow=self.wf, + event_type=EventType.AUDIT, + audit=self.audit_wf, + audit_detail=self.audit_wf_detail, + sys_config=self.sys_config, + ) + notifier.render() + self.assertIsNotNone(notifier.request_data) + print(json.dumps(notifier.request_data)) + self.assertDictEqual( + notifier.request_data["audit"], + { + "audit_id": self.audit_wf.audit_id, + "group_name": "some_group", + "workflow_type": 2, + "create_user_display": "", + "workflow_title": "申请标题", + "audit_auth_groups": self.audit_wf.audit_auth_groups, + "current_audit": "1", + "current_status": 0, + "create_time": self.audit_wf.create_time.isoformat(), + }, + ) + self.assertDictEqual( + notifier.request_data["workflow_content"]["workflow"], + { + "id": self.wf.id, + "workflow_name": "some_name", + "demand_url": "", + "group_id": 1, + "group_name": "g1", + "db_name": "some_db", + "syntax_type": 1, + "is_backup": True, + "engineer": "test_user", + "engineer_display": "中文显示", + "status": "workflow_timingtask", + "audit_auth_groups": "some_audit_group", + "run_date_start": None, + "run_date_end": None, + "finish_time": None, + "is_manual": 0, + "instance": self.ins.id, + "create_time": self.wf.create_time.isoformat(), + }, + ) + self.assertEqual( + notifier.request_data["workflow_content"]["sql_content"], "some_sql" + ) + self.assertEqual( + notifier.request_data["instance"]["instance_name"], self.ins.instance_name + ) + + +class TestNotifySend(TestCase): + audit_wf: WorkflowAudit = None + rs: ResourceGroup = None + user: User = None + + @classmethod + def setUpClass(cls): + cls.user = User.objects.create( + username="test", + email="test@example.com", + ding_user_id="1234", + wx_user_id="1234", + feishu_open_id="1234", + ) + cls.rs = ResourceGroup.objects.create( + group_name="test", + ding_webhook="ding_url", + feishu_webhook="feishu_url", + qywx_webhook="qywx_url", + ) + cls.audit_wf = WorkflowAudit.objects.create( + group_id=cls.rs.group_id, + group_name="some_group", + workflow_id=1, + workflow_type=2, + workflow_title="申请标题", + workflow_remark="申请备注", + audit_auth_groups="1", + current_audit="1", + next_audit="2", + current_status=0, + create_user=cls.user.username, + ) + + @classmethod + def tearDownClass(cls): + cls.user.delete() + cls.rs.delete() + cls.audit_wf.delete() + + def setUp(self): + self.patcher = patch("sql.notify.MsgSender") + self.mock_msg_sender = self.patcher.start() + self.sys_config = SysConfig() + + def tearDown(self): + self.patcher.stop() + + def generate_notifier(self, module) -> Notifier: + return module(workflow=None, audit=self.audit_wf, sys_config=self.sys_config) + + def test_ding_webhook_send(self): + mocker = Mock() + setattr(self.mock_msg_sender.return_value, "send_ding", mocker) + notifier = self.generate_notifier(DingdingWebhookNotifier) + notifier.messages = [ + LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") + ] + notifier.send() + mocker.assert_called_once() + + def test_ding_person_send(self): + mocker = Mock() + setattr(self.mock_msg_sender.return_value, "send_ding2user", mocker) + notifier = self.generate_notifier(DingdingPersonNotifier) + notifier.messages = [ + LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") + ] + notifier.send() + mocker.assert_called_once() + + def test_feishu_webhook(self): + mocker = Mock() + setattr(self.mock_msg_sender.return_value, "send_feishu_webhook", mocker) + notifier = self.generate_notifier(FeishuWebhookNotifier) + notifier.messages = [ + LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") + ] + notifier.send() + mocker.assert_called_once() + + def test_feishu_person(self): + mocker = Mock() + setattr(self.mock_msg_sender.return_value, "send_feishu_user", mocker) + notifier = self.generate_notifier(FeishuPersonNotifier) + notifier.messages = [ + LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") + ] + notifier.send() + mocker.assert_called_once() + + def test_qywx_webhook(self): + mocker = Mock() + setattr(self.mock_msg_sender.return_value, "send_qywx_webhook", mocker) + notifier = self.generate_notifier(QywxWebhookNotifier) + notifier.messages = [ + LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") + ] + notifier.send() + mocker.assert_called_once() + + def test_mail(self): + mocker = Mock() + setattr(self.mock_msg_sender.return_value, "send_email", mocker) + notifier = self.generate_notifier(MailNotifier) + notifier.messages = [ + LegacyMessage(msg_to=[self.user], msg_title="test", msg_content="test") + ] + notifier.send() + mocker.assert_called_once() diff --git a/sql/tests.py b/sql/tests.py index a1ba6238af..8f3fd12fe4 100644 --- a/sql/tests.py +++ b/sql/tests.py @@ -1,8 +1,8 @@ import json -import unittest from datetime import timedelta, datetime, date -from unittest.mock import MagicMock, patch, ANY +from unittest.mock import MagicMock, patch, ANY, Mock from django.conf import settings +from django.db import connection from django.contrib.auth.models import Group from django.contrib.auth.models import Permission from django.test import Client, TestCase, TransactionTestCase @@ -12,8 +12,7 @@ from common.utils.const import WorkflowDict from sql.archiver import add_archive_task, archive from sql.binlog import my2sql_file -from sql.engines.models import ResultSet, ReviewSet, ReviewResult -from sql.notify import notify_for_audit, notify_for_execute, notify_for_my2sql +from sql.engines.models import ResultSet from sql.utils.execute_sql import execute_callback from sql.query import kill_query_conn from sql.models import ( @@ -31,12 +30,16 @@ WorkflowAuditSetting, ArchiveConfig, ) -from common.dashboard import ChartDao User = Users -class TestView(TestCase): +class PickableMock(Mock): + def __reduce__(self): + return (Mock, ()) + + +class TestView(TransactionTestCase): """测试view视图""" def setUp(self): @@ -102,6 +105,11 @@ def setUp(self): self.wl = WorkflowLog.objects.create( audit_id=self.audit.audit_id, operation_type=1 ) + # 慢查询建表 + with connection.cursor() as cursor: + with open("src/init_sql/mysql_slow_query_review.sql") as fp: + content = fp.read() + cursor.execute(content) def tearDown(self): self.sys_config.purge() @@ -112,6 +120,10 @@ def tearDown(self): WorkflowLog.objects.all().delete() QueryPrivilegesApply.objects.all().delete() ResourceGroup.objects.all().delete() + with connection.cursor() as cursor: + cursor.execute( + "DROP table mysql_slow_query_review,mysql_slow_query_review_history" + ) def test_index(self): """测试index页面""" @@ -1570,10 +1582,11 @@ def testWorkflowListViewFilter(self): r_json = r.json() self.assertEqual(r_json["total"], 2) + @patch("sql.notify.auto_notify") @patch("sql.utils.workflow_audit.Audit.detail_by_workflow_id") @patch("sql.utils.workflow_audit.Audit.audit") @patch("sql.utils.workflow_audit.Audit.can_review") - def testWorkflowPassedView(self, _can_review, _audit, _detail_by_id): + def testWorkflowPassedView(self, _can_review, _audit, _detail_by_id, _): """测试审核工单""" c = Client() c.force_login(self.superuser1) @@ -1583,8 +1596,13 @@ def testWorkflowPassedView(self, _can_review, _audit, _detail_by_id): r = c.post("/passed/", {"workflow_id": self.wf1.id}) self.assertContains(r, "你无权操作当前工单!") _can_review.return_value = True - _detail_by_id.return_value.audit_id = 123 - _audit.return_value = {"data": {"workflow_status": 1}} # TODO 改为audit_success + mock_audit_detail = PickableMock() + mock_audit_detail.audit_id = 123 + _detail_by_id.return_value = mock_audit_detail + _audit.return_value = ( + {"data": {"workflow_status": 1}}, + {"foo": "bar"}, + ) # TODO 改为audit_success r = c.post( "/passed/", data={"workflow_id": self.wf1.id, "audit_remark": "some_audit"}, @@ -1596,10 +1614,11 @@ def testWorkflowPassedView(self, _can_review, _audit, _detail_by_id): self.wf1.refresh_from_db() self.assertEqual(self.wf1.status, "workflow_review_pass") + @patch("sql.sql_workflow.notify_for_execute") @patch("sql.sql_workflow.Audit.add_log") @patch("sql.sql_workflow.Audit.detail_by_workflow_id") @patch("sql.sql_workflow.can_execute") - def test_workflow_execute(self, mock_can_excute, mock_detail_by_id, mock_add_log): + def test_workflow_execute(self, mock_can_excute, _, _1, _2): """测试工单执行""" c = Client() c.force_login(self.executor1) @@ -1609,7 +1628,6 @@ def test_workflow_execute(self, mock_can_excute, mock_detail_by_id, mock_add_log r = c.post("/execute/", data={"workflow_id": self.wf2.id}) self.assertContains(r, "你无权操作当前工单!") mock_can_excute.return_value = True - mock_detail_by_id = 123 r = c.post("/execute/", data={"workflow_id": self.wf2.id, "mode": "manual"}) self.wf2.refresh_from_db() self.assertEqual("workflow_finish", self.wf2.status) @@ -1637,7 +1655,8 @@ def testWorkflowCancelView(self, _can_cancel, _audit, _detail_by_id, _add_log): self.assertContains(r, "你无权操作当前工单!") _can_cancel.return_value = True _detail_by_id = 123 - r = c.post( + _audit.return_value = (None, None) + c.post( "/cancel/", data={"workflow_id": self.wf2.id, "cancel_remark": "some_reason"}, ) @@ -2079,11 +2098,14 @@ def test_archive_audit(self, _async_task, _audit): :return: """ _audit.detail_by_workflow_id.return_value.audit_id = 1 - _audit.audit.return_value = { - "status": 0, - "msg": "ok", - "data": {"workflow_status": 1}, - } + _audit.audit.return_value = ( + { + "status": 0, + "msg": "ok", + "data": {"workflow_status": 1}, + }, + None, + ) data = { "archive_id": self.archive_apply.id, "audit_status": WorkflowDict.workflow_status["audit_success"], @@ -2658,319 +2680,6 @@ def test_param_edit_variable_error( ) -class TestNotify(TestCase): - """ - 测试消息 - """ - - def setUp(self): - self.sys_config = SysConfig() - self.user = User.objects.create( - username="test_user", display="中文显示", is_active=True - ) - self.su = User.objects.create( - username="s_user", display="中文显示", is_active=True, is_superuser=True - ) - tomorrow = datetime.today() + timedelta(days=1) - self.ins = Instance.objects.create( - instance_name="some_ins", - type="slave", - db_type="mysql", - host="some_host", - port=3306, - user="ins_user", - password="some_str", - ) - self.wf = SqlWorkflow.objects.create( - workflow_name="some_name", - group_id=1, - group_name="g1", - engineer=self.user.username, - engineer_display=self.user.display, - audit_auth_groups="some_audit_group", - create_time=datetime.now(), - status="workflow_timingtask", - is_backup=True, - instance=self.ins, - db_name="some_db", - syntax_type=1, - ) - SqlWorkflowContent.objects.create( - workflow=self.wf, sql_content="some_sql", execute_result="" - ) - self.query_apply_1 = QueryPrivilegesApply.objects.create( - group_id=1, - group_name="some_name", - title="some_title1", - user_name="some_user", - instance=self.ins, - db_list="some_db,some_db2", - limit_num=100, - valid_date=tomorrow, - priv_type=1, - status=0, - audit_auth_groups="some_audit_group", - ) - self.audit = WorkflowAudit.objects.create( - group_id=1, - group_name="some_group", - workflow_id=1, - workflow_type=1, - workflow_title="申请标题", - workflow_remark="申请备注", - audit_auth_groups="1,2,3", - current_audit="1", - next_audit="2", - current_status=0, - ) - self.aug = Group.objects.create(id=1, name="auth_group") - self.rs = ResourceGroup.objects.create(group_id=1, ding_webhook="url") - - def tearDown(self): - self.sys_config.purge() - User.objects.all().delete() - SqlWorkflow.objects.all().delete() - SqlWorkflowContent.objects.all().delete() - WorkflowAudit.objects.all().delete() - ResourceGroup.objects.all().delete() - - def test_notify_disable(self): - """ - 测试关闭通知 - :return: - """ - # 关闭消息通知 - self.sys_config.set("mail", "false") - self.sys_config.set("ding", "false") - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_sqlreview_audit_wait(self, _auth_group_users, _msg_sender): - """ - 测试SQL上线申请审核通知 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态为待审核 - self.audit.workflow_type = WorkflowDict.workflow_type["sqlreview"] - self.audit.workflow_id = self.wf.id - self.audit.current_status = WorkflowDict.workflow_status["audit_wait"] - self.audit.save() - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_sqlreview_audit_success(self, _auth_group_users, _msg_sender): - """ - 测试SQL上线申请审核通过通知 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态审核通过 - self.audit.workflow_type = WorkflowDict.workflow_type["sqlreview"] - self.audit.workflow_id = self.wf.id - self.audit.current_status = WorkflowDict.workflow_status["audit_success"] - self.audit.create_user = self.user.username - self.audit.save() - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_sqlreview_audit_reject(self, _auth_group_users, _msg_sender): - """ - 测试SQL上线申请审核驳回通知 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态审核通过 - self.audit.workflow_type = WorkflowDict.workflow_type["sqlreview"] - self.audit.workflow_id = self.wf.id - self.audit.current_status = WorkflowDict.workflow_status["audit_reject"] - self.audit.create_user = self.user.username - self.audit.save() - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_sqlreview_audit_abort(self, _auth_group_users, _msg_sender): - """ - 测试SQL上线申请审核取消通知 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态审核取消 - self.audit.workflow_type = WorkflowDict.workflow_type["sqlreview"] - self.audit.workflow_id = self.wf.id - self.audit.current_status = WorkflowDict.workflow_status["audit_abort"] - self.audit.create_user = self.user.username - self.audit.audit_auth_groups = self.aug.id - self.audit.save() - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_sqlreview_wrong_workflow_type( - self, _auth_group_users, _msg_sender - ): - """ - 测试不存在的工单类型 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态审核取消 - self.audit.workflow_type = 10 - self.audit.save() - with self.assertRaisesMessage(Exception, "工单类型不正确"): - notify_for_audit(audit_id=self.audit.audit_id) - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_query_audit_wait_apply_db_perm( - self, _auth_group_users, _msg_sender - ): - """ - 测试查询申请库权限 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态为待审核 - self.audit.workflow_type = WorkflowDict.workflow_type["query"] - self.audit.workflow_id = self.query_apply_1.apply_id - self.audit.current_status = WorkflowDict.workflow_status["audit_wait"] - self.audit.save() - # 修改工单为库权限申请 - self.query_apply_1.priv_type = 1 - self.query_apply_1.save() - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - @patch("sql.notify.MsgSender") - @patch("sql.notify.auth_group_users") - def test_notify_for_query_audit_wait_apply_tb_perm( - self, _auth_group_users, _msg_sender - ): - """ - 测试查询申请表权限 - :return: - """ - # 通知人修改 - _auth_group_users.return_value = [self.user] - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - # 修改工单状态为待审核 - self.audit.workflow_type = WorkflowDict.workflow_type["query"] - self.audit.workflow_id = self.query_apply_1.apply_id - self.audit.current_status = WorkflowDict.workflow_status["audit_wait"] - self.audit.save() - # 修改工单为表权限申请 - self.query_apply_1.priv_type = 2 - self.query_apply_1.save() - r = notify_for_audit(audit_id=self.audit.audit_id) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - @patch("sql.notify.MsgSender") - def test_notify_for_execute_disable(self, _msg_sender): - """ - 测试执行消息关闭 - :return: - """ - # 开启消息通知 - self.sys_config.set("mail", "false") - self.sys_config.set("ding", "false") - r = notify_for_execute(self.wf) - self.assertIsNone(r) - - @patch("sql.notify.auth_group_users") - @patch("sql.notify.Audit") - @patch("sql.notify.MsgSender") - def test_notify_for_execute(self, _msg_sender, _audit, _auth_group_users): - """ - 测试执行消息 - :return: - """ - _auth_group_users.return_value = [self.user] - # 处理工单信息 - _audit.review_info.return_value = ( - self.audit.audit_auth_groups, - self.audit.current_audit, - ) - # 开启消息通知 - self.sys_config.set("mail", "true") - self.sys_config.set("ding", "true") - self.sys_config.set("ddl_notify_auth_group", self.aug.name) - # 修改工单状态为执行结束,修改为DDL工单 - self.wf.status = "workflow_finish" - self.wf.syntax_type = 1 - self.wf.save() - r = notify_for_execute(self.wf) - self.assertIsNone(r) - _msg_sender.assert_called() - - @patch("sql.notify.MsgSender") - def test_notify_for_my2sql_disable(self, _msg_sender): - """ - 测试执行消息关闭 - :return: - """ - # 开启消息通知 - self.sys_config.set("mail", "false") - self.sys_config.set("ding", "false") - r = notify_for_execute(self.wf) - self.assertIsNone(r) - - @patch("django_q.tasks.async_task") - @patch("sql.notify.MsgSender") - def test_notify_for_my2sql(self, _msg_sender, _async_task): - """ - 测试执行消息 - :return: - """ - # 开启消息通知 - self.sys_config.set("mail", "true") - # 设置为task成功 - _async_task.return_value.success.return_value = True - r = notify_for_my2sql(_async_task) - self.assertIsNone(r) - _msg_sender.assert_called_once() - - class TestDataDictionary(TestCase): """ 测试数据字典 diff --git a/sql/utils/execute_sql.py b/sql/utils/execute_sql.py index 4e2c084a53..cb53999739 100644 --- a/sql/utils/execute_sql.py +++ b/sql/utils/execute_sql.py @@ -8,7 +8,7 @@ from common.config import SysConfig from sql.engines.models import ReviewResult, ReviewSet from sql.models import SqlWorkflow -from sql.notify import notify_for_execute +from sql.notify import notify_for_execute, EventType from sql.utils.workflow_audit import Audit from sql.engines import get_engine diff --git a/sql/utils/tests.py b/sql/utils/tests.py index a79e4262c3..1c69b9d207 100644 --- a/sql/utils/tests.py +++ b/sql/utils/tests.py @@ -1179,7 +1179,7 @@ def tearDown(self): def test_audit_add_query(self): """测试添加查询审核工单""" - result = Audit.add(1, self.query_apply_1.apply_id) + result, _ = Audit.add(1, self.query_apply_1.apply_id) audit_id = result["data"]["audit_id"] workflow_status = result["data"]["workflow_status"] self.assertEqual(workflow_status, WorkflowDict.workflow_status["audit_wait"]) @@ -1196,7 +1196,7 @@ def test_audit_add_query(self): def test_audit_add_sqlreview(self): """测试添加上线审核工单""" - result = Audit.add(2, self.wf.id) + result, _ = Audit.add(2, self.wf.id) audit_id = result["data"]["audit_id"] workflow_status = result["data"]["workflow_status"] self.assertEqual(workflow_status, WorkflowDict.workflow_status["audit_wait"]) @@ -1213,7 +1213,7 @@ def test_audit_add_sqlreview(self): def test_audit_add_archive_review(self): """测试添加数据归档工单""" - result = Audit.add(3, self.archive_apply_1.id) + result, workflow_audit_detail = Audit.add(3, self.archive_apply_1.id) audit_id = result["data"]["audit_id"] workflow_status = result["data"]["workflow_status"] self.assertEqual(workflow_status, WorkflowDict.workflow_status["audit_wait"]) @@ -1250,7 +1250,7 @@ def test_audit_add_duplicate(self): def test_audit_add_auto_review(self, _is_auto_review): """测试提交自动审核通过""" self.sys_config.set("auto_review", "true") - result = Audit.add(2, self.wf.id) + result, workflow_audit_detail = Audit.add(2, self.wf.id) audit_id = result["data"]["audit_id"] workflow_status = result["data"]["workflow_status"] self.assertEqual(workflow_status, WorkflowDict.workflow_status["audit_success"]) @@ -1267,7 +1267,7 @@ def test_audit_add_multiple_audit(self): """测试提交多级审核""" self.wf.audit_auth_groups = "1,2,3" self.wf.save() - result = Audit.add(2, self.wf.id) + result, _ = Audit.add(2, self.wf.id) audit_id = result["data"]["audit_id"] workflow_status = result["data"]["workflow_status"] audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) @@ -1286,7 +1286,7 @@ def test_audit_success_not_exists_next(self): self.audit.current_audit = "3" self.audit.next_audit = "-1" self.audit.save() - result = Audit.audit( + result, _ = Audit.audit( self.audit.audit_id, WorkflowDict.workflow_status["audit_success"], self.user.username, @@ -1311,7 +1311,7 @@ def test_audit_success_exists_next(self): self.audit.current_audit = "1" self.audit.next_audit = "2" self.audit.save() - result = Audit.audit( + result, _ = Audit.audit( self.audit.audit_id, WorkflowDict.workflow_status["audit_success"], self.user.username, @@ -1333,7 +1333,7 @@ def test_audit_success_exists_next(self): def test_audit_reject(self): """测试审核不通过""" - result = Audit.audit( + result, _ = Audit.audit( self.audit.audit_id, WorkflowDict.workflow_status["audit_reject"], self.user.username, @@ -1357,7 +1357,7 @@ def test_audit_abort(self): """测试取消审批""" self.audit.create_user = self.user.username self.audit.save() - result = Audit.audit( + result, _ = Audit.audit( self.audit.audit_id, WorkflowDict.workflow_status["audit_abort"], self.user.username, diff --git a/sql/utils/workflow_audit.py b/sql/utils/workflow_audit.py index c79efc401e..ae64382d09 100644 --- a/sql/utils/workflow_audit.py +++ b/sql/utils/workflow_audit.py @@ -155,11 +155,13 @@ def add(workflow_type, workflow_id): # 增加审核id result["data"]["audit_id"] = audit_detail.audit_id # 返回添加结果 - return result + return result, audit_detail # 工单审核 @staticmethod - def audit(audit_id, audit_status, audit_user, audit_remark): + def audit( + audit_id, audit_status, audit_user, audit_remark + ) -> (dict, WorkflowAuditDetail): result = {"status": 0, "msg": "ok", "data": 0} audit_detail = WorkflowAudit.objects.get(audit_id=audit_id) @@ -310,7 +312,7 @@ def audit(audit_id, audit_status, audit_user, audit_remark): # 返回审核结果 result["data"] = {"workflow_status": audit_result.current_status} - return result + return result, audit_detail_result # 获取用户待办工单数量 @staticmethod @@ -340,7 +342,7 @@ def detail(audit_id): # 通过业务id获取审核信息 @staticmethod - def detail_by_workflow_id(workflow_id, workflow_type): + def detail_by_workflow_id(workflow_id, workflow_type) -> WorkflowAudit: try: return WorkflowAudit.objects.get( workflow_id=workflow_id, workflow_type=workflow_type @@ -470,14 +472,16 @@ def add_log( operator, operator_display, ): - WorkflowLog( + log = WorkflowLog( audit_id=audit_id, operation_type=operation_type, operation_type_desc=operation_type_desc, operation_info=operation_info, operator=operator, operator_display=operator_display, - ).save() + ) + log.save() + return log # 获取工单日志 @staticmethod diff --git a/sql_api/api_workflow.py b/sql_api/api_workflow.py index 480a704e1a..c379750777 100644 --- a/sql_api/api_workflow.py +++ b/sql_api/api_workflow.py @@ -1,4 +1,3 @@ -import MySQLdb from django.contrib.auth.decorators import permission_required from django.utils.decorators import method_decorator from rest_framework import views, generics, status, serializers, permissions @@ -134,7 +133,25 @@ def get(self, request): def post(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) - serializer.save() + workflow_content = serializer.save() + sys_config = SysConfig() + is_notified = ( + "Apply" in sys_config.get("notify_phase_control").split(",") + if sys_config.get("notify_phase_control") + else True + ) + if workflow_content.workflow.status == "workflow_manreviewing" and is_notified: + # 获取审核信息 + workflow_audit = Audit.detail_by_workflow_id( + workflow_id=workflow_content.workflow.id, + workflow_type=WorkflowDict.workflow_type["sqlreview"], + ) + async_task( + notify_for_audit, + workflow_audit=workflow_audit, + timeout=60, + task_name=f"sqlreview-submit-{workflow_content.workflow.id}", + ) return Response(serializer.data, status=status.HTTP_201_CREATED) @@ -223,25 +240,26 @@ def post(self, request): # 使用事务保持数据一致性 try: with transaction.atomic(): - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=workflow_id, workflow_type=WorkflowDict.workflow_type["query"], - ).audit_id + ) + audit_id = workflow_audit.audit_id # 调用工作流接口审核 - audit_result = Audit.audit( + audit_result, audit_detail = Audit.audit( audit_id, audit_status, user.username, audit_remark ) # 按照审核结果更新业务表审核状态 - audit_detail = Audit.detail(audit_id) + workflow_audit = Audit.detail(audit_id) if ( - audit_detail.workflow_type + workflow_audit.workflow_type == WorkflowDict.workflow_type["query"] ): # 更新业务表审核状态,插入权限信息 _query_apply_audit_call_back( - audit_detail.workflow_id, + workflow_audit.workflow_id, audit_result["data"]["workflow_status"], ) @@ -252,8 +270,8 @@ def post(self, request): # 消息通知 async_task( notify_for_audit, - audit_id=audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=audit_detail, timeout=60, task_name=f"query-priv-audit-{workflow_id}", ) @@ -274,11 +292,12 @@ def post(self, request): try: with transaction.atomic(): # 调用工作流接口审核 - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=workflow_id, workflow_type=WorkflowDict.workflow_type["sqlreview"], - ).audit_id - audit_result = Audit.audit( + ) + audit_id = workflow_audit.audit_id + audit_result, audit_detail = Audit.audit( audit_id, WorkflowDict.workflow_status["audit_success"], user.username, @@ -308,8 +327,8 @@ def post(self, request): if is_notified: async_task( notify_for_audit, - audit_id=audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=audit_detail, timeout=60, task_name=f"sqlreview-pass-{workflow_id}", ) @@ -336,7 +355,7 @@ def post(self, request): if workflow_detail.status != "workflow_manreviewing": # 增加工单日志 if user.username == workflow_detail.engineer: - Audit.add_log( + _, audit_detail = Audit.add_log( audit_id=audit_id, operation_type=3, operation_type_desc="取消执行", @@ -345,7 +364,7 @@ def post(self, request): operator_display=user.display, ) else: - Audit.add_log( + _, audit_detail = Audit.add_log( audit_id=audit_id, operation_type=2, operation_type_desc="审批不通过", @@ -355,7 +374,7 @@ def post(self, request): ) else: if user.username == workflow_detail.engineer: - Audit.audit( + _, audit_detail = Audit.audit( audit_id, WorkflowDict.workflow_status["audit_abort"], user.username, @@ -363,7 +382,7 @@ def post(self, request): ) # 非提交人需要校验审核权限 elif user.has_perm("sql.sql_review"): - Audit.audit( + _, audit_detail = Audit.audit( audit_id, WorkflowDict.workflow_status["audit_reject"], user.username, @@ -393,18 +412,18 @@ def post(self, request): else True ) if is_notified: - audit_detail = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=workflow_id, workflow_type=WorkflowDict.workflow_type["sqlreview"], ) - if audit_detail.current_status in ( + if workflow_audit.current_status in ( WorkflowDict.workflow_status["audit_abort"], WorkflowDict.workflow_status["audit_reject"], ): async_task( notify_for_audit, - audit_id=audit_detail.audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=audit_detail, timeout=60, task_name=f"sqlreview-cancel-{workflow_id}", ) @@ -422,15 +441,17 @@ def post(self, request): # 使用事务保持数据一致性 try: with transaction.atomic(): - audit_id = Audit.detail_by_workflow_id( + workflow_audit = Audit.detail_by_workflow_id( workflow_id=workflow_id, workflow_type=WorkflowDict.workflow_type["archive"], - ).audit_id + ) + audit_id = workflow_audit.audit_id # 调用工作流插入审核信息,更新业务表审核状态 - audit_status = Audit.audit( + audit_status, audit_detail = Audit.audit( audit_id, audit_status, user.username, audit_remark - )["data"]["workflow_status"] + ) + audit_status = audit_status["data"]["workflow_status"] ArchiveConfig( id=workflow_id, status=audit_status, @@ -445,8 +466,8 @@ def post(self, request): # 消息通知 async_task( notify_for_audit, - audit_id=audit_id, - audit_remark=audit_remark, + workflow_audit=workflow_audit, + workflow_audit_detail=audit_detail, timeout=60, task_name=f"archive-audit-{workflow_id}", ) @@ -554,7 +575,9 @@ def post(self, request): else True ) if is_notified: - notify_for_execute(SqlWorkflow.objects.get(id=workflow_id)) + notify_for_execute( + workflow=SqlWorkflow.objects.get(id=workflow_id), + ) # 执行数据归档工单 elif workflow_type == 3: async_task( diff --git a/sql_api/serializers.py b/sql_api/serializers.py index 5e37a6ed73..7bfb5156c4 100644 --- a/sql_api/serializers.py +++ b/sql_api/serializers.py @@ -10,16 +10,16 @@ ResourceGroup, WorkflowAudit, WorkflowLog, + QueryPrivilegesApply, + ArchiveConfig, ) from django.contrib.auth.models import Group from django.contrib.auth.password_validation import validate_password from django.core.exceptions import ValidationError from django.db import transaction -from django_q.tasks import async_task from sql.engines import get_engine from sql.utils.workflow_audit import Audit from sql.utils.resource_group import user_instances -from sql.notify import notify_for_audit from common.utils.const import WorkflowDict from common.config import SysConfig import traceback @@ -248,6 +248,18 @@ class Meta: fields = ("id", "rds_dbinstanceid", "is_enable", "instance", "ak") +class QueryPrivilegesApplySerializer(serializers.ModelSerializer): + class Meta: + model = QueryPrivilegesApply + fields = "__all__" + + +class ArchiveConfigSerializer(serializers.ModelSerializer): + class Meta: + model = ArchiveConfig + fields = "__all__" + + class InstanceResourceSerializer(serializers.Serializer): instance_id = serializers.IntegerField(label="实例id") resource_type = serializers.ChoiceField( @@ -419,26 +431,7 @@ def create(self, validated_data): except Exception as e: logger.error(f"提交工单报错,错误信息:{traceback.format_exc()}") raise serializers.ValidationError({"errors": str(e)}) - else: - # 自动审核通过且开启了Apply阶段通知参数才发送消息通知 - is_notified = ( - "Apply" in sys_config.get("notify_phase_control").split(",") - if sys_config.get("notify_phase_control") - else True - ) - if workflow_status == "workflow_manreviewing" and is_notified: - # 获取审核信息 - audit_id = Audit.detail_by_workflow_id( - workflow_id=workflow.id, - workflow_type=WorkflowDict.workflow_type["sqlreview"], - ).audit_id - async_task( - notify_for_audit, - audit_id=audit_id, - timeout=60, - task_name=f"sqlreview-submit-{workflow.id}", - ) - return workflow_content + return workflow_content class Meta: model = SqlWorkflowContent diff --git a/sql_api/tests.py b/sql_api/tests.py index 9bbb6e3cf8..f096c7fcea 100644 --- a/sql_api/tests.py +++ b/sql_api/tests.py @@ -422,6 +422,8 @@ def setUp(self): self.token = r.data["access"] self.client.credentials(HTTP_AUTHORIZATION="Bearer " + self.token) SysConfig().set("api_user_whitelist", self.user.id) + self.notify_patcher = patch("sql.notify.auto_notify") + self.notify_patcher.start() def tearDown(self): self.user.delete() @@ -431,6 +433,7 @@ def tearDown(self): SqlWorkflow.objects.all().delete() WorkflowAudit.objects.all().delete() WorkflowLog.objects.all().delete() + self.notify_patcher.stop() def test_get_sql_workflow_list(self): """测试获取SQL上线工单列表""" @@ -478,7 +481,7 @@ def test_check_inception_Exception(self, _get_engine): print(json.loads(r.content)) self.assertDictEqual(json.loads(r.content), {"errors": "RuntimeError"}) - @patch("sql_api.serializers.get_engine") + @patch("sql_api.api_workflow.get_engine") def test_check(self, _get_engine): """测试工单检测,正常返回""" json_data = {