Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added more tests and Pep8 fixes #279

Merged
merged 4 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions bin/clone
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,11 @@ from snare.utils.snare_helpers import check_privileges, print_color, str_to_bool
def main():
loop = asyncio.get_event_loop()
parser = argparse.ArgumentParser()
parser.add_argument(
"--target",
help="domain of the site to be cloned",
required=True)
parser.add_argument(
"--max-depth",
help="max depth of the cloning",
required=False,
default=sys.maxsize)
parser.add_argument("--target", help="domain of the site to be cloned", required=True)
parser.add_argument("--max-depth", help="max depth of the cloning", required=False, default=sys.maxsize)
parser.add_argument("--log_path", help="path to the log file")
parser.add_argument(
"--css-validate",
help="set whether css validation is required",
type=str_to_bool,
default=None)
parser.add_argument(
"--path",
help="path to save the page to be cloned",
required=False,
default="/opt/"
)
parser.add_argument("--css-validate", help="set whether css validation is required", type=str_to_bool, default=None)
parser.add_argument("--path", help="path to save the page to be cloned", required=False, default="/opt/")
args = parser.parse_args()
default_path = os.path.join(args.path, 'snare')

Expand Down
55 changes: 17 additions & 38 deletions bin/snare
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ from snare.utils.snare_helpers import check_privileges, check_meta_file, print_c
def create_initial_config(base_path):
cfg = configparser.ConfigParser()
cfg['WEB-TOOLS'] = dict(google='', bing='')
with open(os.path.join(base_path,'snare.cfg'), 'w') as configfile:
with open(os.path.join(base_path, 'snare.cfg'), 'w') as configfile:
cfg.write(configfile)


Expand All @@ -49,16 +49,16 @@ def snare_setup(base_path):
print_color(err, 'WARNING')
sys.exit(1)

if not os.path.exists(os.path.join(base_path,'pages')):
os.makedirs(os.path.join(base_path,'pages'))
if not os.path.exists(os.path.join(base_path, 'pages')):
os.makedirs(os.path.join(base_path, 'pages'))
# Write pid to pid file
with open(os.path.join(base_path,'snare.pid'), 'wb') as pid_fh:
with open(os.path.join(base_path, 'snare.pid'), 'wb') as pid_fh:
pid_fh.write(str(os.getpid()).encode('utf-8'))
# Config file
if not os.path.exists(os.path.join(base_path,'snare.cfg')):
if not os.path.exists(os.path.join(base_path, 'snare.cfg')):
create_initial_config(base_path)
# Read or create the sensor id
uuid_file_path = os.path.join(base_path,'snare.uuid')
uuid_file_path = os.path.join(base_path, 'snare.uuid')
if os.path.exists(uuid_file_path):
with open(uuid_file_path, 'rb') as uuid_fh:
snare_uuid = uuid_fh.read()
Expand All @@ -79,10 +79,7 @@ def drop_privileges():
os.setuid(wanted_user.pw_uid)
new_user = pwd.getpwuid(os.getuid())
new_group = grp.getgrgid(os.getgid())
print_color(
'privileges dropped, running as "{}:{}"'.format(
new_user.pw_name,
new_group.gr_name), 'INFO')
print_color('privileges dropped, running as "{}:{}"'.format(new_user.pw_name, new_group.gr_name), 'INFO')


def compare_version_info(timeout):
Expand All @@ -96,8 +93,7 @@ def compare_version_info(timeout):
print_color('timeout fetching the repository version', 'ERROR')
else:
if diff_list:
print_color(
'you are running an outdated version, SNARE will be updated and restarted', 'INFO')
print_color('you are running an outdated version, SNARE will be updated and restarted', 'INFO')
repo.git.reset('--hard')
repo.heads.master.checkout()
repo.git.clean('-xdf')
Expand All @@ -120,9 +116,7 @@ async def check_tanner():
version = result["version"]
vm.check_compatibility(version)
except aiohttp.ClientOSError:
print_color(
"Can't connect to tanner host {}".format(req_url),
'ERROR')
print_color("Can't connect to tanner host {}".format(req_url), 'ERROR')
exit(1)
else:
await resp.release()
Expand All @@ -137,16 +131,9 @@ if __name__ == '__main__':
""")
parser = argparse.ArgumentParser()
page_group = parser.add_mutually_exclusive_group(required=True)
page_group.add_argument("--page-dir",
help="name of the folder to be served")
page_group.add_argument(
"--list-pages",
help="list available pages",
action='store_true')
parser.add_argument(
"--index-page",
help="file name of the index page",
default='index.html')
page_group.add_argument("--page-dir", help="name of the folder to be served")
page_group.add_argument("--list-pages", help="list available pages", action='store_true')
parser.add_argument("--index-page", help="file name of the index page", default='index.html')
parser.add_argument("--port", help="port to listen on", default='8080')
parser.add_argument("--host-ip", help="host ip to bind to", default='127.0.0.1')
parser.add_argument("--debug", help="run web server in debug mode", default=False)
Expand Down Expand Up @@ -185,8 +172,7 @@ if __name__ == '__main__':
args_dict = vars(args)
args_dict['full_page_path'] = os.path.realpath(full_page_path)
if not os.path.exists(full_page_path):
print_color(
"--page-dir: {0} does not exist".format(args.page_dir), 'ERROR')
print_color("--page-dir: {0} does not exist".format(args.page_dir), 'ERROR')
exit()
args.index_page = os.path.join("/", args.index_page)

Expand All @@ -202,8 +188,7 @@ if __name__ == '__main__':
print_color("Error found in meta.json. Please clone the pages again.", "ERROR")
exit()

if not os.path.exists(os.path.join(full_page_path,
os.path.join(meta_info[args.index_page]['hash']))):
if not os.path.exists(os.path.join(full_page_path, os.path.join(meta_info[args.index_page]['hash']))):
print_color('can\'t create meta tag', 'WARNING')
else:
snare_helpers.add_meta_tag(args.page_dir, meta_info[args.index_page]['hash'], config, base_path)
Expand All @@ -214,15 +199,9 @@ if __name__ == '__main__':
compare_version_fut = None
if args.auto_update is True:
timeout = snare_helpers.parse_timeout(args.update_timeout)
compare_version_fut = loop.run_in_executor(
pool, compare_version_info, timeout)

app = HttpRequestHandler(
meta_info,
args,
snare_uuid,
debug=args.debug,
keep_alive=75)
compare_version_fut = loop.run_in_executor(pool, compare_version_info, timeout)

app = HttpRequestHandler(meta_info, args, snare_uuid, debug=args.debug, keep_alive=75)

print_color('serving with uuid {0}'.format(snare_uuid.decode('utf-8')), 'INFO')
print_color("Debug logs will be stored in {}".format(log_debug), 'INFO')
Expand Down
5 changes: 2 additions & 3 deletions snare/tanner_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ async def submit_data(self, data):
event_result = await r.json()
except (json.decoder.JSONDecodeError, aiohttp.client_exceptions.ContentTypeError) as e:
self.logger.error('Error submitting data: {} {}'.format(e, data))
event_result = {'version': '0.6.0', 'response': {'message': {'detection':
{'name': 'index', 'order': 1, 'type': 1, 'version': '0.6.0'},
'sess_uuid': data['uuid']}}}
event_result = {'version': '0.6.0', 'response': {'message': {'detection': {
'name': 'index', 'order': 1, 'type': 1, 'version': '0.6.0'}, 'sess_uuid': data['uuid']}}}
finally:
await r.release()
except Exception as e:
Expand Down
17 changes: 5 additions & 12 deletions snare/tests/test_cloner_get_body.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def setUp(self):
self.loop = asyncio.new_event_loop()
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate)
self.target_path = '/opt/snare/pages/{}'.format(
yarl.URL(self.root).host)
self.target_path = '/opt/snare/pages/{}'.format(yarl.URL(self.root).host)
self.return_content = None
self.expected_content = None
self.filename = None
Expand Down Expand Up @@ -51,8 +50,7 @@ def test_get_body(self):

aiohttp.ClientResponse._headers = {'Content-Type': 'text/html'}
aiohttp.ClientResponse.read = AsyncMock(return_value=self.content)
self.filename, self.hashname = self.handler._make_filename(
yarl.URL(self.root))
self.filename, self.hashname = self.handler._make_filename(yarl.URL(self.root))
self.expected_content = '<html><body><a href="/test"></a></body></html>'

self.meta = {
Expand All @@ -72,17 +70,13 @@ async def test():

with self.assertLogs(level='DEBUG') as log:
self.loop.run_until_complete(test())
self.assertIn(
'DEBUG:snare.cloner:Cloned file: /test',
''.join(
log.output))
self.assertIn('DEBUG:snare.cloner:Cloned file: /test', ''.join(log.output))

with open(os.path.join(self.target_path, self.hashname)) as f:
self.return_content = f.read()

self.assertEqual(self.return_content, self.expected_content)
self.assertEqual(
self.handler.visited_urls[-2:], ['http://example.com/', 'http://example.com/test'])
self.assertEqual(self.handler.visited_urls[-2:], ['http://example.com/', 'http://example.com/test'])
self.assertEqual(self.handler.meta, self.meta)

def test_get_body_css_validate(self):
Expand Down Expand Up @@ -142,8 +136,7 @@ async def test():
self.loop.run_until_complete(test())
self.assertEqual(self.return_size, self.q_size)
self.assertEqual(self.handler.meta, self.meta)
self.assertEqual(
self.handler.visited_urls[-1], self.expected_content)
self.assertEqual(self.handler.visited_urls[-1], self.expected_content)

def test_client_error(self):
self.session.get = AsyncMock(side_effect=aiohttp.ClientError)
Expand Down
43 changes: 43 additions & 0 deletions snare/tests/test_cloner_get_root_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import unittest
from unittest import mock
import sys
from snare.cloner import Cloner
import shutil
from yarl import URL
import asyncio
import aiohttp
from snare.utils.asyncmock import AsyncMock


class TestClonerGetRootHost(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()

def test_moved_root(self):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate)
self.expected_moved_root = URL('http://www.example.com')

async def test():
await self.handler.get_root_host()

self.loop.run_until_complete(test())

self.assertEqual(self.handler.moved_root, self.expected_moved_root)

@mock.patch('aiohttp.ClientSession')
def test_clienterror(self, session):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate)

aiohttp.ClientSession = mock.Mock(side_effect=aiohttp.ClientError)

async def test():
await self.handler.get_root_host()

with self.assertRaises(SystemExit):
self.loop.run_until_complete(test())
18 changes: 18 additions & 0 deletions snare/tests/test_cloner_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import unittest
import sys
from snare.cloner import Cloner
import shutil


class TestClonerInitialization(unittest.TestCase):
def setUp(self):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate, default_path='/tmp')

def test_cloner_init(self):
self.assertIsInstance(self.handler, Cloner)

def tearDown(self):
shutil.rmtree(self.handler.target_path)
3 changes: 1 addition & 2 deletions snare/tests/test_cloner_make_filename.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def test_make_filename(self):
self.assertEqual(self.hashname, '167a0418dd8ce3bf0ef00dfb6195f038')

def test_make_filename_same_host(self):
self.filename, self.hashname = self.handler._make_filename(
yarl.URL(self.root))
self.filename, self.hashname = self.handler._make_filename(yarl.URL(self.root))
self.assertEqual(self.filename, '/index.html')
self.assertEqual(self.hashname, 'd1546d731a9f30cc80127d57142a482b')

Expand Down
13 changes: 9 additions & 4 deletions snare/tests/test_cloner_process_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ async def test():

self.loop.run_until_complete(test())
self.assertEqual(self.return_content, '/foo/путь/')
self.assertEqual(
yarl.URL(
self.return_url).human_repr(),
self.expected_content)
self.assertEqual(yarl.URL(self.return_url).human_repr(), self.expected_content)
self.assertEqual(self.return_level, self.level + 1)

self.handler.moved_root = yarl.URL('http://example2.com')
self.expected_content = 'http://example2.com/foo/путь/'

self.loop.run_until_complete(test())
self.assertEqual(self.return_content, '/foo/путь/')
self.assertEqual(yarl.URL(self.return_url).human_repr(), self.expected_content)
self.assertEqual(self.return_level, self.level + 1)

def test_process_link_absolute(self):
Expand Down
17 changes: 15 additions & 2 deletions snare/tests/test_cloner_replace_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ async def test():

self.loop.run_until_complete(test())
self.assertEqual(str(self.return_content), self.expected_content)
self.handler.process_link.assert_called_with(
self.root, self.level, check_host=True)
self.handler.process_link.assert_called_with(self.root, self.level, check_host=True)

def test_replace_image_links(self):
self.handler.process_link = AsyncMock(return_value="/smiley.png")
Expand Down Expand Up @@ -65,5 +64,19 @@ async def test():
self.assertEqual(str(self.return_content), self.expected_content)
self.handler.process_link.assert_called_with(self.root, self.level)

def test_replace_redirects(self):
self.root = "http://example.com"
self.content = ('\n<html>\n<body>\n<p name="redirect" value="http://example.com/home.html">Redirecting...</p>\n'
'</body>\n</html>\n')

self.expected_content = ('\n<html>\n<body>\n<p name="redirect" value="/home.html">Redirecting...</p>\n</body>\n'
'</html>\n')

async def test():
self.return_content = await self.handler.replace_links(self.content, self.level)

self.loop.run_until_complete(test())
self.assertEqual(str(self.return_content), self.expected_content)

def tearDown(self):
shutil.rmtree(self.main_page_path)
17 changes: 17 additions & 0 deletions snare/tests/test_cloner_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import unittest
import sys
from snare.cloner import Cloner
import shutil
import asyncio


class TestClonerRun(unittest.TestCase):
def setUp(self):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate, default_path='/tmp')
self.loop = asyncio.new_event_loop()

def test_run(self):
self.loop.run_until_complete(self.handler.run())
13 changes: 4 additions & 9 deletions snare/tests/test_html_handler_get_dorks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ def setUp(self):
self.data = None

def test_get_dorks(self):
aiohttp.ClientResponse.json = AsyncMock(
return_value=dict(response={'dorks': "test_dorks"}))
aiohttp.ClientResponse.json = AsyncMock(return_value=dict(response={'dorks': "test_dorks"}))

async def test():
self.data = await self.handler.get_dorks()

self.loop.run_until_complete(test())
aiohttp.ClientSession.get.assert_called_with(
'http://tanner.mushmush.org:8090/dorks', timeout=10.0)
aiohttp.ClientSession.get.assert_called_with('http://tanner.mushmush.org:8090/dorks', timeout=10.0)

def test_return_dorks(self):
aiohttp.ClientResponse.json = AsyncMock(return_value=self.dorks)
Expand All @@ -53,17 +51,14 @@ async def test():
self.assertEqual(self.data, self.dorks['response']['dorks'])

def test_logging_error(self):
aiohttp.ClientResponse.json = AsyncMock(
side_effect=JSONDecodeError('ERROR', '', 0))
aiohttp.ClientResponse.json = AsyncMock(side_effect=JSONDecodeError('ERROR', '', 0))

async def test():
self.data = await self.handler.get_dorks()

with self.assertLogs(level='ERROR') as log:
self.loop.run_until_complete(test())
self.assertIn(
'Error getting dorks: ERROR: line 1 column 1 (char 0)',
log.output[0])
self.assertIn('Error getting dorks: ERROR: line 1 column 1 (char 0)', log.output[0])

def test_logging_timeout(self):
aiohttp.ClientResponse.json = AsyncMock(
Expand Down
Loading