Skip to content

Commit

Permalink
Remove _force_mode: Wrap non gzip files in r mode with TextIOWrapper …
Browse files Browse the repository at this point in the history
…to support universal newlines
  • Loading branch information
skim618 committed Feb 14, 2024
1 parent 795c82f commit 9b1b76f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 49 deletions.
8 changes: 5 additions & 3 deletions storages/backends/s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import mimetypes
import os
import io
import posixpath
import tempfile
import threading
Expand Down Expand Up @@ -123,7 +124,6 @@ def __init__(self, name, mode, storage, buffer_size=None):
self._storage = storage
self.name = name[len(self._storage.location) :].lstrip("/")
self._mode = mode
self._force_mode = (lambda b: b) if "b" in mode else (lambda b: b.decode())
self.obj = storage.bucket.Object(name)
if "w" not in mode:
# Force early RAII-style exception if object does not exist
Expand Down Expand Up @@ -184,6 +184,8 @@ def _get_file(self):
self._file.seek(0)
if self._storage.gzip and self.obj.content_encoding == "gzip":
self._file = self._decompress_file(mode=self._mode, file=self._file)
elif "b" not in self._mode:
self._file = io.TextIOWrapper(self._file._file, encoding="utf-8")
self._closed = False
return self._file

Expand All @@ -195,12 +197,12 @@ def _set_file(self, value):
def read(self, *args, **kwargs):
if "r" not in self._mode:
raise AttributeError("File was not opened in read mode.")
return self._force_mode(super().read(*args, **kwargs))
return super().read(*args, **kwargs)

def readline(self, *args, **kwargs):
if "r" not in self._mode:
raise AttributeError("File was not opened in read mode.")
return self._force_mode(super().readline(*args, **kwargs))
return super().readline(*args, **kwargs)

def readlines(self):
return list(self)
Expand Down
88 changes: 42 additions & 46 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,52 +305,6 @@ def test_storage_open_read_string(self):
self.assertEqual(content_str, "")
file.close()

def test_storage_open_read_with_newlines(self):
"""
Test opening a file in "r" mode with various newline characters
"""
name = "test_storage_open_read_with_newlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext\n")
temp_file.seek(0)
file = self.storage.open(name, "r")
file._file = temp_file
content_str = file.read()
file.close()
self.assertEqual(content_str, "line1\nline2\nmore\ntext\n")

def test_storage_open_readlines(self):
"""
Test readlines with file opened in "r" and "rb" modes
"""
name = "test_storage_open_readlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2")
file = self.storage.open(name, "r")
file._file = temp_file

content_lines = file.readlines()
self.assertEqual(content_lines, ["line1\n", "line2"])

temp_file.seek(0)
file = self.storage.open(name, "rb")
file._file = temp_file
content_lines = file.readlines()
self.assertEqual(content_lines, [b"line1\n", b"line2"])

def test_storage_open_readlines_with_newlines(self):
"""
Test readlines with file opened in "r" mode with various newline characters
"""
name = "test_storage_open_readlines_with_newlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext")
file = self.storage.open(name, "r")
file._file = temp_file

content_lines = file.readlines()
self.assertEqual(content_lines, ['line1\n', 'line2\n', 'more\n', 'text'])

def test_storage_open_write(self):
"""
Test opening a file in write mode
Expand Down Expand Up @@ -1172,6 +1126,48 @@ def test_content_type_not_detectable(self):
s3.S3Storage.default_content_type,
)

def test_storage_open_read_with_newlines(self):
"""
Test opening a file in "r" and "rb" mode with various newline characters
"""
name = "test_storage_open_read_with_newlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext\n")
self.storage.save(name, temp_file)
file = self.storage.open(name, "r")
content_str = file.read()
file.close()
self.assertEqual(content_str, "line1\nline2\nmore\ntext\n")

with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext\n")
self.storage.save(name, temp_file)
file = self.storage.open(name, "rb")
content_str = file.read()
file.close()
self.assertEqual(content_str, b"line1\nline2\r\nmore\rtext\n")

def test_storage_open_readlines_with_newlines(self):
"""
Test readlines with file opened in "r" and "rb" mode with various newline characters
"""
name = "test_storage_open_readlines_with_newlines.txt"
with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext")
self.storage.save(name, temp_file)
file = self.storage.open(name, "r")
content_lines = file.readlines()
file.close()
self.assertEqual(content_lines, ['line1\n', 'line2\n', 'more\n', 'text'])

with io.BytesIO() as temp_file:
temp_file.write(b"line1\nline2\r\nmore\rtext")
self.storage.save(name, temp_file)
file = self.storage.open(name, "rb")
content_lines = file.readlines()
file.close()
self.assertEqual(content_lines, [b'line1\n', b'line2\r\n', b'more\r', b'text'])


class TestBackwardsNames(TestCase):
def test_importing(self):
Expand Down

0 comments on commit 9b1b76f

Please sign in to comment.