Skip to content

Commit

Permalink
Move S3 auth & cloudfront_signer config validation to init
Browse files Browse the repository at this point in the history
  • Loading branch information
gerrod3 committed Sep 14, 2023
1 parent d049794 commit ad7f517
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 32 deletions.
52 changes: 20 additions & 32 deletions storages/backends/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,23 @@ class S3Storage(CompressStorageMixin, BaseStorage):
"""

default_content_type = "application/octet-stream"
# If config provided in init, signature_version and addressing_style settings/args
# If config provided in subclass, signature_version and addressing_style settings/args
# are ignored.
config = None

def __init__(self, **settings):
cloudfront_key_id = settings.pop("cloudfront_key_id", None)
cloudfront_key = settings.pop("cloudfront_key", None)
self.cloudfront_signer = settings.pop("cloudfront_signer", None)

super().__init__(**settings)

check_location(self)

if (self.access_key or self.secret_key) and self.session_profile:
raise ImproperlyConfigured(
"AWS_S3_SESSION_PROFILE/session_profile should not be provided with "
"AWS_S3_ACCESS_KEY_ID/access_key and AWS_S3_SECRET_ACCESS_KEY/secret_key"
)

self._bucket = None
self._connections = threading.local()

Expand All @@ -308,39 +313,21 @@ def __init__(self, **settings):
if self.transfer_config is None:
self.transfer_config = TransferConfig(use_threads=self.use_threads)

if cloudfront_key_id and cloudfront_key:
self.cloudfront_signer = self.get_cloudfront_signer(
cloudfront_key_id, cloudfront_key
)
if not self.cloudfront_signer:
if self.cloudfront_key_id and self.cloudfront_key:
self.cloudfront_signer = self.get_cloudfront_signer(
self.cloudfront_key_id, self.cloudfront_key
)
elif bool(self.cloudfront_key_id) ^ bool(self.cloudfront_key):
raise ImproperlyConfigured(
"Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and "
"AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together."
)

def get_cloudfront_signer(self, key_id, key):
return _cloud_front_signer_from_pem(key_id, key)

def get_default_settings(self):
cloudfront_key_id = setting("AWS_CLOUDFRONT_KEY_ID")
cloudfront_key = setting("AWS_CLOUDFRONT_KEY")
if bool(cloudfront_key_id) ^ bool(cloudfront_key):
raise ImproperlyConfigured(
"Both AWS_CLOUDFRONT_KEY_ID and AWS_CLOUDFRONT_KEY must be "
"provided together."
)

if cloudfront_key_id:
cloudfront_signer = self.get_cloudfront_signer(
cloudfront_key_id, cloudfront_key
)
else:
cloudfront_signer = None

s3_access_key_id = setting("AWS_S3_ACCESS_KEY_ID")
s3_secret_access_key = setting("AWS_S3_SECRET_ACCESS_KEY")
s3_session_profile = setting("AWS_S3_SESSION_PROFILE")
if (s3_access_key_id or s3_secret_access_key) and s3_session_profile:
raise ImproperlyConfigured(
"AWS_S3_SESSION_PROFILE should not be provided with "
"AWS_S3_ACCESS_KEY_ID and AWS_S3_SECRET_ACCESS_KEY"
)

return {
"access_key": setting("AWS_S3_ACCESS_KEY_ID", setting("AWS_ACCESS_KEY_ID")),
"secret_key": setting(
Expand All @@ -358,7 +345,8 @@ def get_default_settings(self):
"signature_version": setting("AWS_S3_SIGNATURE_VERSION"),
"location": setting("AWS_LOCATION", ""),
"custom_domain": setting("AWS_S3_CUSTOM_DOMAIN"),
"cloudfront_signer": cloudfront_signer,
"cloudfront_key_id": setting("AWS_CLOUDFRONT_KEY_ID"),
"cloudfront_key": setting("AWS_CLOUDFRONT_KEY"),
"addressing_style": setting("AWS_S3_ADDRESSING_STYLE"),
"file_name_charset": setting("AWS_S3_FILE_NAME_CHARSET", "utf-8"),
"gzip": setting("AWS_IS_GZIPPED", False),
Expand Down
69 changes: 69 additions & 0 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,75 @@ def test_transfer_config(self):
storage = s3.S3Storage()
self.assertFalse(storage.transfer_config.use_threads)

def test_cloudfront_config(self):
# Valid configs
storage = s3.S3Storage()
self.assertIsNone(storage.cloudfront_signer)

with override_settings(AWS_CLOUDFRONT_KEY_ID="foo", AWS_CLOUDFRONT_KEY="boo"):
storage = s3.S3Storage()
self.assertIsNotNone(storage.cloudfront_signer)

storage = s3.S3Storage(cloudfront_key_id="foo", cloudfront_key="boo")
self.asserIsNotNone(storage.cloudfront_signer)

cloudfront_signer = storage.get_cloudfront_signer("foo", "boo")
storage = s3.S3Storage(cloudfront_signer=cloudfront_signer)
self.assertIsNotNone(storage.cloudfront_signer)

with override_settings(AWS_CLOUDFRONT_KEY_ID="foo"):
storage = s3.S3Storage(cloudfront_key="boo")
self.assertIsNotNone(storage.cloudfront_signer)

# Invalid configs
msg = (
"Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and "
"AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together."
)
with override_settings(AWS_CLOUDFRONT_KEY_ID="foo"):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with override_settings(AWS_CLOUDFRONT_KEY="foo"):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(cloudfront_key_id="foo")

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(cloudfront_key="boo")

def test_auth_config(self):
# Valid configs
with override_settings(AWS_S3_ACCESS_KEY_ID="foo", AWS_S3_SECRET_ACCESS_KEY="boo"):
storage = s3.S3Storage()
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

with override_settings(AWS_ACCESS_KEY_ID="foo", AWS_SECRET_ACCESS_KEY="boo"):
storage = s3.S3Storage()
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

storage = s3.S3Storage(access_key="foo", secret_key="boo")
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

# Invalid configs
msg = (
"AWS_S3_SESSION_PROFILE/session_profile should not be provided with "
"AWS_S3_ACCESS_KEY_ID/access_key and AWS_S3_SECRET_ACCESS_KEY/secret_key"
)
with override_settings(
AWS_ACCESS_KEY_ID="foo", AWS_SECRET_ACCESS_KEY="boo", AWS_S3_SESSION_PROFILE="moo"
):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(access_key="foo", secret_key="boo", session_profile="moo")


class S3StaticStorageTests(TestCase):
def setUp(self):
Expand Down

0 comments on commit ad7f517

Please sign in to comment.