Module redvox.common.file_statistics

This module provides utility functions for determining statistics of well structured RedVox data.

Expand source code
"""
This module provides utility functions for determining statistics of well structured RedVox data.
"""

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Callable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
import math
import multiprocessing
from multiprocessing.pool import Pool

import numpy as np

from redvox.common.timesync import TimeSync
from redvox.common.parallel_utils import maybe_parallel_map

# noinspection Mypy
if TYPE_CHECKING:
    from redvox.api1000.wrapped_redvox_packet.sensors.audio import Audio
    from redvox.api1000.wrapped_redvox_packet.sensors.location import Location
    from redvox.api1000.wrapped_redvox_packet.sensors.sensors import Sensors
    from redvox.api1000.wrapped_redvox_packet.station_information import (
        StationInformation,
    )
    from redvox.api1000.wrapped_redvox_packet.timing_information import (
        TimingInformation,
    )
    from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
    from redvox.api900.wrapped_redvox_packet import WrappedRedvoxPacket

# noinspection Mypy
from redvox.common.date_time_utils import datetime_from_epoch_microseconds_utc as us2dt

# noinspection Mypy
import redvox.common.io as io

SAMPLE_RATE_HZ: np.ndarray = np.array(
    [80, 800, 8000, 16000]
)  # list of accepted sample rates in Hz
BASE_NUMBER_POINTS: int = (
    4096  # the number of points to sample at the first sample rate
)
NUM_POINTS_FACTOR: int = 2 ** 3  # the multiplier of points per increased sample rate

# total multiplier of base number of points, 1 multiplier per sample rate
POINTS_FACTOR_ARRAY: np.ndarray = np.array(
    [1, NUM_POINTS_FACTOR, NUM_POINTS_FACTOR ** 2, 2 * NUM_POINTS_FACTOR ** 2]
)

# total number of points per sample rate
DURATION_TOTAL_POINTS: np.ndarray = np.array(POINTS_FACTOR_ARRAY * BASE_NUMBER_POINTS)

# expected duration of packets in seconds
DURATION_SECONDS: np.ndarray = np.divide(DURATION_TOTAL_POINTS, SAMPLE_RATE_HZ)


def get_file_stats(sample_rate: Union[float, int]) -> Tuple[int, float]:
    """
    Get the number of samples in a decoder file and its duration in seconds.

    :param sample_rate: int or float, sample rate
    :returns: number of samples in file as int and file time duration in seconds as float
    """
    try:
        position: int = np.where(SAMPLE_RATE_HZ == sample_rate)[0][0]
    except Exception as ex:
        raise ValueError(
            f"Sample rate {sample_rate} for mic data not recognized."
        ) from ex

    return DURATION_TOTAL_POINTS[position], DURATION_SECONDS[position]


def get_num_points_from_sample_rate(sample_rate: Union[float, int]) -> int:
    """
    Returns the number of data points in a packet given a sample rate

    :param sample_rate: A valid sample rate from the constants above
    :return: the number of data points in the packet in seconds
    """
    try:
        position: int = np.where(SAMPLE_RATE_HZ == sample_rate)[0][0]
        return DURATION_TOTAL_POINTS[position]
    except Exception as ex:
        raise ValueError(
            f"Unknown sample rate {sample_rate} given to compute number of data points!"
        ) from ex


def get_duration_seconds_from_sample_rate(sample_rate: Union[float, int]) -> float:
    """
    Returns the duration of a packet in seconds given a sample rate

    :param sample_rate: A valid sample rate from the constants above
    :return: the duration of the packet in seconds
    """
    try:
        position: int = np.where(SAMPLE_RATE_HZ == sample_rate)[0][0]
        return DURATION_SECONDS[position]
    except Exception as ex:
        raise ValueError(
            f"Unknown sample rate {sample_rate} given to compute duration!"
        ) from ex


def _map_opt(
    opt: Optional[Any], apply: Callable[[Optional[Any]], Optional[Any]]
) -> Optional[Any]:
    """
    Maps an optional with the given function. If the optional is None, None is returned.

    :param opt: The optional to map.
    :param apply: The function to apply to the optional if a value is present.
    :return: The mapped value.
    """
    if opt is None:
        return None

    return apply(opt)


def _map_opt_numeric(
    fn: Callable[[Optional[Any]], Optional[Any]],
    opt: Optional[Any],
) -> Optional[Any]:
    """
    Maps an optional with the given function. If the optional is None, None is returned.

    :param fn: The function to apply to the optional if a value is present.
    :param opt: The optional to map.
    :return: The mapped value.
    """
    if opt is None or math.isnan(opt):
        return None

    return fn(opt)


def _partition_list(lst: List[Any], chunks: int) -> List[Any]:
    """
    Partitions a list into k "chunks" of approximately equal length.
    Adapted from: Adopted from: https://stackoverflow.com/questions/2130016/splitting-a-list-into-n-parts-of-approximately-equal-length/37414115#37414115

    :param lst:
    :param chunks:
    :return:
    """
    n: int = len(lst)
    k: int = chunks
    return [
        lst[i * (n // k) + min(i, n % k) : (i + 1) * (n // k) + min(i + 1, n % k)]
        for i in range(k)
    ]


@dataclass
class GpsDateTime:
    """
    Represents timestamp pairings in API 1000 data.
    """

    mach_dt: datetime
    gps_dt: Optional[datetime]


def dur2td(us: float) -> timedelta:
    return timedelta(microseconds=us)


@dataclass
class StationStat:
    """
    A collection of station fields for a given API 900 or API 1000 packet.
    These are used for timing correction and gap detection.
    """

    station_id: str
    station_uuid: str
    app_start_dt: Optional[datetime]
    packet_start_dt: datetime
    server_recv_dt: Optional[datetime]
    gps_dts: Optional[List[GpsDateTime]]
    latency: Optional[float]
    best_latency_timestamp: Optional[float]
    offset: Optional[float]
    sample_rate_hz: Optional[float]
    packet_duration: Optional[timedelta]

    @staticmethod
    def from_native(native) -> "StationStat":
        return StationStat(
            native.station_id,
            native.station_uuid,
            _map_opt_numeric(us2dt, native.app_start_dt),
            _map_opt_numeric(us2dt, native.packet_start_dt),
            _map_opt_numeric(us2dt, native.server_recv_dt),
            None,
            native.latency,
            native.best_latency_timestamp,
            native.offset,
            native.sample_rate_hz,
            _map_opt_numeric(dur2td, native.packet_duration),
        )

    @staticmethod
    def from_api_900(packet: "WrappedRedvoxPacket") -> "StationStat":
        """
        Extracts the required fields from an API 900 packet.

        :param packet: API 900 packet to extract fields from.
        :return: An instance of StationStat.
        """
        mtz: Optional[float] = packet.mach_time_zero()

        best_offset = packet.best_offset()
        best_latency = packet.best_latency()
        if packet.has_time_synchronization_sensor():
            tsd = TimeSync(
                time_sync_exchanges_list=list(
                    packet.time_synchronization_sensor().payload_values()
                )
            )
            if not best_offset or not best_latency:
                best_offset = tsd.best_offset()
                best_latency = tsd.best_latency()
            best_latency_timestamp = tsd.get_best_latency_timestamp()
        else:
            best_latency_timestamp = np.nan

        # noinspection Mypy
        return StationStat(
            packet.redvox_id(),
            packet.uuid(),
            _map_opt(mtz, us2dt),
            us2dt(packet.app_file_start_timestamp_machine()),
            us2dt(packet.server_timestamp_epoch_microseconds_utc()),
            None,
            best_latency,
            best_latency_timestamp,
            best_offset,
            packet.microphone_sensor().sample_rate_hz()
            if packet.has_microphone_sensor()
            else np.nan,
            timedelta(seconds=packet.duration_s())
            if packet.has_microphone_sensor()
            else 0.0,
        )

    # noinspection Mypy
    @staticmethod
    def from_api_1000(packet: "WrappedRedvoxPacketM") -> "StationStat":
        """
        Extracts the required fields from an API 1000 packet.

        :param packet: API 1000 packet to extract fields from.
        :return: An instance of StationStat.
        """
        station_info: "StationInformation" = packet.get_station_information()
        timing_info: "TimingInformation" = packet.get_timing_information()
        sensors: "Sensors" = packet.get_sensors()
        location_sensor: Optional["Location"] = sensors.get_location()
        audio_sensor: Optional["Audio"] = sensors.get_audio()

        # Optionally extract the GPS timestamps if the location sensor is available
        gps_timestamps: Optional[List[GpsDateTime]] = None
        if location_sensor is not None:
            gps_timestamps = []
            _gps_timestamps = location_sensor.get_timestamps_gps().get_timestamps()
            _gps_timestamps_len = len(_gps_timestamps)
            for i, ts in enumerate(location_sensor.get_timestamps().get_timestamps()):
                # A GPS timestamp isn't always present in the location sensor. We can handle that here.
                gps_ts: Optional[datetime] = (
                    us2dt(_gps_timestamps[i])
                    if (i < _gps_timestamps_len and not np.isnan(_gps_timestamps[i]))
                    else None
                )
                gps_timestamps.append(GpsDateTime(us2dt(ts), gps_ts))

        best_offset = timing_info.get_best_offset()
        best_latency = timing_info.get_best_latency()
        if len(timing_info.get_synch_exchange_array()) > 0:
            tsd = TimeSync(
                time_sync_exchanges_list=timing_info.get_synch_exchange_array()
            )
            if not best_offset or not best_latency:
                best_offset = tsd.best_offset()
                best_latency = tsd.best_latency()
            best_latency_timestamp = tsd.get_best_latency_timestamp()
        else:
            best_latency_timestamp = np.nan

        return StationStat(
            station_info.get_id(),
            station_info.get_uuid(),
            us2dt(timing_info.get_app_start_mach_timestamp()),
            us2dt(timing_info.get_packet_start_mach_timestamp()),
            us2dt(timing_info.get_server_acquisition_arrival_timestamp()),
            gps_timestamps,
            best_latency,
            best_latency_timestamp,
            best_offset,
            _map_opt(audio_sensor, lambda sensor: sensor.get_sample_rate()),
            packet.get_packet_duration(),
        )


# noinspection PyTypeChecker,DuplicatedCode
def extract_stats_serial(index: io.Index) -> List[StationStat]:
    """
    Extracts StationStat information from packets stored in the provided index.

    :param index: Index of packets to extract information from.
    :return: A list of StationStat objects.
    """
    # noinspection Mypy
    stats_900: Iterator[StationStat] = map(
        StationStat.from_api_900,
        index.stream(io.ReadFilter(api_versions={io.ApiVersion.API_900})),
    )
    # noinspection Mypy
    stats_1000: Iterator[StationStat] = map(
        StationStat.from_api_1000,
        index.stream(io.ReadFilter(api_versions={io.ApiVersion.API_1000})),
    )
    return list(stats_900) + list(stats_1000)


def extract_stats_parallel(
    index: io.Index, pool: Optional[multiprocessing.pool.Pool] = None
) -> List[StationStat]:
    """
    Extracts StationStat information in parallel from packets stored in the provided index.

    :param index: Index of packets to extract information from.
    :param pool: optional multiprocessing pool.
    :return: A list of StationStat objects.
    """
    # Partition the index entries by number of cores
    num_cores: int = multiprocessing.cpu_count()
    partitioned: List[List[io.IndexEntry]] = _partition_list(index.entries, num_cores)
    indices: List[io.Index] = list(map(lambda entries: io.Index(entries), partitioned))

    # Run da buggahs in parallel
    # _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool
    # nested: List[List[StationStat]] = _pool.map(extract_stats_serial, indices)
    # if pool is None:
    #     _pool.close()
    nested: Iterator[List[StationStat]] = maybe_parallel_map(
        pool,
        extract_stats_serial,
        iter(indices),
        lambda: len(indices) > 128,
        chunk_size=64,
    )
    return [item for sublist in nested for item in sublist]


ExtractStatsFn: Callable[[io.Index, Optional[Pool]], List[StationStat]]

try:
    # noinspection PyUnresolvedReferences
    import redvox_native

    def extract_stats_native(
        index: io.Index, pool: Optional[Pool] = None
    ) -> List[StationStat]:
        # To native index
        native_index = index.to_native()
        # Get native stats
        # noinspection PyUnresolvedReferences
        native_stats = redvox_native.extract_stats(native_index)
        # To py result
        return list(map(StationStat.from_native, native_stats))

    ExtractStatsFn = extract_stats_native
except ImportError:
    ExtractStatsFn = extract_stats_parallel


def extract_stats(
    index: io.Index,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> List[StationStat]:
    """
    Extracts StationStat information from packets stored in the provided index.

    :param index: Index of packets to extract information from.
    :param pool: optional multiprocessing pool.
    :return: A list of StationStat objects.
    """
    return ExtractStatsFn(index, pool)

Functions

def ExtractStatsFn(index: Index, pool: Optional[multiprocessing.pool.Pool] = None) ‑> List[StationStat]
Expand source code
def extract_stats_native(
    index: io.Index, pool: Optional[Pool] = None
) -> List[StationStat]:
    # To native index
    native_index = index.to_native()
    # Get native stats
    # noinspection PyUnresolvedReferences
    native_stats = redvox_native.extract_stats(native_index)
    # To py result
    return list(map(StationStat.from_native, native_stats))
def dur2td(us: float) ‑> datetime.timedelta
Expand source code
def dur2td(us: float) -> timedelta:
    return timedelta(microseconds=us)
def extract_stats(index: Index, pool: Optional[multiprocessing.pool.Pool] = None) ‑> List[StationStat]

Extracts StationStat information from packets stored in the provided index.

:param index: Index of packets to extract information from. :param pool: optional multiprocessing pool. :return: A list of StationStat objects.

Expand source code
def extract_stats(
    index: io.Index,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> List[StationStat]:
    """
    Extracts StationStat information from packets stored in the provided index.

    :param index: Index of packets to extract information from.
    :param pool: optional multiprocessing pool.
    :return: A list of StationStat objects.
    """
    return ExtractStatsFn(index, pool)
def extract_stats_native(index: Index, pool: Optional[multiprocessing.pool.Pool] = None) ‑> List[StationStat]
Expand source code
def extract_stats_native(
    index: io.Index, pool: Optional[Pool] = None
) -> List[StationStat]:
    # To native index
    native_index = index.to_native()
    # Get native stats
    # noinspection PyUnresolvedReferences
    native_stats = redvox_native.extract_stats(native_index)
    # To py result
    return list(map(StationStat.from_native, native_stats))
def extract_stats_parallel(index: Index, pool: Optional[multiprocessing.pool.Pool] = None) ‑> List[StationStat]

Extracts StationStat information in parallel from packets stored in the provided index.

:param index: Index of packets to extract information from. :param pool: optional multiprocessing pool. :return: A list of StationStat objects.

Expand source code
def extract_stats_parallel(
    index: io.Index, pool: Optional[multiprocessing.pool.Pool] = None
) -> List[StationStat]:
    """
    Extracts StationStat information in parallel from packets stored in the provided index.

    :param index: Index of packets to extract information from.
    :param pool: optional multiprocessing pool.
    :return: A list of StationStat objects.
    """
    # Partition the index entries by number of cores
    num_cores: int = multiprocessing.cpu_count()
    partitioned: List[List[io.IndexEntry]] = _partition_list(index.entries, num_cores)
    indices: List[io.Index] = list(map(lambda entries: io.Index(entries), partitioned))

    # Run da buggahs in parallel
    # _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool
    # nested: List[List[StationStat]] = _pool.map(extract_stats_serial, indices)
    # if pool is None:
    #     _pool.close()
    nested: Iterator[List[StationStat]] = maybe_parallel_map(
        pool,
        extract_stats_serial,
        iter(indices),
        lambda: len(indices) > 128,
        chunk_size=64,
    )
    return [item for sublist in nested for item in sublist]
def extract_stats_serial(index: Index) ‑> List[StationStat]

Extracts StationStat information from packets stored in the provided index.

:param index: Index of packets to extract information from. :return: A list of StationStat objects.

Expand source code
def extract_stats_serial(index: io.Index) -> List[StationStat]:
    """
    Extracts StationStat information from packets stored in the provided index.

    :param index: Index of packets to extract information from.
    :return: A list of StationStat objects.
    """
    # noinspection Mypy
    stats_900: Iterator[StationStat] = map(
        StationStat.from_api_900,
        index.stream(io.ReadFilter(api_versions={io.ApiVersion.API_900})),
    )
    # noinspection Mypy
    stats_1000: Iterator[StationStat] = map(
        StationStat.from_api_1000,
        index.stream(io.ReadFilter(api_versions={io.ApiVersion.API_1000})),
    )
    return list(stats_900) + list(stats_1000)
def get_duration_seconds_from_sample_rate(sample_rate: Union[float, int]) ‑> float

Returns the duration of a packet in seconds given a sample rate

:param sample_rate: A valid sample rate from the constants above :return: the duration of the packet in seconds

Expand source code
def get_duration_seconds_from_sample_rate(sample_rate: Union[float, int]) -> float:
    """
    Returns the duration of a packet in seconds given a sample rate

    :param sample_rate: A valid sample rate from the constants above
    :return: the duration of the packet in seconds
    """
    try:
        position: int = np.where(SAMPLE_RATE_HZ == sample_rate)[0][0]
        return DURATION_SECONDS[position]
    except Exception as ex:
        raise ValueError(
            f"Unknown sample rate {sample_rate} given to compute duration!"
        ) from ex
def get_file_stats(sample_rate: Union[float, int]) ‑> Tuple[int, float]

Get the number of samples in a decoder file and its duration in seconds.

:param sample_rate: int or float, sample rate :returns: number of samples in file as int and file time duration in seconds as float

Expand source code
def get_file_stats(sample_rate: Union[float, int]) -> Tuple[int, float]:
    """
    Get the number of samples in a decoder file and its duration in seconds.

    :param sample_rate: int or float, sample rate
    :returns: number of samples in file as int and file time duration in seconds as float
    """
    try:
        position: int = np.where(SAMPLE_RATE_HZ == sample_rate)[0][0]
    except Exception as ex:
        raise ValueError(
            f"Sample rate {sample_rate} for mic data not recognized."
        ) from ex

    return DURATION_TOTAL_POINTS[position], DURATION_SECONDS[position]
def get_num_points_from_sample_rate(sample_rate: Union[float, int]) ‑> int

Returns the number of data points in a packet given a sample rate

:param sample_rate: A valid sample rate from the constants above :return: the number of data points in the packet in seconds

Expand source code
def get_num_points_from_sample_rate(sample_rate: Union[float, int]) -> int:
    """
    Returns the number of data points in a packet given a sample rate

    :param sample_rate: A valid sample rate from the constants above
    :return: the number of data points in the packet in seconds
    """
    try:
        position: int = np.where(SAMPLE_RATE_HZ == sample_rate)[0][0]
        return DURATION_TOTAL_POINTS[position]
    except Exception as ex:
        raise ValueError(
            f"Unknown sample rate {sample_rate} given to compute number of data points!"
        ) from ex

Classes

class GpsDateTime (mach_dt: datetime.datetime, gps_dt: Optional[datetime.datetime])

Represents timestamp pairings in API 1000 data.

Expand source code
@dataclass
class GpsDateTime:
    """
    Represents timestamp pairings in API 1000 data.
    """

    mach_dt: datetime
    gps_dt: Optional[datetime]

Class variables

var gps_dt : Optional[datetime.datetime]
var mach_dt : datetime.datetime
class StationStat (station_id: str, station_uuid: str, app_start_dt: Optional[datetime.datetime], packet_start_dt: datetime.datetime, server_recv_dt: Optional[datetime.datetime], gps_dts: Optional[List[GpsDateTime]], latency: Optional[float], best_latency_timestamp: Optional[float], offset: Optional[float], sample_rate_hz: Optional[float], packet_duration: Optional[datetime.timedelta])

A collection of station fields for a given API 900 or API 1000 packet. These are used for timing correction and gap detection.

Expand source code
@dataclass
class StationStat:
    """
    A collection of station fields for a given API 900 or API 1000 packet.
    These are used for timing correction and gap detection.
    """

    station_id: str
    station_uuid: str
    app_start_dt: Optional[datetime]
    packet_start_dt: datetime
    server_recv_dt: Optional[datetime]
    gps_dts: Optional[List[GpsDateTime]]
    latency: Optional[float]
    best_latency_timestamp: Optional[float]
    offset: Optional[float]
    sample_rate_hz: Optional[float]
    packet_duration: Optional[timedelta]

    @staticmethod
    def from_native(native) -> "StationStat":
        return StationStat(
            native.station_id,
            native.station_uuid,
            _map_opt_numeric(us2dt, native.app_start_dt),
            _map_opt_numeric(us2dt, native.packet_start_dt),
            _map_opt_numeric(us2dt, native.server_recv_dt),
            None,
            native.latency,
            native.best_latency_timestamp,
            native.offset,
            native.sample_rate_hz,
            _map_opt_numeric(dur2td, native.packet_duration),
        )

    @staticmethod
    def from_api_900(packet: "WrappedRedvoxPacket") -> "StationStat":
        """
        Extracts the required fields from an API 900 packet.

        :param packet: API 900 packet to extract fields from.
        :return: An instance of StationStat.
        """
        mtz: Optional[float] = packet.mach_time_zero()

        best_offset = packet.best_offset()
        best_latency = packet.best_latency()
        if packet.has_time_synchronization_sensor():
            tsd = TimeSync(
                time_sync_exchanges_list=list(
                    packet.time_synchronization_sensor().payload_values()
                )
            )
            if not best_offset or not best_latency:
                best_offset = tsd.best_offset()
                best_latency = tsd.best_latency()
            best_latency_timestamp = tsd.get_best_latency_timestamp()
        else:
            best_latency_timestamp = np.nan

        # noinspection Mypy
        return StationStat(
            packet.redvox_id(),
            packet.uuid(),
            _map_opt(mtz, us2dt),
            us2dt(packet.app_file_start_timestamp_machine()),
            us2dt(packet.server_timestamp_epoch_microseconds_utc()),
            None,
            best_latency,
            best_latency_timestamp,
            best_offset,
            packet.microphone_sensor().sample_rate_hz()
            if packet.has_microphone_sensor()
            else np.nan,
            timedelta(seconds=packet.duration_s())
            if packet.has_microphone_sensor()
            else 0.0,
        )

    # noinspection Mypy
    @staticmethod
    def from_api_1000(packet: "WrappedRedvoxPacketM") -> "StationStat":
        """
        Extracts the required fields from an API 1000 packet.

        :param packet: API 1000 packet to extract fields from.
        :return: An instance of StationStat.
        """
        station_info: "StationInformation" = packet.get_station_information()
        timing_info: "TimingInformation" = packet.get_timing_information()
        sensors: "Sensors" = packet.get_sensors()
        location_sensor: Optional["Location"] = sensors.get_location()
        audio_sensor: Optional["Audio"] = sensors.get_audio()

        # Optionally extract the GPS timestamps if the location sensor is available
        gps_timestamps: Optional[List[GpsDateTime]] = None
        if location_sensor is not None:
            gps_timestamps = []
            _gps_timestamps = location_sensor.get_timestamps_gps().get_timestamps()
            _gps_timestamps_len = len(_gps_timestamps)
            for i, ts in enumerate(location_sensor.get_timestamps().get_timestamps()):
                # A GPS timestamp isn't always present in the location sensor. We can handle that here.
                gps_ts: Optional[datetime] = (
                    us2dt(_gps_timestamps[i])
                    if (i < _gps_timestamps_len and not np.isnan(_gps_timestamps[i]))
                    else None
                )
                gps_timestamps.append(GpsDateTime(us2dt(ts), gps_ts))

        best_offset = timing_info.get_best_offset()
        best_latency = timing_info.get_best_latency()
        if len(timing_info.get_synch_exchange_array()) > 0:
            tsd = TimeSync(
                time_sync_exchanges_list=timing_info.get_synch_exchange_array()
            )
            if not best_offset or not best_latency:
                best_offset = tsd.best_offset()
                best_latency = tsd.best_latency()
            best_latency_timestamp = tsd.get_best_latency_timestamp()
        else:
            best_latency_timestamp = np.nan

        return StationStat(
            station_info.get_id(),
            station_info.get_uuid(),
            us2dt(timing_info.get_app_start_mach_timestamp()),
            us2dt(timing_info.get_packet_start_mach_timestamp()),
            us2dt(timing_info.get_server_acquisition_arrival_timestamp()),
            gps_timestamps,
            best_latency,
            best_latency_timestamp,
            best_offset,
            _map_opt(audio_sensor, lambda sensor: sensor.get_sample_rate()),
            packet.get_packet_duration(),
        )

Class variables

var app_start_dt : Optional[datetime.datetime]
var best_latency_timestamp : Optional[float]
var gps_dts : Optional[List[GpsDateTime]]
var latency : Optional[float]
var offset : Optional[float]
var packet_duration : Optional[datetime.timedelta]
var packet_start_dt : datetime.datetime
var sample_rate_hz : Optional[float]
var server_recv_dt : Optional[datetime.datetime]
var station_id : str
var station_uuid : str

Static methods

def from_api_1000(packet: WrappedRedvoxPacketM) ‑> StationStat

Extracts the required fields from an API 1000 packet.

:param packet: API 1000 packet to extract fields from. :return: An instance of StationStat.

Expand source code
@staticmethod
def from_api_1000(packet: "WrappedRedvoxPacketM") -> "StationStat":
    """
    Extracts the required fields from an API 1000 packet.

    :param packet: API 1000 packet to extract fields from.
    :return: An instance of StationStat.
    """
    station_info: "StationInformation" = packet.get_station_information()
    timing_info: "TimingInformation" = packet.get_timing_information()
    sensors: "Sensors" = packet.get_sensors()
    location_sensor: Optional["Location"] = sensors.get_location()
    audio_sensor: Optional["Audio"] = sensors.get_audio()

    # Optionally extract the GPS timestamps if the location sensor is available
    gps_timestamps: Optional[List[GpsDateTime]] = None
    if location_sensor is not None:
        gps_timestamps = []
        _gps_timestamps = location_sensor.get_timestamps_gps().get_timestamps()
        _gps_timestamps_len = len(_gps_timestamps)
        for i, ts in enumerate(location_sensor.get_timestamps().get_timestamps()):
            # A GPS timestamp isn't always present in the location sensor. We can handle that here.
            gps_ts: Optional[datetime] = (
                us2dt(_gps_timestamps[i])
                if (i < _gps_timestamps_len and not np.isnan(_gps_timestamps[i]))
                else None
            )
            gps_timestamps.append(GpsDateTime(us2dt(ts), gps_ts))

    best_offset = timing_info.get_best_offset()
    best_latency = timing_info.get_best_latency()
    if len(timing_info.get_synch_exchange_array()) > 0:
        tsd = TimeSync(
            time_sync_exchanges_list=timing_info.get_synch_exchange_array()
        )
        if not best_offset or not best_latency:
            best_offset = tsd.best_offset()
            best_latency = tsd.best_latency()
        best_latency_timestamp = tsd.get_best_latency_timestamp()
    else:
        best_latency_timestamp = np.nan

    return StationStat(
        station_info.get_id(),
        station_info.get_uuid(),
        us2dt(timing_info.get_app_start_mach_timestamp()),
        us2dt(timing_info.get_packet_start_mach_timestamp()),
        us2dt(timing_info.get_server_acquisition_arrival_timestamp()),
        gps_timestamps,
        best_latency,
        best_latency_timestamp,
        best_offset,
        _map_opt(audio_sensor, lambda sensor: sensor.get_sample_rate()),
        packet.get_packet_duration(),
    )
def from_api_900(packet: WrappedRedvoxPacket) ‑> StationStat

Extracts the required fields from an API 900 packet.

:param packet: API 900 packet to extract fields from. :return: An instance of StationStat.

Expand source code
@staticmethod
def from_api_900(packet: "WrappedRedvoxPacket") -> "StationStat":
    """
    Extracts the required fields from an API 900 packet.

    :param packet: API 900 packet to extract fields from.
    :return: An instance of StationStat.
    """
    mtz: Optional[float] = packet.mach_time_zero()

    best_offset = packet.best_offset()
    best_latency = packet.best_latency()
    if packet.has_time_synchronization_sensor():
        tsd = TimeSync(
            time_sync_exchanges_list=list(
                packet.time_synchronization_sensor().payload_values()
            )
        )
        if not best_offset or not best_latency:
            best_offset = tsd.best_offset()
            best_latency = tsd.best_latency()
        best_latency_timestamp = tsd.get_best_latency_timestamp()
    else:
        best_latency_timestamp = np.nan

    # noinspection Mypy
    return StationStat(
        packet.redvox_id(),
        packet.uuid(),
        _map_opt(mtz, us2dt),
        us2dt(packet.app_file_start_timestamp_machine()),
        us2dt(packet.server_timestamp_epoch_microseconds_utc()),
        None,
        best_latency,
        best_latency_timestamp,
        best_offset,
        packet.microphone_sensor().sample_rate_hz()
        if packet.has_microphone_sensor()
        else np.nan,
        timedelta(seconds=packet.duration_s())
        if packet.has_microphone_sensor()
        else 0.0,
    )
def from_native(native) ‑> StationStat
Expand source code
@staticmethod
def from_native(native) -> "StationStat":
    return StationStat(
        native.station_id,
        native.station_uuid,
        _map_opt_numeric(us2dt, native.app_start_dt),
        _map_opt_numeric(us2dt, native.packet_start_dt),
        _map_opt_numeric(us2dt, native.server_recv_dt),
        None,
        native.latency,
        native.best_latency_timestamp,
        native.offset,
        native.sample_rate_hz,
        _map_opt_numeric(dur2td, native.packet_duration),
    )