diff --git a/sql/sql_analyze.py b/sql/sql_analyze.py index a1a46620c1..876e329052 100644 --- a/sql/sql_analyze.py +++ b/sql/sql_analyze.py @@ -5,8 +5,11 @@ @file: sql_analyze.py @time: 2019/03/14 """ +from pathlib import Path + import simplejson as json from django.contrib.auth.decorators import permission_required +from django.core.files.temp import NamedTemporaryFile from common.config import SysConfig from sql.plugins.soar import Soar @@ -74,6 +77,13 @@ def analyze(request): } rows = generate_sql(text) for row in rows: + # 验证是不是传过来的文件, 如果是文件, 报错 + try: + p = Path(row["sql"].strip()) + if p.exists(): + return JsonResponse({"status": 1, "msg": "SQL 语句不合法", "data": []}) + except OSError: + pass args["query"] = row["sql"] cmd_args = soar.generate_args2cmd(args=args) stdout, stderr = soar.execute_cmd(cmd_args).communicate() diff --git a/sql/tests.py b/sql/tests.py index 0e5226480c..5fa7bc25f7 100644 --- a/sql/tests.py +++ b/sql/tests.py @@ -2303,6 +2303,28 @@ def test_analyze_text_not_None(self, _subprocess): list(json.loads(r.content)["rows"][0].keys()), ["sql_id", "sql", "report"] ) + @patch("sql.sql_analyze.Path") + @patch("sql.plugins.plugin.subprocess") + def test_analyze_text_evil(self, _subprocess, mock_path): + """ + 测试分析SQL,text不为空 + :return: + """ + _subprocess.Popen.return_value.communicate.return_value = ( + "some_stdout", + "some_stderr", + ) + mock_path.return_value.exists.return_value = True + self.sys_config.set("soar", "/opt/archery/src/plugins/soar") + text = "/etc/passwd" + instance_name = self.master.instance_name + db_name = settings.DATABASES["default"]["TEST"]["NAME"] + r = self.client.post( + path="/sql_analyze/analyze/", + data={"text": text, "instance_name": instance_name, "db_name": db_name}, + ) + self.assertEqual(r.json()["msg"], "SQL 语句不合法") + class TestBinLog(TestCase): """