Module redvox.common.api_conversions

Provides functionality for converting between API versions.

Expand source code
"""
Provides functionality for converting between API versions.
"""

# TODO: Document
# TODO: Test
# TODO: Implement (de)normalization now that dynamic range in known in API M (how does this fork API 900 to API M?)
# TODO: Update any method that sets an enum to accent Union[Enum, int] to fix mypy type errors involving that
# TODO: Breakup conversion into smaller, more testable functions
# TODO: Rework location sensor conversions
# TODO: Add functions for converting compressed audio... in fact, this might make the most sent from converting API 900
# TODO:   into API M data since FLAC requires integers
from typing import List, Optional, Dict, Union

import numpy as np

import redvox.api1000.common.common as common_m
import redvox.api1000.proto.redvox_api_m_pb2 as api_m
import redvox.common.date_time_utils as dt_utls
import redvox.api900.lib.api900_pb2 as api_900
import redvox.api900.reader as reader_900
from redvox.api1000.wrapped_redvox_packet.sensors.sensors import Sensors
from redvox.api1000.wrapped_redvox_packet.sensors.location import LocationProvider
from redvox.api1000.wrapped_redvox_packet.station_information import (
    OsType,
    StationInformation,
    StationMetrics,
    AudioSamplingRate,
)
from redvox.api1000.wrapped_redvox_packet.timing_information import SynchExchange
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
import redvox
import redvox.api900.reader_utils as reader_utils

_NORMALIZATION_CONSTANT: int = 0x7FFFFF
NAN: float = float("nan")


def _normalize_audio_count(count: int, normalize_by: Optional[float] = None) -> float:
    norm: float = normalize_by if normalize_by is not None else _NORMALIZATION_CONSTANT
    return float(count) / float(norm)


def _denormalize_audio_count(norm: float) -> int:
    return int(round(norm * float(_NORMALIZATION_CONSTANT)))


def _migrate_synch_exchanges_900_to_1000_raw(
    synch_exchanges: np.ndarray,
) -> List[api_m.RedvoxPacketM.TimingInformation.SynchExchange]:
    exchanges: List[api_m.RedvoxPacketM.TimingInformation.SynchExchange] = []

    for i in range(0, len(synch_exchanges), 6):
        exchange: api_m.RedvoxPacketM.TimingInformation.SynchExchange = (
            api_m.RedvoxPacketM.TimingInformation.SynchExchange()
        )
        exchange.a1 = float(synch_exchanges[i])
        exchange.a2 = float(synch_exchanges[i + 1])
        exchange.a3 = float(synch_exchanges[i + 2])
        exchange.b1 = float(synch_exchanges[i + 3])
        exchange.b2 = float(synch_exchanges[i + 4])
        exchange.b3 = float(synch_exchanges[i + 5])
        exchange.unit = api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH

        exchanges.append(exchange)

    return exchanges


def _migrate_synch_exchanges_900_to_1000(
    synch_exchanges: np.ndarray,
) -> List[SynchExchange]:
    exchanges: List[SynchExchange] = []

    for i in range(0, len(synch_exchanges), 6):
        exchange: SynchExchange = SynchExchange.new()
        exchange.set_a1(float(synch_exchanges[i]))
        exchange.set_a2(float(synch_exchanges[i + 1]))
        exchange.set_a3(float(synch_exchanges[i + 2]))
        exchange.set_b1(float(synch_exchanges[i + 3]))
        exchange.set_b2(float(synch_exchanges[i + 4]))
        exchange.set_b3(float(synch_exchanges[i + 5]))
        exchanges.append(exchange)

    return exchanges


def _find_mach_time_zero_raw(packet: api_900.RedvoxPacket) -> int:
    """
    find the mach time zero in api 900 packets

    :param packet: api 900 redvox packet to read
    :return: mach time zero of the api 900 packet or -1 if it doesn't exist
    """
    key: str = "machTimeZero"
    mtz: Optional[int] = reader_utils.get_metadata_raw(list(packet.metadata), key, int)
    if mtz is not None:
        return mtz

    location_sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.LATITUDE,
            api_900.ChannelType.LONGITUDE,
            api_900.ChannelType.ALTITUDE,
            api_900.ChannelType.SPEED,
        },
    )
    if location_sensor is not None:
        mtz = reader_utils.get_metadata_raw(
            list(location_sensor.metadata), key, int
        )
        if mtz is not None:
            return mtz

    return -1


def _find_mach_time_zero(packet: reader_900.WrappedRedvoxPacket) -> int:
    if "machTimeZero" in packet.metadata_as_dict():
        return int(packet.metadata_as_dict()["machTimeZero"])

    location_sensor: Optional[reader_900.LocationSensor] = packet.location_sensor()
    if location_sensor is not None:
        if "machTimeZero" in location_sensor.metadata_as_dict():
            return int(location_sensor.metadata_as_dict()["machTimeZero"])

    return -1


def _packet_length_microseconds_900(packet: reader_900.WrappedRedvoxPacket) -> int:
    microphone_sensor: Optional[
        reader_900.MicrophoneSensor
    ] = packet.microphone_sensor()

    if microphone_sensor is not None:
        sample_rate_hz: float = microphone_sensor.sample_rate_hz()
        total_samples: int = len(microphone_sensor.payload_values())
        length_seconds: float = float(total_samples) / sample_rate_hz
        return round(dt_utls.seconds_to_microseconds(length_seconds))

    return 0


def _packet_length_microseconds_900_raw(packet: api_900.RedvoxPacket) -> int:
    if len(packet.evenly_sampled_channels) == 0:
        return 0

    microphone_sensor: api_900.EvenlySampledChannel = packet.evenly_sampled_channels[0]

    sample_rate_hz: float = microphone_sensor.sample_rate_hz
    total_samples: int = reader_utils.payload_len(microphone_sensor)
    length_seconds: float = float(total_samples) / sample_rate_hz
    return round(dt_utls.seconds_to_microseconds(length_seconds))


# noinspection PyTypeChecker
# pylint: disable=C0103
def _migrate_os_type_900_to_1000(os: str) -> OsType:
    os_lower: str = os.lower()
    if os_lower == "android":
        return OsType.ANDROID

    if os_lower == "ios":
        return OsType.IOS

    return OsType.UNKNOWN_OS


def _migrate_os_type_900_to_1000_raw(
    os: str,
):
    os_lower: str = os.lower()
    if os_lower == "android":
        return api_m.RedvoxPacketM.StationInformation.OsType.ANDROID

    if os_lower == "ios":
        return api_m.RedvoxPacketM.StationInformation.OsType.IOS

    return api_m.RedvoxPacketM.StationInformation.OsType.UNKNOWN_OS


# pylint: disable=C0103
def _migrate_os_type_1000_to_900(os: OsType) -> str:
    if os == OsType.ANDROID:
        return "Android"
    elif os == OsType.IOS:
        return "iOS"
    else:
        print(f"API 900 unsupported OsType: {os.name}")
        return os.name


def compute_stats_raw(
    has_stats: Union[
        api_m.RedvoxPacketM.TimingPayload,
        api_m.RedvoxPacketM.SamplePayload,
        api_m.RedvoxPacketM.DoubleSamplePayload,
    ]
):
    values: np.ndarray
    stats_container: api_m.RedvoxPacketM.SummaryStatistics
    if isinstance(has_stats, api_m.RedvoxPacketM.TimingPayload):
        values = np.array(has_stats.timestamps)
        stats_container = has_stats.timestamp_statistics
        mean_sr: float
        std_sr: float
        (mean_sr, std_sr) = common_m.sampling_rate_statistics(values)
        has_stats.mean_sample_rate = mean_sr
        has_stats.stdev_sample_rate = std_sr
    else:
        values = np.array(has_stats.values)
        stats_container = has_stats.value_statistics

    stats_container.count = len(values)
    # noinspection Mypy
    stats_container.min = values.min()
    # noinspection Mypy
    stats_container.max = values.max()
    # noinspection Mypy
    stats_container.mean = values.mean()
    # noinspection Mypy
    stats_container.standard_deviation = values.std()


# noinspection DuplicatedCode
def convert_api_900_to_1000_raw(packet: api_900.RedvoxPacket) -> api_m.RedvoxPacketM:
    """
    Converts a wrapped API 900 packet into a wrapped API M packet.

    :param packet: API 900 packet to convert.
    :return: A wrapped API M packet.
    """
    packet_m: api_m.RedvoxPacketM = api_m.RedvoxPacketM()

    # Top-level metadata
    packet_m.api = 1000.0
    # noinspection PyUnresolvedReferences,Mypy
    packet_m.sub_api = 900.0

    # Station information
    packet_m.station_information.id = packet.redvox_id
    packet_m.station_information.uuid = packet.uuid
    packet_m.station_information.make = packet.device_make
    packet_m.station_information.model = packet.device_model
    packet_m.station_information.os = _migrate_os_type_900_to_1000_raw(packet.device_os)
    packet_m.station_information.os_version = packet.device_os_version
    packet_m.station_information.app_version = packet.app_version
    packet_m.station_information.is_private = packet.is_private

    packet_m.station_information.app_settings.samples_per_window = \
        len(packet.evenly_sampled_channels[0].int32_payload.payload)

    packet_m.station_information.service_urls.acquisition_server = (
        packet.acquisition_server
    )
    packet_m.station_information.service_urls.synch_server = (
        packet.time_synchronization_server
    )
    packet_m.station_information.service_urls.auth_server = packet.authentication_server

    # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings

    # StationMetrics - We know a couple.
    packet_m.station_information.station_metrics.timestamps.unit = (
        api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    )
    packet_m.station_information.station_metrics.timestamps.timestamps[:] = [
        packet.app_file_start_timestamp_machine
    ]
    packet_m.station_information.station_metrics.temperature.unit = (
        api_m.RedvoxPacketM.Unit.DEGREES_CELSIUS
    )
    packet_m.station_information.station_metrics.temperature.values[:] = [
        packet.device_temperature_c
    ]
    packet_m.station_information.station_metrics.battery.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.battery.values[:] = [
        packet.battery_level_percent
    ]

    # And we can fill in defaults for those we don't know
    packet_m.station_information.station_metrics.available_disk.unit = (
        api_m.RedvoxPacketM.Unit.BYTE
    )
    packet_m.station_information.station_metrics.available_disk.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.available_ram.unit = (
        api_m.RedvoxPacketM.Unit.BYTE
    )
    packet_m.station_information.station_metrics.available_ram.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.cpu_utilization.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.cpu_utilization.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.network_strength.unit = (
        api_m.RedvoxPacketM.Unit.DECIBEL
    )
    packet_m.station_information.station_metrics.network_strength.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.battery_current.unit = (
        api_m.RedvoxPacketM.Unit.MICROAMPERES
    )
    packet_m.station_information.station_metrics.battery_current.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.screen_brightness.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.screen_brightness.values[:] = [
        float("nan")
    ]

    packet_m.station_information.station_metrics.network_type[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.NetworkType.UNKNOWN_NETWORK
    ]
    packet_m.station_information.station_metrics.cell_service_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.CellServiceState.UNKNOWN
    ]
    packet_m.station_information.station_metrics.power_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.PowerState.UNKNOWN_POWER_STATE
    ]
    packet_m.station_information.station_metrics.wifi_wake_lock[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.WifiWakeLock.OTHER
    ]
    packet_m.station_information.station_metrics.screen_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.ScreenState.UNKNOWN_SCREEN_STATE
    ]

    compute_stats_raw(packet_m.station_information.station_metrics.timestamps)
    compute_stats_raw(packet_m.station_information.station_metrics.temperature)
    compute_stats_raw(packet_m.station_information.station_metrics.battery)
    compute_stats_raw(packet_m.station_information.station_metrics.available_disk)
    compute_stats_raw(packet_m.station_information.station_metrics.available_ram)
    compute_stats_raw(packet_m.station_information.station_metrics.cpu_utilization)
    compute_stats_raw(packet_m.station_information.station_metrics.network_strength)
    compute_stats_raw(packet_m.station_information.station_metrics.battery_current)
    compute_stats_raw(packet_m.station_information.station_metrics.screen_brightness)

    # Timing information
    mach_time_900: int = packet.app_file_start_timestamp_machine
    os_time_900: int = packet.app_file_start_timestamp_epoch_microseconds_utc
    len_micros: int = _packet_length_microseconds_900_raw(packet)
    best_latency: float = reader_utils.get_metadata_or_default(
        list(packet.metadata), "bestLatency", float, NAN
    )
    best_offset: float = reader_utils.get_metadata_or_default(
        list(packet.metadata), "bestOffset", float, NAN
    )

    packet_m.timing_information.unit = (
        api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    )
    packet_m.timing_information.packet_start_mach_timestamp = mach_time_900
    packet_m.timing_information.packet_start_os_timestamp = os_time_900
    packet_m.timing_information.packet_end_mach_timestamp = mach_time_900 + len_micros
    packet_m.timing_information.server_acquisition_arrival_timestamp = (
        packet.server_timestamp_epoch_microseconds_utc
    )
    packet_m.timing_information.packet_end_os_timestamp = os_time_900 + len_micros
    packet_m.timing_information.app_start_mach_timestamp = _find_mach_time_zero_raw(
        packet
    )
    packet_m.timing_information.best_latency = best_latency
    packet_m.timing_information.best_offset = best_offset

    synch_sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet, {api_900.ChannelType.TIME_SYNCHRONIZATION}
    )
    if synch_sensor is not None:
        synch_payload: np.ndarray = reader_utils.extract_payload(synch_sensor)
        packet_m.timing_information.synch_exchanges.extend(
            _migrate_synch_exchanges_900_to_1000_raw(synch_payload)
        )

    # Sensors
    # Microphone / Audio
    if len(packet.evenly_sampled_channels) < 1:
        raise ValueError("Cannot convert API900 to API1000; Audio sensor missing.")
    audio_900: api_900.EvenlySampledChannel = packet.evenly_sampled_channels[0]
    packet_m.sensors.audio.sensor_description = audio_900.sensor_name
    packet_m.sensors.audio.sample_rate = audio_900.sample_rate_hz
    packet_m.sensors.audio.first_sample_timestamp = (
        audio_900.first_sample_timestamp_epoch_microseconds_utc
    )
    packet_m.sensors.audio.bits_of_precision = 16.0
    packet_m.sensors.audio.encoding = "counts"
    normalized_audio: np.ndarray = (
        reader_utils.extract_payload(audio_900) / _NORMALIZATION_CONSTANT
    )
    packet_m.sensors.audio.samples.values[:] = list(normalized_audio)
    packet_m.sensors.audio.samples.unit = api_m.RedvoxPacketM.Unit.NORMALIZED_COUNTS
    for i in range(0, len(audio_900.metadata), 2):
        v: str = audio_900.metadata[i + 1] if (i + 1) < len(audio_900.metadata) else ""
        packet_m.sensors.audio.metadata[audio_900.metadata[i]] = v
    compute_stats_raw(packet_m.sensors.audio.samples)

    # Pressure
    barometer_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.BAROMETER})
    if barometer_900 is not None:
        packet_m.sensors.pressure.sensor_description = barometer_900.sensor_name
        packet_m.sensors.pressure.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.pressure.timestamps.timestamps[
            :
        ] = barometer_900.timestamps_microseconds_utc
        packet_m.sensors.pressure.samples.values[:] = list(
            reader_utils.extract_payload(barometer_900)
        )
        packet_m.sensors.pressure.samples.unit = api_m.RedvoxPacketM.Unit.KILOPASCAL
        for i in range(0, len(barometer_900.metadata), 2):
            v = (
                barometer_900.metadata[i + 1]
                if (i + 1) < len(barometer_900.metadata)
                else ""
            )
            packet_m.sensors.pressure.metadata[barometer_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.pressure.timestamps)
        compute_stats_raw(packet_m.sensors.pressure.samples)

    # Location
    loc_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.LATITUDE,
            api_900.ChannelType.LONGITUDE,
            api_900.ChannelType.ALTITUDE,
            api_900.ChannelType.SPEED,
            api_900.ChannelType.ACCURACY,
        },
    )
    if loc_900 is not None:
        total_samples: int = len(loc_900.timestamps_microseconds_utc)
        loc_payload: List[float] = list(reader_utils.extract_payload(loc_900))
        packet_m.sensors.location.sensor_description = loc_900.sensor_name
        packet_m.sensors.location.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.location.timestamps.timestamps[
            :
        ] = loc_900.timestamps_microseconds_utc
        packet_m.sensors.location.timestamps_gps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.location.timestamps_gps.timestamps[:] = [
            float("nan")
        ] * total_samples

        total_channels: int = len(loc_900.channel_types)

        lat_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.LATITUDE
        )
        packet_m.sensors.location.latitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        if lat_idx is not None:
            packet_m.sensors.location.latitude_samples.values[:] = loc_payload[
                lat_idx::total_channels
            ]
        else:
            packet_m.sensors.location.latitude_samples.values[:] = [
                float("nan") * total_samples
            ]

        lng_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.LONGITUDE
        )
        packet_m.sensors.location.longitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        if lng_idx is not None:
            packet_m.sensors.location.longitude_samples.values[:] = loc_payload[
                lng_idx::total_channels
            ]
        else:
            packet_m.sensors.location.longitude_samples.values[:] = [
                float("nan") * total_samples
            ]

        alt_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.ALTITUDE
        )
        packet_m.sensors.location.altitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        if alt_idx is not None:
            packet_m.sensors.location.altitude_samples.values[:] = loc_payload[
                alt_idx::total_channels
            ]
        else:
            packet_m.sensors.location.altitude_samples.values[:] = [
                float("nan") * total_samples
            ]

        speed_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.SPEED
        )
        packet_m.sensors.location.speed_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND
        )
        if speed_idx is not None:
            packet_m.sensors.location.speed_samples.values[:] = loc_payload[
                speed_idx::total_channels
            ]
        else:
            packet_m.sensors.location.speed_samples.values[:] = [
                float("nan") * total_samples
            ]

        acc_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.ACCURACY
        )
        packet_m.sensors.location.horizontal_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        if acc_idx is not None:
            packet_m.sensors.location.horizontal_accuracy_samples.values[
                :
            ] = loc_payload[acc_idx::total_channels]
        else:
            packet_m.sensors.location.horizontal_accuracy_samples.values[:] = [
                float("nan") * total_samples
            ]

        packet_m.sensors.location.bearing_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        packet_m.sensors.location.bearing_samples.values[:] = [
            float("nan") * total_samples
        ]

        packet_m.sensors.location.vertical_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        packet_m.sensors.location.vertical_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.speed_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND
        )
        packet_m.sensors.location.speed_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.bearing_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        packet_m.sensors.location.bearing_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]

        # Compute stats
        compute_stats_raw(packet_m.sensors.location.timestamps)
        compute_stats_raw(packet_m.sensors.location.timestamps_gps)
        compute_stats_raw(packet_m.sensors.location.latitude_samples)
        compute_stats_raw(packet_m.sensors.location.longitude_samples)
        compute_stats_raw(packet_m.sensors.location.altitude_samples)
        compute_stats_raw(packet_m.sensors.location.speed_samples)
        compute_stats_raw(packet_m.sensors.location.bearing_samples)
        compute_stats_raw(packet_m.sensors.location.horizontal_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.vertical_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.speed_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.bearing_accuracy_samples)

        # Bookkeeping
        use_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "useLocation", lambda val: v == "T", False
        )
        desired_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "desiredLocation", lambda val: v == "T", False
        )
        permission_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "permissionLocation", lambda val: v == "T", False
        )
        enabled_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "enabledLocation", lambda val: v == "T", False
        )

        if desired_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.USER
            ] * total_samples
        elif enabled_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.GPS
            ] * total_samples
        elif use_location and desired_location and permission_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NETWORK
            ] * total_samples
        else:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NONE
            ] * total_samples

        packet_m.sensors.location.location_permissions_granted = permission_location
        packet_m.sensors.location.location_services_enabled = use_location
        packet_m.sensors.location.location_services_requested = desired_location

        for (i, k) in enumerate(loc_900.metadata):
            if i + 1 < len(loc_900.metadata):
                packet_m.sensors.location.metadata[k] = loc_900.metadata[i + 1]
            else:
                packet_m.sensors.location.metadata[k] = ""

    # # Time Synchronization
    # # This was already added to the timing information

    # Accelerometer
    accel_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.ACCELEROMETER_X,
            api_900.ChannelType.ACCELEROMETER_Y,
            api_900.ChannelType.ACCELEROMETER_Z,
        },
    )
    if accel_900 is not None:
        packet_m.sensors.accelerometer.sensor_description = accel_900.sensor_name
        packet_m.sensors.accelerometer.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.accelerometer.timestamps.timestamps[
            :
        ] = accel_900.timestamps_microseconds_utc
        accel_payload: List[float] = list(reader_utils.extract_payload(accel_900))
        packet_m.sensors.accelerometer.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.x_samples.values[:] = accel_payload[0::3]
        packet_m.sensors.accelerometer.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.y_samples.values[:] = accel_payload[1::3]
        packet_m.sensors.accelerometer.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.z_samples.values[:] = accel_payload[2::3]
        compute_stats_raw(packet_m.sensors.accelerometer.timestamps)
        compute_stats_raw(packet_m.sensors.accelerometer.x_samples)
        compute_stats_raw(packet_m.sensors.accelerometer.y_samples)
        compute_stats_raw(packet_m.sensors.accelerometer.z_samples)

    # Magnetometer
    sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.MAGNETOMETER_X,
            api_900.ChannelType.MAGNETOMETER_Y,
            api_900.ChannelType.MAGNETOMETER_Z,
        },
    )
    if sensor is not None:
        packet_m.sensors.magnetometer.sensor_description = sensor.sensor_name
        packet_m.sensors.magnetometer.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.magnetometer.timestamps.timestamps[
            :
        ] = sensor.timestamps_microseconds_utc
        sensor_payload: List[float] = list(reader_utils.extract_payload(sensor))
        packet_m.sensors.magnetometer.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.x_samples.values[:] = sensor_payload[0::3]
        packet_m.sensors.magnetometer.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.y_samples.values[:] = sensor_payload[1::3]
        packet_m.sensors.magnetometer.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.z_samples.values[:] = sensor_payload[2::3]
        compute_stats_raw(packet_m.sensors.magnetometer.timestamps)
        compute_stats_raw(packet_m.sensors.magnetometer.x_samples)
        compute_stats_raw(packet_m.sensors.magnetometer.y_samples)
        compute_stats_raw(packet_m.sensors.magnetometer.z_samples)
    #
    # Gyroscope
    sensor = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.GYROSCOPE_X,
            api_900.ChannelType.GYROSCOPE_Y,
            api_900.ChannelType.GYROSCOPE_Z,
        },
    )
    if sensor is not None:
        packet_m.sensors.gyroscope.sensor_description = sensor.sensor_name
        packet_m.sensors.gyroscope.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.gyroscope.timestamps.timestamps[
            :
        ] = sensor.timestamps_microseconds_utc
        sensor_payload = list(reader_utils.extract_payload(sensor))
        packet_m.sensors.gyroscope.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.x_samples.values[:] = sensor_payload[0::3]
        packet_m.sensors.gyroscope.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.y_samples.values[:] = sensor_payload[1::3]
        packet_m.sensors.gyroscope.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.z_samples.values[:] = sensor_payload[2::3]
        compute_stats_raw(packet_m.sensors.gyroscope.timestamps)
        compute_stats_raw(packet_m.sensors.gyroscope.x_samples)
        compute_stats_raw(packet_m.sensors.gyroscope.y_samples)
        compute_stats_raw(packet_m.sensors.gyroscope.z_samples)

    #
    # # Light
    light_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.LIGHT})
    if light_900 is not None:
        packet_m.sensors.light.sensor_description = light_900.sensor_name
        packet_m.sensors.light.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.light.timestamps.timestamps[
            :
        ] = light_900.timestamps_microseconds_utc
        packet_m.sensors.light.samples.values[:] = list(
            reader_utils.extract_payload(light_900)
        )
        packet_m.sensors.light.samples.unit = api_m.RedvoxPacketM.Unit.LUX
        for i in range(0, len(light_900.metadata), 2):
            v = light_900.metadata[i + 1] if (i + 1) < len(light_900.metadata) else ""
            packet_m.sensors.light.metadata[light_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.light.timestamps)
        compute_stats_raw(packet_m.sensors.light.samples)

    # # Image
    # Not implemented for conversion. Only a very small fraction of API 900 was ever image capable, and not the public
    # app.

    # # Proximity
    proximity_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.INFRARED})
    if proximity_900 is not None:
        packet_m.sensors.proximity.sensor_description = proximity_900.sensor_name
        packet_m.sensors.proximity.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.proximity.timestamps.timestamps[
            :
        ] = proximity_900.timestamps_microseconds_utc
        packet_m.sensors.proximity.samples.values[:] = list(
            reader_utils.extract_payload(proximity_900)
        )
        packet_m.sensors.proximity.samples.unit = api_m.RedvoxPacketM.Unit.CENTIMETERS
        for i in range(0, len(proximity_900.metadata), 2):
            v = (
                proximity_900.metadata[i + 1]
                if (i + 1) < len(proximity_900.metadata)
                else ""
            )
            packet_m.sensors.proximity.metadata[proximity_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.proximity.timestamps)
        compute_stats_raw(packet_m.sensors.proximity.samples)

    return packet_m


# noinspection DuplicatedCode
def convert_api_900_to_1000(
    wrapped_packet_900: reader_900.WrappedRedvoxPacket,
) -> WrappedRedvoxPacketM:
    """
    Converts a wrapped API 900 packet into a wrapped API M packet.

    :param wrapped_packet_900: API 900 packet to convert.
    :return: A wrapped API M packet.
    """
    wrapped_packet_m: WrappedRedvoxPacketM = WrappedRedvoxPacketM.new()

    # Top-level metadata
    wrapped_packet_m.set_api(1000.0)
    # noinspection PyUnresolvedReferences,Mypy
    wrapped_packet_m.set_sub_api(900.0)

    # Station information
    station_information: StationInformation = wrapped_packet_m.get_station_information()
    station_information.set_id(wrapped_packet_900.redvox_id()).set_uuid(
        wrapped_packet_900.uuid()
    ).set_make(wrapped_packet_900.device_make()).set_model(
        wrapped_packet_900.device_model()
    ).set_os(
        _migrate_os_type_900_to_1000(wrapped_packet_900.device_os())
    ).set_os_version(
        wrapped_packet_900.device_os_version()
    ).set_app_version(
        wrapped_packet_900.app_version()
    ).set_is_private(
        wrapped_packet_900.is_private()
    )

    station_information.get_service_urls().set_acquisition_server(
        wrapped_packet_900.acquisition_server()
    ).set_synch_server(
        wrapped_packet_900.time_synchronization_server()
    ).set_auth_server(
        wrapped_packet_900.authentication_server()
    )

    # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings

    # StationMetrics - We know a couple. We take a slightly more cumbersome approach using the raw protobuf
    # to avoid some conversions between lists and np arrays.
    station_metrics: StationMetrics = station_information.get_station_metrics()
    station_metrics.get_timestamps().append_timestamp(
        wrapped_packet_900.app_file_start_timestamp_machine()
    )
    station_metrics.get_temperature().set_values(
        np.array([wrapped_packet_900.device_temperature_c()]), True
    )
    station_metrics.get_battery().set_values(
        np.array([wrapped_packet_900.battery_level_percent()]), True
    )

    # Timing information
    mach_time_900: int = wrapped_packet_900.app_file_start_timestamp_machine()
    os_time_900: int = (
        wrapped_packet_900.app_file_start_timestamp_epoch_microseconds_utc()
    )
    len_micros: int = _packet_length_microseconds_900(wrapped_packet_900)
    best_latency: Optional[float] = wrapped_packet_900.best_latency()
    best_latency = best_latency if best_latency is not None else NAN
    best_offset: Optional[float] = wrapped_packet_900.best_offset()
    best_offset = best_offset if best_offset is not None else NAN

    wrapped_packet_m.get_timing_information().set_unit(
        common_m.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    ).set_packet_start_mach_timestamp(mach_time_900).set_packet_start_os_timestamp(
        os_time_900
    ).set_packet_end_mach_timestamp(
        mach_time_900 + len_micros
    ).set_packet_end_os_timestamp(
        os_time_900 + len_micros
    ).set_server_acquisition_arrival_timestamp(
        wrapped_packet_900.server_timestamp_epoch_microseconds_utc()
    ).set_app_start_mach_timestamp(
        _find_mach_time_zero(wrapped_packet_900)
    ).set_best_latency(
        best_latency
    ).set_best_offset(
        best_offset
    )

    time_sensor = wrapped_packet_900.time_synchronization_sensor()
    if time_sensor is not None:
        wrapped_packet_m.get_timing_information().get_synch_exchanges().set_values(
            _migrate_synch_exchanges_900_to_1000(time_sensor.payload_values())
        )

    # Sensors
    sensors_m: Sensors = wrapped_packet_m.get_sensors()
    # Microphone / Audio
    mic_sensor_900: Optional[
        reader_900.MicrophoneSensor
    ] = wrapped_packet_900.microphone_sensor()
    if mic_sensor_900 is not None:
        normalized_audio: np.ndarray = (
            mic_sensor_900.payload_values() / _NORMALIZATION_CONSTANT
        )
        audio_sensor_m = sensors_m.new_audio()
        audio_sensor_m.set_first_sample_timestamp(
            mic_sensor_900.first_sample_timestamp_epoch_microseconds_utc()
        ).set_is_scrambled(wrapped_packet_900.is_scrambled()).set_sample_rate(
            mic_sensor_900.sample_rate_hz()
        ).set_sensor_description(
            mic_sensor_900.sensor_name()
        ).get_samples().set_values(
            normalized_audio, update_value_statistics=True
        )
        audio_sensor_m.get_metadata().set_metadata(mic_sensor_900.metadata_as_dict())

    # Barometer
    barometer_sensor_900: Optional[
        reader_900.BarometerSensor
    ] = wrapped_packet_900.barometer_sensor()
    if barometer_sensor_900 is not None:
        pressure_sensor_m = sensors_m.new_pressure()
        pressure_sensor_m.set_sensor_description(barometer_sensor_900.sensor_name())
        pressure_sensor_m.get_timestamps().set_timestamps(
            barometer_sensor_900.timestamps_microseconds_utc(), True
        )
        pressure_sensor_m.get_samples().set_values(
            barometer_sensor_900.payload_values(), True
        )
        pressure_sensor_m.get_metadata().set_metadata(
            barometer_sensor_900.metadata_as_dict()
        )

    # Location
    # TODO: rework
    location_sensor_900: Optional[
        reader_900.LocationSensor
    ] = wrapped_packet_900.location_sensor()
    if location_sensor_900 is not None:
        location_m = sensors_m.new_location()
        location_m.set_sensor_description(location_sensor_900.sensor_name())
        location_m.get_timestamps().set_timestamps(
            location_sensor_900.timestamps_microseconds_utc(), True
        )
        if location_sensor_900.check_for_preset_lat_lon():
            lat_lon: np.ndarray = location_sensor_900.get_payload_lat_lon()
            location_m.get_latitude_samples().set_values(lat_lon[:1], True)
            location_m.get_longitude_samples().set_values(lat_lon[1:], True)
        else:
            location_m.get_latitude_samples().set_values(
                location_sensor_900.payload_values_latitude(), True
            )
            location_m.get_longitude_samples().set_values(
                location_sensor_900.payload_values_longitude(), True
            )
            location_m.get_altitude_samples().set_values(
                location_sensor_900.payload_values_altitude(), True
            )
            location_m.get_speed_samples().set_values(
                location_sensor_900.payload_values_speed(), True
            )
            location_m.get_horizontal_accuracy_samples().set_values(
                location_sensor_900.payload_values_accuracy(), True
            )

        def _extract_meta_bool(metad: Dict[str, str], key: str) -> bool:
            if key not in metad:
                return False

            return metad[key] == "T"

        loc_meta_900 = location_sensor_900.metadata_as_dict()
        use_location = _extract_meta_bool(loc_meta_900, "useLocation")
        desired_location = _extract_meta_bool(loc_meta_900, "desiredLocation")
        permission_location = _extract_meta_bool(loc_meta_900, "permissionLocation")
        enabled_location = _extract_meta_bool(loc_meta_900, "enabledLocation")

        n_p = location_m.get_timestamps().get_timestamps_count()

        if desired_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.USER for i in range(n_p)]
            )
        elif enabled_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.GPS for i in range(n_p)]
            )
        elif use_location and desired_location and permission_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.NETWORK for i in range(n_p)]
            )
        else:
            location_m.get_location_providers().set_values(
                [LocationProvider.NONE for i in range(n_p)]
            )

        location_m.set_location_permissions_granted(permission_location)
        location_m.set_location_services_enabled(use_location)
        location_m.set_location_services_requested(desired_location)

        # Once we're done here, we should remove the original metadata
        if "useLocation" in loc_meta_900:
            del loc_meta_900["useLocation"]
        if "desiredLocation" in loc_meta_900:
            del loc_meta_900["desiredLocation"]
        if "permissionLocation" in loc_meta_900:
            del loc_meta_900["permissionLocation"]
        if "enabledLocation" in loc_meta_900:
            del loc_meta_900["enabledLocation"]
        if "machTimeZero" in loc_meta_900:
            del loc_meta_900["machTimeZero"]
        location_m.get_metadata().set_metadata(loc_meta_900)

    # Time Synchronization
    # This was already added to the timing information

    # Accelerometer
    accelerometer_900 = wrapped_packet_900.accelerometer_sensor()
    if accelerometer_900 is not None:
        accelerometer_m = sensors_m.new_accelerometer()
        accelerometer_m.set_sensor_description(accelerometer_900.sensor_name())
        accelerometer_m.get_timestamps().set_timestamps(
            accelerometer_900.timestamps_microseconds_utc(), True
        )
        accelerometer_m.get_x_samples().set_values(
            accelerometer_900.payload_values_x(), True
        )
        accelerometer_m.get_y_samples().set_values(
            accelerometer_900.payload_values_y(), True
        )
        accelerometer_m.get_z_samples().set_values(
            accelerometer_900.payload_values_z(), True
        )
        accelerometer_m.get_metadata().set_metadata(
            accelerometer_900.metadata_as_dict()
        )

    # Magnetometer
    magnetometer_900 = wrapped_packet_900.magnetometer_sensor()
    if magnetometer_900 is not None:
        magnetometer_m = sensors_m.new_magnetometer()
        magnetometer_m.set_sensor_description(magnetometer_900.sensor_name())
        magnetometer_m.get_timestamps().set_timestamps(
            magnetometer_900.timestamps_microseconds_utc(), True
        )
        magnetometer_m.get_x_samples().set_values(
            magnetometer_900.payload_values_x(), True
        )
        magnetometer_m.get_y_samples().set_values(
            magnetometer_900.payload_values_y(), True
        )
        magnetometer_m.get_z_samples().set_values(
            magnetometer_900.payload_values_z(), True
        )
        magnetometer_m.get_metadata().set_metadata(magnetometer_900.metadata_as_dict())

    # Gyroscope
    gyroscope_900 = wrapped_packet_900.gyroscope_sensor()
    if gyroscope_900 is not None:
        gyroscope_m = sensors_m.new_gyroscope()
        gyroscope_m.set_sensor_description(gyroscope_900.sensor_name())
        gyroscope_m.get_timestamps().set_timestamps(
            gyroscope_900.timestamps_microseconds_utc(), True
        )
        gyroscope_m.get_x_samples().set_values(gyroscope_900.payload_values_x(), True)
        gyroscope_m.get_y_samples().set_values(gyroscope_900.payload_values_y(), True)
        gyroscope_m.get_z_samples().set_values(gyroscope_900.payload_values_z(), True)
        gyroscope_m.get_metadata().set_metadata(gyroscope_900.metadata_as_dict())

    # Light
    light_900 = wrapped_packet_900.light_sensor()
    if light_900 is not None:
        light_m = sensors_m.new_light()
        light_m.set_sensor_description(light_900.sensor_name())
        light_m.get_timestamps().set_timestamps(
            light_900.timestamps_microseconds_utc(), True
        )
        light_m.get_samples().set_values(light_900.payload_values(), True)
        light_m.get_metadata().set_metadata(light_900.metadata_as_dict())

    # Image
    # TODO: Implement

    # Proximity
    proximity_900 = wrapped_packet_900.infrared_sensor()
    if proximity_900 is not None:
        proximity_m = sensors_m.new_proximity()
        proximity_m.set_sensor_description(proximity_900.sensor_name())
        proximity_m.get_timestamps().set_timestamps(
            proximity_900.timestamps_microseconds_utc(), True
        )
        proximity_m.get_samples().set_values(proximity_900.payload_values(), True)
        proximity_m.get_metadata().set_metadata(proximity_900.metadata_as_dict())

    # Removed any other API 900 top-level metadata now that its been used
    meta = wrapped_packet_900.metadata_as_dict()
    if "machTimeZero" in meta:
        del meta["machTimeZero"]
    if "bestOffset" in meta:
        del meta["bestOffset"]
    if "bestLatency" in meta:
        del meta["bestLatency"]
    wrapped_packet_m.get_metadata().append_metadata(
        "migrated_from_api_900", f"v{redvox.VERSION}"
    )

    return wrapped_packet_m


def convert_api_1000_to_900(
    wrapped_packet_m: WrappedRedvoxPacketM,
) -> reader_900.WrappedRedvoxPacket:
    """
    Converts an API M wrapped packet into an API 900 wrapped packet.

    :param wrapped_packet_m: Packet to convert.
    :return: An API 900 wrapped packet.
    """
    # TODO detect and warn about all the fields that are being dropped due to conversion!
    wrapped_packet_900: reader_900.WrappedRedvoxPacket = (
        reader_900.WrappedRedvoxPacket()
    )

    station_information_m = wrapped_packet_m.get_station_information()
    sensors_m = wrapped_packet_m.get_sensors()
    audio_m = sensors_m.get_audio()

    wrapped_packet_900.set_api(900)
    wrapped_packet_900.set_uuid(station_information_m.get_uuid())
    wrapped_packet_900.set_redvox_id(station_information_m.get_id())
    wrapped_packet_900.set_authenticated_email(station_information_m.get_auth_id())
    wrapped_packet_900.set_authentication_token(
        "n/a"
    )  # Different auth protocols are used, can't convert between
    wrapped_packet_900.set_firebase_token("n/a")  # No longer in API M packet
    wrapped_packet_900.set_is_backfilled(False)  # No longer useful metric in API M
    wrapped_packet_900.set_is_private(station_information_m.get_is_private())
    wrapped_packet_900.set_is_scrambled(False)
    wrapped_packet_900.set_device_make(station_information_m.get_make())
    wrapped_packet_900.set_device_model(station_information_m.get_model())
    wrapped_packet_900.set_device_os(
        _migrate_os_type_1000_to_900(station_information_m.get_os())
    )
    wrapped_packet_900.set_device_os_version(station_information_m.get_os_version())
    wrapped_packet_900.set_app_version(station_information_m.get_app_version())

    battery_metrics = station_information_m.get_station_metrics().get_battery()
    battery_percent: float = (
        battery_metrics.get_values()[-1]
        if battery_metrics.get_values_count() > 0
        else 0.0
    )
    wrapped_packet_900.set_battery_level_percent(battery_percent)
    temp_metrics = station_information_m.get_station_metrics().get_temperature()
    device_temp: float = (
        temp_metrics.get_values()[-1] if temp_metrics.get_values_count() > 0 else 0.0
    )
    wrapped_packet_900.set_device_temperature_c(device_temp)

    server_info_m = wrapped_packet_m.get_station_information().get_service_urls()
    wrapped_packet_900.set_acquisition_server(server_info_m.get_acquisition_server())
    wrapped_packet_900.set_time_synchronization_server(server_info_m.get_synch_server())
    wrapped_packet_900.set_authentication_server(server_info_m.get_auth_server())

    timing_info_m = wrapped_packet_m.get_timing_information()
    wrapped_packet_900.set_app_file_start_timestamp_epoch_microseconds_utc(
        round(timing_info_m.get_packet_start_os_timestamp())
    )
    wrapped_packet_900.set_app_file_start_timestamp_machine(
        round(timing_info_m.get_packet_start_mach_timestamp())
    )
    wrapped_packet_900.set_server_timestamp_epoch_microseconds_utc(
        round(timing_info_m.get_server_acquisition_arrival_timestamp())
    )

    # Top-level metadata
    wrapped_packet_900.add_metadata(
        "machTimeZero", str(timing_info_m.get_app_start_mach_timestamp())
    )
    wrapped_packet_900.add_metadata(
        "bestLatency", str(timing_info_m.get_best_latency())
    )
    wrapped_packet_900.add_metadata("bestOffset", str(timing_info_m.get_best_offset()))
    wrapped_packet_900.add_metadata("migrated_from_api_1000", f"v{redvox.VERSION}")

    # Sensors
    if audio_m is not None:
        denorm_audio = list(
            map(_denormalize_audio_count, audio_m.get_samples().get_values())
        )
        mic_900 = reader_900.MicrophoneSensor()
        mic_900.set_sample_rate_hz(audio_m.get_sample_rate())
        mic_900.set_first_sample_timestamp_epoch_microseconds_utc(
            round(audio_m.get_first_sample_timestamp())
        )
        mic_900.set_sensor_name(audio_m.get_sensor_description())
        mic_900.set_metadata_as_dict(audio_m.get_metadata().get_metadata())
        mic_900.set_payload_values(denorm_audio)
        wrapped_packet_900.set_microphone_sensor(mic_900)

    pressure_m = sensors_m.get_pressure()
    if pressure_m is not None:
        barometer_900 = reader_900.BarometerSensor()
        barometer_900.set_sensor_name(pressure_m.get_sensor_description())
        barometer_900.set_metadata_as_dict(pressure_m.get_metadata().get_metadata())
        barometer_900.set_timestamps_microseconds_utc(
            pressure_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        barometer_900.set_payload_values(pressure_m.get_samples().get_values())
        wrapped_packet_900.set_barometer_sensor(barometer_900)

    location_m = sensors_m.get_location()
    if location_m is not None:
        location_900 = reader_900.LocationSensor()
        location_900.set_sensor_name(location_m.get_sensor_description())
        location_900.set_timestamps_microseconds_utc(
            location_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        location_900.set_payload_values(
            location_m.get_latitude_samples().get_values(),
            location_m.get_longitude_samples().get_values(),
            location_m.get_altitude_samples().get_values(),
            location_m.get_speed_samples().get_values(),
            location_m.get_horizontal_accuracy_samples().get_values(),
        )
        wrapped_packet_900.set_location_sensor(location_900)
        metadata = location_m.get_metadata().get_metadata()
        metadata["useLocation"] = (
            "T" if location_m.get_location_services_enabled() else "F"
        )
        metadata["desiredLocation"] = (
            "T" if location_m.get_location_services_requested() else "F"
        )
        metadata["permissionLocation"] = (
            "T" if location_m.get_location_permissions_granted() else "F"
        )
        metadata["enabledLocation"] = (
            "T"
            if LocationProvider.GPS in location_m.get_location_providers().get_values()
            else "FD"
        )
        location_900.set_metadata_as_dict(metadata)

    # Synch exchanges
    synch_exchanges_m = timing_info_m.get_synch_exchanges()
    if synch_exchanges_m.get_count() > 0:
        synch_900 = reader_900.TimeSynchronizationSensor()
        values: List[int] = []

        for exchange in synch_exchanges_m.get_values():
            values.extend(
                [
                    round(exchange.get_a1()),
                    round(exchange.get_a2()),
                    round(exchange.get_a3()),
                    round(exchange.get_b1()),
                    round(exchange.get_b2()),
                    round(exchange.get_b3()),
                ]
            )

        synch_900.set_payload_values(values)
        wrapped_packet_900.set_time_synchronization_sensor(synch_900)

    accel_m = sensors_m.get_accelerometer()
    if accel_m is not None:
        accel_900 = reader_900.AccelerometerSensor()
        accel_900.set_sensor_name(accel_m.get_sensor_description())
        accel_900.set_timestamps_microseconds_utc(
            accel_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        accel_900.set_metadata_as_dict(accel_m.get_metadata().get_metadata())
        accel_900.set_payload_values(
            accel_m.get_x_samples().get_values(),
            accel_m.get_y_samples().get_values(),
            accel_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_accelerometer_sensor(accel_900)

    magnetometer_m = sensors_m.get_magnetometer()
    if magnetometer_m is not None:
        magnetometer_900 = reader_900.MagnetometerSensor()
        magnetometer_900.set_sensor_name(magnetometer_m.get_sensor_description())
        magnetometer_900.set_timestamps_microseconds_utc(
            magnetometer_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        magnetometer_900.set_metadata_as_dict(
            magnetometer_m.get_metadata().get_metadata()
        )
        magnetometer_900.set_payload_values(
            magnetometer_m.get_x_samples().get_values(),
            magnetometer_m.get_y_samples().get_values(),
            magnetometer_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_magnetometer_sensor(magnetometer_900)

    gyroscope_m = sensors_m.get_gyroscope()
    if gyroscope_m is not None:
        gyroscope_900 = reader_900.GyroscopeSensor()
        gyroscope_900.set_sensor_name(gyroscope_m.get_sensor_description())
        gyroscope_900.set_timestamps_microseconds_utc(
            gyroscope_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        gyroscope_900.set_metadata_as_dict(gyroscope_m.get_metadata().get_metadata())
        gyroscope_900.set_payload_values(
            gyroscope_m.get_x_samples().get_values(),
            gyroscope_m.get_y_samples().get_values(),
            gyroscope_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_gyroscope_sensor(gyroscope_900)

    # Light
    light_m = sensors_m.get_light()
    if light_m is not None:
        light_900 = reader_900.LightSensor()
        light_900.set_sensor_name(light_m.get_sensor_description())
        light_900.set_metadata_as_dict(light_m.get_metadata().get_metadata())
        light_900.set_timestamps_microseconds_utc(
            light_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        light_900.set_payload_values(light_m.get_samples().get_values())
        wrapped_packet_900.set_light_sensor(light_900)

    # Image, skip for now

    # Infrared / proximity
    proximity_m = sensors_m.get_proximity()
    if proximity_m is not None:
        proximity_900 = reader_900.InfraredSensor()
        proximity_900.set_sensor_name(proximity_m.get_sensor_description())
        proximity_900.set_metadata_as_dict(proximity_m.get_metadata().get_metadata())
        proximity_900.set_timestamps_microseconds_utc(
            proximity_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        proximity_900.set_payload_values(proximity_m.get_samples().get_values())
        wrapped_packet_900.set_infrared_sensor(proximity_900)

    return wrapped_packet_900

Functions

def compute_stats_raw(has_stats: Union[src.redvox_api_m.redvox_api_m_pb2.TimingPayload, src.redvox_api_m.redvox_api_m_pb2.SamplePayload, src.redvox_api_m.redvox_api_m_pb2.DoubleSamplePayload])
Expand source code
def compute_stats_raw(
    has_stats: Union[
        api_m.RedvoxPacketM.TimingPayload,
        api_m.RedvoxPacketM.SamplePayload,
        api_m.RedvoxPacketM.DoubleSamplePayload,
    ]
):
    values: np.ndarray
    stats_container: api_m.RedvoxPacketM.SummaryStatistics
    if isinstance(has_stats, api_m.RedvoxPacketM.TimingPayload):
        values = np.array(has_stats.timestamps)
        stats_container = has_stats.timestamp_statistics
        mean_sr: float
        std_sr: float
        (mean_sr, std_sr) = common_m.sampling_rate_statistics(values)
        has_stats.mean_sample_rate = mean_sr
        has_stats.stdev_sample_rate = std_sr
    else:
        values = np.array(has_stats.values)
        stats_container = has_stats.value_statistics

    stats_container.count = len(values)
    # noinspection Mypy
    stats_container.min = values.min()
    # noinspection Mypy
    stats_container.max = values.max()
    # noinspection Mypy
    stats_container.mean = values.mean()
    # noinspection Mypy
    stats_container.standard_deviation = values.std()
def convert_api_1000_to_900(wrapped_packet_m: WrappedRedvoxPacketM) ‑> WrappedRedvoxPacket

Converts an API M wrapped packet into an API 900 wrapped packet.

:param wrapped_packet_m: Packet to convert. :return: An API 900 wrapped packet.

Expand source code
def convert_api_1000_to_900(
    wrapped_packet_m: WrappedRedvoxPacketM,
) -> reader_900.WrappedRedvoxPacket:
    """
    Converts an API M wrapped packet into an API 900 wrapped packet.

    :param wrapped_packet_m: Packet to convert.
    :return: An API 900 wrapped packet.
    """
    # TODO detect and warn about all the fields that are being dropped due to conversion!
    wrapped_packet_900: reader_900.WrappedRedvoxPacket = (
        reader_900.WrappedRedvoxPacket()
    )

    station_information_m = wrapped_packet_m.get_station_information()
    sensors_m = wrapped_packet_m.get_sensors()
    audio_m = sensors_m.get_audio()

    wrapped_packet_900.set_api(900)
    wrapped_packet_900.set_uuid(station_information_m.get_uuid())
    wrapped_packet_900.set_redvox_id(station_information_m.get_id())
    wrapped_packet_900.set_authenticated_email(station_information_m.get_auth_id())
    wrapped_packet_900.set_authentication_token(
        "n/a"
    )  # Different auth protocols are used, can't convert between
    wrapped_packet_900.set_firebase_token("n/a")  # No longer in API M packet
    wrapped_packet_900.set_is_backfilled(False)  # No longer useful metric in API M
    wrapped_packet_900.set_is_private(station_information_m.get_is_private())
    wrapped_packet_900.set_is_scrambled(False)
    wrapped_packet_900.set_device_make(station_information_m.get_make())
    wrapped_packet_900.set_device_model(station_information_m.get_model())
    wrapped_packet_900.set_device_os(
        _migrate_os_type_1000_to_900(station_information_m.get_os())
    )
    wrapped_packet_900.set_device_os_version(station_information_m.get_os_version())
    wrapped_packet_900.set_app_version(station_information_m.get_app_version())

    battery_metrics = station_information_m.get_station_metrics().get_battery()
    battery_percent: float = (
        battery_metrics.get_values()[-1]
        if battery_metrics.get_values_count() > 0
        else 0.0
    )
    wrapped_packet_900.set_battery_level_percent(battery_percent)
    temp_metrics = station_information_m.get_station_metrics().get_temperature()
    device_temp: float = (
        temp_metrics.get_values()[-1] if temp_metrics.get_values_count() > 0 else 0.0
    )
    wrapped_packet_900.set_device_temperature_c(device_temp)

    server_info_m = wrapped_packet_m.get_station_information().get_service_urls()
    wrapped_packet_900.set_acquisition_server(server_info_m.get_acquisition_server())
    wrapped_packet_900.set_time_synchronization_server(server_info_m.get_synch_server())
    wrapped_packet_900.set_authentication_server(server_info_m.get_auth_server())

    timing_info_m = wrapped_packet_m.get_timing_information()
    wrapped_packet_900.set_app_file_start_timestamp_epoch_microseconds_utc(
        round(timing_info_m.get_packet_start_os_timestamp())
    )
    wrapped_packet_900.set_app_file_start_timestamp_machine(
        round(timing_info_m.get_packet_start_mach_timestamp())
    )
    wrapped_packet_900.set_server_timestamp_epoch_microseconds_utc(
        round(timing_info_m.get_server_acquisition_arrival_timestamp())
    )

    # Top-level metadata
    wrapped_packet_900.add_metadata(
        "machTimeZero", str(timing_info_m.get_app_start_mach_timestamp())
    )
    wrapped_packet_900.add_metadata(
        "bestLatency", str(timing_info_m.get_best_latency())
    )
    wrapped_packet_900.add_metadata("bestOffset", str(timing_info_m.get_best_offset()))
    wrapped_packet_900.add_metadata("migrated_from_api_1000", f"v{redvox.VERSION}")

    # Sensors
    if audio_m is not None:
        denorm_audio = list(
            map(_denormalize_audio_count, audio_m.get_samples().get_values())
        )
        mic_900 = reader_900.MicrophoneSensor()
        mic_900.set_sample_rate_hz(audio_m.get_sample_rate())
        mic_900.set_first_sample_timestamp_epoch_microseconds_utc(
            round(audio_m.get_first_sample_timestamp())
        )
        mic_900.set_sensor_name(audio_m.get_sensor_description())
        mic_900.set_metadata_as_dict(audio_m.get_metadata().get_metadata())
        mic_900.set_payload_values(denorm_audio)
        wrapped_packet_900.set_microphone_sensor(mic_900)

    pressure_m = sensors_m.get_pressure()
    if pressure_m is not None:
        barometer_900 = reader_900.BarometerSensor()
        barometer_900.set_sensor_name(pressure_m.get_sensor_description())
        barometer_900.set_metadata_as_dict(pressure_m.get_metadata().get_metadata())
        barometer_900.set_timestamps_microseconds_utc(
            pressure_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        barometer_900.set_payload_values(pressure_m.get_samples().get_values())
        wrapped_packet_900.set_barometer_sensor(barometer_900)

    location_m = sensors_m.get_location()
    if location_m is not None:
        location_900 = reader_900.LocationSensor()
        location_900.set_sensor_name(location_m.get_sensor_description())
        location_900.set_timestamps_microseconds_utc(
            location_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        location_900.set_payload_values(
            location_m.get_latitude_samples().get_values(),
            location_m.get_longitude_samples().get_values(),
            location_m.get_altitude_samples().get_values(),
            location_m.get_speed_samples().get_values(),
            location_m.get_horizontal_accuracy_samples().get_values(),
        )
        wrapped_packet_900.set_location_sensor(location_900)
        metadata = location_m.get_metadata().get_metadata()
        metadata["useLocation"] = (
            "T" if location_m.get_location_services_enabled() else "F"
        )
        metadata["desiredLocation"] = (
            "T" if location_m.get_location_services_requested() else "F"
        )
        metadata["permissionLocation"] = (
            "T" if location_m.get_location_permissions_granted() else "F"
        )
        metadata["enabledLocation"] = (
            "T"
            if LocationProvider.GPS in location_m.get_location_providers().get_values()
            else "FD"
        )
        location_900.set_metadata_as_dict(metadata)

    # Synch exchanges
    synch_exchanges_m = timing_info_m.get_synch_exchanges()
    if synch_exchanges_m.get_count() > 0:
        synch_900 = reader_900.TimeSynchronizationSensor()
        values: List[int] = []

        for exchange in synch_exchanges_m.get_values():
            values.extend(
                [
                    round(exchange.get_a1()),
                    round(exchange.get_a2()),
                    round(exchange.get_a3()),
                    round(exchange.get_b1()),
                    round(exchange.get_b2()),
                    round(exchange.get_b3()),
                ]
            )

        synch_900.set_payload_values(values)
        wrapped_packet_900.set_time_synchronization_sensor(synch_900)

    accel_m = sensors_m.get_accelerometer()
    if accel_m is not None:
        accel_900 = reader_900.AccelerometerSensor()
        accel_900.set_sensor_name(accel_m.get_sensor_description())
        accel_900.set_timestamps_microseconds_utc(
            accel_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        accel_900.set_metadata_as_dict(accel_m.get_metadata().get_metadata())
        accel_900.set_payload_values(
            accel_m.get_x_samples().get_values(),
            accel_m.get_y_samples().get_values(),
            accel_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_accelerometer_sensor(accel_900)

    magnetometer_m = sensors_m.get_magnetometer()
    if magnetometer_m is not None:
        magnetometer_900 = reader_900.MagnetometerSensor()
        magnetometer_900.set_sensor_name(magnetometer_m.get_sensor_description())
        magnetometer_900.set_timestamps_microseconds_utc(
            magnetometer_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        magnetometer_900.set_metadata_as_dict(
            magnetometer_m.get_metadata().get_metadata()
        )
        magnetometer_900.set_payload_values(
            magnetometer_m.get_x_samples().get_values(),
            magnetometer_m.get_y_samples().get_values(),
            magnetometer_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_magnetometer_sensor(magnetometer_900)

    gyroscope_m = sensors_m.get_gyroscope()
    if gyroscope_m is not None:
        gyroscope_900 = reader_900.GyroscopeSensor()
        gyroscope_900.set_sensor_name(gyroscope_m.get_sensor_description())
        gyroscope_900.set_timestamps_microseconds_utc(
            gyroscope_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        gyroscope_900.set_metadata_as_dict(gyroscope_m.get_metadata().get_metadata())
        gyroscope_900.set_payload_values(
            gyroscope_m.get_x_samples().get_values(),
            gyroscope_m.get_y_samples().get_values(),
            gyroscope_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_gyroscope_sensor(gyroscope_900)

    # Light
    light_m = sensors_m.get_light()
    if light_m is not None:
        light_900 = reader_900.LightSensor()
        light_900.set_sensor_name(light_m.get_sensor_description())
        light_900.set_metadata_as_dict(light_m.get_metadata().get_metadata())
        light_900.set_timestamps_microseconds_utc(
            light_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        light_900.set_payload_values(light_m.get_samples().get_values())
        wrapped_packet_900.set_light_sensor(light_900)

    # Image, skip for now

    # Infrared / proximity
    proximity_m = sensors_m.get_proximity()
    if proximity_m is not None:
        proximity_900 = reader_900.InfraredSensor()
        proximity_900.set_sensor_name(proximity_m.get_sensor_description())
        proximity_900.set_metadata_as_dict(proximity_m.get_metadata().get_metadata())
        proximity_900.set_timestamps_microseconds_utc(
            proximity_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        proximity_900.set_payload_values(proximity_m.get_samples().get_values())
        wrapped_packet_900.set_infrared_sensor(proximity_900)

    return wrapped_packet_900
def convert_api_900_to_1000(wrapped_packet_900: WrappedRedvoxPacket) ‑> WrappedRedvoxPacketM

Converts a wrapped API 900 packet into a wrapped API M packet.

:param wrapped_packet_900: API 900 packet to convert. :return: A wrapped API M packet.

Expand source code
def convert_api_900_to_1000(
    wrapped_packet_900: reader_900.WrappedRedvoxPacket,
) -> WrappedRedvoxPacketM:
    """
    Converts a wrapped API 900 packet into a wrapped API M packet.

    :param wrapped_packet_900: API 900 packet to convert.
    :return: A wrapped API M packet.
    """
    wrapped_packet_m: WrappedRedvoxPacketM = WrappedRedvoxPacketM.new()

    # Top-level metadata
    wrapped_packet_m.set_api(1000.0)
    # noinspection PyUnresolvedReferences,Mypy
    wrapped_packet_m.set_sub_api(900.0)

    # Station information
    station_information: StationInformation = wrapped_packet_m.get_station_information()
    station_information.set_id(wrapped_packet_900.redvox_id()).set_uuid(
        wrapped_packet_900.uuid()
    ).set_make(wrapped_packet_900.device_make()).set_model(
        wrapped_packet_900.device_model()
    ).set_os(
        _migrate_os_type_900_to_1000(wrapped_packet_900.device_os())
    ).set_os_version(
        wrapped_packet_900.device_os_version()
    ).set_app_version(
        wrapped_packet_900.app_version()
    ).set_is_private(
        wrapped_packet_900.is_private()
    )

    station_information.get_service_urls().set_acquisition_server(
        wrapped_packet_900.acquisition_server()
    ).set_synch_server(
        wrapped_packet_900.time_synchronization_server()
    ).set_auth_server(
        wrapped_packet_900.authentication_server()
    )

    # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings

    # StationMetrics - We know a couple. We take a slightly more cumbersome approach using the raw protobuf
    # to avoid some conversions between lists and np arrays.
    station_metrics: StationMetrics = station_information.get_station_metrics()
    station_metrics.get_timestamps().append_timestamp(
        wrapped_packet_900.app_file_start_timestamp_machine()
    )
    station_metrics.get_temperature().set_values(
        np.array([wrapped_packet_900.device_temperature_c()]), True
    )
    station_metrics.get_battery().set_values(
        np.array([wrapped_packet_900.battery_level_percent()]), True
    )

    # Timing information
    mach_time_900: int = wrapped_packet_900.app_file_start_timestamp_machine()
    os_time_900: int = (
        wrapped_packet_900.app_file_start_timestamp_epoch_microseconds_utc()
    )
    len_micros: int = _packet_length_microseconds_900(wrapped_packet_900)
    best_latency: Optional[float] = wrapped_packet_900.best_latency()
    best_latency = best_latency if best_latency is not None else NAN
    best_offset: Optional[float] = wrapped_packet_900.best_offset()
    best_offset = best_offset if best_offset is not None else NAN

    wrapped_packet_m.get_timing_information().set_unit(
        common_m.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    ).set_packet_start_mach_timestamp(mach_time_900).set_packet_start_os_timestamp(
        os_time_900
    ).set_packet_end_mach_timestamp(
        mach_time_900 + len_micros
    ).set_packet_end_os_timestamp(
        os_time_900 + len_micros
    ).set_server_acquisition_arrival_timestamp(
        wrapped_packet_900.server_timestamp_epoch_microseconds_utc()
    ).set_app_start_mach_timestamp(
        _find_mach_time_zero(wrapped_packet_900)
    ).set_best_latency(
        best_latency
    ).set_best_offset(
        best_offset
    )

    time_sensor = wrapped_packet_900.time_synchronization_sensor()
    if time_sensor is not None:
        wrapped_packet_m.get_timing_information().get_synch_exchanges().set_values(
            _migrate_synch_exchanges_900_to_1000(time_sensor.payload_values())
        )

    # Sensors
    sensors_m: Sensors = wrapped_packet_m.get_sensors()
    # Microphone / Audio
    mic_sensor_900: Optional[
        reader_900.MicrophoneSensor
    ] = wrapped_packet_900.microphone_sensor()
    if mic_sensor_900 is not None:
        normalized_audio: np.ndarray = (
            mic_sensor_900.payload_values() / _NORMALIZATION_CONSTANT
        )
        audio_sensor_m = sensors_m.new_audio()
        audio_sensor_m.set_first_sample_timestamp(
            mic_sensor_900.first_sample_timestamp_epoch_microseconds_utc()
        ).set_is_scrambled(wrapped_packet_900.is_scrambled()).set_sample_rate(
            mic_sensor_900.sample_rate_hz()
        ).set_sensor_description(
            mic_sensor_900.sensor_name()
        ).get_samples().set_values(
            normalized_audio, update_value_statistics=True
        )
        audio_sensor_m.get_metadata().set_metadata(mic_sensor_900.metadata_as_dict())

    # Barometer
    barometer_sensor_900: Optional[
        reader_900.BarometerSensor
    ] = wrapped_packet_900.barometer_sensor()
    if barometer_sensor_900 is not None:
        pressure_sensor_m = sensors_m.new_pressure()
        pressure_sensor_m.set_sensor_description(barometer_sensor_900.sensor_name())
        pressure_sensor_m.get_timestamps().set_timestamps(
            barometer_sensor_900.timestamps_microseconds_utc(), True
        )
        pressure_sensor_m.get_samples().set_values(
            barometer_sensor_900.payload_values(), True
        )
        pressure_sensor_m.get_metadata().set_metadata(
            barometer_sensor_900.metadata_as_dict()
        )

    # Location
    # TODO: rework
    location_sensor_900: Optional[
        reader_900.LocationSensor
    ] = wrapped_packet_900.location_sensor()
    if location_sensor_900 is not None:
        location_m = sensors_m.new_location()
        location_m.set_sensor_description(location_sensor_900.sensor_name())
        location_m.get_timestamps().set_timestamps(
            location_sensor_900.timestamps_microseconds_utc(), True
        )
        if location_sensor_900.check_for_preset_lat_lon():
            lat_lon: np.ndarray = location_sensor_900.get_payload_lat_lon()
            location_m.get_latitude_samples().set_values(lat_lon[:1], True)
            location_m.get_longitude_samples().set_values(lat_lon[1:], True)
        else:
            location_m.get_latitude_samples().set_values(
                location_sensor_900.payload_values_latitude(), True
            )
            location_m.get_longitude_samples().set_values(
                location_sensor_900.payload_values_longitude(), True
            )
            location_m.get_altitude_samples().set_values(
                location_sensor_900.payload_values_altitude(), True
            )
            location_m.get_speed_samples().set_values(
                location_sensor_900.payload_values_speed(), True
            )
            location_m.get_horizontal_accuracy_samples().set_values(
                location_sensor_900.payload_values_accuracy(), True
            )

        def _extract_meta_bool(metad: Dict[str, str], key: str) -> bool:
            if key not in metad:
                return False

            return metad[key] == "T"

        loc_meta_900 = location_sensor_900.metadata_as_dict()
        use_location = _extract_meta_bool(loc_meta_900, "useLocation")
        desired_location = _extract_meta_bool(loc_meta_900, "desiredLocation")
        permission_location = _extract_meta_bool(loc_meta_900, "permissionLocation")
        enabled_location = _extract_meta_bool(loc_meta_900, "enabledLocation")

        n_p = location_m.get_timestamps().get_timestamps_count()

        if desired_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.USER for i in range(n_p)]
            )
        elif enabled_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.GPS for i in range(n_p)]
            )
        elif use_location and desired_location and permission_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.NETWORK for i in range(n_p)]
            )
        else:
            location_m.get_location_providers().set_values(
                [LocationProvider.NONE for i in range(n_p)]
            )

        location_m.set_location_permissions_granted(permission_location)
        location_m.set_location_services_enabled(use_location)
        location_m.set_location_services_requested(desired_location)

        # Once we're done here, we should remove the original metadata
        if "useLocation" in loc_meta_900:
            del loc_meta_900["useLocation"]
        if "desiredLocation" in loc_meta_900:
            del loc_meta_900["desiredLocation"]
        if "permissionLocation" in loc_meta_900:
            del loc_meta_900["permissionLocation"]
        if "enabledLocation" in loc_meta_900:
            del loc_meta_900["enabledLocation"]
        if "machTimeZero" in loc_meta_900:
            del loc_meta_900["machTimeZero"]
        location_m.get_metadata().set_metadata(loc_meta_900)

    # Time Synchronization
    # This was already added to the timing information

    # Accelerometer
    accelerometer_900 = wrapped_packet_900.accelerometer_sensor()
    if accelerometer_900 is not None:
        accelerometer_m = sensors_m.new_accelerometer()
        accelerometer_m.set_sensor_description(accelerometer_900.sensor_name())
        accelerometer_m.get_timestamps().set_timestamps(
            accelerometer_900.timestamps_microseconds_utc(), True
        )
        accelerometer_m.get_x_samples().set_values(
            accelerometer_900.payload_values_x(), True
        )
        accelerometer_m.get_y_samples().set_values(
            accelerometer_900.payload_values_y(), True
        )
        accelerometer_m.get_z_samples().set_values(
            accelerometer_900.payload_values_z(), True
        )
        accelerometer_m.get_metadata().set_metadata(
            accelerometer_900.metadata_as_dict()
        )

    # Magnetometer
    magnetometer_900 = wrapped_packet_900.magnetometer_sensor()
    if magnetometer_900 is not None:
        magnetometer_m = sensors_m.new_magnetometer()
        magnetometer_m.set_sensor_description(magnetometer_900.sensor_name())
        magnetometer_m.get_timestamps().set_timestamps(
            magnetometer_900.timestamps_microseconds_utc(), True
        )
        magnetometer_m.get_x_samples().set_values(
            magnetometer_900.payload_values_x(), True
        )
        magnetometer_m.get_y_samples().set_values(
            magnetometer_900.payload_values_y(), True
        )
        magnetometer_m.get_z_samples().set_values(
            magnetometer_900.payload_values_z(), True
        )
        magnetometer_m.get_metadata().set_metadata(magnetometer_900.metadata_as_dict())

    # Gyroscope
    gyroscope_900 = wrapped_packet_900.gyroscope_sensor()
    if gyroscope_900 is not None:
        gyroscope_m = sensors_m.new_gyroscope()
        gyroscope_m.set_sensor_description(gyroscope_900.sensor_name())
        gyroscope_m.get_timestamps().set_timestamps(
            gyroscope_900.timestamps_microseconds_utc(), True
        )
        gyroscope_m.get_x_samples().set_values(gyroscope_900.payload_values_x(), True)
        gyroscope_m.get_y_samples().set_values(gyroscope_900.payload_values_y(), True)
        gyroscope_m.get_z_samples().set_values(gyroscope_900.payload_values_z(), True)
        gyroscope_m.get_metadata().set_metadata(gyroscope_900.metadata_as_dict())

    # Light
    light_900 = wrapped_packet_900.light_sensor()
    if light_900 is not None:
        light_m = sensors_m.new_light()
        light_m.set_sensor_description(light_900.sensor_name())
        light_m.get_timestamps().set_timestamps(
            light_900.timestamps_microseconds_utc(), True
        )
        light_m.get_samples().set_values(light_900.payload_values(), True)
        light_m.get_metadata().set_metadata(light_900.metadata_as_dict())

    # Image
    # TODO: Implement

    # Proximity
    proximity_900 = wrapped_packet_900.infrared_sensor()
    if proximity_900 is not None:
        proximity_m = sensors_m.new_proximity()
        proximity_m.set_sensor_description(proximity_900.sensor_name())
        proximity_m.get_timestamps().set_timestamps(
            proximity_900.timestamps_microseconds_utc(), True
        )
        proximity_m.get_samples().set_values(proximity_900.payload_values(), True)
        proximity_m.get_metadata().set_metadata(proximity_900.metadata_as_dict())

    # Removed any other API 900 top-level metadata now that its been used
    meta = wrapped_packet_900.metadata_as_dict()
    if "machTimeZero" in meta:
        del meta["machTimeZero"]
    if "bestOffset" in meta:
        del meta["bestOffset"]
    if "bestLatency" in meta:
        del meta["bestLatency"]
    wrapped_packet_m.get_metadata().append_metadata(
        "migrated_from_api_900", f"v{redvox.VERSION}"
    )

    return wrapped_packet_m
def convert_api_900_to_1000_raw(packet: api900_pb2.RedvoxPacket) ‑> src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM

Converts a wrapped API 900 packet into a wrapped API M packet.

:param packet: API 900 packet to convert. :return: A wrapped API M packet.

Expand source code
def convert_api_900_to_1000_raw(packet: api_900.RedvoxPacket) -> api_m.RedvoxPacketM:
    """
    Converts a wrapped API 900 packet into a wrapped API M packet.

    :param packet: API 900 packet to convert.
    :return: A wrapped API M packet.
    """
    packet_m: api_m.RedvoxPacketM = api_m.RedvoxPacketM()

    # Top-level metadata
    packet_m.api = 1000.0
    # noinspection PyUnresolvedReferences,Mypy
    packet_m.sub_api = 900.0

    # Station information
    packet_m.station_information.id = packet.redvox_id
    packet_m.station_information.uuid = packet.uuid
    packet_m.station_information.make = packet.device_make
    packet_m.station_information.model = packet.device_model
    packet_m.station_information.os = _migrate_os_type_900_to_1000_raw(packet.device_os)
    packet_m.station_information.os_version = packet.device_os_version
    packet_m.station_information.app_version = packet.app_version
    packet_m.station_information.is_private = packet.is_private

    packet_m.station_information.app_settings.samples_per_window = \
        len(packet.evenly_sampled_channels[0].int32_payload.payload)

    packet_m.station_information.service_urls.acquisition_server = (
        packet.acquisition_server
    )
    packet_m.station_information.service_urls.synch_server = (
        packet.time_synchronization_server
    )
    packet_m.station_information.service_urls.auth_server = packet.authentication_server

    # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings

    # StationMetrics - We know a couple.
    packet_m.station_information.station_metrics.timestamps.unit = (
        api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    )
    packet_m.station_information.station_metrics.timestamps.timestamps[:] = [
        packet.app_file_start_timestamp_machine
    ]
    packet_m.station_information.station_metrics.temperature.unit = (
        api_m.RedvoxPacketM.Unit.DEGREES_CELSIUS
    )
    packet_m.station_information.station_metrics.temperature.values[:] = [
        packet.device_temperature_c
    ]
    packet_m.station_information.station_metrics.battery.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.battery.values[:] = [
        packet.battery_level_percent
    ]

    # And we can fill in defaults for those we don't know
    packet_m.station_information.station_metrics.available_disk.unit = (
        api_m.RedvoxPacketM.Unit.BYTE
    )
    packet_m.station_information.station_metrics.available_disk.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.available_ram.unit = (
        api_m.RedvoxPacketM.Unit.BYTE
    )
    packet_m.station_information.station_metrics.available_ram.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.cpu_utilization.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.cpu_utilization.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.network_strength.unit = (
        api_m.RedvoxPacketM.Unit.DECIBEL
    )
    packet_m.station_information.station_metrics.network_strength.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.battery_current.unit = (
        api_m.RedvoxPacketM.Unit.MICROAMPERES
    )
    packet_m.station_information.station_metrics.battery_current.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.screen_brightness.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.screen_brightness.values[:] = [
        float("nan")
    ]

    packet_m.station_information.station_metrics.network_type[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.NetworkType.UNKNOWN_NETWORK
    ]
    packet_m.station_information.station_metrics.cell_service_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.CellServiceState.UNKNOWN
    ]
    packet_m.station_information.station_metrics.power_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.PowerState.UNKNOWN_POWER_STATE
    ]
    packet_m.station_information.station_metrics.wifi_wake_lock[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.WifiWakeLock.OTHER
    ]
    packet_m.station_information.station_metrics.screen_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.ScreenState.UNKNOWN_SCREEN_STATE
    ]

    compute_stats_raw(packet_m.station_information.station_metrics.timestamps)
    compute_stats_raw(packet_m.station_information.station_metrics.temperature)
    compute_stats_raw(packet_m.station_information.station_metrics.battery)
    compute_stats_raw(packet_m.station_information.station_metrics.available_disk)
    compute_stats_raw(packet_m.station_information.station_metrics.available_ram)
    compute_stats_raw(packet_m.station_information.station_metrics.cpu_utilization)
    compute_stats_raw(packet_m.station_information.station_metrics.network_strength)
    compute_stats_raw(packet_m.station_information.station_metrics.battery_current)
    compute_stats_raw(packet_m.station_information.station_metrics.screen_brightness)

    # Timing information
    mach_time_900: int = packet.app_file_start_timestamp_machine
    os_time_900: int = packet.app_file_start_timestamp_epoch_microseconds_utc
    len_micros: int = _packet_length_microseconds_900_raw(packet)
    best_latency: float = reader_utils.get_metadata_or_default(
        list(packet.metadata), "bestLatency", float, NAN
    )
    best_offset: float = reader_utils.get_metadata_or_default(
        list(packet.metadata), "bestOffset", float, NAN
    )

    packet_m.timing_information.unit = (
        api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    )
    packet_m.timing_information.packet_start_mach_timestamp = mach_time_900
    packet_m.timing_information.packet_start_os_timestamp = os_time_900
    packet_m.timing_information.packet_end_mach_timestamp = mach_time_900 + len_micros
    packet_m.timing_information.server_acquisition_arrival_timestamp = (
        packet.server_timestamp_epoch_microseconds_utc
    )
    packet_m.timing_information.packet_end_os_timestamp = os_time_900 + len_micros
    packet_m.timing_information.app_start_mach_timestamp = _find_mach_time_zero_raw(
        packet
    )
    packet_m.timing_information.best_latency = best_latency
    packet_m.timing_information.best_offset = best_offset

    synch_sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet, {api_900.ChannelType.TIME_SYNCHRONIZATION}
    )
    if synch_sensor is not None:
        synch_payload: np.ndarray = reader_utils.extract_payload(synch_sensor)
        packet_m.timing_information.synch_exchanges.extend(
            _migrate_synch_exchanges_900_to_1000_raw(synch_payload)
        )

    # Sensors
    # Microphone / Audio
    if len(packet.evenly_sampled_channels) < 1:
        raise ValueError("Cannot convert API900 to API1000; Audio sensor missing.")
    audio_900: api_900.EvenlySampledChannel = packet.evenly_sampled_channels[0]
    packet_m.sensors.audio.sensor_description = audio_900.sensor_name
    packet_m.sensors.audio.sample_rate = audio_900.sample_rate_hz
    packet_m.sensors.audio.first_sample_timestamp = (
        audio_900.first_sample_timestamp_epoch_microseconds_utc
    )
    packet_m.sensors.audio.bits_of_precision = 16.0
    packet_m.sensors.audio.encoding = "counts"
    normalized_audio: np.ndarray = (
        reader_utils.extract_payload(audio_900) / _NORMALIZATION_CONSTANT
    )
    packet_m.sensors.audio.samples.values[:] = list(normalized_audio)
    packet_m.sensors.audio.samples.unit = api_m.RedvoxPacketM.Unit.NORMALIZED_COUNTS
    for i in range(0, len(audio_900.metadata), 2):
        v: str = audio_900.metadata[i + 1] if (i + 1) < len(audio_900.metadata) else ""
        packet_m.sensors.audio.metadata[audio_900.metadata[i]] = v
    compute_stats_raw(packet_m.sensors.audio.samples)

    # Pressure
    barometer_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.BAROMETER})
    if barometer_900 is not None:
        packet_m.sensors.pressure.sensor_description = barometer_900.sensor_name
        packet_m.sensors.pressure.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.pressure.timestamps.timestamps[
            :
        ] = barometer_900.timestamps_microseconds_utc
        packet_m.sensors.pressure.samples.values[:] = list(
            reader_utils.extract_payload(barometer_900)
        )
        packet_m.sensors.pressure.samples.unit = api_m.RedvoxPacketM.Unit.KILOPASCAL
        for i in range(0, len(barometer_900.metadata), 2):
            v = (
                barometer_900.metadata[i + 1]
                if (i + 1) < len(barometer_900.metadata)
                else ""
            )
            packet_m.sensors.pressure.metadata[barometer_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.pressure.timestamps)
        compute_stats_raw(packet_m.sensors.pressure.samples)

    # Location
    loc_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.LATITUDE,
            api_900.ChannelType.LONGITUDE,
            api_900.ChannelType.ALTITUDE,
            api_900.ChannelType.SPEED,
            api_900.ChannelType.ACCURACY,
        },
    )
    if loc_900 is not None:
        total_samples: int = len(loc_900.timestamps_microseconds_utc)
        loc_payload: List[float] = list(reader_utils.extract_payload(loc_900))
        packet_m.sensors.location.sensor_description = loc_900.sensor_name
        packet_m.sensors.location.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.location.timestamps.timestamps[
            :
        ] = loc_900.timestamps_microseconds_utc
        packet_m.sensors.location.timestamps_gps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.location.timestamps_gps.timestamps[:] = [
            float("nan")
        ] * total_samples

        total_channels: int = len(loc_900.channel_types)

        lat_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.LATITUDE
        )
        packet_m.sensors.location.latitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        if lat_idx is not None:
            packet_m.sensors.location.latitude_samples.values[:] = loc_payload[
                lat_idx::total_channels
            ]
        else:
            packet_m.sensors.location.latitude_samples.values[:] = [
                float("nan") * total_samples
            ]

        lng_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.LONGITUDE
        )
        packet_m.sensors.location.longitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        if lng_idx is not None:
            packet_m.sensors.location.longitude_samples.values[:] = loc_payload[
                lng_idx::total_channels
            ]
        else:
            packet_m.sensors.location.longitude_samples.values[:] = [
                float("nan") * total_samples
            ]

        alt_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.ALTITUDE
        )
        packet_m.sensors.location.altitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        if alt_idx is not None:
            packet_m.sensors.location.altitude_samples.values[:] = loc_payload[
                alt_idx::total_channels
            ]
        else:
            packet_m.sensors.location.altitude_samples.values[:] = [
                float("nan") * total_samples
            ]

        speed_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.SPEED
        )
        packet_m.sensors.location.speed_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND
        )
        if speed_idx is not None:
            packet_m.sensors.location.speed_samples.values[:] = loc_payload[
                speed_idx::total_channels
            ]
        else:
            packet_m.sensors.location.speed_samples.values[:] = [
                float("nan") * total_samples
            ]

        acc_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.ACCURACY
        )
        packet_m.sensors.location.horizontal_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        if acc_idx is not None:
            packet_m.sensors.location.horizontal_accuracy_samples.values[
                :
            ] = loc_payload[acc_idx::total_channels]
        else:
            packet_m.sensors.location.horizontal_accuracy_samples.values[:] = [
                float("nan") * total_samples
            ]

        packet_m.sensors.location.bearing_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        packet_m.sensors.location.bearing_samples.values[:] = [
            float("nan") * total_samples
        ]

        packet_m.sensors.location.vertical_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        packet_m.sensors.location.vertical_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.speed_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND
        )
        packet_m.sensors.location.speed_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.bearing_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        packet_m.sensors.location.bearing_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]

        # Compute stats
        compute_stats_raw(packet_m.sensors.location.timestamps)
        compute_stats_raw(packet_m.sensors.location.timestamps_gps)
        compute_stats_raw(packet_m.sensors.location.latitude_samples)
        compute_stats_raw(packet_m.sensors.location.longitude_samples)
        compute_stats_raw(packet_m.sensors.location.altitude_samples)
        compute_stats_raw(packet_m.sensors.location.speed_samples)
        compute_stats_raw(packet_m.sensors.location.bearing_samples)
        compute_stats_raw(packet_m.sensors.location.horizontal_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.vertical_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.speed_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.bearing_accuracy_samples)

        # Bookkeeping
        use_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "useLocation", lambda val: v == "T", False
        )
        desired_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "desiredLocation", lambda val: v == "T", False
        )
        permission_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "permissionLocation", lambda val: v == "T", False
        )
        enabled_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "enabledLocation", lambda val: v == "T", False
        )

        if desired_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.USER
            ] * total_samples
        elif enabled_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.GPS
            ] * total_samples
        elif use_location and desired_location and permission_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NETWORK
            ] * total_samples
        else:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NONE
            ] * total_samples

        packet_m.sensors.location.location_permissions_granted = permission_location
        packet_m.sensors.location.location_services_enabled = use_location
        packet_m.sensors.location.location_services_requested = desired_location

        for (i, k) in enumerate(loc_900.metadata):
            if i + 1 < len(loc_900.metadata):
                packet_m.sensors.location.metadata[k] = loc_900.metadata[i + 1]
            else:
                packet_m.sensors.location.metadata[k] = ""

    # # Time Synchronization
    # # This was already added to the timing information

    # Accelerometer
    accel_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.ACCELEROMETER_X,
            api_900.ChannelType.ACCELEROMETER_Y,
            api_900.ChannelType.ACCELEROMETER_Z,
        },
    )
    if accel_900 is not None:
        packet_m.sensors.accelerometer.sensor_description = accel_900.sensor_name
        packet_m.sensors.accelerometer.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.accelerometer.timestamps.timestamps[
            :
        ] = accel_900.timestamps_microseconds_utc
        accel_payload: List[float] = list(reader_utils.extract_payload(accel_900))
        packet_m.sensors.accelerometer.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.x_samples.values[:] = accel_payload[0::3]
        packet_m.sensors.accelerometer.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.y_samples.values[:] = accel_payload[1::3]
        packet_m.sensors.accelerometer.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.z_samples.values[:] = accel_payload[2::3]
        compute_stats_raw(packet_m.sensors.accelerometer.timestamps)
        compute_stats_raw(packet_m.sensors.accelerometer.x_samples)
        compute_stats_raw(packet_m.sensors.accelerometer.y_samples)
        compute_stats_raw(packet_m.sensors.accelerometer.z_samples)

    # Magnetometer
    sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.MAGNETOMETER_X,
            api_900.ChannelType.MAGNETOMETER_Y,
            api_900.ChannelType.MAGNETOMETER_Z,
        },
    )
    if sensor is not None:
        packet_m.sensors.magnetometer.sensor_description = sensor.sensor_name
        packet_m.sensors.magnetometer.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.magnetometer.timestamps.timestamps[
            :
        ] = sensor.timestamps_microseconds_utc
        sensor_payload: List[float] = list(reader_utils.extract_payload(sensor))
        packet_m.sensors.magnetometer.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.x_samples.values[:] = sensor_payload[0::3]
        packet_m.sensors.magnetometer.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.y_samples.values[:] = sensor_payload[1::3]
        packet_m.sensors.magnetometer.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.z_samples.values[:] = sensor_payload[2::3]
        compute_stats_raw(packet_m.sensors.magnetometer.timestamps)
        compute_stats_raw(packet_m.sensors.magnetometer.x_samples)
        compute_stats_raw(packet_m.sensors.magnetometer.y_samples)
        compute_stats_raw(packet_m.sensors.magnetometer.z_samples)
    #
    # Gyroscope
    sensor = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.GYROSCOPE_X,
            api_900.ChannelType.GYROSCOPE_Y,
            api_900.ChannelType.GYROSCOPE_Z,
        },
    )
    if sensor is not None:
        packet_m.sensors.gyroscope.sensor_description = sensor.sensor_name
        packet_m.sensors.gyroscope.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.gyroscope.timestamps.timestamps[
            :
        ] = sensor.timestamps_microseconds_utc
        sensor_payload = list(reader_utils.extract_payload(sensor))
        packet_m.sensors.gyroscope.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.x_samples.values[:] = sensor_payload[0::3]
        packet_m.sensors.gyroscope.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.y_samples.values[:] = sensor_payload[1::3]
        packet_m.sensors.gyroscope.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.z_samples.values[:] = sensor_payload[2::3]
        compute_stats_raw(packet_m.sensors.gyroscope.timestamps)
        compute_stats_raw(packet_m.sensors.gyroscope.x_samples)
        compute_stats_raw(packet_m.sensors.gyroscope.y_samples)
        compute_stats_raw(packet_m.sensors.gyroscope.z_samples)

    #
    # # Light
    light_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.LIGHT})
    if light_900 is not None:
        packet_m.sensors.light.sensor_description = light_900.sensor_name
        packet_m.sensors.light.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.light.timestamps.timestamps[
            :
        ] = light_900.timestamps_microseconds_utc
        packet_m.sensors.light.samples.values[:] = list(
            reader_utils.extract_payload(light_900)
        )
        packet_m.sensors.light.samples.unit = api_m.RedvoxPacketM.Unit.LUX
        for i in range(0, len(light_900.metadata), 2):
            v = light_900.metadata[i + 1] if (i + 1) < len(light_900.metadata) else ""
            packet_m.sensors.light.metadata[light_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.light.timestamps)
        compute_stats_raw(packet_m.sensors.light.samples)

    # # Image
    # Not implemented for conversion. Only a very small fraction of API 900 was ever image capable, and not the public
    # app.

    # # Proximity
    proximity_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.INFRARED})
    if proximity_900 is not None:
        packet_m.sensors.proximity.sensor_description = proximity_900.sensor_name
        packet_m.sensors.proximity.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.proximity.timestamps.timestamps[
            :
        ] = proximity_900.timestamps_microseconds_utc
        packet_m.sensors.proximity.samples.values[:] = list(
            reader_utils.extract_payload(proximity_900)
        )
        packet_m.sensors.proximity.samples.unit = api_m.RedvoxPacketM.Unit.CENTIMETERS
        for i in range(0, len(proximity_900.metadata), 2):
            v = (
                proximity_900.metadata[i + 1]
                if (i + 1) < len(proximity_900.metadata)
                else ""
            )
            packet_m.sensors.proximity.metadata[proximity_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.proximity.timestamps)
        compute_stats_raw(packet_m.sensors.proximity.samples)

    return packet_m