Skip to content

Commit

Permalink
Merge pull request #135 from kobotoolbox/125-validate-aws-credentials
Browse files Browse the repository at this point in the history
Adding validation for aws credentials without any dependency on boto3
  • Loading branch information
noliveleger authored Nov 16, 2020
2 parents f61163b + b455868 commit ec3e516
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 2 deletions.
113 changes: 113 additions & 0 deletions helpers/aws_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
import datetime
import hashlib
import hmac
from urllib.error import HTTPError
from urllib.request import Request, urlopen


class AWSValidation:
"""
A class to validate AWS credentials without using boto3 as a dependency.
The structure and methods have been adapted from the AWS documentation:
http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
"""

METHOD = 'POST'
SERVICE = 'sts'
REGION = 'us-east-1'
HOST = 'sts.amazonaws.com'
ENDPOINT = 'https://sts.amazonaws.com'
REQUEST_PARAMETERS = 'Action=GetCallerIdentity&Version=2011-06-15'
CANONICAL_URI = '/'
SIGNED_HEADERS = 'host;x-amz-date'
PAYLOAD_HASH = hashlib.sha256(''.encode()).hexdigest()
ALGORITHM = 'AWS4-HMAC-SHA256'

def __init__(self, aws_access_key_id, aws_secret_access_key):
self.access_key = aws_access_key_id
self.secret_key = aws_secret_access_key

@staticmethod
def _sign(key, msg):
return hmac.new(key, msg.encode(), hashlib.sha256).digest()

@classmethod
def _get_signature_key(cls, key, date_stamp, region_name, service_name):
k_date = cls._sign(('AWS4' + key).encode(), date_stamp)
k_region = cls._sign(k_date, region_name)
k_service = cls._sign(k_region, service_name)
return cls._sign(k_service, 'aws4_request')

def _get_request_url_and_headers(self):
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')

canonical_querystring = self.REQUEST_PARAMETERS

canonical_headers = '\n'.join(
[f'host:{self.HOST}', f'x-amz-date:{amzdate}', '']
)

canonical_request = '\n'.join(
[
self.METHOD,
self.CANONICAL_URI,
canonical_querystring,
canonical_headers,
self.SIGNED_HEADERS,
self.PAYLOAD_HASH,
]
)

credential_scope = '/'.join(
[datestamp, self.REGION, self.SERVICE, 'aws4_request']
)

string_to_sign = '\n'.join(
[
self.ALGORITHM,
amzdate,
credential_scope,
hashlib.sha256(canonical_request.encode()).hexdigest(),
]
)

signing_key = self._get_signature_key(
self.secret_key, datestamp, self.REGION, self.SERVICE
)

signature = hmac.new(
signing_key, string_to_sign.encode(), hashlib.sha256
).hexdigest()

authorization_header = (
'{} Credential={}/{}, SignedHeaders={}, Signature={}'.format(
self.ALGORITHM,
self.access_key,
credential_scope,
self.SIGNED_HEADERS,
signature,
)
)

headers = {'x-amz-date': amzdate, 'Authorization': authorization_header}
request_url = '?'.join([self.ENDPOINT, canonical_querystring])

return request_url, headers

def validate_credentials(self):
request_url, headers = self._get_request_url_and_headers()
req = Request(request_url, headers=headers, method=self.METHOD)

try:
with urlopen(req) as res:
if res.status == 200:
return True
else:
return False
except HTTPError as e:
return False

72 changes: 71 additions & 1 deletion helpers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datetime import datetime
from random import choice

from helpers.aws_validation import AWSValidation
from helpers.cli import CLI
from helpers.network import Network
from helpers.singleton import Singleton
Expand All @@ -30,7 +31,8 @@ class Config(metaclass=Singleton):
DEFAULT_NGINX_PORT = '80'
DEFAULT_NGINX_HTTPS_PORT = '443'
KOBO_DOCKER_BRANCH = '2.020.45'
KOBO_INSTALL_VERSION = '4.1.0'
KOBO_INSTALL_VERSION = '4.2.0'
MAXIMUM_AWS_CREDENTIAL_ATTEMPTS = 3

def __init__(self):
self.__first_time = None
Expand Down Expand Up @@ -302,10 +304,12 @@ def get_template(cls):
'aws_backup_weekly_retention': '4',
'aws_backup_yearly_retention': '2',
'aws_bucket_name': '',
'aws_credentials_valid': False,
'aws_mongo_backup_minimum_size': '50',
'aws_postgres_backup_minimum_size': '50',
'aws_redis_backup_minimum_size': '5',
'aws_secret_key': '',
'aws_validate_credentials': True,
'backend_server_role': 'primary',
'backup_from_primary': True,
'block_common_http_ports': True,
Expand Down Expand Up @@ -576,6 +580,13 @@ def use_letsencrypt(self):
def use_private_dns(self):
return self.__dict['use_private_dns']

def validate_aws_credentials(self):
validation = AWSValidation(
aws_access_key_id=self.__dict['aws_access_key'],
aws_secret_access_key=self.__dict['aws_secret_key'],
)
self.__dict['aws_credentials_valid'] = validation.validate_credentials()

def write_config(self):
"""
Writes config to file `Config.CONFIG_FILE`.
Expand Down Expand Up @@ -756,6 +767,11 @@ def __questions_aws(self):
'Do you want to use AWS S3 storage?',
default=self.__dict['use_aws']
)
self.__questions_aws_configuration()
self.__questions_aws_validate_credentials()

def __questions_aws_configuration(self):

if self.__dict['use_aws']:
self.__dict['aws_access_key'] = CLI.colored_input(
'AWS Access Key', CLI.COLOR_QUESTION,
Expand All @@ -771,6 +787,60 @@ def __questions_aws(self):
self.__dict['aws_secret_key'] = ''
self.__dict['aws_bucket_name'] = ''

def __questions_aws_validate_credentials(self):
"""
Prompting user whether they would like to validate their entered AWS
credentials or continue without validation.
"""
# Resetting validation when setup is rerun
self.__dict['aws_credentials_valid'] = False
aws_credential_attempts = 0

if self.__dict['use_aws']:
self.__dict['aws_validate_credentials'] = CLI.yes_no_question(
'Would you like to validate your AWS credentials?',
default=self.__dict['aws_validate_credentials'],
)

if self.__dict['use_aws'] and self.__dict['aws_validate_credentials']:
while (
not self.__dict['aws_credentials_valid']
and aws_credential_attempts
<= self.MAXIMUM_AWS_CREDENTIAL_ATTEMPTS
):
aws_credential_attempts += 1
self.validate_aws_credentials()
attempts_remaining = (
self.MAXIMUM_AWS_CREDENTIAL_ATTEMPTS
- aws_credential_attempts
)
if (
not self.__dict['aws_credentials_valid']
and attempts_remaining > 0
):
CLI.colored_print(
'Invalid credentials, please try again.',
CLI.COLOR_WARNING,
)
CLI.colored_print(
'Attempts remaining for AWS validation: {}'.format(
attempts_remaining
),
CLI.COLOR_INFO,
)
self.__questions_aws_configuration()
else:
if not self.__dict['aws_credentials_valid']:
CLI.colored_print(
'Please restart configuration', CLI.COLOR_ERROR
)
sys.exit(1)
else:
CLI.colored_print(
'AWS credentials successfully validated',
CLI.COLOR_SUCCESS
)

def __questions_aws_backup_settings(self):

self.__dict['aws_backup_bucket_name'] = CLI.colored_input(
Expand Down
1 change: 0 additions & 1 deletion requirements_tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ pathlib2==2.3.2
pluggy==0.8.0
py==1.7.0
pytest==3.9.2
six==1.11.0
netifaces==0.10.7
113 changes: 113 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .utils import (
read_config,
write_trigger_upsert_db_users,
MockAWSValidation
)

CHOICE_YES = '1'
Expand Down Expand Up @@ -149,6 +150,118 @@ def test_use_https():
assert config.local_install
assert not config.is_secure

def _aws_validation_setup():
config = read_config()

assert not config._Config__dict['use_aws']
assert not config._Config__dict['aws_credentials_valid']

return config

def test_aws_credentials_invalid_with_no_configuration():
config = _aws_validation_setup()

with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
mock_colored_input.side_effect = CHOICE_NO
assert not config._Config__dict['use_aws']
assert not config._Config__dict['aws_credentials_valid']

def test_aws_validation_fails_with_system_exit():
config = _aws_validation_setup()

with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
mock_colored_input.side_effect = iter(
[CHOICE_YES, '', '', '', CHOICE_YES, '', '', '', '', '', '']
)
try:
config._Config__questions_aws()
except SystemExit:
pass
assert not config._Config__dict['aws_credentials_valid']

def test_aws_invalid_credentials_continue_without_validation():
config = _aws_validation_setup()

with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
mock_colored_input.side_effect = iter([CHOICE_YES,'', '', '', CHOICE_NO])
config._Config__questions_aws()
assert not config._Config__dict['aws_credentials_valid']

@patch('helpers.aws_validation.AWSValidation.validate_credentials',
new=MockAWSValidation.validate_credentials)
def test_aws_validation_passes_with_valid_credentials():
config = _aws_validation_setup()

# correct keys, no validation, should continue without issue
with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
mock_colored_input.side_effect = iter(
[
CHOICE_YES,
'test_access_key',
'test_secret_key',
'test_bucket_name',
CHOICE_NO,
]
)
config._Config__questions_aws()
assert not config._Config__dict['aws_credentials_valid']

# correct keys in first attempt, choose to validate, continue
# without issue
with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
config._Config__dict['aws_credentials_valid'] = False
mock_colored_input.side_effect = iter(
[
CHOICE_YES,
'test_access_key',
'test_secret_key',
'test_bucket_name',
CHOICE_YES,
]
)
config._Config__questions_aws()
assert config._Config__dict['aws_credentials_valid']

# correct keys in second attempt, choose to validate, continue
# without issue
with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
config._Config__dict['aws_credentials_valid'] = False
mock_colored_input.side_effect = iter(
[
CHOICE_YES,
'',
'',
'',
CHOICE_YES,
'test_access_key',
'test_secret_key',
'test_bucket_name',
]
)
config._Config__questions_aws()
assert config._Config__dict['aws_credentials_valid']

# correct keys in third attempt, choose to validate, continue
# without issue
with patch('helpers.cli.CLI.colored_input') as mock_colored_input:
config._Config__dict['aws_credentials_valid'] = False
mock_colored_input.side_effect = iter(
[
CHOICE_YES,
'',
'',
'',
CHOICE_YES,
'',
'',
'',
'test_access_key',
'test_secret_key',
'test_bucket_name',
]
)
config._Config__questions_aws()
assert config._Config__dict['aws_credentials_valid']

@patch('helpers.config.Config._Config__clone_repo',
MagicMock(return_value=True))
Expand Down
12 changes: 12 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,15 @@ class MockUpgrading:
@staticmethod
def migrate_single_to_two_databases(config):
pass


class MockAWSValidation:

def validate_credentials(self):
if (
self.access_key == 'test_access_key'
and self.secret_key == 'test_secret_key'
):
return True
else:
return False

0 comments on commit ec3e516

Please sign in to comment.