Skip to content

Commit

Permalink
Optionally handle non-monotonic data
Browse files Browse the repository at this point in the history
    * New load_xdf parameter: handle_non_monotonic (default: False)
      - None: disable monotonicity checking
      - False: check and warn only if non-monotonic data are detected
      - True: attempt to sort non-monotonic data

    * Additional tests for monotonicity
  • Loading branch information
jamieforth committed Feb 1, 2025
1 parent d2cc01d commit d689858
Show file tree
Hide file tree
Showing 6 changed files with 672 additions and 5 deletions.
190 changes: 189 additions & 1 deletion src/pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import itertools
import logging
import struct
from collections import OrderedDict, defaultdict
from collections import OrderedDict, defaultdict, namedtuple
from pathlib import Path
from xml.etree.ElementTree import ParseError, fromstring

Expand Down Expand Up @@ -79,6 +79,7 @@ def load_xdf(
select_streams=None,
*,
on_chunk=None,
handle_non_monotonic=False,
synchronize_clocks=True,
handle_clock_resets=True,
dejitter_timestamps=True,
Expand Down Expand Up @@ -122,6 +123,13 @@ def load_xdf(
verbose : Passing True will set logging level to DEBUG, False will set it to
WARNING, and None will use root logger level. (default: None)
handle_non_monotonic : bool | None
Whether to warn only or sort samples that are not in ascending time
order. (default: False)
- bool : 'False' check only, warning when data are non-monotonic
'True' check and sort non-monotonic data
- None : Disable non-monotonicity check
synchronize_clocks : Whether to enable clock synchronization based on
ClockOffset chunks. (default: true)
Expand Down Expand Up @@ -358,6 +366,33 @@ def load_xdf(
else:
stream.time_series = np.zeros((0, stream.nchns))

# Perform initial non-monotonicity checks if requested
if handle_non_monotonic is None:
logger.debug(" skipping non-monotonicity check...")
else:
logger.info(" performing non-monotonicity check...")
mono_status = _check_monotonicity(temp)
# Are all time-values monotonic across all streams?
time_stamps_mono = all(mono_status["time_stamps"].values())
clock_times_mono = all(mono_status["clock_times"].values())
if time_stamps_mono and clock_times_mono:
# All streams are monotonic.
logger.info("All streams are monotonic, continuing...")
elif not handle_non_monotonic:
# Some data are non-monotonic, but we will not attempt to handle it.
if not synchronize_clocks and not clock_times_mono:
msg = (
"Clock offsets are non-monotonic but clocks are not going to "
"be synchronized. Consider loading with 'synchronize_clocks=True'."
)
logger.warning(msg)
else:
msg = (
"Non-monotonic streams detected - "
"consider loading with 'handle_non_monotonic=True'."
)
logger.warning(msg)

# perform (fault-tolerant) clock synchronization if requested
if synchronize_clocks:
logger.info(" performing clock synchronization...")
Expand All @@ -371,6 +406,12 @@ def load_xdf(
winsor_threshold,
)

# perform non-monotonicity handling if requested
if handle_non_monotonic:
logger.info(" sorting non-monotonic data...")
for stream in temp.values():
_sort_stream_data(stream)

# perform jitter removal if requested
if dejitter_timestamps:
logger.info(" performing jitter removal...")
Expand Down Expand Up @@ -547,6 +588,153 @@ def _scan_forward(f):
return False


# Named tuple to represent monotonicity information.
Monotonicity = namedtuple("Monotonicity", ["result", "n", "dec_count", "eq_count"])
# - result: bool - True if all pairwise values are equal or increasing.
# - n: total number of pairwise values
# - dec_count: number of pairwise decreasing values
# - eq_count: number of pairwise equal values


def _monotonic_increasing(a):
"""Test for increasing (non-decreasing) monotonicity.
Parameters
----------
a : array_like
Returns
-------
Monotonicity : namedtuple
"""
# Intervals between successive values.
diffs = np.diff(a)

increasing = True
n = len(diffs)
dec_count = 0
eq_count = 0

# Count non-increasing intervals.
non_inc_count = np.sum(diffs <= 0)

if non_inc_count > 0:
# Count constant intervals.
eq_count = np.sum(diffs == 0).item()
dec_count = (non_inc_count - eq_count).item()

if dec_count > 0:
increasing = False
return Monotonicity(increasing, n, dec_count, eq_count)


def _check_monotonicity(streams):
"""Check monotonicity of all stream time values.
Parameters
----------
streams : dict
Dictionary of stream_id: StreamData.
Returns
-------
mono_status : dict[dict]
Dictionary of attr: [stream_id: bool]
- attr: StreamData attribute name
- stream_id: Stream ID
- bool: stream attr data are monotonic
"""
mono_status = {
"time_stamps": {},
"clock_times": {},
}

max_stream_id = max(streams.values(), key=lambda s: s.stream_id)
id_align = len(str(max_stream_id.stream_id))

for stream in streams.values():
for attr in mono_status.keys():
monotonic, n, dec_count, eq_count = _monotonic_increasing(
getattr(stream, attr)
)

mono_status[attr][stream.stream_id] = monotonic

msg = [
(
f"Stream {stream.stream_id:>{id_align}}: {attr} are"
f"{'' if monotonic else ' NOT'} monotonic"
)
]

if dec_count > 0:
dec_pc = round(dec_count / n * 100, 1)
msg.append(f"dec={dec_count} ({'<0.1' if dec_pc < 0.1 else dec_pc}%)")

if eq_count > 0:
eq_pc = round(eq_count / n * 100, 1)
msg.append(f"eq={eq_count} ({'<0.1' if eq_pc < 0.1 else eq_pc}%)")

msg = ", ".join(msg)

if monotonic:
logger.info(msg)
else:
logger.warning(msg)
return mono_status


def _sort_stream_data(stream):
"""Sort stream data by ground truth timestamps.
Parameters
----------
stream: StreamData (possibly non-monotonic)
Returns
-------
stream : StreamData (monotonic)
With sorted timestamps and timeseries data, if necessary.
Non-monotonic streams are stable sorted in ascending order of ground
truth timestamps. When clock resets have been detected sorting is
applied within each clock segment, but clock segments themselves are
not sorted.
Both timestamp and timeseries arrays are modified, keeping all
samples aligned with their original timestamp.
Monotonic streams/segments are not modified.
Stream may still contain identically timestamped samples, but these
can be handled by dejittering.
"""
if len(stream.time_stamps) <= 1:
return stream
clock_segments = stream.clock_segments
if len(clock_segments) == 0:
# Clocks have not been synchronized.
clock_segments = [(0, len(stream.time_stamps) - 1)] # inclusive
for start_i, end_i in clock_segments:
ts_slice = slice(start_i, end_i + 1)
if not _monotonic_increasing(stream.time_stamps[ts_slice]).result:
logger.info(
f"Sorting stream {stream.stream_id}: clock segment {start_i}-{end_i}."
)
# Determine monotonic timestamp ordering.
ind = np.argsort(stream.time_stamps[ts_slice], kind="stable")
# Reorder timestamps in place.
stream.time_stamps[ts_slice] = stream.time_stamps[ts_slice][ind]
# Reorder timeseries data.
if stream.fmt == "string":
stream.time_series[ts_slice] = np.array(stream.time_series[ts_slice])[
ind
].tolist()
else:
stream.time_series[ts_slice] = stream.time_series[ts_slice][ind]
return stream


def _find_segment_indices(b_breaks):
"""Convert boundary breaks array to segment indices.
Expand Down
Loading

0 comments on commit d689858

Please sign in to comment.