diff --git a/sql/engines/pgsql.py b/sql/engines/pgsql.py index f686f33772..0307033caf 100644 --- a/sql/engines/pgsql.py +++ b/sql/engines/pgsql.py @@ -187,12 +187,14 @@ def query( result_set = ResultSet(full_sql=sql) try: conn = self.get_connection(db_name=db_name) + conn.autocommit = False max_execution_time = kwargs.get("max_execution_time", 0) cursor = conn.cursor() try: cursor.execute(f"SET statement_timeout TO {max_execution_time};") except: pass + cursor.execute("SET transaction ISOLATION LEVEL READ COMMITTED READ ONLY;") if schema_name: cursor.execute( f"SET search_path TO %(schema_name)s;", {"schema_name": schema_name} @@ -203,6 +205,7 @@ def query( rows = cursor.fetchmany(size=int(limit_num)) else: rows = cursor.fetchall() + conn.commit() fields = cursor.description column_type_codes = [i[1] for i in fields] if fields else [] # 定义 JSON 和 JSONB 的 type_code,# 114 是 json,3802 是 jsonb @@ -231,6 +234,7 @@ def query( result_set.rows = converted_rows result_set.affected_rows = len(converted_rows) except Exception as e: + conn.rollback() logger.warning( f"PgSQL命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" ) @@ -324,13 +328,14 @@ def execute_workflow(self, workflow, close_conn=True): db_name = workflow.db_name try: conn = self.get_connection(db_name=db_name) + conn.autocommit = False cursor = conn.cursor() + cursor.execute("SET transaction ISOLATION LEVEL READ COMMITTED READ WRITE;") # 逐条执行切分语句,追加到执行结果中 for statement in split_sql: statement = statement.rstrip(";") with FuncTimer() as t: cursor.execute(statement) - conn.commit() execute_result.rows.append( ReviewResult( id=line, @@ -343,7 +348,9 @@ def execute_workflow(self, workflow, close_conn=True): ) ) line += 1 + conn.commit() except Exception as e: + conn.rollback() logger.warning( f"PGSQL命令执行报错,语句:{statement or sql}, 错误信息:{traceback.format_exc()}" )