Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
LeoQuote committed Oct 18, 2023
1 parent 2fa6b05 commit 92cace4
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 104 deletions.
4 changes: 1 addition & 3 deletions archery/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@
"phoenix": {"path": "sql.engines.phoenix:PhoenixEngine"},
"odps": {"path": "sql.engines.odps:ODPSEngine"},
}
ENABLED_NOTIFIERS = (
"sql.notify:FeishuWebhookNotifier",
)
ENABLED_NOTIFIERS = ("sql.notify:FeishuWebhookNotifier",)
ENABLED_ENGINES = env("ENABLED_ENGINES")

# Application definition
Expand Down
4 changes: 3 additions & 1 deletion sql/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ def archive_apply(request):
)
archive_id = archive_info.id
# 调用工作流插入审核信息
audit_result, audit_detail = Audit.add(WorkflowDict.workflow_type["archive"], archive_id)
audit_result, audit_detail = Audit.add(
WorkflowDict.workflow_type["archive"], archive_id
)
except Exception as msg:
logger.error(traceback.format_exc())
result["status"] = 1
Expand Down
162 changes: 96 additions & 66 deletions sql/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
Users,
SqlWorkflow,
ResourceGroup,
ArchiveConfig, WorkflowAudit, WorkflowAuditDetail,
ArchiveConfig,
WorkflowAudit,
WorkflowAuditDetail,
)
from sql_api.serializers import (
WorkflowSerializer,
WorkflowAuditListSerializer,
QueryPrivilegesApplySerializer,
ArchiveConfigSerializer
ArchiveConfigSerializer,
)
from sql.utils.resource_group import auth_group_users
from common.utils.sendmsg import MsgSender
Expand Down Expand Up @@ -53,16 +55,16 @@ def __notify_cnf_status():
feishu_status = sys_config.get("feishu")
generic_webhook = sys_config.get("generic_webhook")
if not any(
[
mail_status,
ding_status,
ding_webhook_status,
wx_status,
feishu_status,
feishu_webhook_status,
qywx_webhook_status,
generic_webhook,
]
[
mail_status,
ding_status,
ding_webhook_status,
wx_status,
feishu_status,
feishu_webhook_status,
qywx_webhook_status,
generic_webhook,
]
):
logger.info("未开启任何消息通知,可在系统设置中开启")
return False
Expand All @@ -86,15 +88,11 @@ def notify_for_my2sql(task):
"""
if task.success:
result = My2SqlResult(
success=True,
submitter=task.kwargs["user"],
file_path=task.result[1]
success=True, submitter=task.kwargs["user"], file_path=task.result[1]
)
else:
result = My2SqlResult(
success=False,
submitter=task.kwargs["user"],
error=task.result
success=False, submitter=task.kwargs["user"], error=task.result
)
# 发送
sys_config = SysConfig()
Expand All @@ -104,12 +102,14 @@ def notify_for_my2sql(task):
class Notifier:
name = "base"

def __init__(self,
workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult],
sys_config: SysConfig,
audit: WorkflowAudit = None,
audit_detail: WorkflowAuditDetail = None,
event_type: EventType = EventType.AUDIT):
def __init__(
self,
workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult],
sys_config: SysConfig,
audit: WorkflowAudit = None,
audit_detail: WorkflowAuditDetail = None,
event_type: EventType = EventType.AUDIT,
):
self.workflow = workflow
self.audit = audit
self.audit_detail = audit_detail
Expand Down Expand Up @@ -169,7 +169,9 @@ def __init__(self, *args, **kwargs):
def render_audit(self):
# 获取审核信息
audit_id = self.audit.audit_id
base_url = self.sys_config.get("archery_base_url", "http://127.0.0.1:8000").rstrip("/")
base_url = self.sys_config.get(
"archery_base_url", "http://127.0.0.1:8000"
).rstrip("/")
workflow_url = "{base_url}/workflow/{audit_id}".format(
base_url=base_url, audit_id=self.audit.audit_id
)
Expand Down Expand Up @@ -310,21 +312,28 @@ def render_audit(self):
else:
raise Exception("工单状态不正确")
logger.info(f"通知Debug{msg_to}")
self.messages.append(LegacyMessage(
msg_title, msg_content, msg_to
))
self.messages.append(LegacyMessage(msg_title, msg_content, msg_to))

def render_execute(self):
base_url = self.sys_config.get("archery_base_url", "http://127.0.0.1:8000").rstrip("/")
audit_auth_group, current_audit_auth_group = Audit.review_info(self.workflow.id, 2)
base_url = self.sys_config.get(
"archery_base_url", "http://127.0.0.1:8000"
).rstrip("/")
audit_auth_group, current_audit_auth_group = Audit.review_info(
self.workflow.id, 2
)
audit_id = Audit.detail_by_workflow_id(self.workflow.id, 2).audit_id
url = "{base_url}/workflow/{audit_id}".format(base_url=base_url, audit_id=audit_id)
msg_title = (f"[{WorkflowDict.workflow_type['sqlreview_display']}]工单"
f"{self.workflow.get_status_display()}#{audit_id}")
url = "{base_url}/workflow/{audit_id}".format(
base_url=base_url, audit_id=audit_id
)
msg_title = (
f"[{WorkflowDict.workflow_type['sqlreview_display']}]工单"
f"{self.workflow.get_status_display()}#{audit_id}"
)
preview = re.sub(
'[\r\n\f]{2,}',
'\n',
self.workflow.sqlworkflowcontent.sql_content[0:500].replace("\r", ""), )
"[\r\n\f]{2,}",
"\n",
self.workflow.sqlworkflowcontent.sql_content[0:500].replace("\r", ""),
)
msg_content = f"""发起时间:{self.workflow.create_time.strftime("%Y-%m-%d %H:%M:%S")}
发起人:{self.workflow.engineer_display}
组:{self.workflow.group_name}
Expand All @@ -336,12 +345,15 @@ def render_execute(self):
工单详情预览:{preview}"""
# 邮件通知申请人,抄送DBA
msg_to = Users.objects.filter(username=self.workflow.engineer)
msg_cc = auth_group_users(auth_group_names=["DBA"], group_id=self.workflow.group_id)
self.messages.append(LegacyMessage(
msg_title, msg_content, msg_to, msg_cc
))
msg_cc = auth_group_users(
auth_group_names=["DBA"], group_id=self.workflow.group_id
)
self.messages.append(LegacyMessage(msg_title, msg_content, msg_to, msg_cc))
# DDL通知
if self.sys_config.get("ddl_notify_auth_group") and self.workflow.status == "workflow_finish":
if (
self.sys_config.get("ddl_notify_auth_group")
and self.workflow.status == "workflow_finish"
):
# 判断上线语句是否存在DDL,存在则通知相关人员
if self.workflow.syntax_type == 1:
# 消息内容通知
Expand All @@ -354,13 +366,11 @@ def render_execute(self):
工单地址:{url}
工单预览:{preview}"""
# 获取通知成员ddl_notify_auth_group
ddl_notify_auth_group = self.sys_config.get("ddl_notify_auth_group", "").split(
","
)
ddl_notify_auth_group = self.sys_config.get(
"ddl_notify_auth_group", ""
).split(",")
msg_to = Users.objects.filter(groups__name__in=ddl_notify_auth_group)
self.messages.append(LegacyMessage(
msg_title, msg_content, msg_to
))
self.messages.append(LegacyMessage(msg_title, msg_content, msg_to))

def render_m2sql(self):
submitter_in_db = Users.objects.get(username=self.workflow.submitter)
Expand Down Expand Up @@ -409,16 +419,22 @@ def send(self):
msg_sender = MsgSender()
for m in self.messages:
ding_user_id_list = [
user.ding_user_id for user in chain(m.msg_to, m.msg_cc) if user.ding_user_id
user.ding_user_id
for user in chain(m.msg_to, m.msg_cc)
if user.ding_user_id
]
msg_sender.send_ding2user(ding_user_id_list, f"{m.msg_title}\n{m.msg_content}")
msg_sender.send_ding2user(
ding_user_id_list, f"{m.msg_title}\n{m.msg_content}"
)


class FeishuWebhookNotifier(LegacyRender):
name = "feishu_webhook"

def send(self):
feishu_webhook = ResourceGroup.objects.get(group_id=self.audit.group_id).feishu_webhook
feishu_webhook = ResourceGroup.objects.get(
group_id=self.audit.group_id
).feishu_webhook
if not feishu_webhook:
return
msg_sender = MsgSender()
Expand All @@ -433,10 +449,14 @@ def send(self):
msg_sender = MsgSender()
for m in self.messages:
open_id = [
user.feishu_open_id for user in chain(m.msg_to, m.msg_cc) if user.feishu_open_id
user.feishu_open_id
for user in chain(m.msg_to, m.msg_cc)
if user.feishu_open_id
]
user_mail = [
user.email for user in chain(m.msg_to, m.msg_cc) if not user.feishu_open_id
user.email
for user in chain(m.msg_to, m.msg_cc)
if not user.feishu_open_id
]
msg_sender.send_feishu_user(m.msg_title, m.msg_content, open_id, user_mail)

Expand All @@ -452,14 +472,20 @@ def send(self):
return
msg_sender = MsgSender()
for m in self.messages:
msg_sender.send_qywx_webhook(qywx_webhook, f"{m.msg_title}\n{m.msg_content}")
msg_sender.send_qywx_webhook(
qywx_webhook, f"{m.msg_title}\n{m.msg_content}"
)


def auto_notify(sys_config: SysConfig,
workflow: Union[SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult] = None,
audit: WorkflowAudit = None,
audit_detail: WorkflowAuditDetail = None,
event_type: EventType = EventType.AUDIT):
def auto_notify(
sys_config: SysConfig,
workflow: Union[
SqlWorkflow, ArchiveConfig, QueryPrivilegesApply, My2SqlResult
] = None,
audit: WorkflowAudit = None,
audit_detail: WorkflowAuditDetail = None,
event_type: EventType = EventType.AUDIT,
):
"""
加载所有的 notifier, 调用 notifier 的 render 和 send 方法
内部方法, 有数据库查询, 为了方便测试, 请勿使用 async_task 调用, 防止 patch 后调用失败
Expand All @@ -473,22 +499,26 @@ def auto_notify(sys_config: SysConfig,
file, _class = notifier.split(":")
notify_module = importlib.import_module(file)
notifier = getattr(notify_module, _class)
notifier = notifier(workflow=workflow, audit=audit, audit_detail=audit_detail,
event_type=event_type, sys_config=sys_config)
notifier = notifier(
workflow=workflow,
audit=audit,
audit_detail=audit_detail,
event_type=event_type,
sys_config=sys_config,
)
notifier.render()
notifier.send()


def notify_for_execute(workflow: SqlWorkflow, sys_config: SysConfig = None):
if not sys_config:
sys_config = SysConfig()
auto_notify(
workflow=workflow,
sys_config=sys_config
)
auto_notify(workflow=workflow, sys_config=sys_config)


def notify_for_audit(workflow_audit: WorkflowAudit, workflow_audit_detail: WorkflowAuditDetail = None):
def notify_for_audit(
workflow_audit: WorkflowAudit, workflow_audit_detail: WorkflowAuditDetail = None
):
"""
工作流消息通知适配器, 供 async_task 调用, 方便后续的 mock
直接传 model 对象, 注意函数内部不要做数据库相关的查询, 以免测试不好mock
Expand All @@ -501,5 +531,5 @@ def notify_for_audit(workflow_audit: WorkflowAudit, workflow_audit_detail: Workf
workflow=None,
audit=workflow_audit,
audit_detail=workflow_audit_detail,
sys_config=sys_config
sys_config=sys_config,
)
Loading

0 comments on commit 92cace4

Please sign in to comment.