From ae40824810238b428338e4167013cf5f90cb6cd9 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Fri, 23 Jul 2021 09:19:53 +0200 Subject: [PATCH] Added attributes support for fsapfs back-end #504 --- dfvfs/vfs/apfs_attribute.py | 107 ++++++++++++++++++++++++++++++++ dfvfs/vfs/apfs_file_entry.py | 38 ++++++++++++ tests/vfs/apfs_attribute.py | 69 +++++++++++++++++++++ tests/vfs/apfs_file_entry.py | 115 +++++++++++++++++++++++++---------- 4 files changed, 297 insertions(+), 32 deletions(-) create mode 100644 dfvfs/vfs/apfs_attribute.py create mode 100644 tests/vfs/apfs_attribute.py diff --git a/dfvfs/vfs/apfs_attribute.py b/dfvfs/vfs/apfs_attribute.py new file mode 100644 index 00000000..a71e1c00 --- /dev/null +++ b/dfvfs/vfs/apfs_attribute.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +"""The APFS attribute implementation.""" + +import os + +from dfvfs.lib import errors +from dfvfs.vfs import attribute + + +class APFSExtendedAttribute(attribute.Attribute): + """APFS extended attribute that uses pyfsapfs.""" + + def __init__(self, fsapfs_extended_attribute): + """Initializes an attribute. + + Args: + fsapfs_extended_attribute (pyfsapfs.extended_attribute): APFS extended + attribute. + + Raises: + BackEndError: if the pyfsapfs extended attribute is missing. + """ + if not fsapfs_extended_attribute: + raise errors.BackEndError('Missing pyfsapfs extended attribute.') + + super(APFSExtendedAttribute, self).__init__() + self._fsapfs_extended_attribute = fsapfs_extended_attribute + + @property + def name(self): + """str: name.""" + return self._fsapfs_extended_attribute.name + + # Note: that the following functions do not follow the style guide + # because they are part of the file-like object interface. + # pylint: disable=invalid-name + + def read(self, size=None): + """Reads a byte string from the file input/output (IO) object. + + The function will read a byte string of the specified size or + all of the remaining data if no size was specified. + + Args: + size (Optional[int]): number of bytes to read, where None is all + remaining data. + + Returns: + bytes: data read. + + Raises: + IOError: if the read failed. + OSError: if the read failed. + """ + return self._fsapfs_extended_attribute.read_buffer(size) + + def seek(self, offset, whence=os.SEEK_SET): + """Seeks to an offset within the file input/output (IO) object. + + Args: + offset (int): offset to seek. + whence (Optional[int]): value that indicates whether offset is an + absolute or relative position within the file. + + Raises: + IOError: if the seek failed. + OSError: if the seek failed. + """ + self._fsapfs_extended_attribute.seek_offset(offset, whence) + + # get_offset() is preferred above tell() by the libbfio layer used in libyal. + def get_offset(self): + """Retrieves the current offset into the file input/output (IO) object. + + Returns: + int: current offset into the file input/output (IO) object. + + Raises: + IOError: if the file input/output (IO)-like object has not been opened. + OSError: if the file input/output (IO)-like object has not been opened. + """ + return self._fsapfs_extended_attribute.get_offset() + + # Pythonesque alias for get_offset(). + def tell(self): + """Retrieves the current offset into the file input/output (IO) object.""" + return self.get_offset() + + def get_size(self): + """Retrieves the size of the file input/output (IO) object. + + Returns: + int: size of the file input/output (IO) object. + + Raises: + IOError: if the file input/output (IO) object has not been opened. + OSError: if the file input/output (IO) object has not been opened. + """ + return self._fsapfs_extended_attribute.get_size() + + def seekable(self): + """Determines if a file input/output (IO) object is seekable. + + Returns: + bool: True since a file IO object provides a seek method. + """ + return True diff --git a/dfvfs/vfs/apfs_file_entry.py b/dfvfs/vfs/apfs_file_entry.py index 4df1231f..030dfd04 100644 --- a/dfvfs/vfs/apfs_file_entry.py +++ b/dfvfs/vfs/apfs_file_entry.py @@ -6,6 +6,8 @@ from dfvfs.lib import definitions from dfvfs.lib import errors from dfvfs.path import apfs_path_spec +from dfvfs.vfs import attribute +from dfvfs.vfs import apfs_attribute from dfvfs.vfs import file_entry @@ -90,6 +92,24 @@ def __init__( self.entry_type = self._ENTRY_TYPES.get( fsapfs_file_entry.file_mode & 0xf000, None) + def _GetAttributes(self): + """Retrieves the attributes. + + Returns: + list[Attribute]: attributes. + """ + if self._attributes is None: + stat_attribute = self._GetStatAttribute() + self._attributes = [stat_attribute] + + for fsapfs_extended_attribute in ( + self._fsapfs_file_entry.extended_attributes): + extended_attribute = apfs_attribute.APFSExtendedAttribute( + fsapfs_extended_attribute) + self._attributes.append(extended_attribute) + + return self._attributes + def _GetDirectory(self): """Retrieves a directory. @@ -134,6 +154,24 @@ def _GetStat(self): return stat_object + def _GetStatAttribute(self): + """Retrieves a stat attribute. + + Returns: + StatAttribute: a stat attribute. + """ + stat_attribute = attribute.StatAttribute() + stat_attribute.group_identifier = self._fsapfs_file_entry.group_identifier + stat_attribute.inode_number = self._fsapfs_file_entry.identifier + stat_attribute.mode = self._fsapfs_file_entry.file_mode & 0x0fff + # TODO: implement number of hard links support in pyfsapfs + # stat_attribute.number_of_links = self._fsapfs_file_entry.number_of_links + stat_attribute.owner_identifier = self._fsapfs_file_entry.owner_identifier + stat_attribute.size = self._fsapfs_file_entry.size + stat_attribute.type = self.entry_type + + return stat_attribute + def _GetSubFileEntries(self): """Retrieves a sub file entries generator. diff --git a/tests/vfs/apfs_attribute.py b/tests/vfs/apfs_attribute.py new file mode 100644 index 00000000..c93201e1 --- /dev/null +++ b/tests/vfs/apfs_attribute.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the APFS attribute.""" + +import unittest + +from dfvfs.lib import definitions +from dfvfs.lib import errors +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import context +from dfvfs.vfs import apfs_attribute +from dfvfs.vfs import apfs_file_system + +from tests import test_lib as shared_test_lib + + +class APFSAttributeTest(shared_test_lib.BaseTestCase): + """Tests the APFS attribute.""" + + # pylint: disable=protected-access + + _IDENTIFIER_A_FILE = 17 + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_path = self._GetTestFilePath(['apfs.raw']) + self._SkipIfPathNotExists(test_path) + + test_os_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_OS, location=test_path) + test_raw_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) + self._apfs_container_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_APFS_CONTAINER, location='/apfs1', + parent=test_raw_path_spec) + self._apfs_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_APFS, location='/', + parent=self._apfs_container_path_spec) + + self._file_system = apfs_file_system.APFSFileSystem( + self._resolver_context, self._apfs_path_spec) + self._file_system.Open() + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._resolver_context.Empty() + + def testIntialize(self): + """Tests the __init__ function.""" + test_location = '/a_directory/a_file' + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_APFS, identifier=self._IDENTIFIER_A_FILE, + location=test_location, parent=self._apfs_container_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + + fsapfs_attribute = file_entry._fsapfs_file_entry.get_extended_attribute(0) + self.assertIsNotNone(fsapfs_attribute) + self.assertEqual(fsapfs_attribute.name, 'myxattr') + + test_attribute = apfs_attribute.APFSExtendedAttribute(fsapfs_attribute) + self.assertIsNotNone(test_attribute) + + with self.assertRaises(errors.BackEndError): + apfs_attribute.APFSExtendedAttribute(None) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vfs/apfs_file_entry.py b/tests/vfs/apfs_file_entry.py index 54652e0c..fa41fcff 100644 --- a/tests/vfs/apfs_file_entry.py +++ b/tests/vfs/apfs_file_entry.py @@ -8,6 +8,8 @@ from dfvfs.path import factory as path_spec_factory from dfvfs.resolver import context from dfvfs.resolver import resolver +from dfvfs.vfs import attribute +from dfvfs.vfs import apfs_attribute from dfvfs.vfs import apfs_file_entry from dfvfs.vfs import apfs_file_system @@ -63,6 +65,8 @@ def testEntriesGenerator(self): class APFSFileEntryTest(shared_test_lib.BaseTestCase): """Tests the APFS file entry.""" + # pylint: disable=protected-access + _IDENTIFIER_A_DIRECTORY = 16 _IDENTIFIER_A_FILE = 17 _IDENTIFIER_A_LINK = 22 @@ -189,6 +193,85 @@ def testSize(self): self.assertIsNotNone(file_entry) self.assertEqual(file_entry.size, 22) + def testGetAttributes(self): + """Tests the _GetAttributes function.""" + test_location = '/a_directory/a_file' + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_APFS, identifier=self._IDENTIFIER_A_FILE, + location=test_location, parent=self._apfs_container_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertIsNone(file_entry._attributes) + + file_entry._GetAttributes() + self.assertIsNotNone(file_entry._attributes) + self.assertEqual(len(file_entry._attributes), 2) + + test_attribute = file_entry._attributes[0] + self.assertIsInstance(test_attribute, attribute.StatAttribute) + + test_attribute = file_entry._attributes[1] + self.assertIsInstance(test_attribute, apfs_attribute.APFSExtendedAttribute) + self.assertEqual(test_attribute.name, 'myxattr') + + test_attribute_value_data = test_attribute.read() + self.assertEqual(test_attribute_value_data, b'My extended attribute') + + def testGetStat(self): + """Tests the _GetStat function.""" + test_location = '/a_directory/another_file' + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_APFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, location=test_location, + parent=self._apfs_container_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_object = file_entry._GetStat() + + self.assertIsNotNone(stat_object) + self.assertEqual(stat_object.type, stat_object.TYPE_FILE) + self.assertEqual(stat_object.size, 22) + + self.assertEqual(stat_object.mode, 420) + self.assertEqual(stat_object.uid, 99) + self.assertEqual(stat_object.gid, 99) + + self.assertEqual(stat_object.atime, 1627013324) + self.assertEqual(stat_object.atime_nano, 9960526) + + self.assertEqual(stat_object.ctime, 1627013324) + self.assertEqual(stat_object.ctime_nano, 9982411) + + self.assertEqual(stat_object.crtime, 1627013324) + self.assertEqual(stat_object.crtime_nano, 9982411) + + self.assertEqual(stat_object.mtime, 1627013324) + self.assertEqual(stat_object.mtime_nano, 9960526) + + def testGetStatAttribute(self): + """Tests the _GetStatAttribute function.""" + test_location = '/a_directory/another_file' + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, location=test_location, + parent=self._apfs_container_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_attribute = file_entry._GetStatAttribute() + + self.assertIsNotNone(stat_attribute) + self.assertEqual(stat_attribute.group_identifier, 99) + self.assertEqual(stat_attribute.inode_number, 21) + self.assertEqual(stat_attribute.mode, 0o644) + # TODO: implement number of hard links support in pyfshfs + # self.assertEqual(stat_attribute.number_of_links, 1) + self.assertEqual(stat_attribute.owner_identifier, 99) + self.assertEqual(stat_attribute.size, 22) + self.assertEqual(stat_attribute.type, stat_attribute.TYPE_FILE) + def testGetAPFSFileEntry(self): """Tests the GetAPFSFileEntry function.""" test_location = '/a_directory/another_file' @@ -244,38 +327,6 @@ def testGetParentFileEntry(self): self.assertEqual(parent_file_entry.name, 'a_directory') - def testGetStat(self): - """Tests the GetStat function.""" - test_location = '/a_directory/another_file' - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_APFS, - identifier=self._IDENTIFIER_ANOTHER_FILE, location=test_location, - parent=self._apfs_container_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - stat_object = file_entry.GetStat() - - self.assertIsNotNone(stat_object) - self.assertEqual(stat_object.type, stat_object.TYPE_FILE) - self.assertEqual(stat_object.size, 22) - - self.assertEqual(stat_object.mode, 420) - self.assertEqual(stat_object.uid, 99) - self.assertEqual(stat_object.gid, 99) - - self.assertEqual(stat_object.atime, 1627013324) - self.assertEqual(stat_object.atime_nano, 9960526) - - self.assertEqual(stat_object.ctime, 1627013324) - self.assertEqual(stat_object.ctime_nano, 9982411) - - self.assertEqual(stat_object.crtime, 1627013324) - self.assertEqual(stat_object.crtime_nano, 9982411) - - self.assertEqual(stat_object.mtime, 1627013324) - self.assertEqual(stat_object.mtime_nano, 9960526) - def testIsFunctions(self): """Tests the Is? functions.""" test_location = '/a_directory/another_file'