Module redvox.api1000.wrapped_redvox_packet.sensors.derived.movement

Contains classes and methods for examining movement events.

Expand source code
"""
Contains classes and methods for examining movement events.
"""

from collections import defaultdict
from dataclasses import dataclass
import datetime
from enum import Enum
from functools import total_ordering
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple

import numpy as np

from redvox.common.constants import NAN
from redvox.common.date_time_utils import datetime_from_epoch_microseconds_utc

if TYPE_CHECKING:
    from redvox.api1000.common.mapping import Mapping
    from redvox.api1000.wrapped_redvox_packet.event_streams import Event, EventStream
    from redvox.api1000.wrapped_redvox_packet.sensors.sensors import Sensors
    from redvox.api1000.wrapped_redvox_packet.sensors.xyz import Xyz
    from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM


class MovementChannel(Enum):
    """
    Enumeration of movement channels.
    """

    GYROSCOPE_X: str = "GYROSCOPE_X"
    GYROSCOPE_Y: str = "GYROSCOPE_Y"
    GYROSCOPE_Z: str = "GYROSCOPE_Z"
    ACCELEROMETER_X: str = "ACCELEROMETER_X"
    ACCELEROMETER_Y: str = "ACCELEROMETER_Y"
    ACCELEROMETER_Z: str = "ACCELEROMETER_Z"


# noinspection DuplicatedCode
@total_ordering
@dataclass
class MovementEvent:
    """
    Represents the metadata associated with a derived movement event.
    """

    movement_channel: MovementChannel
    movement_start: float
    movement_end: float
    movement_duration: float
    magnitude_min: float
    magnitude_max: float
    magnitude_range: float
    magnitude_mean: float
    magnitude_std_dev: float

    def movement_start_dt(self) -> datetime.datetime:
        """
        :return: The movement start as a datetime.
        """
        return datetime_from_epoch_microseconds_utc(self.movement_start)

    def movement_end_dt(self) -> datetime.datetime:
        """
        :return: The movement end as a datetime.
        """
        return datetime_from_epoch_microseconds_utc(self.movement_end)

    def movement_duration_td(self) -> datetime.timedelta:
        """
        :return: The movement duration as a timedelta.
        """
        return self.movement_end_dt() - self.movement_start_dt()

    def time_diff(self, other: "MovementEvent") -> datetime.timedelta:
        """
        Returns the time difference between two events.
        :param other: The other event to compare against.
        :return: The time difference between this and another event.
        """
        events: List["MovementEvent"] = sorted([self, other])
        fst: "MovementEvent" = events[0]
        scd: "MovementEvent" = events[1]

        return scd.movement_start_dt() - fst.movement_start_dt()

    @staticmethod
    def from_event(event: "Event") -> "MovementEvent":
        """
        Converts a generic API M Event into a MovementEvent.
        :param event: The Event to convert.
        :return: A MovementEvent.
        """
        numeric_payload: "Mapping[float]" = event.get_numeric_payload()
        numeric_payload_dict: Dict[str, float] = numeric_payload.get_metadata()
        return MovementEvent(
            MovementChannel[event.get_description()],
            numeric_payload_dict["movement_start"],
            numeric_payload_dict["movement_end"],
            numeric_payload_dict["movement_duration"],
            numeric_payload_dict["magnitude_min"],
            numeric_payload_dict["magnitude_max"],
            numeric_payload_dict["magnitude_range"],
            numeric_payload_dict["magnitude_mean"],
            numeric_payload_dict["magnitude_std_dev"],
        )

    def __eq__(self, other) -> bool:
        return (
            isinstance(other, MovementEvent)
            and self.movement_start == other.movement_start
        )

    def __lt__(self, other: "MovementEvent") -> bool:
        return self.movement_start < other.movement_start


# noinspection DuplicatedCode
@dataclass
class MovementEventStream:
    """
    Represents a derived movement event stream.
    """

    name: str
    movement_events: List[MovementEvent]

    @staticmethod
    def from_event_stream(event_stream: "EventStream") -> "MovementEventStream":
        """
        Converts an API M EventStream into a MovementEventStream.
        :param event_stream: Stream to convert.
        :return: A MovementEventStream.
        """
        movement_events: List[MovementEvent] = list(
            map(MovementEvent.from_event, event_stream.get_events().get_values())
        )

        movement_events.sort(key=lambda event: event.movement_start_dt())
        return MovementEventStream(event_stream.get_name(), movement_events)

    def events_by_channel(
        self, movement_channel: MovementChannel
    ) -> List[MovementEvent]:
        """
        Returns events for a given channel.
        :param movement_channel: Channel to filter events for.
        :return: A list of movement events.
        """
        return list(
            filter(
                lambda event: event.movement_channel == movement_channel,
                self.movement_events,
            )
        )


@dataclass
class _Stats:
    """
    An encapsulation of summary stats used when updating stats for merged Events.
    """

    mag_min: float = NAN
    mag_max: float = NAN
    mag_range: float = NAN
    mag_mean: float = NAN
    mag_std: float = NAN

    @staticmethod
    def from_samples(samples: np.ndarray) -> "_Stats":
        """
        Converts a set of samples into _Stats.
        :param samples: Samples to calculate stats over.
        :return: An instance of _Stats.
        """
        # noinspection PyArgumentList
        mag_min: float = samples.min()
        # noinspection PyArgumentList
        mag_max: float = samples.max()

        return _Stats(
            mag_min, mag_max, mag_max - mag_min, samples.mean(), samples.std()
        )


# noinspection DuplicatedCode
@dataclass
class MovementData:
    """
    Encapsulates movement data from multiple packets from a single station.
    """

    movement_event_stream: MovementEventStream
    accelerometer_timestamps: Optional[np.ndarray]
    accelerometer_x: Optional[np.ndarray]
    accelerometer_y: Optional[np.ndarray]
    accelerometer_z: Optional[np.ndarray]
    gyroscope_timestamps: Optional[np.ndarray]
    gyroscope_x: Optional[np.ndarray]
    gyroscope_y: Optional[np.ndarray]
    gyroscope_z: Optional[np.ndarray]

    @staticmethod
    def from_packets(packets: List["WrappedRedvoxPacketM"]) -> "MovementData":
        """
        Extracts and concatenates movement data.
        :param packets: The packets to extract movement data from.
        :return: An instance of MovementData.
        """
        movement_event_stream: MovementEventStream = MovementEventStream("Movement", [])
        accelerometer_timestamps: np.ndarray = np.array([])
        accelerometer_x: np.ndarray = np.array([])
        accelerometer_y: np.ndarray = np.array([])
        accelerometer_z: np.ndarray = np.array([])
        gyroscope_timestamps: np.ndarray = np.array([])
        gyroscope_x: np.ndarray = np.array([])
        gyroscope_y: np.ndarray = np.array([])
        gyroscope_z: np.ndarray = np.array([])

        for packet in packets:
            if packet.get_event_streams().get_count() == 1:
                event_stream: "EventStream" = packet.get_event_streams().get_values()[0]
                movement_event_stream.movement_events.extend(
                    list(
                        map(
                            MovementEvent.from_event,
                            event_stream.get_events().get_values(),
                        )
                    )
                )

            sensors: "Sensors" = packet.get_sensors()
            accel: Optional["Xyz"] = sensors.get_accelerometer()
            gyro: Optional["Xyz"] = sensors.get_gyroscope()

            if accel is not None:
                accelerometer_timestamps = np.concatenate(
                    (accelerometer_timestamps, accel.get_timestamps().get_timestamps())
                )
                accelerometer_x = np.concatenate(
                    (accelerometer_x, accel.get_x_samples().get_values())
                )
                accelerometer_y = np.concatenate(
                    (accelerometer_y, accel.get_y_samples().get_values())
                )
                accelerometer_z = np.concatenate(
                    (accelerometer_z, accel.get_z_samples().get_values())
                )

            if gyro is not None:
                gyroscope_timestamps = np.concatenate(
                    (gyroscope_timestamps, gyro.get_timestamps().get_timestamps())
                )
                gyroscope_x = np.concatenate(
                    (gyroscope_x, gyro.get_x_samples().get_values())
                )
                gyroscope_y = np.concatenate(
                    (gyroscope_y, gyro.get_y_samples().get_values())
                )
                gyroscope_z = np.concatenate(
                    (gyroscope_z, gyro.get_z_samples().get_values())
                )

        return MovementData(
            movement_event_stream,
            accelerometer_timestamps,
            accelerometer_x,
            accelerometer_y,
            accelerometer_z,
            gyroscope_timestamps,
            gyroscope_x,
            gyroscope_y,
            gyroscope_z,
        )

    def data_for_channel(
        self, channel: MovementChannel
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Returns the timestamps and samples for a given channel.
        :param channel: Channel to return data for.
        :return: A tuple containing the timestamps and samples for the provided channel.
        """
        if channel == MovementChannel.ACCELEROMETER_X:
            return self.accelerometer_timestamps, self.accelerometer_x

        if channel == MovementChannel.ACCELEROMETER_Y:
            return self.accelerometer_timestamps, self.accelerometer_y

        if channel == MovementChannel.ACCELEROMETER_Z:
            return self.accelerometer_timestamps, self.accelerometer_z

        if channel == MovementChannel.GYROSCOPE_X:
            return self.gyroscope_timestamps, self.gyroscope_x

        if channel == MovementChannel.GYROSCOPE_Y:
            return self.gyroscope_timestamps, self.gyroscope_y

        return self.gyroscope_timestamps, self.gyroscope_z

    def __update_stats(
        self, movement_channel: MovementChannel, start_ts: float, end_ts: float
    ) -> _Stats:
        """
        Compute summary statistics for a particular channel within a particular window.
        :param movement_channel: The channel to compute statistics from.
        :param start_ts: The start time as microseconds since the epoch.
        :param end_ts: The end time as microseconds since the epoch.
        :return: An instance of _Stats.
        """
        timestamps: np.ndarray
        samples: np.ndarray
        (timestamps, samples) = self.data_for_channel(movement_channel)
        samples = samples * samples

        start_idx: Optional[int] = None
        end_idx: Optional[int] = None

        # The goal here is to find the first index that matches the start time and the first index that matches the end
        # time in a single O(N) pass. TODO: this could be improved with binary search.
        i: int = 0
        for i, timestamp in enumerate(timestamps):
            if timestamp >= start_ts:
                start_idx = i
                break

        for j in range(i, len(timestamps)):
            timestamp = timestamps[j]
            if timestamp >= end_ts:
                end_idx = j + 1
                break

        if start_idx is None or end_idx is None:
            return _Stats()

        return _Stats.from_samples(samples[start_idx:end_idx])

    def __merge_movement_events(self, max_merge_gap: datetime.timedelta):
        """
        Merges movement events that are "close together".
        :param max_merge_gap: Any consecutive events that are smaller than this timedelta will be merged.
        """
        res: MovementEventStream = MovementEventStream(
            self.movement_event_stream.name, []
        )

        # Group events by channel
        channel_to_events: Dict[MovementChannel, List[MovementEvent]] = defaultdict(
            list
        )
        for event in self.movement_event_stream.movement_events:
            channel_to_events[event.movement_channel].append(event)

        # For each channel, group packets that are "close together"
        for channel, events in channel_to_events.items():
            groups: List[List[MovementEvent]] = []
            group: List[MovementEvent] = [events[0]]

            for i in range(1, len(events)):
                prev: MovementEvent = events[i - 1]
                cur: MovementEvent = events[i]

                if prev.time_diff(cur) < max_merge_gap:
                    group.append(cur)
                else:
                    groups.append(group)
                    group = [cur]

            if len(group) > 0:
                groups.append(group)

            # For each group, compute a new event using the raw data to update statistics
            for group in groups:
                start_ts: float = group[0].movement_start
                end_ts: float = group[-1].movement_end
                stats: _Stats = self.__update_stats(channel, start_ts, end_ts)
                movement_event: MovementEvent = MovementEvent(
                    channel,
                    start_ts,
                    end_ts,
                    end_ts - start_ts,
                    stats.mag_min,
                    stats.mag_max,
                    stats.mag_range,
                    stats.mag_mean,
                    stats.mag_std,
                )
                res.movement_events.append(movement_event)

        # Replace the current MovementEventStream with the updated one
        self.movement_event_stream = res

    def post_process(
        self,
        max_merge_gap: Optional[datetime.timedelta] = None,
        min_detection: Optional[datetime.timedelta] = None,
    ):
        """
        Performs post-processing on the MovementEventStream to optionally merge close together events and filter out
        short-duration events.
        :param max_merge_gap: When provided, any consecutive packets that have gaps less than this value will be merged.
        :param min_detection: When provided, events with a duration less than this value will be filtered out.
        """

        if max_merge_gap is not None:
            self.__merge_movement_events(max_merge_gap)

        if min_detection is not None:
            self.movement_event_stream.movement_events = list(
                filter(
                    lambda event: event.movement_duration_td() >= min_detection,
                    self.movement_event_stream.movement_events,
                )
            )

Classes

class MovementChannel (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumeration of movement channels.

Expand source code
class MovementChannel(Enum):
    """
    Enumeration of movement channels.
    """

    GYROSCOPE_X: str = "GYROSCOPE_X"
    GYROSCOPE_Y: str = "GYROSCOPE_Y"
    GYROSCOPE_Z: str = "GYROSCOPE_Z"
    ACCELEROMETER_X: str = "ACCELEROMETER_X"
    ACCELEROMETER_Y: str = "ACCELEROMETER_Y"
    ACCELEROMETER_Z: str = "ACCELEROMETER_Z"

Ancestors

  • enum.Enum

Class variables

var ACCELEROMETER_X : str
var ACCELEROMETER_Y : str
var ACCELEROMETER_Z : str
var GYROSCOPE_X : str
var GYROSCOPE_Y : str
var GYROSCOPE_Z : str
class MovementData (movement_event_stream: MovementEventStream, accelerometer_timestamps: Optional[numpy.ndarray], accelerometer_x: Optional[numpy.ndarray], accelerometer_y: Optional[numpy.ndarray], accelerometer_z: Optional[numpy.ndarray], gyroscope_timestamps: Optional[numpy.ndarray], gyroscope_x: Optional[numpy.ndarray], gyroscope_y: Optional[numpy.ndarray], gyroscope_z: Optional[numpy.ndarray])

Encapsulates movement data from multiple packets from a single station.

Expand source code
@dataclass
class MovementData:
    """
    Encapsulates movement data from multiple packets from a single station.
    """

    movement_event_stream: MovementEventStream
    accelerometer_timestamps: Optional[np.ndarray]
    accelerometer_x: Optional[np.ndarray]
    accelerometer_y: Optional[np.ndarray]
    accelerometer_z: Optional[np.ndarray]
    gyroscope_timestamps: Optional[np.ndarray]
    gyroscope_x: Optional[np.ndarray]
    gyroscope_y: Optional[np.ndarray]
    gyroscope_z: Optional[np.ndarray]

    @staticmethod
    def from_packets(packets: List["WrappedRedvoxPacketM"]) -> "MovementData":
        """
        Extracts and concatenates movement data.
        :param packets: The packets to extract movement data from.
        :return: An instance of MovementData.
        """
        movement_event_stream: MovementEventStream = MovementEventStream("Movement", [])
        accelerometer_timestamps: np.ndarray = np.array([])
        accelerometer_x: np.ndarray = np.array([])
        accelerometer_y: np.ndarray = np.array([])
        accelerometer_z: np.ndarray = np.array([])
        gyroscope_timestamps: np.ndarray = np.array([])
        gyroscope_x: np.ndarray = np.array([])
        gyroscope_y: np.ndarray = np.array([])
        gyroscope_z: np.ndarray = np.array([])

        for packet in packets:
            if packet.get_event_streams().get_count() == 1:
                event_stream: "EventStream" = packet.get_event_streams().get_values()[0]
                movement_event_stream.movement_events.extend(
                    list(
                        map(
                            MovementEvent.from_event,
                            event_stream.get_events().get_values(),
                        )
                    )
                )

            sensors: "Sensors" = packet.get_sensors()
            accel: Optional["Xyz"] = sensors.get_accelerometer()
            gyro: Optional["Xyz"] = sensors.get_gyroscope()

            if accel is not None:
                accelerometer_timestamps = np.concatenate(
                    (accelerometer_timestamps, accel.get_timestamps().get_timestamps())
                )
                accelerometer_x = np.concatenate(
                    (accelerometer_x, accel.get_x_samples().get_values())
                )
                accelerometer_y = np.concatenate(
                    (accelerometer_y, accel.get_y_samples().get_values())
                )
                accelerometer_z = np.concatenate(
                    (accelerometer_z, accel.get_z_samples().get_values())
                )

            if gyro is not None:
                gyroscope_timestamps = np.concatenate(
                    (gyroscope_timestamps, gyro.get_timestamps().get_timestamps())
                )
                gyroscope_x = np.concatenate(
                    (gyroscope_x, gyro.get_x_samples().get_values())
                )
                gyroscope_y = np.concatenate(
                    (gyroscope_y, gyro.get_y_samples().get_values())
                )
                gyroscope_z = np.concatenate(
                    (gyroscope_z, gyro.get_z_samples().get_values())
                )

        return MovementData(
            movement_event_stream,
            accelerometer_timestamps,
            accelerometer_x,
            accelerometer_y,
            accelerometer_z,
            gyroscope_timestamps,
            gyroscope_x,
            gyroscope_y,
            gyroscope_z,
        )

    def data_for_channel(
        self, channel: MovementChannel
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Returns the timestamps and samples for a given channel.
        :param channel: Channel to return data for.
        :return: A tuple containing the timestamps and samples for the provided channel.
        """
        if channel == MovementChannel.ACCELEROMETER_X:
            return self.accelerometer_timestamps, self.accelerometer_x

        if channel == MovementChannel.ACCELEROMETER_Y:
            return self.accelerometer_timestamps, self.accelerometer_y

        if channel == MovementChannel.ACCELEROMETER_Z:
            return self.accelerometer_timestamps, self.accelerometer_z

        if channel == MovementChannel.GYROSCOPE_X:
            return self.gyroscope_timestamps, self.gyroscope_x

        if channel == MovementChannel.GYROSCOPE_Y:
            return self.gyroscope_timestamps, self.gyroscope_y

        return self.gyroscope_timestamps, self.gyroscope_z

    def __update_stats(
        self, movement_channel: MovementChannel, start_ts: float, end_ts: float
    ) -> _Stats:
        """
        Compute summary statistics for a particular channel within a particular window.
        :param movement_channel: The channel to compute statistics from.
        :param start_ts: The start time as microseconds since the epoch.
        :param end_ts: The end time as microseconds since the epoch.
        :return: An instance of _Stats.
        """
        timestamps: np.ndarray
        samples: np.ndarray
        (timestamps, samples) = self.data_for_channel(movement_channel)
        samples = samples * samples

        start_idx: Optional[int] = None
        end_idx: Optional[int] = None

        # The goal here is to find the first index that matches the start time and the first index that matches the end
        # time in a single O(N) pass. TODO: this could be improved with binary search.
        i: int = 0
        for i, timestamp in enumerate(timestamps):
            if timestamp >= start_ts:
                start_idx = i
                break

        for j in range(i, len(timestamps)):
            timestamp = timestamps[j]
            if timestamp >= end_ts:
                end_idx = j + 1
                break

        if start_idx is None or end_idx is None:
            return _Stats()

        return _Stats.from_samples(samples[start_idx:end_idx])

    def __merge_movement_events(self, max_merge_gap: datetime.timedelta):
        """
        Merges movement events that are "close together".
        :param max_merge_gap: Any consecutive events that are smaller than this timedelta will be merged.
        """
        res: MovementEventStream = MovementEventStream(
            self.movement_event_stream.name, []
        )

        # Group events by channel
        channel_to_events: Dict[MovementChannel, List[MovementEvent]] = defaultdict(
            list
        )
        for event in self.movement_event_stream.movement_events:
            channel_to_events[event.movement_channel].append(event)

        # For each channel, group packets that are "close together"
        for channel, events in channel_to_events.items():
            groups: List[List[MovementEvent]] = []
            group: List[MovementEvent] = [events[0]]

            for i in range(1, len(events)):
                prev: MovementEvent = events[i - 1]
                cur: MovementEvent = events[i]

                if prev.time_diff(cur) < max_merge_gap:
                    group.append(cur)
                else:
                    groups.append(group)
                    group = [cur]

            if len(group) > 0:
                groups.append(group)

            # For each group, compute a new event using the raw data to update statistics
            for group in groups:
                start_ts: float = group[0].movement_start
                end_ts: float = group[-1].movement_end
                stats: _Stats = self.__update_stats(channel, start_ts, end_ts)
                movement_event: MovementEvent = MovementEvent(
                    channel,
                    start_ts,
                    end_ts,
                    end_ts - start_ts,
                    stats.mag_min,
                    stats.mag_max,
                    stats.mag_range,
                    stats.mag_mean,
                    stats.mag_std,
                )
                res.movement_events.append(movement_event)

        # Replace the current MovementEventStream with the updated one
        self.movement_event_stream = res

    def post_process(
        self,
        max_merge_gap: Optional[datetime.timedelta] = None,
        min_detection: Optional[datetime.timedelta] = None,
    ):
        """
        Performs post-processing on the MovementEventStream to optionally merge close together events and filter out
        short-duration events.
        :param max_merge_gap: When provided, any consecutive packets that have gaps less than this value will be merged.
        :param min_detection: When provided, events with a duration less than this value will be filtered out.
        """

        if max_merge_gap is not None:
            self.__merge_movement_events(max_merge_gap)

        if min_detection is not None:
            self.movement_event_stream.movement_events = list(
                filter(
                    lambda event: event.movement_duration_td() >= min_detection,
                    self.movement_event_stream.movement_events,
                )
            )

Class variables

var accelerometer_timestamps : Optional[numpy.ndarray]
var accelerometer_x : Optional[numpy.ndarray]
var accelerometer_y : Optional[numpy.ndarray]
var accelerometer_z : Optional[numpy.ndarray]
var gyroscope_timestamps : Optional[numpy.ndarray]
var gyroscope_x : Optional[numpy.ndarray]
var gyroscope_y : Optional[numpy.ndarray]
var gyroscope_z : Optional[numpy.ndarray]
var movement_event_streamMovementEventStream

Static methods

def from_packets(packets: List[ForwardRef('WrappedRedvoxPacketM')]) ‑> MovementData

Extracts and concatenates movement data. :param packets: The packets to extract movement data from. :return: An instance of MovementData.

Expand source code
@staticmethod
def from_packets(packets: List["WrappedRedvoxPacketM"]) -> "MovementData":
    """
    Extracts and concatenates movement data.
    :param packets: The packets to extract movement data from.
    :return: An instance of MovementData.
    """
    movement_event_stream: MovementEventStream = MovementEventStream("Movement", [])
    accelerometer_timestamps: np.ndarray = np.array([])
    accelerometer_x: np.ndarray = np.array([])
    accelerometer_y: np.ndarray = np.array([])
    accelerometer_z: np.ndarray = np.array([])
    gyroscope_timestamps: np.ndarray = np.array([])
    gyroscope_x: np.ndarray = np.array([])
    gyroscope_y: np.ndarray = np.array([])
    gyroscope_z: np.ndarray = np.array([])

    for packet in packets:
        if packet.get_event_streams().get_count() == 1:
            event_stream: "EventStream" = packet.get_event_streams().get_values()[0]
            movement_event_stream.movement_events.extend(
                list(
                    map(
                        MovementEvent.from_event,
                        event_stream.get_events().get_values(),
                    )
                )
            )

        sensors: "Sensors" = packet.get_sensors()
        accel: Optional["Xyz"] = sensors.get_accelerometer()
        gyro: Optional["Xyz"] = sensors.get_gyroscope()

        if accel is not None:
            accelerometer_timestamps = np.concatenate(
                (accelerometer_timestamps, accel.get_timestamps().get_timestamps())
            )
            accelerometer_x = np.concatenate(
                (accelerometer_x, accel.get_x_samples().get_values())
            )
            accelerometer_y = np.concatenate(
                (accelerometer_y, accel.get_y_samples().get_values())
            )
            accelerometer_z = np.concatenate(
                (accelerometer_z, accel.get_z_samples().get_values())
            )

        if gyro is not None:
            gyroscope_timestamps = np.concatenate(
                (gyroscope_timestamps, gyro.get_timestamps().get_timestamps())
            )
            gyroscope_x = np.concatenate(
                (gyroscope_x, gyro.get_x_samples().get_values())
            )
            gyroscope_y = np.concatenate(
                (gyroscope_y, gyro.get_y_samples().get_values())
            )
            gyroscope_z = np.concatenate(
                (gyroscope_z, gyro.get_z_samples().get_values())
            )

    return MovementData(
        movement_event_stream,
        accelerometer_timestamps,
        accelerometer_x,
        accelerometer_y,
        accelerometer_z,
        gyroscope_timestamps,
        gyroscope_x,
        gyroscope_y,
        gyroscope_z,
    )

Methods

def data_for_channel(self, channel: MovementChannel) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Returns the timestamps and samples for a given channel. :param channel: Channel to return data for. :return: A tuple containing the timestamps and samples for the provided channel.

Expand source code
def data_for_channel(
    self, channel: MovementChannel
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Returns the timestamps and samples for a given channel.
    :param channel: Channel to return data for.
    :return: A tuple containing the timestamps and samples for the provided channel.
    """
    if channel == MovementChannel.ACCELEROMETER_X:
        return self.accelerometer_timestamps, self.accelerometer_x

    if channel == MovementChannel.ACCELEROMETER_Y:
        return self.accelerometer_timestamps, self.accelerometer_y

    if channel == MovementChannel.ACCELEROMETER_Z:
        return self.accelerometer_timestamps, self.accelerometer_z

    if channel == MovementChannel.GYROSCOPE_X:
        return self.gyroscope_timestamps, self.gyroscope_x

    if channel == MovementChannel.GYROSCOPE_Y:
        return self.gyroscope_timestamps, self.gyroscope_y

    return self.gyroscope_timestamps, self.gyroscope_z
def post_process(self, max_merge_gap: Optional[datetime.timedelta] = None, min_detection: Optional[datetime.timedelta] = None)

Performs post-processing on the MovementEventStream to optionally merge close together events and filter out short-duration events. :param max_merge_gap: When provided, any consecutive packets that have gaps less than this value will be merged. :param min_detection: When provided, events with a duration less than this value will be filtered out.

Expand source code
def post_process(
    self,
    max_merge_gap: Optional[datetime.timedelta] = None,
    min_detection: Optional[datetime.timedelta] = None,
):
    """
    Performs post-processing on the MovementEventStream to optionally merge close together events and filter out
    short-duration events.
    :param max_merge_gap: When provided, any consecutive packets that have gaps less than this value will be merged.
    :param min_detection: When provided, events with a duration less than this value will be filtered out.
    """

    if max_merge_gap is not None:
        self.__merge_movement_events(max_merge_gap)

    if min_detection is not None:
        self.movement_event_stream.movement_events = list(
            filter(
                lambda event: event.movement_duration_td() >= min_detection,
                self.movement_event_stream.movement_events,
            )
        )
class MovementEvent (movement_channel: MovementChannel, movement_start: float, movement_end: float, movement_duration: float, magnitude_min: float, magnitude_max: float, magnitude_range: float, magnitude_mean: float, magnitude_std_dev: float)

Represents the metadata associated with a derived movement event.

Expand source code
@total_ordering
@dataclass
class MovementEvent:
    """
    Represents the metadata associated with a derived movement event.
    """

    movement_channel: MovementChannel
    movement_start: float
    movement_end: float
    movement_duration: float
    magnitude_min: float
    magnitude_max: float
    magnitude_range: float
    magnitude_mean: float
    magnitude_std_dev: float

    def movement_start_dt(self) -> datetime.datetime:
        """
        :return: The movement start as a datetime.
        """
        return datetime_from_epoch_microseconds_utc(self.movement_start)

    def movement_end_dt(self) -> datetime.datetime:
        """
        :return: The movement end as a datetime.
        """
        return datetime_from_epoch_microseconds_utc(self.movement_end)

    def movement_duration_td(self) -> datetime.timedelta:
        """
        :return: The movement duration as a timedelta.
        """
        return self.movement_end_dt() - self.movement_start_dt()

    def time_diff(self, other: "MovementEvent") -> datetime.timedelta:
        """
        Returns the time difference between two events.
        :param other: The other event to compare against.
        :return: The time difference between this and another event.
        """
        events: List["MovementEvent"] = sorted([self, other])
        fst: "MovementEvent" = events[0]
        scd: "MovementEvent" = events[1]

        return scd.movement_start_dt() - fst.movement_start_dt()

    @staticmethod
    def from_event(event: "Event") -> "MovementEvent":
        """
        Converts a generic API M Event into a MovementEvent.
        :param event: The Event to convert.
        :return: A MovementEvent.
        """
        numeric_payload: "Mapping[float]" = event.get_numeric_payload()
        numeric_payload_dict: Dict[str, float] = numeric_payload.get_metadata()
        return MovementEvent(
            MovementChannel[event.get_description()],
            numeric_payload_dict["movement_start"],
            numeric_payload_dict["movement_end"],
            numeric_payload_dict["movement_duration"],
            numeric_payload_dict["magnitude_min"],
            numeric_payload_dict["magnitude_max"],
            numeric_payload_dict["magnitude_range"],
            numeric_payload_dict["magnitude_mean"],
            numeric_payload_dict["magnitude_std_dev"],
        )

    def __eq__(self, other) -> bool:
        return (
            isinstance(other, MovementEvent)
            and self.movement_start == other.movement_start
        )

    def __lt__(self, other: "MovementEvent") -> bool:
        return self.movement_start < other.movement_start

Class variables

var magnitude_max : float
var magnitude_mean : float
var magnitude_min : float
var magnitude_range : float
var magnitude_std_dev : float
var movement_channelMovementChannel
var movement_duration : float
var movement_end : float
var movement_start : float

Static methods

def from_event(event: Event) ‑> MovementEvent

Converts a generic API M Event into a MovementEvent. :param event: The Event to convert. :return: A MovementEvent.

Expand source code
@staticmethod
def from_event(event: "Event") -> "MovementEvent":
    """
    Converts a generic API M Event into a MovementEvent.
    :param event: The Event to convert.
    :return: A MovementEvent.
    """
    numeric_payload: "Mapping[float]" = event.get_numeric_payload()
    numeric_payload_dict: Dict[str, float] = numeric_payload.get_metadata()
    return MovementEvent(
        MovementChannel[event.get_description()],
        numeric_payload_dict["movement_start"],
        numeric_payload_dict["movement_end"],
        numeric_payload_dict["movement_duration"],
        numeric_payload_dict["magnitude_min"],
        numeric_payload_dict["magnitude_max"],
        numeric_payload_dict["magnitude_range"],
        numeric_payload_dict["magnitude_mean"],
        numeric_payload_dict["magnitude_std_dev"],
    )

Methods

def movement_duration_td(self) ‑> datetime.timedelta

:return: The movement duration as a timedelta.

Expand source code
def movement_duration_td(self) -> datetime.timedelta:
    """
    :return: The movement duration as a timedelta.
    """
    return self.movement_end_dt() - self.movement_start_dt()
def movement_end_dt(self) ‑> datetime.datetime

:return: The movement end as a datetime.

Expand source code
def movement_end_dt(self) -> datetime.datetime:
    """
    :return: The movement end as a datetime.
    """
    return datetime_from_epoch_microseconds_utc(self.movement_end)
def movement_start_dt(self) ‑> datetime.datetime

:return: The movement start as a datetime.

Expand source code
def movement_start_dt(self) -> datetime.datetime:
    """
    :return: The movement start as a datetime.
    """
    return datetime_from_epoch_microseconds_utc(self.movement_start)
def time_diff(self, other: MovementEvent) ‑> datetime.timedelta

Returns the time difference between two events. :param other: The other event to compare against. :return: The time difference between this and another event.

Expand source code
def time_diff(self, other: "MovementEvent") -> datetime.timedelta:
    """
    Returns the time difference between two events.
    :param other: The other event to compare against.
    :return: The time difference between this and another event.
    """
    events: List["MovementEvent"] = sorted([self, other])
    fst: "MovementEvent" = events[0]
    scd: "MovementEvent" = events[1]

    return scd.movement_start_dt() - fst.movement_start_dt()
class MovementEventStream (name: str, movement_events: List[MovementEvent])

Represents a derived movement event stream.

Expand source code
@dataclass
class MovementEventStream:
    """
    Represents a derived movement event stream.
    """

    name: str
    movement_events: List[MovementEvent]

    @staticmethod
    def from_event_stream(event_stream: "EventStream") -> "MovementEventStream":
        """
        Converts an API M EventStream into a MovementEventStream.
        :param event_stream: Stream to convert.
        :return: A MovementEventStream.
        """
        movement_events: List[MovementEvent] = list(
            map(MovementEvent.from_event, event_stream.get_events().get_values())
        )

        movement_events.sort(key=lambda event: event.movement_start_dt())
        return MovementEventStream(event_stream.get_name(), movement_events)

    def events_by_channel(
        self, movement_channel: MovementChannel
    ) -> List[MovementEvent]:
        """
        Returns events for a given channel.
        :param movement_channel: Channel to filter events for.
        :return: A list of movement events.
        """
        return list(
            filter(
                lambda event: event.movement_channel == movement_channel,
                self.movement_events,
            )
        )

Class variables

var movement_events : List[MovementEvent]
var name : str

Static methods

def from_event_stream(event_stream: EventStream) ‑> MovementEventStream

Converts an API M EventStream into a MovementEventStream. :param event_stream: Stream to convert. :return: A MovementEventStream.

Expand source code
@staticmethod
def from_event_stream(event_stream: "EventStream") -> "MovementEventStream":
    """
    Converts an API M EventStream into a MovementEventStream.
    :param event_stream: Stream to convert.
    :return: A MovementEventStream.
    """
    movement_events: List[MovementEvent] = list(
        map(MovementEvent.from_event, event_stream.get_events().get_values())
    )

    movement_events.sort(key=lambda event: event.movement_start_dt())
    return MovementEventStream(event_stream.get_name(), movement_events)

Methods

def events_by_channel(self, movement_channel: MovementChannel) ‑> List[MovementEvent]

Returns events for a given channel. :param movement_channel: Channel to filter events for. :return: A list of movement events.

Expand source code
def events_by_channel(
    self, movement_channel: MovementChannel
) -> List[MovementEvent]:
    """
    Returns events for a given channel.
    :param movement_channel: Channel to filter events for.
    :return: A list of movement events.
    """
    return list(
        filter(
            lambda event: event.movement_channel == movement_channel,
            self.movement_events,
        )
    )