From ec45d9b7a1e5a3cb70f997fdce113034f82728f8 Mon Sep 17 00:00:00 2001 From: peixubin <20983498@qq.com> Date: Sat, 26 Oct 2024 14:55:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BD=93=E5=B7=A5=E5=8D=95=E4=B8=8D=E5=9C=A8?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E6=80=81=E6=97=B6,=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E6=97=A5=E5=BF=97,=E4=B8=8D=E6=8A=A5=E9=94=99,=E9=81=BF?= =?UTF-8?q?=E5=85=8Dq-task=E4=B8=8D=E6=96=AD=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/utils/execute_sql.py | 31 +++++++++++++++++++++++++++---- sql/utils/tests.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/sql/utils/execute_sql.py b/sql/utils/execute_sql.py index 8027a4c9aa..b6b44362dc 100644 --- a/sql/utils/execute_sql.py +++ b/sql/utils/execute_sql.py @@ -17,21 +17,44 @@ def execute(workflow_id, user=None): """为延时或异步任务准备的execute, 传入工单ID和执行人信息""" + audit_id = Audit.detail_by_workflow_id( + workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW + ).audit_id # 使用当前读防止重复执行 with transaction.atomic(): workflow_detail = SqlWorkflow.objects.select_for_update().get(id=workflow_id) # 只有排队中和定时执行的数据才可以继续执行,否则直接抛错 if workflow_detail.status not in ["workflow_queuing", "workflow_timingtask"]: - raise Exception("工单状态不正确,禁止执行!") + logger.error(f"工单号[{workflow_id}] 可能被任务调度器重试") + Audit.add_log( + audit_id=audit_id, + operation_type=5, + operation_type_desc="执行工单发生异常", + operation_info="请检查工单执行情况", + operator=user.username if user else "", + operator_display=user.display if user else "系统", + ) + result = ReviewSet( + rows=[ + ReviewResult( + id=1, + errlevel=2, + stagestatus="执行发生错误", + errormessage=f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员", + ) + ], + ) + result.error = ( + f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员", + ) + return result + # 将工单状态修改为执行中 else: SqlWorkflow(id=workflow_id, status="workflow_executing").save( update_fields=["status"] ) # 增加执行日志 - audit_id = Audit.detail_by_workflow_id( - workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW - ).audit_id Audit.add_log( audit_id=audit_id, operation_type=5, diff --git a/sql/utils/tests.py b/sql/utils/tests.py index 512e0f5edd..617bc1013e 100644 --- a/sql/utils/tests.py +++ b/sql/utils/tests.py @@ -364,6 +364,19 @@ def setUp(self): db_name="some_db", syntax_type=1, ) + self.wf_executing = SqlWorkflow.objects.create( + workflow_name="some_name", + group_id=1, + group_name="g1", + engineer_display="", + audit_auth_groups="some_group", + create_time=datetime.datetime.now(), + status="workflow_executing", + is_backup=True, + instance=self.ins, + db_name="some_db", + syntax_type=1, + ) SqlWorkflowContent.objects.create( workflow=self.wf, sql_content="some_sql", @@ -409,6 +422,24 @@ def test_execute(self, _get_engine, _execute_workflow, _audit): operator_display="系统", ) + @patch("sql.utils.execute_sql.Audit") + @patch("sql.engines.mysql.MysqlEngine.execute_workflow") + @patch("sql.engines.get_engine") + def test_execute_in_executing(self, _get_engine, _execute_workflow, _audit): + _audit.detail_by_workflow_id.return_value.audit_id = 1 + result = execute(self.wf_executing.id) + _audit.add_log.assert_called_with( + audit_id=1, + operation_type=5, + operation_type_desc="执行工单发生异常", + operation_info="请检查工单执行情况", + operator="", + operator_display="系统", + ) + assert result.error == ( + f"任务[{self.wf_executing.id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员", + ) + @patch("sql.utils.execute_sql.notify_for_execute") @patch("sql.utils.execute_sql.Audit") def test_execute_callback_success(self, _audit, _notify):