Module redvox.common.sensor_reader_utils

This module loads sensor data from Redvox packets

Expand source code
"""
This module loads sensor data from Redvox packets
"""

from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np

# noinspection Mypy
import pyarrow as pa

import redvox.api1000.proto.redvox_api_m_pb2 as api_m
from redvox.common import date_time_utils as dtu
from redvox.common.sensor_data import SensorData, SensorType


# Dataframe column definitions
COMPRESSED_AUDIO_COLUMNS: List[str] = [
    "timestamps",
    "unaltered_timestamps",
    "compressed_audio",
    "audio_codec",
]

IMAGE_COLUMNS: List[str] = [
    "timestamps",
    "unaltered_timestamps",
    "image",
    "image_codec",
]

LOCATION_COLUMNS: List[str] = [
    "timestamps",
    "unaltered_timestamps",
    "gps_timestamps",
    "latitude",
    "longitude",
    "altitude",
    "speed",
    "bearing",
    "horizontal_accuracy",
    "vertical_accuracy",
    "speed_accuracy",
    "bearing_accuracy",
    "location_provider",
]

STATION_HEALTH_COLUMNS: List[str] = [
    "timestamps",
    "unaltered_timestamps",
    "battery_charge_remaining",
    "battery_current_strength",
    "internal_temp_c",
    "network_type",
    "network_strength",
    "power_state",
    "avail_ram",
    "avail_disk",
    "cell_service",
    "cpu_utilization",
    "wifi_wake_lock",
    "screen_state",
    "screen_brightness",
]

# These are used for checking if a field is present or not
__ACCELEROMETER_FIELD_NAME: str = "accelerometer"
__AMBIENT_TEMPERATURE_FIELD_NAME: str = "ambient_temperature"
__AUDIO_FIELD_NAME: str = "audio"
__COMPRESSED_AUDIO_FIELD_NAME: str = "compressed_audio"
__GRAVITY_FIELD_NAME: str = "gravity"
__GYROSCOPE_FIELD_NAME: str = "gyroscope"
__IMAGE_FIELD_NAME: str = "image"
__LIGHT_FIELD_NAME: str = "light"
__LINEAR_ACCELERATION_FIELD_NAME: str = "linear_acceleration"
__LOCATION_FIELD_NAME: str = "location"
__MAGNETOMETER_FIELD_NAME: str = "magnetometer"
__ORIENTATION_FIELD_NAME: str = "orientation"
__PRESSURE_FIELD_NAME: str = "pressure"
__PROXIMITY_FIELD_NAME: str = "proximity"
__RELATIVE_HUMIDITY_FIELD_NAME: str = "relative_humidity"
__ROTATION_VECTOR_FIELD_NAME: str = "rotation_vector"
__VELOCITY_FIELD_NAME: str = "velocity"

__SENSOR_TYPE_TO_FIELD_NAME: Dict[SensorType, str] = {
    SensorType.UNKNOWN_SENSOR: "unknown",
    SensorType.STATION_HEALTH: "unknown",
    SensorType.ACCELEROMETER: __ACCELEROMETER_FIELD_NAME,
    SensorType.AMBIENT_TEMPERATURE: __AMBIENT_TEMPERATURE_FIELD_NAME,
    SensorType.AUDIO: __AUDIO_FIELD_NAME,
    SensorType.COMPRESSED_AUDIO: __COMPRESSED_AUDIO_FIELD_NAME,
    SensorType.GRAVITY: __GRAVITY_FIELD_NAME,
    SensorType.GYROSCOPE: __GYROSCOPE_FIELD_NAME,
    SensorType.IMAGE: __IMAGE_FIELD_NAME,
    SensorType.LIGHT: __LIGHT_FIELD_NAME,
    SensorType.LINEAR_ACCELERATION: __LINEAR_ACCELERATION_FIELD_NAME,
    SensorType.LOCATION: __LOCATION_FIELD_NAME,
    SensorType.BEST_LOCATION: __LOCATION_FIELD_NAME,
    SensorType.MAGNETOMETER: __MAGNETOMETER_FIELD_NAME,
    SensorType.ORIENTATION: __ORIENTATION_FIELD_NAME,
    SensorType.PRESSURE: __PRESSURE_FIELD_NAME,
    SensorType.PROXIMITY: __PROXIMITY_FIELD_NAME,
    SensorType.RELATIVE_HUMIDITY: __RELATIVE_HUMIDITY_FIELD_NAME,
    SensorType.ROTATION_VECTOR: __ROTATION_VECTOR_FIELD_NAME,
    SensorType.INFRARED: __PROXIMITY_FIELD_NAME,
    SensorType.VELOCITY: __VELOCITY_FIELD_NAME,
}

Sensor = Union[
    api_m.RedvoxPacketM.Sensors.Xyz,
    api_m.RedvoxPacketM.Sensors.Single,
    api_m.RedvoxPacketM.Sensors.Audio,
    api_m.RedvoxPacketM.Sensors.Image,
    api_m.RedvoxPacketM.Sensors.Location,
    api_m.RedvoxPacketM.Sensors.CompressedAudio,
]

# Maps a sensor type to a function that can extract that sensor for a particular packet.
__SENSOR_TYPE_TO_SENSOR_FN: Dict[
    SensorType,
    Optional[
        Callable[
            [api_m.RedvoxPacketM],
            Sensor,
        ]
    ],
] = {
    SensorType.UNKNOWN_SENSOR: None,
    SensorType.STATION_HEALTH: None,
    SensorType.ACCELEROMETER: lambda packet: packet.sensors.accelerometer,
    SensorType.AMBIENT_TEMPERATURE: lambda packet: packet.sensors.ambient_temperature,
    SensorType.AUDIO: lambda packet: packet.sensors.audio,
    SensorType.COMPRESSED_AUDIO: lambda packet: packet.sensors.compressed_audio,
    SensorType.GRAVITY: lambda packet: packet.sensors.gravity,
    SensorType.GYROSCOPE: lambda packet: packet.sensors.gyroscope,
    SensorType.IMAGE: lambda packet: packet.sensors.image,
    SensorType.LIGHT: lambda packet: packet.sensors.light,
    SensorType.LINEAR_ACCELERATION: lambda packet: packet.sensors.linear_acceleration,
    SensorType.LOCATION: lambda packet: packet.sensors.location,
    SensorType.BEST_LOCATION: lambda packet: packet.sensors.location,
    SensorType.MAGNETOMETER: lambda packet: packet.sensors.magnetometer,
    SensorType.ORIENTATION: lambda packet: packet.sensors.orientation,
    SensorType.PRESSURE: lambda packet: packet.sensors.pressure,
    SensorType.PROXIMITY: lambda packet: packet.sensors.proximity,
    SensorType.RELATIVE_HUMIDITY: lambda packet: packet.sensors.relative_humidity,
    SensorType.ROTATION_VECTOR: lambda packet: packet.sensors.rotation_vector,
    SensorType.INFRARED: lambda packet: packet.sensors.proximity,
    SensorType.VELOCITY: lambda packet: packet.sensors.velocity,
}


def __has_sensor(data: Union[api_m.RedvoxPacketM, api_m.RedvoxPacketM.Sensors], field_name: str) -> bool:
    """
    :param data: Either a packet or a packet's sensors message.
    :param field_name: The name of the sensor being checked.
    :return: True if the sensor exists, False otherwise.
    """
    if isinstance(data, api_m.RedvoxPacketM):
        # noinspection Mypy,PyTypeChecker
        return data.sensors.HasField(field_name)

    if isinstance(data, api_m.RedvoxPacketM.Sensors):
        # noinspection Mypy,PyTypeChecker
        return data.HasField(field_name)

    return False


def __packet_duration_s(packet: api_m.RedvoxPacketM) -> float:
    """
    :param packet: The packet to calculate the duration for.
    :return: The packet duration in seconds.
    """
    return len(packet.sensors.audio.samples.values) / packet.sensors.audio.sample_rate


def __packet_duration_us(packet: api_m.RedvoxPacketM) -> float:
    """
    :param packet: The packet to calculate the duration for.
    :return: The packet duration in microseconds.
    """
    return __packet_duration_s(packet) * 1_000_000.0


def __stats_for_sensor_per_packet_per_second(
    num_packets: int, packet_dur_s: float, timestamps: np.array
) -> Tuple[float, float, float]:
    """
    Sensor being evaluated must either have 1/packet or 1/second sample rate

    :param num_packets: number of packets to calculate stats for
    :param packet_dur_s: duration of packet in seconds
    :param timestamps: timestamps of the data
    :return: sample rate in hz, sample interval in seconds, and sample interval std deviation
    """
    if len(timestamps) != num_packets:
        sample_rate = 1.0
    else:
        sample_rate = 1 / packet_dur_s
    sample_interval = 1 / sample_rate
    sample_interval_std = (
        dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps)))) if len(timestamps) > 1 else np.nan
    )
    return sample_rate, sample_interval, sample_interval_std


def get_empty_sensor(name: str, sensor_type: SensorType = SensorType.UNKNOWN_SENSOR) -> SensorData:
    """
    :param name: name of the sensor
    :param sensor_type: type of the sensor to create, default SensorType.UNKNOWN_SENSOR
    :return: empty sensor with no data
    """
    return SensorData(name, pa.Table.from_pydict({"timestamps": []}), sensor_type)


def get_sensor_description(sensor: Sensor) -> str:
    """
    :param sensor: the sensor to read the description from
    :return: the sensor's description
    """
    return sensor.sensor_description


def get_sample_statistics(data_df: pa.Table) -> Tuple[float, float, float]:
    """
    calculate the sample rate, interval and interval std dev using the timestamps in the dataframe

    :param data_df: the dataframe containing timestamps to calculate statistics from
    :return: a Tuple containing the sample rate, interval and interval std dev
    """
    sample_interval: float
    sample_interval_std: float
    timestamps: np.array = data_df["timestamps"].to_numpy()
    if timestamps.size > 1:
        sample_interval = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps))))
        sample_interval_std = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps))))
    else:
        sample_interval = np.nan
        sample_interval_std = np.nan
    return 1.0 / sample_interval, sample_interval, sample_interval_std


def read_apim_xyz_sensor(sensor: api_m.RedvoxPacketM.Sensors.Xyz, column_id: str) -> pa.Table:
    """
    read a sensor that has xyz data channels from an api M data packet.
    raises Attribute Error if sensor does not contain xyz channels

    :param sensor: the xyz api M sensor to read
    :param column_id: string, used to name the columns
    :return: dictionary representing the data in the sensor
    """
    timestamps: np.ndarray = np.array(sensor.timestamps.timestamps)
    try:
        columns: List[str] = [
            "timestamps",
            "unaltered_timestamps",
            f"{column_id}_x",
            f"{column_id}_y",
            f"{column_id}_z",
        ]
        return pa.Table.from_pydict(
            dict(
                zip(
                    columns,
                    [
                        timestamps,
                        timestamps,
                        np.array(sensor.x_samples.values),
                        np.array(sensor.y_samples.values),
                        np.array(sensor.z_samples.values),
                    ],
                )
            )
        )
    except AttributeError:
        raise


def read_apim_single_sensor(sensor: api_m.RedvoxPacketM.Sensors.Single, column_id: str) -> pa.Table:
    """
    read a sensor that has a single data channel from an api M data packet.
    raises Attribute Error if sensor does not contain exactly one data channel

    :param sensor: the single channel api M sensor to read
    :param column_id: string, used to name the columns
    :return: pyarrow table representing the data in the sensor
    """
    timestamps: np.ndarray = np.array(sensor.timestamps.timestamps)
    try:
        columns: List[str] = ["timestamps", "unaltered_timestamps", column_id]
        return pa.Table.from_pydict(dict(zip(columns, [timestamps, timestamps, np.array(sensor.samples.values)])))
    except AttributeError:
        raise


def apim_compressed_audio_to_pyarrow(comp_audio: api_m.RedvoxPacketM.Sensors.CompressedAudio) -> pa.Table:
    """
    :param comp_audio: compressed audio sensor to convert
    :return: pyarrow table representation of compressed audio data
    """
    return pa.Table.from_pydict(
        dict(
            zip(
                COMPRESSED_AUDIO_COLUMNS,
                [
                    comp_audio.first_sample_timestamp,
                    comp_audio.first_sample_timestamp,
                    np.array(list(comp_audio.audio_bytes)),
                    comp_audio.audio_codec,
                ],
            )
        )
    )


def apim_image_to_pyarrow(image_sensor: api_m.RedvoxPacketM.Sensors.Image) -> pa.Table:
    """
    :param image_sensor: image sensor to convert
    :return: pyarrow table representation of image data
    """
    timestamps = image_sensor.timestamps.timestamps
    codecs = np.full(len(timestamps), image_sensor.image_codec)
    return pa.Table.from_pydict(dict(zip(IMAGE_COLUMNS, [timestamps, timestamps, image_sensor.samples, codecs])))


def apim_best_location_to_pyarrow(
    best_loc: api_m.RedvoxPacketM.Sensors.Location.BestLocation, packet_start_timestamp: float
) -> pa.Table:
    """
    :param best_loc: best location to convert to pyarrow table
    :param packet_start_timestamp: timestamp of packet's first sample
    :return: pyarrow table representation of the best location data
    """
    return pa.Table.from_pydict(
        dict(
            zip(
                LOCATION_COLUMNS,
                [
                    [packet_start_timestamp],
                    [best_loc.latitude_longitude_timestamp.mach],
                    [best_loc.latitude_longitude_timestamp.gps],
                    [best_loc.latitude],
                    [best_loc.longitude],
                    [best_loc.altitude],
                    [best_loc.speed],
                    [best_loc.bearing],
                    [best_loc.horizontal_accuracy],
                    [best_loc.vertical_accuracy],
                    [best_loc.speed_accuracy],
                    [best_loc.bearing_accuracy],
                    [best_loc.location_provider],
                ],
            )
        )
    )


def apim_location_to_pyarrow(loc: api_m.RedvoxPacketM.Sensors.Location) -> pa.Table:
    """
    :param loc: location sensor to convert
    :return: pyarrow table representation of location data
    """
    timestamps = loc.timestamps.timestamps
    gps_timestamps = loc.timestamps_gps.timestamps
    lat_samples = loc.latitude_samples.values
    lon_samples = loc.longitude_samples.values
    alt_samples = loc.altitude_samples.values
    spd_samples = loc.speed_samples.values
    bear_samples = loc.bearing_samples.values
    hor_acc_samples = loc.horizontal_accuracy_samples.values
    vert_acc_samples = loc.vertical_accuracy_samples.values
    spd_acc_samples = loc.speed_accuracy_samples.values
    bear_acc_samples = loc.bearing_accuracy_samples.values
    loc_prov_samples = loc.location_providers
    data_for_df = [[], [], [], [], [], [], [], [], [], [], [], [], []]
    for i in range(len(timestamps)):
        data_for_df[0].append(timestamps[i])
        data_for_df[1].append(timestamps[i])
        data_for_df[2].append(np.nan if len(gps_timestamps) <= i else gps_timestamps[i])
        data_for_df[3].append(lat_samples[i])
        data_for_df[4].append(lon_samples[i])
        data_for_df[5].append(np.nan if len(alt_samples) <= i else alt_samples[i])
        data_for_df[6].append(np.nan if len(spd_samples) <= i else spd_samples[i])
        data_for_df[7].append(np.nan if len(bear_samples) <= i else bear_samples[i])
        data_for_df[8].append(np.nan if len(hor_acc_samples) <= i else hor_acc_samples[i])
        data_for_df[9].append(np.nan if len(vert_acc_samples) <= i else vert_acc_samples[i])
        data_for_df[10].append(np.nan if len(spd_acc_samples) <= i else spd_acc_samples[i])
        data_for_df[11].append(np.nan if len(bear_acc_samples) <= i else bear_acc_samples[i])
        data_for_df[12].append(
            api_m.RedvoxPacketM.Sensors.Location.LocationProvider.UNKNOWN
            if len(loc_prov_samples) <= i
            else loc_prov_samples[i]
        )
    return pa.Table.from_pydict(dict(zip(LOCATION_COLUMNS, data_for_df)))


def apim_health_to_pyarrow(metrics: api_m.RedvoxPacketM.StationInformation.StationMetrics) -> pa.Table:
    """
    :param metrics: station metrics to convert
    :return: pyarrow table representation of station metrics data
    """
    timestamps = metrics.timestamps.timestamps
    bat_samples = metrics.battery.values
    bat_cur_samples = metrics.battery_current.values
    temp_samples = metrics.temperature.values
    net_samples = metrics.network_type
    net_str_samples = metrics.network_strength.values
    pow_samples = metrics.power_state
    avail_ram_samples = metrics.available_ram.values
    avail_disk_samples = metrics.available_disk.values
    cell_samples = metrics.cell_service_state
    cpu_util_samples = metrics.cpu_utilization.values
    wake_lock_samples = metrics.wifi_wake_lock
    screen_state_samples = metrics.screen_state
    screen_bright_samples = metrics.screen_brightness.values
    data_for_df = [], [], [], [], [], [], [], [], [], [], [], [], [], [], []
    for i in range(len(timestamps)):
        data_for_df[0].append(timestamps[i])
        data_for_df[1].append(timestamps[i])
        data_for_df[2].append(np.nan if len(bat_samples) < i + 1 else bat_samples[i])
        data_for_df[3].append(np.nan if len(bat_cur_samples) < i + 1 else bat_cur_samples[i])
        data_for_df[4].append(np.nan if len(temp_samples) < i + 1 else temp_samples[i])
        data_for_df[5].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.NetworkType.UNKNOWN_NETWORK
            if len(net_samples) < i + 1
            else net_samples[i]
        )
        data_for_df[6].append(np.nan if len(net_str_samples) < i + 1 else net_str_samples[i])
        data_for_df[7].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.PowerState.UNKNOWN_POWER_STATE
            if len(pow_samples) < i + 1
            else pow_samples[i]
        )
        data_for_df[8].append(np.nan if len(avail_ram_samples) < i + 1 else avail_ram_samples[i])
        data_for_df[9].append(np.nan if len(avail_disk_samples) < i + 1 else avail_disk_samples[i])
        data_for_df[10].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.CellServiceState.UNKNOWN
            if len(cell_samples) < i + 1
            else cell_samples[i]
        )
        data_for_df[11].append(np.nan if len(cpu_util_samples) < i + 1 else cpu_util_samples[i])
        data_for_df[12].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.WifiWakeLock.NONE
            if len(wake_lock_samples) < i + 1
            else wake_lock_samples[i]
        )
        data_for_df[13].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.ScreenState.UNKNOWN_SCREEN_STATE
            if len(screen_state_samples) < i + 1
            else screen_state_samples[i]
        )
        data_for_df[14].append(np.nan if len(screen_bright_samples) < i + 1 else screen_bright_samples[i])
    return pa.Table.from_pydict(dict(zip(STATION_HEALTH_COLUMNS, data_for_df)))

Functions

def apim_best_location_to_pyarrow(best_loc: src.redvox_api_m.redvox_api_m_pb2.BestLocation, packet_start_timestamp: float) ‑> pyarrow.lib.Table

:param best_loc: best location to convert to pyarrow table :param packet_start_timestamp: timestamp of packet's first sample :return: pyarrow table representation of the best location data

Expand source code
def apim_best_location_to_pyarrow(
    best_loc: api_m.RedvoxPacketM.Sensors.Location.BestLocation, packet_start_timestamp: float
) -> pa.Table:
    """
    :param best_loc: best location to convert to pyarrow table
    :param packet_start_timestamp: timestamp of packet's first sample
    :return: pyarrow table representation of the best location data
    """
    return pa.Table.from_pydict(
        dict(
            zip(
                LOCATION_COLUMNS,
                [
                    [packet_start_timestamp],
                    [best_loc.latitude_longitude_timestamp.mach],
                    [best_loc.latitude_longitude_timestamp.gps],
                    [best_loc.latitude],
                    [best_loc.longitude],
                    [best_loc.altitude],
                    [best_loc.speed],
                    [best_loc.bearing],
                    [best_loc.horizontal_accuracy],
                    [best_loc.vertical_accuracy],
                    [best_loc.speed_accuracy],
                    [best_loc.bearing_accuracy],
                    [best_loc.location_provider],
                ],
            )
        )
    )
def apim_compressed_audio_to_pyarrow(comp_audio: src.redvox_api_m.redvox_api_m_pb2.CompressedAudio) ‑> pyarrow.lib.Table

:param comp_audio: compressed audio sensor to convert :return: pyarrow table representation of compressed audio data

Expand source code
def apim_compressed_audio_to_pyarrow(comp_audio: api_m.RedvoxPacketM.Sensors.CompressedAudio) -> pa.Table:
    """
    :param comp_audio: compressed audio sensor to convert
    :return: pyarrow table representation of compressed audio data
    """
    return pa.Table.from_pydict(
        dict(
            zip(
                COMPRESSED_AUDIO_COLUMNS,
                [
                    comp_audio.first_sample_timestamp,
                    comp_audio.first_sample_timestamp,
                    np.array(list(comp_audio.audio_bytes)),
                    comp_audio.audio_codec,
                ],
            )
        )
    )
def apim_health_to_pyarrow(metrics: src.redvox_api_m.redvox_api_m_pb2.StationMetrics) ‑> pyarrow.lib.Table

:param metrics: station metrics to convert :return: pyarrow table representation of station metrics data

Expand source code
def apim_health_to_pyarrow(metrics: api_m.RedvoxPacketM.StationInformation.StationMetrics) -> pa.Table:
    """
    :param metrics: station metrics to convert
    :return: pyarrow table representation of station metrics data
    """
    timestamps = metrics.timestamps.timestamps
    bat_samples = metrics.battery.values
    bat_cur_samples = metrics.battery_current.values
    temp_samples = metrics.temperature.values
    net_samples = metrics.network_type
    net_str_samples = metrics.network_strength.values
    pow_samples = metrics.power_state
    avail_ram_samples = metrics.available_ram.values
    avail_disk_samples = metrics.available_disk.values
    cell_samples = metrics.cell_service_state
    cpu_util_samples = metrics.cpu_utilization.values
    wake_lock_samples = metrics.wifi_wake_lock
    screen_state_samples = metrics.screen_state
    screen_bright_samples = metrics.screen_brightness.values
    data_for_df = [], [], [], [], [], [], [], [], [], [], [], [], [], [], []
    for i in range(len(timestamps)):
        data_for_df[0].append(timestamps[i])
        data_for_df[1].append(timestamps[i])
        data_for_df[2].append(np.nan if len(bat_samples) < i + 1 else bat_samples[i])
        data_for_df[3].append(np.nan if len(bat_cur_samples) < i + 1 else bat_cur_samples[i])
        data_for_df[4].append(np.nan if len(temp_samples) < i + 1 else temp_samples[i])
        data_for_df[5].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.NetworkType.UNKNOWN_NETWORK
            if len(net_samples) < i + 1
            else net_samples[i]
        )
        data_for_df[6].append(np.nan if len(net_str_samples) < i + 1 else net_str_samples[i])
        data_for_df[7].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.PowerState.UNKNOWN_POWER_STATE
            if len(pow_samples) < i + 1
            else pow_samples[i]
        )
        data_for_df[8].append(np.nan if len(avail_ram_samples) < i + 1 else avail_ram_samples[i])
        data_for_df[9].append(np.nan if len(avail_disk_samples) < i + 1 else avail_disk_samples[i])
        data_for_df[10].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.CellServiceState.UNKNOWN
            if len(cell_samples) < i + 1
            else cell_samples[i]
        )
        data_for_df[11].append(np.nan if len(cpu_util_samples) < i + 1 else cpu_util_samples[i])
        data_for_df[12].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.WifiWakeLock.NONE
            if len(wake_lock_samples) < i + 1
            else wake_lock_samples[i]
        )
        data_for_df[13].append(
            api_m.RedvoxPacketM.StationInformation.StationMetrics.ScreenState.UNKNOWN_SCREEN_STATE
            if len(screen_state_samples) < i + 1
            else screen_state_samples[i]
        )
        data_for_df[14].append(np.nan if len(screen_bright_samples) < i + 1 else screen_bright_samples[i])
    return pa.Table.from_pydict(dict(zip(STATION_HEALTH_COLUMNS, data_for_df)))
def apim_image_to_pyarrow(image_sensor: src.redvox_api_m.redvox_api_m_pb2.Image) ‑> pyarrow.lib.Table

:param image_sensor: image sensor to convert :return: pyarrow table representation of image data

Expand source code
def apim_image_to_pyarrow(image_sensor: api_m.RedvoxPacketM.Sensors.Image) -> pa.Table:
    """
    :param image_sensor: image sensor to convert
    :return: pyarrow table representation of image data
    """
    timestamps = image_sensor.timestamps.timestamps
    codecs = np.full(len(timestamps), image_sensor.image_codec)
    return pa.Table.from_pydict(dict(zip(IMAGE_COLUMNS, [timestamps, timestamps, image_sensor.samples, codecs])))
def apim_location_to_pyarrow(loc: src.redvox_api_m.redvox_api_m_pb2.Location) ‑> pyarrow.lib.Table

:param loc: location sensor to convert :return: pyarrow table representation of location data

Expand source code
def apim_location_to_pyarrow(loc: api_m.RedvoxPacketM.Sensors.Location) -> pa.Table:
    """
    :param loc: location sensor to convert
    :return: pyarrow table representation of location data
    """
    timestamps = loc.timestamps.timestamps
    gps_timestamps = loc.timestamps_gps.timestamps
    lat_samples = loc.latitude_samples.values
    lon_samples = loc.longitude_samples.values
    alt_samples = loc.altitude_samples.values
    spd_samples = loc.speed_samples.values
    bear_samples = loc.bearing_samples.values
    hor_acc_samples = loc.horizontal_accuracy_samples.values
    vert_acc_samples = loc.vertical_accuracy_samples.values
    spd_acc_samples = loc.speed_accuracy_samples.values
    bear_acc_samples = loc.bearing_accuracy_samples.values
    loc_prov_samples = loc.location_providers
    data_for_df = [[], [], [], [], [], [], [], [], [], [], [], [], []]
    for i in range(len(timestamps)):
        data_for_df[0].append(timestamps[i])
        data_for_df[1].append(timestamps[i])
        data_for_df[2].append(np.nan if len(gps_timestamps) <= i else gps_timestamps[i])
        data_for_df[3].append(lat_samples[i])
        data_for_df[4].append(lon_samples[i])
        data_for_df[5].append(np.nan if len(alt_samples) <= i else alt_samples[i])
        data_for_df[6].append(np.nan if len(spd_samples) <= i else spd_samples[i])
        data_for_df[7].append(np.nan if len(bear_samples) <= i else bear_samples[i])
        data_for_df[8].append(np.nan if len(hor_acc_samples) <= i else hor_acc_samples[i])
        data_for_df[9].append(np.nan if len(vert_acc_samples) <= i else vert_acc_samples[i])
        data_for_df[10].append(np.nan if len(spd_acc_samples) <= i else spd_acc_samples[i])
        data_for_df[11].append(np.nan if len(bear_acc_samples) <= i else bear_acc_samples[i])
        data_for_df[12].append(
            api_m.RedvoxPacketM.Sensors.Location.LocationProvider.UNKNOWN
            if len(loc_prov_samples) <= i
            else loc_prov_samples[i]
        )
    return pa.Table.from_pydict(dict(zip(LOCATION_COLUMNS, data_for_df)))
def get_empty_sensor(name: str, sensor_type: SensorType = SensorType.UNKNOWN_SENSOR) ‑> SensorData

:param name: name of the sensor :param sensor_type: type of the sensor to create, default SensorType.UNKNOWN_SENSOR :return: empty sensor with no data

Expand source code
def get_empty_sensor(name: str, sensor_type: SensorType = SensorType.UNKNOWN_SENSOR) -> SensorData:
    """
    :param name: name of the sensor
    :param sensor_type: type of the sensor to create, default SensorType.UNKNOWN_SENSOR
    :return: empty sensor with no data
    """
    return SensorData(name, pa.Table.from_pydict({"timestamps": []}), sensor_type)
def get_sample_statistics(data_df: pyarrow.lib.Table) ‑> Tuple[float, float, float]

calculate the sample rate, interval and interval std dev using the timestamps in the dataframe

:param data_df: the dataframe containing timestamps to calculate statistics from :return: a Tuple containing the sample rate, interval and interval std dev

Expand source code
def get_sample_statistics(data_df: pa.Table) -> Tuple[float, float, float]:
    """
    calculate the sample rate, interval and interval std dev using the timestamps in the dataframe

    :param data_df: the dataframe containing timestamps to calculate statistics from
    :return: a Tuple containing the sample rate, interval and interval std dev
    """
    sample_interval: float
    sample_interval_std: float
    timestamps: np.array = data_df["timestamps"].to_numpy()
    if timestamps.size > 1:
        sample_interval = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps))))
        sample_interval_std = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps))))
    else:
        sample_interval = np.nan
        sample_interval_std = np.nan
    return 1.0 / sample_interval, sample_interval, sample_interval_std
def get_sensor_description(sensor: Union[src.redvox_api_m.redvox_api_m_pb2.Xyz, src.redvox_api_m.redvox_api_m_pb2.Single, src.redvox_api_m.redvox_api_m_pb2.Audio, src.redvox_api_m.redvox_api_m_pb2.Image, src.redvox_api_m.redvox_api_m_pb2.Location, src.redvox_api_m.redvox_api_m_pb2.CompressedAudio]) ‑> str

:param sensor: the sensor to read the description from :return: the sensor's description

Expand source code
def get_sensor_description(sensor: Sensor) -> str:
    """
    :param sensor: the sensor to read the description from
    :return: the sensor's description
    """
    return sensor.sensor_description
def read_apim_single_sensor(sensor: src.redvox_api_m.redvox_api_m_pb2.Single, column_id: str) ‑> pyarrow.lib.Table

read a sensor that has a single data channel from an api M data packet. raises Attribute Error if sensor does not contain exactly one data channel

:param sensor: the single channel api M sensor to read :param column_id: string, used to name the columns :return: pyarrow table representing the data in the sensor

Expand source code
def read_apim_single_sensor(sensor: api_m.RedvoxPacketM.Sensors.Single, column_id: str) -> pa.Table:
    """
    read a sensor that has a single data channel from an api M data packet.
    raises Attribute Error if sensor does not contain exactly one data channel

    :param sensor: the single channel api M sensor to read
    :param column_id: string, used to name the columns
    :return: pyarrow table representing the data in the sensor
    """
    timestamps: np.ndarray = np.array(sensor.timestamps.timestamps)
    try:
        columns: List[str] = ["timestamps", "unaltered_timestamps", column_id]
        return pa.Table.from_pydict(dict(zip(columns, [timestamps, timestamps, np.array(sensor.samples.values)])))
    except AttributeError:
        raise
def read_apim_xyz_sensor(sensor: src.redvox_api_m.redvox_api_m_pb2.Xyz, column_id: str) ‑> pyarrow.lib.Table

read a sensor that has xyz data channels from an api M data packet. raises Attribute Error if sensor does not contain xyz channels

:param sensor: the xyz api M sensor to read :param column_id: string, used to name the columns :return: dictionary representing the data in the sensor

Expand source code
def read_apim_xyz_sensor(sensor: api_m.RedvoxPacketM.Sensors.Xyz, column_id: str) -> pa.Table:
    """
    read a sensor that has xyz data channels from an api M data packet.
    raises Attribute Error if sensor does not contain xyz channels

    :param sensor: the xyz api M sensor to read
    :param column_id: string, used to name the columns
    :return: dictionary representing the data in the sensor
    """
    timestamps: np.ndarray = np.array(sensor.timestamps.timestamps)
    try:
        columns: List[str] = [
            "timestamps",
            "unaltered_timestamps",
            f"{column_id}_x",
            f"{column_id}_y",
            f"{column_id}_z",
        ]
        return pa.Table.from_pydict(
            dict(
                zip(
                    columns,
                    [
                        timestamps,
                        timestamps,
                        np.array(sensor.x_samples.values),
                        np.array(sensor.y_samples.values),
                        np.array(sensor.z_samples.values),
                    ],
                )
            )
        )
    except AttributeError:
        raise