Skip to content

Commit

Permalink
针对性修复 sql_analyze 读取文件的 bug (hhyo#2349)
Browse files Browse the repository at this point in the history
* 针对性修复 sql_analyze 读取文件的 bug

* 针对性修复 sql_analyze 读取文件的 bug

* add strip
  • Loading branch information
LeoQuote authored Oct 25, 2023
1 parent 2411ca0 commit 785d317
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
10 changes: 10 additions & 0 deletions sql/sql_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
@file: sql_analyze.py
@time: 2019/03/14
"""
from pathlib import Path

import simplejson as json
from django.contrib.auth.decorators import permission_required
from django.core.files.temp import NamedTemporaryFile

from common.config import SysConfig
from sql.plugins.soar import Soar
Expand Down Expand Up @@ -74,6 +77,13 @@ def analyze(request):
}
rows = generate_sql(text)
for row in rows:
# 验证是不是传过来的文件, 如果是文件, 报错
try:
p = Path(row["sql"].strip())
if p.exists():
return JsonResponse({"status": 1, "msg": "SQL 语句不合法", "data": []})
except OSError:
pass
args["query"] = row["sql"]
cmd_args = soar.generate_args2cmd(args=args)
stdout, stderr = soar.execute_cmd(cmd_args).communicate()
Expand Down
22 changes: 22 additions & 0 deletions sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,28 @@ def test_analyze_text_not_None(self, _subprocess):
list(json.loads(r.content)["rows"][0].keys()), ["sql_id", "sql", "report"]
)

@patch("sql.sql_analyze.Path")
@patch("sql.plugins.plugin.subprocess")
def test_analyze_text_evil(self, _subprocess, mock_path):
"""
测试分析SQL,text不为空
:return:
"""
_subprocess.Popen.return_value.communicate.return_value = (
"some_stdout",
"some_stderr",
)
mock_path.return_value.exists.return_value = True
self.sys_config.set("soar", "/opt/archery/src/plugins/soar")
text = "/etc/passwd"
instance_name = self.master.instance_name
db_name = settings.DATABASES["default"]["TEST"]["NAME"]
r = self.client.post(
path="/sql_analyze/analyze/",
data={"text": text, "instance_name": instance_name, "db_name": db_name},
)
self.assertEqual(r.json()["msg"], "SQL 语句不合法")


class TestBinLog(TestCase):
"""
Expand Down

0 comments on commit 785d317

Please sign in to comment.