Skip to content

Commit

Permalink
Added attributes support for fsapfs back-end log2timeline#504
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jul 23, 2021
1 parent 5dc1686 commit ae40824
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 32 deletions.
107 changes: 107 additions & 0 deletions dfvfs/vfs/apfs_attribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
"""The APFS attribute implementation."""

import os

from dfvfs.lib import errors
from dfvfs.vfs import attribute


class APFSExtendedAttribute(attribute.Attribute):
"""APFS extended attribute that uses pyfsapfs."""

def __init__(self, fsapfs_extended_attribute):
"""Initializes an attribute.
Args:
fsapfs_extended_attribute (pyfsapfs.extended_attribute): APFS extended
attribute.
Raises:
BackEndError: if the pyfsapfs extended attribute is missing.
"""
if not fsapfs_extended_attribute:
raise errors.BackEndError('Missing pyfsapfs extended attribute.')

super(APFSExtendedAttribute, self).__init__()
self._fsapfs_extended_attribute = fsapfs_extended_attribute

@property
def name(self):
"""str: name."""
return self._fsapfs_extended_attribute.name

# Note: that the following functions do not follow the style guide
# because they are part of the file-like object interface.
# pylint: disable=invalid-name

def read(self, size=None):
"""Reads a byte string from the file input/output (IO) object.
The function will read a byte string of the specified size or
all of the remaining data if no size was specified.
Args:
size (Optional[int]): number of bytes to read, where None is all
remaining data.
Returns:
bytes: data read.
Raises:
IOError: if the read failed.
OSError: if the read failed.
"""
return self._fsapfs_extended_attribute.read_buffer(size)

def seek(self, offset, whence=os.SEEK_SET):
"""Seeks to an offset within the file input/output (IO) object.
Args:
offset (int): offset to seek.
whence (Optional[int]): value that indicates whether offset is an
absolute or relative position within the file.
Raises:
IOError: if the seek failed.
OSError: if the seek failed.
"""
self._fsapfs_extended_attribute.seek_offset(offset, whence)

# get_offset() is preferred above tell() by the libbfio layer used in libyal.
def get_offset(self):
"""Retrieves the current offset into the file input/output (IO) object.
Returns:
int: current offset into the file input/output (IO) object.
Raises:
IOError: if the file input/output (IO)-like object has not been opened.
OSError: if the file input/output (IO)-like object has not been opened.
"""
return self._fsapfs_extended_attribute.get_offset()

# Pythonesque alias for get_offset().
def tell(self):
"""Retrieves the current offset into the file input/output (IO) object."""
return self.get_offset()

def get_size(self):
"""Retrieves the size of the file input/output (IO) object.
Returns:
int: size of the file input/output (IO) object.
Raises:
IOError: if the file input/output (IO) object has not been opened.
OSError: if the file input/output (IO) object has not been opened.
"""
return self._fsapfs_extended_attribute.get_size()

def seekable(self):
"""Determines if a file input/output (IO) object is seekable.
Returns:
bool: True since a file IO object provides a seek method.
"""
return True
38 changes: 38 additions & 0 deletions dfvfs/vfs/apfs_file_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dfvfs.lib import definitions
from dfvfs.lib import errors
from dfvfs.path import apfs_path_spec
from dfvfs.vfs import attribute
from dfvfs.vfs import apfs_attribute
from dfvfs.vfs import file_entry


Expand Down Expand Up @@ -90,6 +92,24 @@ def __init__(
self.entry_type = self._ENTRY_TYPES.get(
fsapfs_file_entry.file_mode & 0xf000, None)

def _GetAttributes(self):
"""Retrieves the attributes.
Returns:
list[Attribute]: attributes.
"""
if self._attributes is None:
stat_attribute = self._GetStatAttribute()
self._attributes = [stat_attribute]

for fsapfs_extended_attribute in (
self._fsapfs_file_entry.extended_attributes):
extended_attribute = apfs_attribute.APFSExtendedAttribute(
fsapfs_extended_attribute)
self._attributes.append(extended_attribute)

return self._attributes

def _GetDirectory(self):
"""Retrieves a directory.
Expand Down Expand Up @@ -134,6 +154,24 @@ def _GetStat(self):

return stat_object

def _GetStatAttribute(self):
"""Retrieves a stat attribute.
Returns:
StatAttribute: a stat attribute.
"""
stat_attribute = attribute.StatAttribute()
stat_attribute.group_identifier = self._fsapfs_file_entry.group_identifier
stat_attribute.inode_number = self._fsapfs_file_entry.identifier
stat_attribute.mode = self._fsapfs_file_entry.file_mode & 0x0fff
# TODO: implement number of hard links support in pyfsapfs
# stat_attribute.number_of_links = self._fsapfs_file_entry.number_of_links
stat_attribute.owner_identifier = self._fsapfs_file_entry.owner_identifier
stat_attribute.size = self._fsapfs_file_entry.size
stat_attribute.type = self.entry_type

return stat_attribute

def _GetSubFileEntries(self):
"""Retrieves a sub file entries generator.
Expand Down
69 changes: 69 additions & 0 deletions tests/vfs/apfs_attribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests for the APFS attribute."""

import unittest

from dfvfs.lib import definitions
from dfvfs.lib import errors
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import context
from dfvfs.vfs import apfs_attribute
from dfvfs.vfs import apfs_file_system

from tests import test_lib as shared_test_lib


class APFSAttributeTest(shared_test_lib.BaseTestCase):
"""Tests the APFS attribute."""

# pylint: disable=protected-access

_IDENTIFIER_A_FILE = 17

def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_path = self._GetTestFilePath(['apfs.raw'])
self._SkipIfPathNotExists(test_path)

test_os_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=test_path)
test_raw_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec)
self._apfs_container_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS_CONTAINER, location='/apfs1',
parent=test_raw_path_spec)
self._apfs_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS, location='/',
parent=self._apfs_container_path_spec)

self._file_system = apfs_file_system.APFSFileSystem(
self._resolver_context, self._apfs_path_spec)
self._file_system.Open()

def tearDown(self):
"""Cleans up the needed objects used throughout the test."""
self._resolver_context.Empty()

def testIntialize(self):
"""Tests the __init__ function."""
test_location = '/a_directory/a_file'
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS, identifier=self._IDENTIFIER_A_FILE,
location=test_location, parent=self._apfs_container_path_spec)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)

fsapfs_attribute = file_entry._fsapfs_file_entry.get_extended_attribute(0)
self.assertIsNotNone(fsapfs_attribute)
self.assertEqual(fsapfs_attribute.name, 'myxattr')

test_attribute = apfs_attribute.APFSExtendedAttribute(fsapfs_attribute)
self.assertIsNotNone(test_attribute)

with self.assertRaises(errors.BackEndError):
apfs_attribute.APFSExtendedAttribute(None)


if __name__ == '__main__':
unittest.main()
115 changes: 83 additions & 32 deletions tests/vfs/apfs_file_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import context
from dfvfs.resolver import resolver
from dfvfs.vfs import attribute
from dfvfs.vfs import apfs_attribute
from dfvfs.vfs import apfs_file_entry
from dfvfs.vfs import apfs_file_system

Expand Down Expand Up @@ -63,6 +65,8 @@ def testEntriesGenerator(self):
class APFSFileEntryTest(shared_test_lib.BaseTestCase):
"""Tests the APFS file entry."""

# pylint: disable=protected-access

_IDENTIFIER_A_DIRECTORY = 16
_IDENTIFIER_A_FILE = 17
_IDENTIFIER_A_LINK = 22
Expand Down Expand Up @@ -189,6 +193,85 @@ def testSize(self):
self.assertIsNotNone(file_entry)
self.assertEqual(file_entry.size, 22)

def testGetAttributes(self):
"""Tests the _GetAttributes function."""
test_location = '/a_directory/a_file'
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS, identifier=self._IDENTIFIER_A_FILE,
location=test_location, parent=self._apfs_container_path_spec)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
self.assertIsNotNone(file_entry)

self.assertIsNone(file_entry._attributes)

file_entry._GetAttributes()
self.assertIsNotNone(file_entry._attributes)
self.assertEqual(len(file_entry._attributes), 2)

test_attribute = file_entry._attributes[0]
self.assertIsInstance(test_attribute, attribute.StatAttribute)

test_attribute = file_entry._attributes[1]
self.assertIsInstance(test_attribute, apfs_attribute.APFSExtendedAttribute)
self.assertEqual(test_attribute.name, 'myxattr')

test_attribute_value_data = test_attribute.read()
self.assertEqual(test_attribute_value_data, b'My extended attribute')

def testGetStat(self):
"""Tests the _GetStat function."""
test_location = '/a_directory/another_file'
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS,
identifier=self._IDENTIFIER_ANOTHER_FILE, location=test_location,
parent=self._apfs_container_path_spec)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
self.assertIsNotNone(file_entry)

stat_object = file_entry._GetStat()

self.assertIsNotNone(stat_object)
self.assertEqual(stat_object.type, stat_object.TYPE_FILE)
self.assertEqual(stat_object.size, 22)

self.assertEqual(stat_object.mode, 420)
self.assertEqual(stat_object.uid, 99)
self.assertEqual(stat_object.gid, 99)

self.assertEqual(stat_object.atime, 1627013324)
self.assertEqual(stat_object.atime_nano, 9960526)

self.assertEqual(stat_object.ctime, 1627013324)
self.assertEqual(stat_object.ctime_nano, 9982411)

self.assertEqual(stat_object.crtime, 1627013324)
self.assertEqual(stat_object.crtime_nano, 9982411)

self.assertEqual(stat_object.mtime, 1627013324)
self.assertEqual(stat_object.mtime_nano, 9960526)

def testGetStatAttribute(self):
"""Tests the _GetStatAttribute function."""
test_location = '/a_directory/another_file'
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_HFS,
identifier=self._IDENTIFIER_ANOTHER_FILE, location=test_location,
parent=self._apfs_container_path_spec)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
self.assertIsNotNone(file_entry)

stat_attribute = file_entry._GetStatAttribute()

self.assertIsNotNone(stat_attribute)
self.assertEqual(stat_attribute.group_identifier, 99)
self.assertEqual(stat_attribute.inode_number, 21)
self.assertEqual(stat_attribute.mode, 0o644)
# TODO: implement number of hard links support in pyfshfs
# self.assertEqual(stat_attribute.number_of_links, 1)
self.assertEqual(stat_attribute.owner_identifier, 99)
self.assertEqual(stat_attribute.size, 22)
self.assertEqual(stat_attribute.type, stat_attribute.TYPE_FILE)

def testGetAPFSFileEntry(self):
"""Tests the GetAPFSFileEntry function."""
test_location = '/a_directory/another_file'
Expand Down Expand Up @@ -244,38 +327,6 @@ def testGetParentFileEntry(self):

self.assertEqual(parent_file_entry.name, 'a_directory')

def testGetStat(self):
"""Tests the GetStat function."""
test_location = '/a_directory/another_file'
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS,
identifier=self._IDENTIFIER_ANOTHER_FILE, location=test_location,
parent=self._apfs_container_path_spec)
file_entry = self._file_system.GetFileEntryByPathSpec(path_spec)
self.assertIsNotNone(file_entry)

stat_object = file_entry.GetStat()

self.assertIsNotNone(stat_object)
self.assertEqual(stat_object.type, stat_object.TYPE_FILE)
self.assertEqual(stat_object.size, 22)

self.assertEqual(stat_object.mode, 420)
self.assertEqual(stat_object.uid, 99)
self.assertEqual(stat_object.gid, 99)

self.assertEqual(stat_object.atime, 1627013324)
self.assertEqual(stat_object.atime_nano, 9960526)

self.assertEqual(stat_object.ctime, 1627013324)
self.assertEqual(stat_object.ctime_nano, 9982411)

self.assertEqual(stat_object.crtime, 1627013324)
self.assertEqual(stat_object.crtime_nano, 9982411)

self.assertEqual(stat_object.mtime, 1627013324)
self.assertEqual(stat_object.mtime_nano, 9960526)

def testIsFunctions(self):
"""Tests the Is? functions."""
test_location = '/a_directory/another_file'
Expand Down

0 comments on commit ae40824

Please sign in to comment.