Skip to content

Commit

Permalink
Expose information about incomplete segments (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve authored Mar 13, 2024
1 parent 1a118e0 commit 095a2c1
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 13 deletions.
6 changes: 6 additions & 0 deletions docs/apireference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ Reading TDMS Files
.. autoclass:: ChannelDataChunk()
:members:

.. autoclass:: nptdms.tdms.FileStatus()
:members:

.. autoclass:: nptdms.tdms.ChannelSegmentStatus()
:members:

Writing TDMS Files
------------------

Expand Down
4 changes: 2 additions & 2 deletions nptdms/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ def _read_segment_metadata(
file, segment_position, is_index_file)

segment = TdmsSegment(
position, toc_mask, next_segment_pos, data_position)
position, toc_mask, next_segment_pos, data_position, segment_incomplete)

properties = segment.read_segment_objects(
file, self._prev_segment_objects, index_cache, previous_segment, segment_incomplete)
file, self._prev_segment_objects, index_cache, previous_segment)
return segment, properties

def _read_lead_in(self, file, segment_position, is_index_file=False):
Expand Down
48 changes: 48 additions & 0 deletions nptdms/tdms.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,33 @@ def properties(self):

return self._properties

@property
def file_status(self):
""" Return information about the file status
:rtype: FileStatus
"""

incomplete_final_segment = False
channel_statuses = None
if self._reader._segments:
last_segment = self._reader._segments[-1]
incomplete_final_segment = last_segment.segment_incomplete
last_chunk_overrides = last_segment.final_chunk_lengths_override
if last_chunk_overrides is not None:
channel_statuses = dict(
(obj.path, ChannelSegmentStatus(obj.number_values, last_chunk_overrides.get(obj.path, 0)))
for obj in last_segment.ordered_objects
if obj.has_data)
elif incomplete_final_segment:
# Data lengths match expected lengths
channel_statuses = dict(
(obj.path, ChannelSegmentStatus(obj.number_values, obj.number_values))
for obj in last_segment.ordered_objects
if obj.has_data)

return FileStatus(incomplete_final_segment, channel_statuses)

def as_dataframe(self, time_index=False, absolute_time=False, scaled_data=True, arrow_dtypes=False):
"""
Converts the TDMS file to a DataFrame. DataFrame columns are named using the TDMS object paths.
Expand Down Expand Up @@ -955,6 +982,27 @@ def _data(self):
return self._raw_data.data


class FileStatus:
"""
Contains status information about a read TDMS file
"""
def __init__(self, incomplete_final_segment, channel_statuses):
#: Boolean indicating whether the last data segment was not written completely,
#: meaning it may contain less data than expected
self.incomplete_final_segment = incomplete_final_segment
#: Dictionary mapping from channel paths to ChannelSegmentStatus objects
#: when the last segment is incomplete or had an unexpected length
self.channel_statuses = channel_statuses


class ChannelSegmentStatus:
def __init__(self, expected_length, read_length):
#: Number of values expected in the segment
self.expected_length = expected_length
#: Number of values read from the segment
self.read_length = read_length


def _convert_data_chunk(chunk, raw_timestamps):
for channel_chunk in chunk.channel_data.values():
_convert_channel_data_chunk(channel_chunk, raw_timestamps)
Expand Down
23 changes: 12 additions & 11 deletions nptdms/tdms_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class TdmsSegment(object):
'data_position',
'final_chunk_lengths_override',
'object_index',
'segment_incomplete',
]

def __init__(self, position, toc_mask, next_segment_pos, data_position):
def __init__(self, position, toc_mask, next_segment_pos, data_position, segment_incomplete):
self.position = position
self.toc_mask = toc_mask
self.next_segment_pos = next_segment_pos
Expand All @@ -55,23 +56,23 @@ def __init__(self, position, toc_mask, next_segment_pos, data_position):
self.final_chunk_lengths_override = None
self.ordered_objects = None
self.object_index = None
self.segment_incomplete = segment_incomplete

def __repr__(self):
return "<TdmsSegment at position %d>" % self.position

def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment, segment_incomplete):
def read_segment_objects(self, file, previous_segment_objects, index_cache, previous_segment):
"""Read segment metadata section and update object information
:param file: Open TDMS file
:param previous_segment_objects: Dictionary of path to the most
recently read segment object for a TDMS object.
:param index_cache: A SegmentIndexCache instance, or None if segment indexes are not required.
:param previous_segment: Previous segment in the file.
:param segment_incomplete: Whether the next segment offset was not set.
"""

if not self.toc_mask & toc_properties['kTocMetaData']:
self._reuse_previous_segment_metadata(previous_segment, segment_incomplete)
self._reuse_previous_segment_metadata(previous_segment)
return

endianness = '>' if (self.toc_mask & toc_properties['kTocBigEndian']) else '<'
Expand Down Expand Up @@ -134,7 +135,7 @@ def read_segment_objects(self, file, previous_segment_objects, index_cache, prev

if index_cache is not None:
self.object_index = index_cache.get_index(self.ordered_objects)
self._calculate_chunks(segment_incomplete)
self._calculate_chunks()
return properties

def get_segment_object(self, object_path):
Expand Down Expand Up @@ -194,11 +195,11 @@ def _reuse_previous_object(
segment_obj.read_raw_data_index(file, raw_data_index_header, endianness)
self.ordered_objects.append(segment_obj)

def _reuse_previous_segment_metadata(self, previous_segment, segment_incomplete):
def _reuse_previous_segment_metadata(self, previous_segment):
try:
self.ordered_objects = previous_segment.ordered_objects
self.object_index = previous_segment.object_index
self._calculate_chunks(segment_incomplete)
self._calculate_chunks()
except AttributeError:
raise ValueError(
"kTocMetaData is not set for segment but "
Expand Down Expand Up @@ -269,7 +270,7 @@ def read_raw_data_for_channel(self, f, channel_path, chunk_offset=0, num_chunks=
for chunk in self._read_channel_data_chunks(f, data_objects, channel_path, chunk_offset, stop_chunk):
yield chunk

def _calculate_chunks(self, segment_incomplete):
def _calculate_chunks(self):
"""
Work out the number of chunks the data is in, for cases
where the meta data doesn't change at all so there is no
Expand Down Expand Up @@ -299,9 +300,9 @@ def _calculate_chunks(self, segment_incomplete):
total_data_size, data_size)
self.num_chunks = 1 + int(total_data_size // data_size)
self.final_chunk_lengths_override = self._compute_final_chunk_lengths(
data_size, chunk_remainder, segment_incomplete)
data_size, chunk_remainder)

def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_incomplete):
def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder):
"""Compute object data lengths for a final chunk that has less data than expected
"""
if self._have_daqmx_objects():
Expand All @@ -314,7 +315,7 @@ def _compute_final_chunk_lengths(self, chunk_size, chunk_remainder, segment_inco
return obj_chunk_sizes

interleaved_data = self.toc_mask & toc_properties['kTocInterleavedData']
if interleaved_data or not segment_incomplete:
if interleaved_data or not self.segment_incomplete:
for obj in self.ordered_objects:
if not obj.has_data:
continue
Expand Down
16 changes: 16 additions & 0 deletions nptdms/test/test_daqmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,22 @@ def test_incomplete_segment_with_different_length_buffers():
np.testing.assert_array_equal(group["Channel5"][:], [5] * 2)
np.testing.assert_array_equal(group["Channel6"][:], [6] * 2)

file_status = tdms_data.file_status
assert file_status.incomplete_final_segment
assert file_status.channel_statuses is not None
for channel in ["Channel1", "Channel2"]:
channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"]
assert channel_status.expected_length == 4
assert channel_status.read_length == 4
for channel in ["Channel3", "Channel4"]:
channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"]
assert channel_status.expected_length == 2
assert channel_status.read_length == 1
for channel in ["Channel5", "Channel6"]:
channel_status = file_status.channel_statuses[f"/'Group'/'{channel}'"]
assert channel_status.expected_length == 1
assert channel_status.read_length == 0


def test_multiple_raw_data_buffers_with_scalers_split_across_buffers():
""" DAQmx with scalers split across different raw data buffers
Expand Down
49 changes: 49 additions & 0 deletions nptdms/test/test_tdms_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,12 @@ def test_incomplete_segment_with_string_data():
channel = tdms_data["Group"]["StringChannel"]
assert len(channel) == 0

file_status = tdms_data.file_status
assert file_status.incomplete_final_segment
channel_status = file_status.channel_statuses["/'Group'/'StringChannel'"]
assert channel_status.expected_length == 2
assert channel_status.read_length == 0


def test_truncated_interleaved_data():
"""
Expand Down Expand Up @@ -807,6 +813,49 @@ def test_truncated_interleaved_data():
assert len(chan) == 3
assert len(chan_data) == 3

file_status = tdms_file.file_status
assert file_status.incomplete_final_segment
chan1_status = file_status.channel_statuses["/'group'/'channel1'"]
assert chan1_status.expected_length == 4
assert chan1_status.read_length == 3
chan2_status = file_status.channel_statuses["/'group'/'channel2'"]
assert chan2_status.expected_length == 4
assert chan2_status.read_length == 3


def test_incomplete_last_segment_with_all_data_present():
""" Last segment doesn't have length set, but all data can be read
"""
test_file = GeneratedFile()
test_file.add_segment(
("kTocMetaData", "kTocRawData", "kTocNewObjList"),
segment_objects_metadata(
channel_metadata("/'group'/'channel1'", 3, 2),
channel_metadata("/'group'/'channel2'", 3, 2),
),
"01 00 00 00" "02 00 00 00"
"05 00 00 00" "06 00 00 00"
)
test_file.add_segment(
("kTocRawData", ),
"",
"03 00 00 00" "04 00 00 00"
"07 00 00 00" "08 00 00 00",
incomplete=True
)

tdms_data = test_file.load()

compare_arrays(tdms_data['group']['channel1'][:], np.array([1, 2, 3, 4], dtype=np.int32))
compare_arrays(tdms_data['group']['channel2'][:], np.array([5, 6, 7, 8], dtype=np.int32))

file_status = tdms_data.file_status
assert file_status.incomplete_final_segment
for channel in ['channel1', 'channel2']:
chan_status = file_status.channel_statuses[f"/'group'/'{channel}'"]
assert chan_status.expected_length == 2
assert chan_status.read_length == 2


def test_truncated_metadata_in_last_segment():
""" Test the scenario where writing the file was aborted with part of the metadata written
Expand Down

0 comments on commit 095a2c1

Please sign in to comment.