Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move S3 auth & cloudfront_signer config validation to init #1302

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 22 additions & 33 deletions storages/backends/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,24 @@ class S3Storage(CompressStorageMixin, BaseStorage):
"""

default_content_type = "application/octet-stream"
# If config provided in init, signature_version and addressing_style settings/args
# are ignored.
# If config provided in subclass, signature_version and addressing_style
# settings/args are ignored.
config = None

def __init__(self, **settings):
cloudfront_key_id = settings.pop("cloudfront_key_id", None)
cloudfront_key = settings.pop("cloudfront_key", None)
self.cloudfront_signer = settings.pop("cloudfront_signer", None)

super().__init__(**settings)

check_location(self)

if (self.access_key or self.secret_key) and self.session_profile:
raise ImproperlyConfigured(
"AWS_S3_SESSION_PROFILE/session_profile should not be provided with "
"AWS_S3_ACCESS_KEY_ID/access_key and "
"AWS_S3_SECRET_ACCESS_KEY/secret_key"
)

self._bucket = None
self._connections = threading.local()

Expand All @@ -308,39 +314,21 @@ def __init__(self, **settings):
if self.transfer_config is None:
self.transfer_config = TransferConfig(use_threads=self.use_threads)

if cloudfront_key_id and cloudfront_key:
self.cloudfront_signer = self.get_cloudfront_signer(
cloudfront_key_id, cloudfront_key
)
if not self.cloudfront_signer:
if self.cloudfront_key_id and self.cloudfront_key:
self.cloudfront_signer = self.get_cloudfront_signer(
self.cloudfront_key_id, self.cloudfront_key
)
elif bool(self.cloudfront_key_id) ^ bool(self.cloudfront_key):
raise ImproperlyConfigured(
"Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and "
"AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together."
)

def get_cloudfront_signer(self, key_id, key):
return _cloud_front_signer_from_pem(key_id, key)

def get_default_settings(self):
cloudfront_key_id = setting("AWS_CLOUDFRONT_KEY_ID")
cloudfront_key = setting("AWS_CLOUDFRONT_KEY")
if bool(cloudfront_key_id) ^ bool(cloudfront_key):
raise ImproperlyConfigured(
"Both AWS_CLOUDFRONT_KEY_ID and AWS_CLOUDFRONT_KEY must be "
"provided together."
)

if cloudfront_key_id:
cloudfront_signer = self.get_cloudfront_signer(
cloudfront_key_id, cloudfront_key
)
else:
cloudfront_signer = None

s3_access_key_id = setting("AWS_S3_ACCESS_KEY_ID")
s3_secret_access_key = setting("AWS_S3_SECRET_ACCESS_KEY")
s3_session_profile = setting("AWS_S3_SESSION_PROFILE")
if (s3_access_key_id or s3_secret_access_key) and s3_session_profile:
raise ImproperlyConfigured(
"AWS_S3_SESSION_PROFILE should not be provided with "
"AWS_S3_ACCESS_KEY_ID and AWS_S3_SECRET_ACCESS_KEY"
)

return {
"access_key": setting("AWS_S3_ACCESS_KEY_ID", setting("AWS_ACCESS_KEY_ID")),
"secret_key": setting(
Expand All @@ -358,7 +346,8 @@ def get_default_settings(self):
"signature_version": setting("AWS_S3_SIGNATURE_VERSION"),
"location": setting("AWS_LOCATION", ""),
"custom_domain": setting("AWS_S3_CUSTOM_DOMAIN"),
"cloudfront_signer": cloudfront_signer,
"cloudfront_key_id": setting("AWS_CLOUDFRONT_KEY_ID"),
"cloudfront_key": setting("AWS_CLOUDFRONT_KEY"),
"addressing_style": setting("AWS_S3_ADDRESSING_STYLE"),
"file_name_charset": setting("AWS_S3_FILE_NAME_CHARSET", "utf-8"),
"gzip": setting("AWS_IS_GZIPPED", False),
Expand Down
95 changes: 95 additions & 0 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,101 @@ def test_transfer_config(self):
storage = s3.S3Storage()
self.assertFalse(storage.transfer_config.use_threads)

def test_cloudfront_config(self):
# Valid configs
storage = s3.S3Storage()
self.assertIsNone(storage.cloudfront_signer)

key_id = "test-id"
pem = dedent(
"""\
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQCXVuwcMk+JmVSKuQ1K4dZx4Z1dEcRQgTlqvhAyljIpttXlZh2/
fD3GkJCiqfwEmo+cdNK/LFzRj/CX8Wz1z1lH2USONpG6sAkotkatCbejiItDu5y6
janGJHfuWXu6B/o9gwZylU1gIsePY3lLNk+r9QhXUO4jXw6zLJftVwKPhQIDAQAB
AoGAbpkRV9HUmoQ5al+uPSkp5HOy4s8XHpYxdbaMc8ubwSxiyJCF8OhE5RXE/Xso
N90UUox1b0xmUKfWddPzgvgTD/Ub7D6Ukf+nVWDX60tWgNxICAUHptGL3tWweaAy
H+0+vZ0TzvTt9r00vW0FzO7F8X9/Rs1ntDRLtF3RCCxdq0kCQQDHFu+t811lCvy/
67rMEKGvNsNNSTrzOrNr3PqUrCnOrzKazjFVjsKv5VzI/U+rXGYKWJsMpuCFiHZ3
DILUC09TAkEAwpm2S6MN6pzn9eY6pmhOxZ+GQGGRUkKZfC1GDxaRSRb8sKTjptYw
WSemJSxiDzdj3Po2hF0lbhkpJgUq6xnCxwJAZgHHfn5CLSJrDD7Q7/vZi/foK3JJ
BRTfl3Wa4pAvv5meuRjKyEakVBGV79lyd5+ZHNX3Y40hXunjoO3FHrZIxwJAdRzu
waxahrRxQOKSr20c4wAzWnGddIUSO9I/VHs/al5EKsbBHrnOlQkwizSfuwqZtfZ7
csNf8FeCFRiNELoLJwJAZxWBE2+8J9VW9AQ0SE7j4FyM/B8FvRhF5PLAAsw/OxHO
SxiFP7Ptdac1tm5H5zOqaqSHWphI19HNNilXKmxuCA==
-----END RSA PRIVATE KEY-----"""
).encode("ascii")

with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id, AWS_CLOUDFRONT_KEY=pem):
storage = s3.S3Storage()
self.assertIsNotNone(storage.cloudfront_signer)

storage = s3.S3Storage(cloudfront_key_id=key_id, cloudfront_key=pem)
self.assertIsNotNone(storage.cloudfront_signer)

cloudfront_signer = storage.get_cloudfront_signer(key_id, pem)
storage = s3.S3Storage(cloudfront_signer=cloudfront_signer)
self.assertIsNotNone(storage.cloudfront_signer)

with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id):
storage = s3.S3Storage(cloudfront_key=pem)
self.assertIsNotNone(storage.cloudfront_signer)

# Invalid configs
msg = (
"Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and "
"AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together."
)
with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with override_settings(AWS_CLOUDFRONT_KEY=pem):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(cloudfront_key_id=key_id)

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(cloudfront_key=pem)

def test_auth_config(self):
# Valid configs
with override_settings(
AWS_S3_ACCESS_KEY_ID="foo", AWS_S3_SECRET_ACCESS_KEY="boo"
):
storage = s3.S3Storage()
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

with override_settings(AWS_ACCESS_KEY_ID="foo", AWS_SECRET_ACCESS_KEY="boo"):
storage = s3.S3Storage()
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

storage = s3.S3Storage(access_key="foo", secret_key="boo")
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

# Invalid configs
msg = (
"AWS_S3_SESSION_PROFILE/session_profile should not be provided with "
"AWS_S3_ACCESS_KEY_ID/access_key and AWS_S3_SECRET_ACCESS_KEY/secret_key"
)
with override_settings(
AWS_ACCESS_KEY_ID="foo",
AWS_SECRET_ACCESS_KEY="boo",
AWS_S3_SESSION_PROFILE="moo",
):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(
access_key="foo", secret_key="boo", session_profile="moo"
)


class S3StaticStorageTests(TestCase):
def setUp(self):
Expand Down