From b76ce60b73f1f8ff70d16d2d75a5facc734cef4d Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Thu, 14 Sep 2023 18:03:40 -0400 Subject: [PATCH] Move S3 auth & cloudfront_signer config validation to init --- storages/backends/s3.py | 55 ++++++++++-------------- tests/test_s3.py | 95 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 33 deletions(-) diff --git a/storages/backends/s3.py b/storages/backends/s3.py index cd549273..24676e9b 100644 --- a/storages/backends/s3.py +++ b/storages/backends/s3.py @@ -275,18 +275,24 @@ class S3Storage(CompressStorageMixin, BaseStorage): """ default_content_type = "application/octet-stream" - # If config provided in init, signature_version and addressing_style settings/args - # are ignored. + # If config provided in subclass, signature_version and addressing_style + # settings/args are ignored. config = None def __init__(self, **settings): - cloudfront_key_id = settings.pop("cloudfront_key_id", None) - cloudfront_key = settings.pop("cloudfront_key", None) + self.cloudfront_signer = settings.pop("cloudfront_signer", None) super().__init__(**settings) check_location(self) + if (self.access_key or self.secret_key) and self.session_profile: + raise ImproperlyConfigured( + "AWS_S3_SESSION_PROFILE/session_profile should not be provided with " + "AWS_S3_ACCESS_KEY_ID/access_key and " + "AWS_S3_SECRET_ACCESS_KEY/secret_key" + ) + self._bucket = None self._connections = threading.local() @@ -308,39 +314,21 @@ def __init__(self, **settings): if self.transfer_config is None: self.transfer_config = TransferConfig(use_threads=self.use_threads) - if cloudfront_key_id and cloudfront_key: - self.cloudfront_signer = self.get_cloudfront_signer( - cloudfront_key_id, cloudfront_key - ) + if not self.cloudfront_signer: + if self.cloudfront_key_id and self.cloudfront_key: + self.cloudfront_signer = self.get_cloudfront_signer( + self.cloudfront_key_id, self.cloudfront_key + ) + elif bool(self.cloudfront_key_id) ^ bool(self.cloudfront_key): + raise ImproperlyConfigured( + "Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and " + "AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together." + ) def get_cloudfront_signer(self, key_id, key): return _cloud_front_signer_from_pem(key_id, key) def get_default_settings(self): - cloudfront_key_id = setting("AWS_CLOUDFRONT_KEY_ID") - cloudfront_key = setting("AWS_CLOUDFRONT_KEY") - if bool(cloudfront_key_id) ^ bool(cloudfront_key): - raise ImproperlyConfigured( - "Both AWS_CLOUDFRONT_KEY_ID and AWS_CLOUDFRONT_KEY must be " - "provided together." - ) - - if cloudfront_key_id: - cloudfront_signer = self.get_cloudfront_signer( - cloudfront_key_id, cloudfront_key - ) - else: - cloudfront_signer = None - - s3_access_key_id = setting("AWS_S3_ACCESS_KEY_ID") - s3_secret_access_key = setting("AWS_S3_SECRET_ACCESS_KEY") - s3_session_profile = setting("AWS_S3_SESSION_PROFILE") - if (s3_access_key_id or s3_secret_access_key) and s3_session_profile: - raise ImproperlyConfigured( - "AWS_S3_SESSION_PROFILE should not be provided with " - "AWS_S3_ACCESS_KEY_ID and AWS_S3_SECRET_ACCESS_KEY" - ) - return { "access_key": setting("AWS_S3_ACCESS_KEY_ID", setting("AWS_ACCESS_KEY_ID")), "secret_key": setting( @@ -358,7 +346,8 @@ def get_default_settings(self): "signature_version": setting("AWS_S3_SIGNATURE_VERSION"), "location": setting("AWS_LOCATION", ""), "custom_domain": setting("AWS_S3_CUSTOM_DOMAIN"), - "cloudfront_signer": cloudfront_signer, + "cloudfront_key_id": setting("AWS_CLOUDFRONT_KEY_ID"), + "cloudfront_key": setting("AWS_CLOUDFRONT_KEY"), "addressing_style": setting("AWS_S3_ADDRESSING_STYLE"), "file_name_charset": setting("AWS_S3_FILE_NAME_CHARSET", "utf-8"), "gzip": setting("AWS_IS_GZIPPED", False), diff --git a/tests/test_s3.py b/tests/test_s3.py index 7d1e7f5f..6b3d9530 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -850,6 +850,101 @@ def test_transfer_config(self): storage = s3.S3Storage() self.assertFalse(storage.transfer_config.use_threads) + def test_cloudfront_config(self): + # Valid configs + storage = s3.S3Storage() + self.assertIsNone(storage.cloudfront_signer) + + key_id = "test-id" + pem = dedent( + """\ + -----BEGIN RSA PRIVATE KEY----- + MIICWwIBAAKBgQCXVuwcMk+JmVSKuQ1K4dZx4Z1dEcRQgTlqvhAyljIpttXlZh2/ + fD3GkJCiqfwEmo+cdNK/LFzRj/CX8Wz1z1lH2USONpG6sAkotkatCbejiItDu5y6 + janGJHfuWXu6B/o9gwZylU1gIsePY3lLNk+r9QhXUO4jXw6zLJftVwKPhQIDAQAB + AoGAbpkRV9HUmoQ5al+uPSkp5HOy4s8XHpYxdbaMc8ubwSxiyJCF8OhE5RXE/Xso + N90UUox1b0xmUKfWddPzgvgTD/Ub7D6Ukf+nVWDX60tWgNxICAUHptGL3tWweaAy + H+0+vZ0TzvTt9r00vW0FzO7F8X9/Rs1ntDRLtF3RCCxdq0kCQQDHFu+t811lCvy/ + 67rMEKGvNsNNSTrzOrNr3PqUrCnOrzKazjFVjsKv5VzI/U+rXGYKWJsMpuCFiHZ3 + DILUC09TAkEAwpm2S6MN6pzn9eY6pmhOxZ+GQGGRUkKZfC1GDxaRSRb8sKTjptYw + WSemJSxiDzdj3Po2hF0lbhkpJgUq6xnCxwJAZgHHfn5CLSJrDD7Q7/vZi/foK3JJ + BRTfl3Wa4pAvv5meuRjKyEakVBGV79lyd5+ZHNX3Y40hXunjoO3FHrZIxwJAdRzu + waxahrRxQOKSr20c4wAzWnGddIUSO9I/VHs/al5EKsbBHrnOlQkwizSfuwqZtfZ7 + csNf8FeCFRiNELoLJwJAZxWBE2+8J9VW9AQ0SE7j4FyM/B8FvRhF5PLAAsw/OxHO + SxiFP7Ptdac1tm5H5zOqaqSHWphI19HNNilXKmxuCA== + -----END RSA PRIVATE KEY-----""" + ).encode("ascii") + + with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id, AWS_CLOUDFRONT_KEY=pem): + storage = s3.S3Storage() + self.assertIsNotNone(storage.cloudfront_signer) + + storage = s3.S3Storage(cloudfront_key_id=key_id, cloudfront_key=pem) + self.assertIsNotNone(storage.cloudfront_signer) + + cloudfront_signer = storage.get_cloudfront_signer(key_id, pem) + storage = s3.S3Storage(cloudfront_signer=cloudfront_signer) + self.assertIsNotNone(storage.cloudfront_signer) + + with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id): + storage = s3.S3Storage(cloudfront_key=pem) + self.assertIsNotNone(storage.cloudfront_signer) + + # Invalid configs + msg = ( + "Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and " + "AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together." + ) + with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id): + with self.assertRaisesMessage(ImproperlyConfigured, msg): + storage = s3.S3Storage() + + with override_settings(AWS_CLOUDFRONT_KEY=pem): + with self.assertRaisesMessage(ImproperlyConfigured, msg): + storage = s3.S3Storage() + + with self.assertRaisesMessage(ImproperlyConfigured, msg): + storage = s3.S3Storage(cloudfront_key_id=key_id) + + with self.assertRaisesMessage(ImproperlyConfigured, msg): + storage = s3.S3Storage(cloudfront_key=pem) + + def test_auth_config(self): + # Valid configs + with override_settings( + AWS_S3_ACCESS_KEY_ID="foo", AWS_S3_SECRET_ACCESS_KEY="boo" + ): + storage = s3.S3Storage() + self.assertEqual(storage.access_key, "foo") + self.assertEqual(storage.secret_key, "boo") + + with override_settings(AWS_ACCESS_KEY_ID="foo", AWS_SECRET_ACCESS_KEY="boo"): + storage = s3.S3Storage() + self.assertEqual(storage.access_key, "foo") + self.assertEqual(storage.secret_key, "boo") + + storage = s3.S3Storage(access_key="foo", secret_key="boo") + self.assertEqual(storage.access_key, "foo") + self.assertEqual(storage.secret_key, "boo") + + # Invalid configs + msg = ( + "AWS_S3_SESSION_PROFILE/session_profile should not be provided with " + "AWS_S3_ACCESS_KEY_ID/access_key and AWS_S3_SECRET_ACCESS_KEY/secret_key" + ) + with override_settings( + AWS_ACCESS_KEY_ID="foo", + AWS_SECRET_ACCESS_KEY="boo", + AWS_S3_SESSION_PROFILE="moo", + ): + with self.assertRaisesMessage(ImproperlyConfigured, msg): + storage = s3.S3Storage() + + with self.assertRaisesMessage(ImproperlyConfigured, msg): + storage = s3.S3Storage( + access_key="foo", secret_key="boo", session_profile="moo" + ) + class S3StaticStorageTests(TestCase): def setUp(self):