Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Added support for FTP_TLS (Weak Cryptography Issue) #1320

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions storages/backends/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
# Usage:
#
# Add below to settings.py:
# FTP_STORAGE_LOCATION = '[a]ftp://<user>:<pass>@<host>:<port>/[path]'
# FTP_STORAGE_LOCATION = '[a]ftp[s]://<user>:<pass>@<host>:<port>/[path]'
#
# In models.py you can write:
# from FTPStorage import FTPStorage
# fs = FTPStorage()
# For a TLS configuration, you must use it below:
# fs = FTPStorage(secure=True)
# class FTPTest(models.Model):
# file = models.FileField(upload_to='a/b/c/', storage=fs)

Expand All @@ -37,7 +39,7 @@ class FTPStorageException(Exception):
class FTPStorage(Storage):
"""FTP Storage class for Django pluggable storage system."""

def __init__(self, location=None, base_url=None, encoding=None):
def __init__(self, location=None, base_url=None, encoding=None, secure=False):
fazledyn-or marked this conversation as resolved.
Show resolved Hide resolved
location = location or setting("FTP_STORAGE_LOCATION")
if location is None:
raise ImproperlyConfigured(
Expand All @@ -51,13 +53,14 @@ def __init__(self, location=None, base_url=None, encoding=None):
self._config = self._decode_location(location)
self._base_url = base_url
self._connection = None
self._secure = secure
fazledyn-or marked this conversation as resolved.
Show resolved Hide resolved

def _decode_location(self, location):
"""Return splitted configuration data from location."""
splitted_url = urlparse(location)
config = {}

if splitted_url.scheme not in ("ftp", "aftp"):
if splitted_url.scheme not in ("ftp", "aftp", "ftps"):
fazledyn-or marked this conversation as resolved.
Show resolved Hide resolved
raise ImproperlyConfigured("FTPStorage works only with FTP protocol!")
if splitted_url.hostname == "":
raise ImproperlyConfigured("You must at least provide hostname!")
Expand All @@ -84,11 +87,13 @@ def _start_connection(self):

# Real reconnect
if self._connection is None:
ftp = ftplib.FTP()
ftp = ftplib.FTP_TLS() if self._secure else ftplib.FTP()
fazledyn-or marked this conversation as resolved.
Show resolved Hide resolved
ftp.encoding = self.encoding
try:
ftp.connect(self._config["host"], self._config["port"])
ftp.login(self._config["user"], self._config["passwd"])
if self._secure:
ftp.prot_p()
if self._config["active"]:
ftp.set_pasv(False)
if self._config["path"] != "":
Expand Down
208 changes: 208 additions & 0 deletions tests/test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
URL = "ftp://{user}:{passwd}@{host}:{port}/".format(
user=USER, passwd=PASSWORD, host=HOST, port=PORT
)
URL_TLS = "ftps://{user}:{passwd}@{host}:{port}/".format(
user=USER, passwd=PASSWORD, host=HOST, port=PORT
)

LIST_FIXTURE = """drwxr-xr-x 2 ftp nogroup 4096 Jul 27 09:46 dir
-rw-r--r-- 1 ftp nogroup 1024 Jul 27 09:45 fi
Expand Down Expand Up @@ -239,3 +242,208 @@ def test_close(self, mock_ftp, mock_storage):
file_.is_dirty = True
file_.read()
file_.close()


class FTPTLSTest(TestCase):
def setUp(self):
self.storage = ftp.FTPStorage(location=URL_TLS, secure=True)

def test_init_no_location(self):
jschneier marked this conversation as resolved.
Show resolved Hide resolved
with self.assertRaises(ImproperlyConfigured):
ftp.FTPStorage(secure=True)

@patch("storages.backends.ftp.setting", return_value=URL_TLS)
jschneier marked this conversation as resolved.
Show resolved Hide resolved
def test_init_location_from_setting(self, mock_setting):
storage = ftp.FTPStorage(secure=True)
self.assertTrue(mock_setting.called)
self.assertEqual(storage.location, URL_TLS)

def test_decode_location(self):
config = self.storage._decode_location(URL_TLS)
jschneier marked this conversation as resolved.
Show resolved Hide resolved
wanted_config = {
"passwd": "b@r",
"host": "localhost",
"user": "foo",
"active": False,
"path": "/",
"port": 2121,
}
self.assertEqual(config, wanted_config)

def test_decode_location_error(self):
jschneier marked this conversation as resolved.
Show resolved Hide resolved
with self.assertRaises(ImproperlyConfigured):
self.storage._decode_location("foo")
with self.assertRaises(ImproperlyConfigured):
self.storage._decode_location("http://foo.pt")
# TODO: Cannot not provide a port
# with self.assertRaises(ImproperlyConfigured):
# self.storage._decode_location('ftp://')

@patch("ftplib.FTP_TLS")
def test_start_connection(self, mock_ftp):
jschneier marked this conversation as resolved.
Show resolved Hide resolved
self.storage._start_connection()
self.assertIsNotNone(self.storage._connection)
# Start active
storage = ftp.FTPStorage(location=URL_TLS, secure=True)
storage._start_connection()

@patch("ftplib.FTP_TLS", **{"return_value.pwd.side_effect": IOError()})
def test_start_connection_timeout(self, mock_ftp):
self.storage._start_connection()
self.assertIsNotNone(self.storage._connection)

@patch("ftplib.FTP_TLS", **{"return_value.connect.side_effect": IOError()})
def test_start_connection_error(self, mock_ftp):
with self.assertRaises(ftp.FTPStorageException):
self.storage._start_connection()

@patch("ftplib.FTP_TLS", **{"return_value.quit.return_value": None})
def test_disconnect(self, mock_ftp_quit):
self.storage._start_connection()
self.storage.disconnect()
self.assertIsNone(self.storage._connection)

@patch("ftplib.FTP_TLS", **{"return_value.pwd.return_value": "foo"})
def test_mkremdirs(self, mock_ftp):
self.storage._start_connection()
self.storage._mkremdirs("foo/bar")

@patch("ftplib.FTP_TLS", **{"return_value.pwd.return_value": "foo"})
def test_mkremdirs_n_subdirectories(self, mock_ftp):
self.storage._start_connection()
self.storage._mkremdirs("foo/bar/null")

@patch(
"ftplib.FTP_TLS",
**{
"return_value.pwd.return_value": "foo",
"return_value.storbinary.return_value": None,
},
)
def test_put_file(self, mock_ftp):
self.storage._start_connection()
self.storage._put_file("foo", File(io.BytesIO(b"foo"), "foo"))

@patch(
"ftplib.FTP_TLS",
**{
"return_value.pwd.return_value": "foo",
"return_value.storbinary.side_effect": IOError(),
},
)
def test_put_file_error(self, mock_ftp):
self.storage._start_connection()
with self.assertRaises(ftp.FTPStorageException):
self.storage._put_file("foo", File(io.BytesIO(b"foo"), "foo"))

def test_open(self):
remote_file = self.storage._open("foo")
self.assertIsInstance(remote_file, ftp.FTPStorageFile)

@patch("ftplib.FTP_TLS", **{"return_value.pwd.return_value": "foo"})
def test_read(self, mock_ftp):
self.storage._start_connection()
self.storage._read("foo")

@patch("ftplib.FTP_TLS", **{"return_value.pwd.side_effect": IOError()})
def test_read2(self, mock_ftp):
self.storage._start_connection()
with self.assertRaises(ftp.FTPStorageException):
self.storage._read("foo")

@patch(
"ftplib.FTP_TLS",
**{
"return_value.pwd.return_value": "foo",
"return_value.storbinary.return_value": None,
},
)
def test_save(self, mock_ftp):
self.storage._save("foo", File(io.BytesIO(b"foo"), "foo"))

@patch("ftplib.FTP_TLS", **{"return_value.retrlines": list_retrlines})
def test_listdir(self, mock_retrlines):
dirs, files = self.storage.listdir("/")
self.assertEqual(len(dirs), 1)
self.assertEqual(dirs, ["dir"])
self.assertEqual(len(files), 2)
self.assertEqual(sorted(files), sorted(["fi", "fi2"]))

@patch("ftplib.FTP_TLS", **{"return_value.retrlines.side_effect": IOError()})
def test_listdir_error(self, mock_ftp):
with self.assertRaises(ftp.FTPStorageException):
self.storage.listdir("/")

@patch("ftplib.FTP_TLS", **{"return_value.nlst.return_value": ["foo", "foo2"]})
def test_exists(self, mock_ftp):
self.assertTrue(self.storage.exists("foo"))
self.assertFalse(self.storage.exists("bar"))

@patch("ftplib.FTP_TLS", **{"return_value.nlst.side_effect": IOError()})
def test_exists_error(self, mock_ftp):
with self.assertRaises(ftp.FTPStorageException):
self.storage.exists("foo")

@patch(
"ftplib.FTP_TLS",
**{
"return_value.delete.return_value": None,
"return_value.nlst.return_value": ["foo", "foo2"],
},
)
def test_delete(self, mock_ftp):
self.storage.delete("foo")
self.assertTrue(mock_ftp.return_value.delete.called)

@patch("ftplib.FTP_TLS", **{"return_value.retrlines": list_retrlines})
def test_size(self, mock_ftp):
self.assertEqual(1024, self.storage.size("fi"))
self.assertEqual(2048, self.storage.size("fi2"))
self.assertEqual(0, self.storage.size("bar"))

@patch("ftplib.FTP_TLS", **{"return_value.retrlines.side_effect": IOError()})
def test_size_error(self, mock_ftp):
self.assertEqual(0, self.storage.size("foo"))

def test_url(self):
with self.assertRaises(ValueError):
self.storage._base_url = None
self.storage.url("foo")
self.storage = ftp.FTPStorage(location=URL_TLS, base_url="http://foo.bar/", secure=True)
self.assertEqual("http://foo.bar/foo", self.storage.url("foo"))


class FTPTLSStorageFileTest(TestCase):
def setUp(self):
self.storage = ftp.FTPStorage(location=URL_TLS, secure=True)

@patch("ftplib.FTP_TLS", **{"return_value.retrlines": list_retrlines})
def test_size(self, mock_ftp):
file_ = ftp.FTPStorageFile("fi", self.storage, "wb")
self.assertEqual(file_.size, 1024)

@patch("ftplib.FTP_TLS", **{"return_value.pwd.return_value": "foo"})
@patch("storages.backends.ftp.FTPStorage._read", return_value=io.BytesIO(b"foo"))
def test_readlines(self, mock_ftp, mock_storage):
file_ = ftp.FTPStorageFile("fi", self.storage, "wb")
self.assertEqual([b"foo"], file_.readlines())

@patch("ftplib.FTP_TLS", **{"return_value.pwd.return_value": "foo"})
@patch("storages.backends.ftp.FTPStorage._read", return_value=io.BytesIO(b"foo"))
def test_read(self, mock_ftp, mock_storage):
file_ = ftp.FTPStorageFile("fi", self.storage, "wb")
self.assertEqual(b"foo", file_.read())

def test_write(self):
file_ = ftp.FTPStorageFile("fi", self.storage, "wb")
file_.write(b"foo")
file_.seek(0)
self.assertEqual(file_.file.read(), b"foo")

@patch("ftplib.FTP_TLS", **{"return_value.pwd.return_value": "foo"})
@patch("storages.backends.ftp.FTPStorage._read", return_value=io.BytesIO(b"foo"))
def test_close(self, mock_ftp, mock_storage):
file_ = ftp.FTPStorageFile("fi", self.storage, "wb")
file_.is_dirty = True
file_.read()
file_.close()