Module redvox.common.api_conversions
Provides functionality for converting between API versions.
Expand source code
"""
Provides functionality for converting between API versions.
"""
# TODO: Document
# TODO: Test
# TODO: Implement (de)normalization now that dynamic range in known in API M (how does this fork API 900 to API M?)
# TODO: Update any method that sets an enum to accent Union[Enum, int] to fix mypy type errors involving that
# TODO: Breakup conversion into smaller, more testable functions
# TODO: Rework location sensor conversions
# TODO: Add functions for converting compressed audio... in fact, this might make the most sent from converting API 900
# TODO:   into API M data since FLAC requires integers
from typing import List, Optional, Dict, Union
import numpy as np
import redvox.api1000.common.common as common_m
import redvox.api1000.proto.redvox_api_m_pb2 as api_m
import redvox.common.date_time_utils as dt_utls
import redvox.api900.lib.api900_pb2 as api_900
import redvox.api900.reader as reader_900
from redvox.api1000.wrapped_redvox_packet.sensors.sensors import Sensors
from redvox.api1000.wrapped_redvox_packet.sensors.location import LocationProvider
from redvox.api1000.wrapped_redvox_packet.station_information import (
    OsType,
    StationInformation,
    StationMetrics,
    AudioSamplingRate,
)
from redvox.api1000.wrapped_redvox_packet.timing_information import SynchExchange
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
import redvox
import redvox.api900.reader_utils as reader_utils
_NORMALIZATION_CONSTANT: int = 0x7FFFFF
NAN: float = float("nan")
def _normalize_audio_count(count: int, normalize_by: Optional[float] = None) -> float:
    norm: float = normalize_by if normalize_by is not None else _NORMALIZATION_CONSTANT
    return float(count) / float(norm)
def _denormalize_audio_count(norm: float) -> int:
    return int(round(norm * float(_NORMALIZATION_CONSTANT)))
def _migrate_synch_exchanges_900_to_1000_raw(
    synch_exchanges: np.ndarray,
) -> List[api_m.RedvoxPacketM.TimingInformation.SynchExchange]:
    exchanges: List[api_m.RedvoxPacketM.TimingInformation.SynchExchange] = []
    for i in range(0, len(synch_exchanges), 6):
        exchange: api_m.RedvoxPacketM.TimingInformation.SynchExchange = (
            api_m.RedvoxPacketM.TimingInformation.SynchExchange()
        )
        exchange.a1 = float(synch_exchanges[i])
        exchange.a2 = float(synch_exchanges[i + 1])
        exchange.a3 = float(synch_exchanges[i + 2])
        exchange.b1 = float(synch_exchanges[i + 3])
        exchange.b2 = float(synch_exchanges[i + 4])
        exchange.b3 = float(synch_exchanges[i + 5])
        exchange.unit = api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        exchanges.append(exchange)
    return exchanges
def _migrate_synch_exchanges_900_to_1000(
    synch_exchanges: np.ndarray,
) -> List[SynchExchange]:
    exchanges: List[SynchExchange] = []
    for i in range(0, len(synch_exchanges), 6):
        exchange: SynchExchange = SynchExchange.new()
        exchange.set_a1(float(synch_exchanges[i]))
        exchange.set_a2(float(synch_exchanges[i + 1]))
        exchange.set_a3(float(synch_exchanges[i + 2]))
        exchange.set_b1(float(synch_exchanges[i + 3]))
        exchange.set_b2(float(synch_exchanges[i + 4]))
        exchange.set_b3(float(synch_exchanges[i + 5]))
        exchanges.append(exchange)
    return exchanges
def _find_mach_time_zero_raw(packet: api_900.RedvoxPacket) -> int:
    """
    find the mach time zero in api 900 packets
    :param packet: api 900 redvox packet to read
    :return: mach time zero of the api 900 packet or -1 if it doesn't exist
    """
    key: str = "machTimeZero"
    mtz: Optional[int] = reader_utils.get_metadata_raw(list(packet.metadata), key, int)
    if mtz is not None:
        return mtz
    location_sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.LATITUDE,
            api_900.ChannelType.LONGITUDE,
            api_900.ChannelType.ALTITUDE,
            api_900.ChannelType.SPEED,
        },
    )
    if location_sensor is not None:
        mtz = reader_utils.get_metadata_raw(
            list(location_sensor.metadata), key, int
        )
        if mtz is not None:
            return mtz
    return -1
def _find_mach_time_zero(packet: reader_900.WrappedRedvoxPacket) -> int:
    if "machTimeZero" in packet.metadata_as_dict():
        return int(packet.metadata_as_dict()["machTimeZero"])
    location_sensor: Optional[reader_900.LocationSensor] = packet.location_sensor()
    if location_sensor is not None:
        if "machTimeZero" in location_sensor.metadata_as_dict():
            return int(location_sensor.metadata_as_dict()["machTimeZero"])
    return -1
def _packet_length_microseconds_900(packet: reader_900.WrappedRedvoxPacket) -> int:
    microphone_sensor: Optional[
        reader_900.MicrophoneSensor
    ] = packet.microphone_sensor()
    if microphone_sensor is not None:
        sample_rate_hz: float = microphone_sensor.sample_rate_hz()
        total_samples: int = len(microphone_sensor.payload_values())
        length_seconds: float = float(total_samples) / sample_rate_hz
        return round(dt_utls.seconds_to_microseconds(length_seconds))
    return 0
def _packet_length_microseconds_900_raw(packet: api_900.RedvoxPacket) -> int:
    if len(packet.evenly_sampled_channels) == 0:
        return 0
    microphone_sensor: api_900.EvenlySampledChannel = packet.evenly_sampled_channels[0]
    sample_rate_hz: float = microphone_sensor.sample_rate_hz
    total_samples: int = reader_utils.payload_len(microphone_sensor)
    length_seconds: float = float(total_samples) / sample_rate_hz
    return round(dt_utls.seconds_to_microseconds(length_seconds))
# noinspection PyTypeChecker
# pylint: disable=C0103
def _migrate_os_type_900_to_1000(os: str) -> OsType:
    os_lower: str = os.lower()
    if os_lower == "android":
        return OsType.ANDROID
    if os_lower == "ios":
        return OsType.IOS
    return OsType.UNKNOWN_OS
def _migrate_os_type_900_to_1000_raw(
    os: str,
):
    os_lower: str = os.lower()
    if os_lower == "android":
        return api_m.RedvoxPacketM.StationInformation.OsType.ANDROID
    if os_lower == "ios":
        return api_m.RedvoxPacketM.StationInformation.OsType.IOS
    return api_m.RedvoxPacketM.StationInformation.OsType.UNKNOWN_OS
# pylint: disable=C0103
def _migrate_os_type_1000_to_900(os: OsType) -> str:
    if os == OsType.ANDROID:
        return "Android"
    elif os == OsType.IOS:
        return "iOS"
    else:
        print(f"API 900 unsupported OsType: {os.name}")
        return os.name
def compute_stats_raw(
    has_stats: Union[
        api_m.RedvoxPacketM.TimingPayload,
        api_m.RedvoxPacketM.SamplePayload,
        api_m.RedvoxPacketM.DoubleSamplePayload,
    ]
):
    values: np.ndarray
    stats_container: api_m.RedvoxPacketM.SummaryStatistics
    if isinstance(has_stats, api_m.RedvoxPacketM.TimingPayload):
        values = np.array(has_stats.timestamps)
        stats_container = has_stats.timestamp_statistics
        mean_sr: float
        std_sr: float
        (mean_sr, std_sr) = common_m.sampling_rate_statistics(values)
        has_stats.mean_sample_rate = mean_sr
        has_stats.stdev_sample_rate = std_sr
    else:
        values = np.array(has_stats.values)
        stats_container = has_stats.value_statistics
    stats_container.count = len(values)
    # noinspection Mypy
    stats_container.min = values.min()
    # noinspection Mypy
    stats_container.max = values.max()
    # noinspection Mypy
    stats_container.mean = values.mean()
    # noinspection Mypy
    stats_container.standard_deviation = values.std()
# noinspection DuplicatedCode
def convert_api_900_to_1000_raw(packet: api_900.RedvoxPacket) -> api_m.RedvoxPacketM:
    """
    Converts a wrapped API 900 packet into a wrapped API M packet.
    :param packet: API 900 packet to convert.
    :return: A wrapped API M packet.
    """
    packet_m: api_m.RedvoxPacketM = api_m.RedvoxPacketM()
    # Top-level metadata
    packet_m.api = 1000.0
    # noinspection PyUnresolvedReferences,Mypy
    packet_m.sub_api = 900.0
    # Station information
    packet_m.station_information.id = packet.redvox_id
    packet_m.station_information.uuid = packet.uuid
    packet_m.station_information.make = packet.device_make
    packet_m.station_information.model = packet.device_model
    packet_m.station_information.os = _migrate_os_type_900_to_1000_raw(packet.device_os)
    packet_m.station_information.os_version = packet.device_os_version
    packet_m.station_information.app_version = packet.app_version
    packet_m.station_information.is_private = packet.is_private
    packet_m.station_information.app_settings.samples_per_window = \
        len(packet.evenly_sampled_channels[0].int32_payload.payload)
    packet_m.station_information.service_urls.acquisition_server = (
        packet.acquisition_server
    )
    packet_m.station_information.service_urls.synch_server = (
        packet.time_synchronization_server
    )
    packet_m.station_information.service_urls.auth_server = packet.authentication_server
    # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings
    # StationMetrics - We know a couple.
    packet_m.station_information.station_metrics.timestamps.unit = (
        api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    )
    packet_m.station_information.station_metrics.timestamps.timestamps[:] = [
        packet.app_file_start_timestamp_machine
    ]
    packet_m.station_information.station_metrics.temperature.unit = (
        api_m.RedvoxPacketM.Unit.DEGREES_CELSIUS
    )
    packet_m.station_information.station_metrics.temperature.values[:] = [
        packet.device_temperature_c
    ]
    packet_m.station_information.station_metrics.battery.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.battery.values[:] = [
        packet.battery_level_percent
    ]
    # And we can fill in defaults for those we don't know
    packet_m.station_information.station_metrics.available_disk.unit = (
        api_m.RedvoxPacketM.Unit.BYTE
    )
    packet_m.station_information.station_metrics.available_disk.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.available_ram.unit = (
        api_m.RedvoxPacketM.Unit.BYTE
    )
    packet_m.station_information.station_metrics.available_ram.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.cpu_utilization.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.cpu_utilization.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.network_strength.unit = (
        api_m.RedvoxPacketM.Unit.DECIBEL
    )
    packet_m.station_information.station_metrics.network_strength.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.battery_current.unit = (
        api_m.RedvoxPacketM.Unit.MICROAMPERES
    )
    packet_m.station_information.station_metrics.battery_current.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.screen_brightness.unit = (
        api_m.RedvoxPacketM.Unit.PERCENTAGE
    )
    packet_m.station_information.station_metrics.screen_brightness.values[:] = [
        float("nan")
    ]
    packet_m.station_information.station_metrics.network_type[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.NetworkType.UNKNOWN_NETWORK
    ]
    packet_m.station_information.station_metrics.cell_service_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.CellServiceState.UNKNOWN
    ]
    packet_m.station_information.station_metrics.power_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.PowerState.UNKNOWN_POWER_STATE
    ]
    packet_m.station_information.station_metrics.wifi_wake_lock[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.WifiWakeLock.OTHER
    ]
    packet_m.station_information.station_metrics.screen_state[:] = [
        api_m.RedvoxPacketM.StationInformation.StationMetrics.ScreenState.UNKNOWN_SCREEN_STATE
    ]
    compute_stats_raw(packet_m.station_information.station_metrics.timestamps)
    compute_stats_raw(packet_m.station_information.station_metrics.temperature)
    compute_stats_raw(packet_m.station_information.station_metrics.battery)
    compute_stats_raw(packet_m.station_information.station_metrics.available_disk)
    compute_stats_raw(packet_m.station_information.station_metrics.available_ram)
    compute_stats_raw(packet_m.station_information.station_metrics.cpu_utilization)
    compute_stats_raw(packet_m.station_information.station_metrics.network_strength)
    compute_stats_raw(packet_m.station_information.station_metrics.battery_current)
    compute_stats_raw(packet_m.station_information.station_metrics.screen_brightness)
    # Timing information
    mach_time_900: int = packet.app_file_start_timestamp_machine
    os_time_900: int = packet.app_file_start_timestamp_epoch_microseconds_utc
    len_micros: int = _packet_length_microseconds_900_raw(packet)
    best_latency: float = reader_utils.get_metadata_or_default(
        list(packet.metadata), "bestLatency", float, NAN
    )
    best_offset: float = reader_utils.get_metadata_or_default(
        list(packet.metadata), "bestOffset", float, NAN
    )
    packet_m.timing_information.unit = (
        api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    )
    packet_m.timing_information.packet_start_mach_timestamp = mach_time_900
    packet_m.timing_information.packet_start_os_timestamp = os_time_900
    packet_m.timing_information.packet_end_mach_timestamp = mach_time_900 + len_micros
    packet_m.timing_information.server_acquisition_arrival_timestamp = (
        packet.server_timestamp_epoch_microseconds_utc
    )
    packet_m.timing_information.packet_end_os_timestamp = os_time_900 + len_micros
    packet_m.timing_information.app_start_mach_timestamp = _find_mach_time_zero_raw(
        packet
    )
    packet_m.timing_information.best_latency = best_latency
    packet_m.timing_information.best_offset = best_offset
    synch_sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet, {api_900.ChannelType.TIME_SYNCHRONIZATION}
    )
    if synch_sensor is not None:
        synch_payload: np.ndarray = reader_utils.extract_payload(synch_sensor)
        packet_m.timing_information.synch_exchanges.extend(
            _migrate_synch_exchanges_900_to_1000_raw(synch_payload)
        )
    # Sensors
    # Microphone / Audio
    if len(packet.evenly_sampled_channels) < 1:
        raise ValueError("Cannot convert API900 to API1000; Audio sensor missing.")
    audio_900: api_900.EvenlySampledChannel = packet.evenly_sampled_channels[0]
    packet_m.sensors.audio.sensor_description = audio_900.sensor_name
    packet_m.sensors.audio.sample_rate = audio_900.sample_rate_hz
    packet_m.sensors.audio.first_sample_timestamp = (
        audio_900.first_sample_timestamp_epoch_microseconds_utc
    )
    packet_m.sensors.audio.bits_of_precision = 16.0
    packet_m.sensors.audio.encoding = "counts"
    normalized_audio: np.ndarray = (
        reader_utils.extract_payload(audio_900) / _NORMALIZATION_CONSTANT
    )
    packet_m.sensors.audio.samples.values[:] = list(normalized_audio)
    packet_m.sensors.audio.samples.unit = api_m.RedvoxPacketM.Unit.NORMALIZED_COUNTS
    for i in range(0, len(audio_900.metadata), 2):
        v: str = audio_900.metadata[i + 1] if (i + 1) < len(audio_900.metadata) else ""
        packet_m.sensors.audio.metadata[audio_900.metadata[i]] = v
    compute_stats_raw(packet_m.sensors.audio.samples)
    # Pressure
    barometer_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.BAROMETER})
    if barometer_900 is not None:
        packet_m.sensors.pressure.sensor_description = barometer_900.sensor_name
        packet_m.sensors.pressure.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.pressure.timestamps.timestamps[
            :
        ] = barometer_900.timestamps_microseconds_utc
        packet_m.sensors.pressure.samples.values[:] = list(
            reader_utils.extract_payload(barometer_900)
        )
        packet_m.sensors.pressure.samples.unit = api_m.RedvoxPacketM.Unit.KILOPASCAL
        for i in range(0, len(barometer_900.metadata), 2):
            v = (
                barometer_900.metadata[i + 1]
                if (i + 1) < len(barometer_900.metadata)
                else ""
            )
            packet_m.sensors.pressure.metadata[barometer_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.pressure.timestamps)
        compute_stats_raw(packet_m.sensors.pressure.samples)
    # Location
    loc_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.LATITUDE,
            api_900.ChannelType.LONGITUDE,
            api_900.ChannelType.ALTITUDE,
            api_900.ChannelType.SPEED,
            api_900.ChannelType.ACCURACY,
        },
    )
    if loc_900 is not None:
        total_samples: int = len(loc_900.timestamps_microseconds_utc)
        loc_payload: List[float] = list(reader_utils.extract_payload(loc_900))
        packet_m.sensors.location.sensor_description = loc_900.sensor_name
        packet_m.sensors.location.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.location.timestamps.timestamps[
            :
        ] = loc_900.timestamps_microseconds_utc
        packet_m.sensors.location.timestamps_gps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.location.timestamps_gps.timestamps[:] = [
            float("nan")
        ] * total_samples
        total_channels: int = len(loc_900.channel_types)
        lat_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.LATITUDE
        )
        packet_m.sensors.location.latitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        if lat_idx is not None:
            packet_m.sensors.location.latitude_samples.values[:] = loc_payload[
                lat_idx::total_channels
            ]
        else:
            packet_m.sensors.location.latitude_samples.values[:] = [
                float("nan") * total_samples
            ]
        lng_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.LONGITUDE
        )
        packet_m.sensors.location.longitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        if lng_idx is not None:
            packet_m.sensors.location.longitude_samples.values[:] = loc_payload[
                lng_idx::total_channels
            ]
        else:
            packet_m.sensors.location.longitude_samples.values[:] = [
                float("nan") * total_samples
            ]
        alt_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.ALTITUDE
        )
        packet_m.sensors.location.altitude_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        if alt_idx is not None:
            packet_m.sensors.location.altitude_samples.values[:] = loc_payload[
                alt_idx::total_channels
            ]
        else:
            packet_m.sensors.location.altitude_samples.values[:] = [
                float("nan") * total_samples
            ]
        speed_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.SPEED
        )
        packet_m.sensors.location.speed_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND
        )
        if speed_idx is not None:
            packet_m.sensors.location.speed_samples.values[:] = loc_payload[
                speed_idx::total_channels
            ]
        else:
            packet_m.sensors.location.speed_samples.values[:] = [
                float("nan") * total_samples
            ]
        acc_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw(
            packet, api_900.ChannelType.ACCURACY
        )
        packet_m.sensors.location.horizontal_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        if acc_idx is not None:
            packet_m.sensors.location.horizontal_accuracy_samples.values[
                :
            ] = loc_payload[acc_idx::total_channels]
        else:
            packet_m.sensors.location.horizontal_accuracy_samples.values[:] = [
                float("nan") * total_samples
            ]
        packet_m.sensors.location.bearing_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        packet_m.sensors.location.bearing_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.vertical_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS
        )
        packet_m.sensors.location.vertical_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.speed_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND
        )
        packet_m.sensors.location.speed_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        packet_m.sensors.location.bearing_accuracy_samples.unit = (
            api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES
        )
        packet_m.sensors.location.bearing_accuracy_samples.values[:] = [
            float("nan") * total_samples
        ]
        # Compute stats
        compute_stats_raw(packet_m.sensors.location.timestamps)
        compute_stats_raw(packet_m.sensors.location.timestamps_gps)
        compute_stats_raw(packet_m.sensors.location.latitude_samples)
        compute_stats_raw(packet_m.sensors.location.longitude_samples)
        compute_stats_raw(packet_m.sensors.location.altitude_samples)
        compute_stats_raw(packet_m.sensors.location.speed_samples)
        compute_stats_raw(packet_m.sensors.location.bearing_samples)
        compute_stats_raw(packet_m.sensors.location.horizontal_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.vertical_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.speed_accuracy_samples)
        compute_stats_raw(packet_m.sensors.location.bearing_accuracy_samples)
        # Bookkeeping
        use_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "useLocation", lambda val: v == "T", False
        )
        desired_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "desiredLocation", lambda val: v == "T", False
        )
        permission_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "permissionLocation", lambda val: v == "T", False
        )
        enabled_location: bool = reader_utils.get_metadata_or_default(
            list(loc_900.metadata), "enabledLocation", lambda val: v == "T", False
        )
        if desired_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.USER
            ] * total_samples
        elif enabled_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.GPS
            ] * total_samples
        elif use_location and desired_location and permission_location:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NETWORK
            ] * total_samples
        else:
            packet_m.sensors.location.location_providers[:] = [
                api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NONE
            ] * total_samples
        packet_m.sensors.location.location_permissions_granted = permission_location
        packet_m.sensors.location.location_services_enabled = use_location
        packet_m.sensors.location.location_services_requested = desired_location
        for (i, k) in enumerate(loc_900.metadata):
            if i + 1 < len(loc_900.metadata):
                packet_m.sensors.location.metadata[k] = loc_900.metadata[i + 1]
            else:
                packet_m.sensors.location.metadata[k] = ""
    # # Time Synchronization
    # # This was already added to the timing information
    # Accelerometer
    accel_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.ACCELEROMETER_X,
            api_900.ChannelType.ACCELEROMETER_Y,
            api_900.ChannelType.ACCELEROMETER_Z,
        },
    )
    if accel_900 is not None:
        packet_m.sensors.accelerometer.sensor_description = accel_900.sensor_name
        packet_m.sensors.accelerometer.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.accelerometer.timestamps.timestamps[
            :
        ] = accel_900.timestamps_microseconds_utc
        accel_payload: List[float] = list(reader_utils.extract_payload(accel_900))
        packet_m.sensors.accelerometer.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.x_samples.values[:] = accel_payload[0::3]
        packet_m.sensors.accelerometer.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.y_samples.values[:] = accel_payload[1::3]
        packet_m.sensors.accelerometer.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED
        )
        packet_m.sensors.accelerometer.z_samples.values[:] = accel_payload[2::3]
        compute_stats_raw(packet_m.sensors.accelerometer.timestamps)
        compute_stats_raw(packet_m.sensors.accelerometer.x_samples)
        compute_stats_raw(packet_m.sensors.accelerometer.y_samples)
        compute_stats_raw(packet_m.sensors.accelerometer.z_samples)
    # Magnetometer
    sensor: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.MAGNETOMETER_X,
            api_900.ChannelType.MAGNETOMETER_Y,
            api_900.ChannelType.MAGNETOMETER_Z,
        },
    )
    if sensor is not None:
        packet_m.sensors.magnetometer.sensor_description = sensor.sensor_name
        packet_m.sensors.magnetometer.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.magnetometer.timestamps.timestamps[
            :
        ] = sensor.timestamps_microseconds_utc
        sensor_payload: List[float] = list(reader_utils.extract_payload(sensor))
        packet_m.sensors.magnetometer.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.x_samples.values[:] = sensor_payload[0::3]
        packet_m.sensors.magnetometer.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.y_samples.values[:] = sensor_payload[1::3]
        packet_m.sensors.magnetometer.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.MICROTESLA
        )
        packet_m.sensors.magnetometer.z_samples.values[:] = sensor_payload[2::3]
        compute_stats_raw(packet_m.sensors.magnetometer.timestamps)
        compute_stats_raw(packet_m.sensors.magnetometer.x_samples)
        compute_stats_raw(packet_m.sensors.magnetometer.y_samples)
        compute_stats_raw(packet_m.sensors.magnetometer.z_samples)
    #
    # Gyroscope
    sensor = reader_utils.find_uneven_channel_raw(
        packet,
        {
            api_900.ChannelType.GYROSCOPE_X,
            api_900.ChannelType.GYROSCOPE_Y,
            api_900.ChannelType.GYROSCOPE_Z,
        },
    )
    if sensor is not None:
        packet_m.sensors.gyroscope.sensor_description = sensor.sensor_name
        packet_m.sensors.gyroscope.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.gyroscope.timestamps.timestamps[
            :
        ] = sensor.timestamps_microseconds_utc
        sensor_payload = list(reader_utils.extract_payload(sensor))
        packet_m.sensors.gyroscope.x_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.x_samples.values[:] = sensor_payload[0::3]
        packet_m.sensors.gyroscope.y_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.y_samples.values[:] = sensor_payload[1::3]
        packet_m.sensors.gyroscope.z_samples.unit = (
            api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND
        )
        packet_m.sensors.gyroscope.z_samples.values[:] = sensor_payload[2::3]
        compute_stats_raw(packet_m.sensors.gyroscope.timestamps)
        compute_stats_raw(packet_m.sensors.gyroscope.x_samples)
        compute_stats_raw(packet_m.sensors.gyroscope.y_samples)
        compute_stats_raw(packet_m.sensors.gyroscope.z_samples)
    #
    # # Light
    light_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.LIGHT})
    if light_900 is not None:
        packet_m.sensors.light.sensor_description = light_900.sensor_name
        packet_m.sensors.light.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.light.timestamps.timestamps[
            :
        ] = light_900.timestamps_microseconds_utc
        packet_m.sensors.light.samples.values[:] = list(
            reader_utils.extract_payload(light_900)
        )
        packet_m.sensors.light.samples.unit = api_m.RedvoxPacketM.Unit.LUX
        for i in range(0, len(light_900.metadata), 2):
            v = light_900.metadata[i + 1] if (i + 1) < len(light_900.metadata) else ""
            packet_m.sensors.light.metadata[light_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.light.timestamps)
        compute_stats_raw(packet_m.sensors.light.samples)
    # # Image
    # Not implemented for conversion. Only a very small fraction of API 900 was ever image capable, and not the public
    # app.
    # # Proximity
    proximity_900: Optional[
        api_900.UnevenlySampledChannel
    ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.INFRARED})
    if proximity_900 is not None:
        packet_m.sensors.proximity.sensor_description = proximity_900.sensor_name
        packet_m.sensors.proximity.timestamps.unit = (
            api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
        )
        packet_m.sensors.proximity.timestamps.timestamps[
            :
        ] = proximity_900.timestamps_microseconds_utc
        packet_m.sensors.proximity.samples.values[:] = list(
            reader_utils.extract_payload(proximity_900)
        )
        packet_m.sensors.proximity.samples.unit = api_m.RedvoxPacketM.Unit.CENTIMETERS
        for i in range(0, len(proximity_900.metadata), 2):
            v = (
                proximity_900.metadata[i + 1]
                if (i + 1) < len(proximity_900.metadata)
                else ""
            )
            packet_m.sensors.proximity.metadata[proximity_900.metadata[i]] = v
        compute_stats_raw(packet_m.sensors.proximity.timestamps)
        compute_stats_raw(packet_m.sensors.proximity.samples)
    return packet_m
# noinspection DuplicatedCode
def convert_api_900_to_1000(
    wrapped_packet_900: reader_900.WrappedRedvoxPacket,
) -> WrappedRedvoxPacketM:
    """
    Converts a wrapped API 900 packet into a wrapped API M packet.
    :param wrapped_packet_900: API 900 packet to convert.
    :return: A wrapped API M packet.
    """
    wrapped_packet_m: WrappedRedvoxPacketM = WrappedRedvoxPacketM.new()
    # Top-level metadata
    wrapped_packet_m.set_api(1000.0)
    # noinspection PyUnresolvedReferences,Mypy
    wrapped_packet_m.set_sub_api(900.0)
    # Station information
    station_information: StationInformation = wrapped_packet_m.get_station_information()
    station_information.set_id(wrapped_packet_900.redvox_id()).set_uuid(
        wrapped_packet_900.uuid()
    ).set_make(wrapped_packet_900.device_make()).set_model(
        wrapped_packet_900.device_model()
    ).set_os(
        _migrate_os_type_900_to_1000(wrapped_packet_900.device_os())
    ).set_os_version(
        wrapped_packet_900.device_os_version()
    ).set_app_version(
        wrapped_packet_900.app_version()
    ).set_is_private(
        wrapped_packet_900.is_private()
    )
    station_information.get_service_urls().set_acquisition_server(
        wrapped_packet_900.acquisition_server()
    ).set_synch_server(
        wrapped_packet_900.time_synchronization_server()
    ).set_auth_server(
        wrapped_packet_900.authentication_server()
    )
    # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings
    # StationMetrics - We know a couple. We take a slightly more cumbersome approach using the raw protobuf
    # to avoid some conversions between lists and np arrays.
    station_metrics: StationMetrics = station_information.get_station_metrics()
    station_metrics.get_timestamps().append_timestamp(
        wrapped_packet_900.app_file_start_timestamp_machine()
    )
    station_metrics.get_temperature().set_values(
        np.array([wrapped_packet_900.device_temperature_c()]), True
    )
    station_metrics.get_battery().set_values(
        np.array([wrapped_packet_900.battery_level_percent()]), True
    )
    # Timing information
    mach_time_900: int = wrapped_packet_900.app_file_start_timestamp_machine()
    os_time_900: int = (
        wrapped_packet_900.app_file_start_timestamp_epoch_microseconds_utc()
    )
    len_micros: int = _packet_length_microseconds_900(wrapped_packet_900)
    best_latency: Optional[float] = wrapped_packet_900.best_latency()
    best_latency = best_latency if best_latency is not None else NAN
    best_offset: Optional[float] = wrapped_packet_900.best_offset()
    best_offset = best_offset if best_offset is not None else NAN
    wrapped_packet_m.get_timing_information().set_unit(
        common_m.Unit.MICROSECONDS_SINCE_UNIX_EPOCH
    ).set_packet_start_mach_timestamp(mach_time_900).set_packet_start_os_timestamp(
        os_time_900
    ).set_packet_end_mach_timestamp(
        mach_time_900 + len_micros
    ).set_packet_end_os_timestamp(
        os_time_900 + len_micros
    ).set_server_acquisition_arrival_timestamp(
        wrapped_packet_900.server_timestamp_epoch_microseconds_utc()
    ).set_app_start_mach_timestamp(
        _find_mach_time_zero(wrapped_packet_900)
    ).set_best_latency(
        best_latency
    ).set_best_offset(
        best_offset
    )
    time_sensor = wrapped_packet_900.time_synchronization_sensor()
    if time_sensor is not None:
        wrapped_packet_m.get_timing_information().get_synch_exchanges().set_values(
            _migrate_synch_exchanges_900_to_1000(time_sensor.payload_values())
        )
    # Sensors
    sensors_m: Sensors = wrapped_packet_m.get_sensors()
    # Microphone / Audio
    mic_sensor_900: Optional[
        reader_900.MicrophoneSensor
    ] = wrapped_packet_900.microphone_sensor()
    if mic_sensor_900 is not None:
        normalized_audio: np.ndarray = (
            mic_sensor_900.payload_values() / _NORMALIZATION_CONSTANT
        )
        audio_sensor_m = sensors_m.new_audio()
        audio_sensor_m.set_first_sample_timestamp(
            mic_sensor_900.first_sample_timestamp_epoch_microseconds_utc()
        ).set_is_scrambled(wrapped_packet_900.is_scrambled()).set_sample_rate(
            mic_sensor_900.sample_rate_hz()
        ).set_sensor_description(
            mic_sensor_900.sensor_name()
        ).get_samples().set_values(
            normalized_audio, update_value_statistics=True
        )
        audio_sensor_m.get_metadata().set_metadata(mic_sensor_900.metadata_as_dict())
    # Barometer
    barometer_sensor_900: Optional[
        reader_900.BarometerSensor
    ] = wrapped_packet_900.barometer_sensor()
    if barometer_sensor_900 is not None:
        pressure_sensor_m = sensors_m.new_pressure()
        pressure_sensor_m.set_sensor_description(barometer_sensor_900.sensor_name())
        pressure_sensor_m.get_timestamps().set_timestamps(
            barometer_sensor_900.timestamps_microseconds_utc(), True
        )
        pressure_sensor_m.get_samples().set_values(
            barometer_sensor_900.payload_values(), True
        )
        pressure_sensor_m.get_metadata().set_metadata(
            barometer_sensor_900.metadata_as_dict()
        )
    # Location
    # TODO: rework
    location_sensor_900: Optional[
        reader_900.LocationSensor
    ] = wrapped_packet_900.location_sensor()
    if location_sensor_900 is not None:
        location_m = sensors_m.new_location()
        location_m.set_sensor_description(location_sensor_900.sensor_name())
        location_m.get_timestamps().set_timestamps(
            location_sensor_900.timestamps_microseconds_utc(), True
        )
        if location_sensor_900.check_for_preset_lat_lon():
            lat_lon: np.ndarray = location_sensor_900.get_payload_lat_lon()
            location_m.get_latitude_samples().set_values(lat_lon[:1], True)
            location_m.get_longitude_samples().set_values(lat_lon[1:], True)
        else:
            location_m.get_latitude_samples().set_values(
                location_sensor_900.payload_values_latitude(), True
            )
            location_m.get_longitude_samples().set_values(
                location_sensor_900.payload_values_longitude(), True
            )
            location_m.get_altitude_samples().set_values(
                location_sensor_900.payload_values_altitude(), True
            )
            location_m.get_speed_samples().set_values(
                location_sensor_900.payload_values_speed(), True
            )
            location_m.get_horizontal_accuracy_samples().set_values(
                location_sensor_900.payload_values_accuracy(), True
            )
        def _extract_meta_bool(metad: Dict[str, str], key: str) -> bool:
            if key not in metad:
                return False
            return metad[key] == "T"
        loc_meta_900 = location_sensor_900.metadata_as_dict()
        use_location = _extract_meta_bool(loc_meta_900, "useLocation")
        desired_location = _extract_meta_bool(loc_meta_900, "desiredLocation")
        permission_location = _extract_meta_bool(loc_meta_900, "permissionLocation")
        enabled_location = _extract_meta_bool(loc_meta_900, "enabledLocation")
        n_p = location_m.get_timestamps().get_timestamps_count()
        if desired_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.USER for i in range(n_p)]
            )
        elif enabled_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.GPS for i in range(n_p)]
            )
        elif use_location and desired_location and permission_location:
            location_m.get_location_providers().set_values(
                [LocationProvider.NETWORK for i in range(n_p)]
            )
        else:
            location_m.get_location_providers().set_values(
                [LocationProvider.NONE for i in range(n_p)]
            )
        location_m.set_location_permissions_granted(permission_location)
        location_m.set_location_services_enabled(use_location)
        location_m.set_location_services_requested(desired_location)
        # Once we're done here, we should remove the original metadata
        if "useLocation" in loc_meta_900:
            del loc_meta_900["useLocation"]
        if "desiredLocation" in loc_meta_900:
            del loc_meta_900["desiredLocation"]
        if "permissionLocation" in loc_meta_900:
            del loc_meta_900["permissionLocation"]
        if "enabledLocation" in loc_meta_900:
            del loc_meta_900["enabledLocation"]
        if "machTimeZero" in loc_meta_900:
            del loc_meta_900["machTimeZero"]
        location_m.get_metadata().set_metadata(loc_meta_900)
    # Time Synchronization
    # This was already added to the timing information
    # Accelerometer
    accelerometer_900 = wrapped_packet_900.accelerometer_sensor()
    if accelerometer_900 is not None:
        accelerometer_m = sensors_m.new_accelerometer()
        accelerometer_m.set_sensor_description(accelerometer_900.sensor_name())
        accelerometer_m.get_timestamps().set_timestamps(
            accelerometer_900.timestamps_microseconds_utc(), True
        )
        accelerometer_m.get_x_samples().set_values(
            accelerometer_900.payload_values_x(), True
        )
        accelerometer_m.get_y_samples().set_values(
            accelerometer_900.payload_values_y(), True
        )
        accelerometer_m.get_z_samples().set_values(
            accelerometer_900.payload_values_z(), True
        )
        accelerometer_m.get_metadata().set_metadata(
            accelerometer_900.metadata_as_dict()
        )
    # Magnetometer
    magnetometer_900 = wrapped_packet_900.magnetometer_sensor()
    if magnetometer_900 is not None:
        magnetometer_m = sensors_m.new_magnetometer()
        magnetometer_m.set_sensor_description(magnetometer_900.sensor_name())
        magnetometer_m.get_timestamps().set_timestamps(
            magnetometer_900.timestamps_microseconds_utc(), True
        )
        magnetometer_m.get_x_samples().set_values(
            magnetometer_900.payload_values_x(), True
        )
        magnetometer_m.get_y_samples().set_values(
            magnetometer_900.payload_values_y(), True
        )
        magnetometer_m.get_z_samples().set_values(
            magnetometer_900.payload_values_z(), True
        )
        magnetometer_m.get_metadata().set_metadata(magnetometer_900.metadata_as_dict())
    # Gyroscope
    gyroscope_900 = wrapped_packet_900.gyroscope_sensor()
    if gyroscope_900 is not None:
        gyroscope_m = sensors_m.new_gyroscope()
        gyroscope_m.set_sensor_description(gyroscope_900.sensor_name())
        gyroscope_m.get_timestamps().set_timestamps(
            gyroscope_900.timestamps_microseconds_utc(), True
        )
        gyroscope_m.get_x_samples().set_values(gyroscope_900.payload_values_x(), True)
        gyroscope_m.get_y_samples().set_values(gyroscope_900.payload_values_y(), True)
        gyroscope_m.get_z_samples().set_values(gyroscope_900.payload_values_z(), True)
        gyroscope_m.get_metadata().set_metadata(gyroscope_900.metadata_as_dict())
    # Light
    light_900 = wrapped_packet_900.light_sensor()
    if light_900 is not None:
        light_m = sensors_m.new_light()
        light_m.set_sensor_description(light_900.sensor_name())
        light_m.get_timestamps().set_timestamps(
            light_900.timestamps_microseconds_utc(), True
        )
        light_m.get_samples().set_values(light_900.payload_values(), True)
        light_m.get_metadata().set_metadata(light_900.metadata_as_dict())
    # Image
    # TODO: Implement
    # Proximity
    proximity_900 = wrapped_packet_900.infrared_sensor()
    if proximity_900 is not None:
        proximity_m = sensors_m.new_proximity()
        proximity_m.set_sensor_description(proximity_900.sensor_name())
        proximity_m.get_timestamps().set_timestamps(
            proximity_900.timestamps_microseconds_utc(), True
        )
        proximity_m.get_samples().set_values(proximity_900.payload_values(), True)
        proximity_m.get_metadata().set_metadata(proximity_900.metadata_as_dict())
    # Removed any other API 900 top-level metadata now that its been used
    meta = wrapped_packet_900.metadata_as_dict()
    if "machTimeZero" in meta:
        del meta["machTimeZero"]
    if "bestOffset" in meta:
        del meta["bestOffset"]
    if "bestLatency" in meta:
        del meta["bestLatency"]
    wrapped_packet_m.get_metadata().append_metadata(
        "migrated_from_api_900", f"v{redvox.VERSION}"
    )
    return wrapped_packet_m
def convert_api_1000_to_900(
    wrapped_packet_m: WrappedRedvoxPacketM,
) -> reader_900.WrappedRedvoxPacket:
    """
    Converts an API M wrapped packet into an API 900 wrapped packet.
    :param wrapped_packet_m: Packet to convert.
    :return: An API 900 wrapped packet.
    """
    # TODO detect and warn about all the fields that are being dropped due to conversion!
    wrapped_packet_900: reader_900.WrappedRedvoxPacket = (
        reader_900.WrappedRedvoxPacket()
    )
    station_information_m = wrapped_packet_m.get_station_information()
    sensors_m = wrapped_packet_m.get_sensors()
    audio_m = sensors_m.get_audio()
    wrapped_packet_900.set_api(900)
    wrapped_packet_900.set_uuid(station_information_m.get_uuid())
    wrapped_packet_900.set_redvox_id(station_information_m.get_id())
    wrapped_packet_900.set_authenticated_email(station_information_m.get_auth_id())
    wrapped_packet_900.set_authentication_token(
        "n/a"
    )  # Different auth protocols are used, can't convert between
    wrapped_packet_900.set_firebase_token("n/a")  # No longer in API M packet
    wrapped_packet_900.set_is_backfilled(False)  # No longer useful metric in API M
    wrapped_packet_900.set_is_private(station_information_m.get_is_private())
    wrapped_packet_900.set_is_scrambled(False)
    wrapped_packet_900.set_device_make(station_information_m.get_make())
    wrapped_packet_900.set_device_model(station_information_m.get_model())
    wrapped_packet_900.set_device_os(
        _migrate_os_type_1000_to_900(station_information_m.get_os())
    )
    wrapped_packet_900.set_device_os_version(station_information_m.get_os_version())
    wrapped_packet_900.set_app_version(station_information_m.get_app_version())
    battery_metrics = station_information_m.get_station_metrics().get_battery()
    battery_percent: float = (
        battery_metrics.get_values()[-1]
        if battery_metrics.get_values_count() > 0
        else 0.0
    )
    wrapped_packet_900.set_battery_level_percent(battery_percent)
    temp_metrics = station_information_m.get_station_metrics().get_temperature()
    device_temp: float = (
        temp_metrics.get_values()[-1] if temp_metrics.get_values_count() > 0 else 0.0
    )
    wrapped_packet_900.set_device_temperature_c(device_temp)
    server_info_m = wrapped_packet_m.get_station_information().get_service_urls()
    wrapped_packet_900.set_acquisition_server(server_info_m.get_acquisition_server())
    wrapped_packet_900.set_time_synchronization_server(server_info_m.get_synch_server())
    wrapped_packet_900.set_authentication_server(server_info_m.get_auth_server())
    timing_info_m = wrapped_packet_m.get_timing_information()
    wrapped_packet_900.set_app_file_start_timestamp_epoch_microseconds_utc(
        round(timing_info_m.get_packet_start_os_timestamp())
    )
    wrapped_packet_900.set_app_file_start_timestamp_machine(
        round(timing_info_m.get_packet_start_mach_timestamp())
    )
    wrapped_packet_900.set_server_timestamp_epoch_microseconds_utc(
        round(timing_info_m.get_server_acquisition_arrival_timestamp())
    )
    # Top-level metadata
    wrapped_packet_900.add_metadata(
        "machTimeZero", str(timing_info_m.get_app_start_mach_timestamp())
    )
    wrapped_packet_900.add_metadata(
        "bestLatency", str(timing_info_m.get_best_latency())
    )
    wrapped_packet_900.add_metadata("bestOffset", str(timing_info_m.get_best_offset()))
    wrapped_packet_900.add_metadata("migrated_from_api_1000", f"v{redvox.VERSION}")
    # Sensors
    if audio_m is not None:
        denorm_audio = list(
            map(_denormalize_audio_count, audio_m.get_samples().get_values())
        )
        mic_900 = reader_900.MicrophoneSensor()
        mic_900.set_sample_rate_hz(audio_m.get_sample_rate())
        mic_900.set_first_sample_timestamp_epoch_microseconds_utc(
            round(audio_m.get_first_sample_timestamp())
        )
        mic_900.set_sensor_name(audio_m.get_sensor_description())
        mic_900.set_metadata_as_dict(audio_m.get_metadata().get_metadata())
        mic_900.set_payload_values(denorm_audio)
        wrapped_packet_900.set_microphone_sensor(mic_900)
    pressure_m = sensors_m.get_pressure()
    if pressure_m is not None:
        barometer_900 = reader_900.BarometerSensor()
        barometer_900.set_sensor_name(pressure_m.get_sensor_description())
        barometer_900.set_metadata_as_dict(pressure_m.get_metadata().get_metadata())
        barometer_900.set_timestamps_microseconds_utc(
            pressure_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        barometer_900.set_payload_values(pressure_m.get_samples().get_values())
        wrapped_packet_900.set_barometer_sensor(barometer_900)
    location_m = sensors_m.get_location()
    if location_m is not None:
        location_900 = reader_900.LocationSensor()
        location_900.set_sensor_name(location_m.get_sensor_description())
        location_900.set_timestamps_microseconds_utc(
            location_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        location_900.set_payload_values(
            location_m.get_latitude_samples().get_values(),
            location_m.get_longitude_samples().get_values(),
            location_m.get_altitude_samples().get_values(),
            location_m.get_speed_samples().get_values(),
            location_m.get_horizontal_accuracy_samples().get_values(),
        )
        wrapped_packet_900.set_location_sensor(location_900)
        metadata = location_m.get_metadata().get_metadata()
        metadata["useLocation"] = (
            "T" if location_m.get_location_services_enabled() else "F"
        )
        metadata["desiredLocation"] = (
            "T" if location_m.get_location_services_requested() else "F"
        )
        metadata["permissionLocation"] = (
            "T" if location_m.get_location_permissions_granted() else "F"
        )
        metadata["enabledLocation"] = (
            "T"
            if LocationProvider.GPS in location_m.get_location_providers().get_values()
            else "FD"
        )
        location_900.set_metadata_as_dict(metadata)
    # Synch exchanges
    synch_exchanges_m = timing_info_m.get_synch_exchanges()
    if synch_exchanges_m.get_count() > 0:
        synch_900 = reader_900.TimeSynchronizationSensor()
        values: List[int] = []
        for exchange in synch_exchanges_m.get_values():
            values.extend(
                [
                    round(exchange.get_a1()),
                    round(exchange.get_a2()),
                    round(exchange.get_a3()),
                    round(exchange.get_b1()),
                    round(exchange.get_b2()),
                    round(exchange.get_b3()),
                ]
            )
        synch_900.set_payload_values(values)
        wrapped_packet_900.set_time_synchronization_sensor(synch_900)
    accel_m = sensors_m.get_accelerometer()
    if accel_m is not None:
        accel_900 = reader_900.AccelerometerSensor()
        accel_900.set_sensor_name(accel_m.get_sensor_description())
        accel_900.set_timestamps_microseconds_utc(
            accel_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        accel_900.set_metadata_as_dict(accel_m.get_metadata().get_metadata())
        accel_900.set_payload_values(
            accel_m.get_x_samples().get_values(),
            accel_m.get_y_samples().get_values(),
            accel_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_accelerometer_sensor(accel_900)
    magnetometer_m = sensors_m.get_magnetometer()
    if magnetometer_m is not None:
        magnetometer_900 = reader_900.MagnetometerSensor()
        magnetometer_900.set_sensor_name(magnetometer_m.get_sensor_description())
        magnetometer_900.set_timestamps_microseconds_utc(
            magnetometer_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        magnetometer_900.set_metadata_as_dict(
            magnetometer_m.get_metadata().get_metadata()
        )
        magnetometer_900.set_payload_values(
            magnetometer_m.get_x_samples().get_values(),
            magnetometer_m.get_y_samples().get_values(),
            magnetometer_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_magnetometer_sensor(magnetometer_900)
    gyroscope_m = sensors_m.get_gyroscope()
    if gyroscope_m is not None:
        gyroscope_900 = reader_900.GyroscopeSensor()
        gyroscope_900.set_sensor_name(gyroscope_m.get_sensor_description())
        gyroscope_900.set_timestamps_microseconds_utc(
            gyroscope_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        gyroscope_900.set_metadata_as_dict(gyroscope_m.get_metadata().get_metadata())
        gyroscope_900.set_payload_values(
            gyroscope_m.get_x_samples().get_values(),
            gyroscope_m.get_y_samples().get_values(),
            gyroscope_m.get_z_samples().get_values(),
        )
        wrapped_packet_900.set_gyroscope_sensor(gyroscope_900)
    # Light
    light_m = sensors_m.get_light()
    if light_m is not None:
        light_900 = reader_900.LightSensor()
        light_900.set_sensor_name(light_m.get_sensor_description())
        light_900.set_metadata_as_dict(light_m.get_metadata().get_metadata())
        light_900.set_timestamps_microseconds_utc(
            light_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        light_900.set_payload_values(light_m.get_samples().get_values())
        wrapped_packet_900.set_light_sensor(light_900)
    # Image, skip for now
    # Infrared / proximity
    proximity_m = sensors_m.get_proximity()
    if proximity_m is not None:
        proximity_900 = reader_900.InfraredSensor()
        proximity_900.set_sensor_name(proximity_m.get_sensor_description())
        proximity_900.set_metadata_as_dict(proximity_m.get_metadata().get_metadata())
        proximity_900.set_timestamps_microseconds_utc(
            proximity_m.get_timestamps().get_timestamps().astype(np.int64)
        )
        proximity_900.set_payload_values(proximity_m.get_samples().get_values())
        wrapped_packet_900.set_infrared_sensor(proximity_900)
    return wrapped_packet_900
Functions
def compute_stats_raw(has_stats: Union[src.redvox_api_m.redvox_api_m_pb2.TimingPayload, src.redvox_api_m.redvox_api_m_pb2.SamplePayload, src.redvox_api_m.redvox_api_m_pb2.DoubleSamplePayload])- 
Expand source code
def compute_stats_raw( has_stats: Union[ api_m.RedvoxPacketM.TimingPayload, api_m.RedvoxPacketM.SamplePayload, api_m.RedvoxPacketM.DoubleSamplePayload, ] ): values: np.ndarray stats_container: api_m.RedvoxPacketM.SummaryStatistics if isinstance(has_stats, api_m.RedvoxPacketM.TimingPayload): values = np.array(has_stats.timestamps) stats_container = has_stats.timestamp_statistics mean_sr: float std_sr: float (mean_sr, std_sr) = common_m.sampling_rate_statistics(values) has_stats.mean_sample_rate = mean_sr has_stats.stdev_sample_rate = std_sr else: values = np.array(has_stats.values) stats_container = has_stats.value_statistics stats_container.count = len(values) # noinspection Mypy stats_container.min = values.min() # noinspection Mypy stats_container.max = values.max() # noinspection Mypy stats_container.mean = values.mean() # noinspection Mypy stats_container.standard_deviation = values.std() def convert_api_1000_to_900(wrapped_packet_m: WrappedRedvoxPacketM) ‑> WrappedRedvoxPacket- 
Converts an API M wrapped packet into an API 900 wrapped packet.
:param wrapped_packet_m: Packet to convert. :return: An API 900 wrapped packet.
Expand source code
def convert_api_1000_to_900( wrapped_packet_m: WrappedRedvoxPacketM, ) -> reader_900.WrappedRedvoxPacket: """ Converts an API M wrapped packet into an API 900 wrapped packet. :param wrapped_packet_m: Packet to convert. :return: An API 900 wrapped packet. """ # TODO detect and warn about all the fields that are being dropped due to conversion! wrapped_packet_900: reader_900.WrappedRedvoxPacket = ( reader_900.WrappedRedvoxPacket() ) station_information_m = wrapped_packet_m.get_station_information() sensors_m = wrapped_packet_m.get_sensors() audio_m = sensors_m.get_audio() wrapped_packet_900.set_api(900) wrapped_packet_900.set_uuid(station_information_m.get_uuid()) wrapped_packet_900.set_redvox_id(station_information_m.get_id()) wrapped_packet_900.set_authenticated_email(station_information_m.get_auth_id()) wrapped_packet_900.set_authentication_token( "n/a" ) # Different auth protocols are used, can't convert between wrapped_packet_900.set_firebase_token("n/a") # No longer in API M packet wrapped_packet_900.set_is_backfilled(False) # No longer useful metric in API M wrapped_packet_900.set_is_private(station_information_m.get_is_private()) wrapped_packet_900.set_is_scrambled(False) wrapped_packet_900.set_device_make(station_information_m.get_make()) wrapped_packet_900.set_device_model(station_information_m.get_model()) wrapped_packet_900.set_device_os( _migrate_os_type_1000_to_900(station_information_m.get_os()) ) wrapped_packet_900.set_device_os_version(station_information_m.get_os_version()) wrapped_packet_900.set_app_version(station_information_m.get_app_version()) battery_metrics = station_information_m.get_station_metrics().get_battery() battery_percent: float = ( battery_metrics.get_values()[-1] if battery_metrics.get_values_count() > 0 else 0.0 ) wrapped_packet_900.set_battery_level_percent(battery_percent) temp_metrics = station_information_m.get_station_metrics().get_temperature() device_temp: float = ( temp_metrics.get_values()[-1] if temp_metrics.get_values_count() > 0 else 0.0 ) wrapped_packet_900.set_device_temperature_c(device_temp) server_info_m = wrapped_packet_m.get_station_information().get_service_urls() wrapped_packet_900.set_acquisition_server(server_info_m.get_acquisition_server()) wrapped_packet_900.set_time_synchronization_server(server_info_m.get_synch_server()) wrapped_packet_900.set_authentication_server(server_info_m.get_auth_server()) timing_info_m = wrapped_packet_m.get_timing_information() wrapped_packet_900.set_app_file_start_timestamp_epoch_microseconds_utc( round(timing_info_m.get_packet_start_os_timestamp()) ) wrapped_packet_900.set_app_file_start_timestamp_machine( round(timing_info_m.get_packet_start_mach_timestamp()) ) wrapped_packet_900.set_server_timestamp_epoch_microseconds_utc( round(timing_info_m.get_server_acquisition_arrival_timestamp()) ) # Top-level metadata wrapped_packet_900.add_metadata( "machTimeZero", str(timing_info_m.get_app_start_mach_timestamp()) ) wrapped_packet_900.add_metadata( "bestLatency", str(timing_info_m.get_best_latency()) ) wrapped_packet_900.add_metadata("bestOffset", str(timing_info_m.get_best_offset())) wrapped_packet_900.add_metadata("migrated_from_api_1000", f"v{redvox.VERSION}") # Sensors if audio_m is not None: denorm_audio = list( map(_denormalize_audio_count, audio_m.get_samples().get_values()) ) mic_900 = reader_900.MicrophoneSensor() mic_900.set_sample_rate_hz(audio_m.get_sample_rate()) mic_900.set_first_sample_timestamp_epoch_microseconds_utc( round(audio_m.get_first_sample_timestamp()) ) mic_900.set_sensor_name(audio_m.get_sensor_description()) mic_900.set_metadata_as_dict(audio_m.get_metadata().get_metadata()) mic_900.set_payload_values(denorm_audio) wrapped_packet_900.set_microphone_sensor(mic_900) pressure_m = sensors_m.get_pressure() if pressure_m is not None: barometer_900 = reader_900.BarometerSensor() barometer_900.set_sensor_name(pressure_m.get_sensor_description()) barometer_900.set_metadata_as_dict(pressure_m.get_metadata().get_metadata()) barometer_900.set_timestamps_microseconds_utc( pressure_m.get_timestamps().get_timestamps().astype(np.int64) ) barometer_900.set_payload_values(pressure_m.get_samples().get_values()) wrapped_packet_900.set_barometer_sensor(barometer_900) location_m = sensors_m.get_location() if location_m is not None: location_900 = reader_900.LocationSensor() location_900.set_sensor_name(location_m.get_sensor_description()) location_900.set_timestamps_microseconds_utc( location_m.get_timestamps().get_timestamps().astype(np.int64) ) location_900.set_payload_values( location_m.get_latitude_samples().get_values(), location_m.get_longitude_samples().get_values(), location_m.get_altitude_samples().get_values(), location_m.get_speed_samples().get_values(), location_m.get_horizontal_accuracy_samples().get_values(), ) wrapped_packet_900.set_location_sensor(location_900) metadata = location_m.get_metadata().get_metadata() metadata["useLocation"] = ( "T" if location_m.get_location_services_enabled() else "F" ) metadata["desiredLocation"] = ( "T" if location_m.get_location_services_requested() else "F" ) metadata["permissionLocation"] = ( "T" if location_m.get_location_permissions_granted() else "F" ) metadata["enabledLocation"] = ( "T" if LocationProvider.GPS in location_m.get_location_providers().get_values() else "FD" ) location_900.set_metadata_as_dict(metadata) # Synch exchanges synch_exchanges_m = timing_info_m.get_synch_exchanges() if synch_exchanges_m.get_count() > 0: synch_900 = reader_900.TimeSynchronizationSensor() values: List[int] = [] for exchange in synch_exchanges_m.get_values(): values.extend( [ round(exchange.get_a1()), round(exchange.get_a2()), round(exchange.get_a3()), round(exchange.get_b1()), round(exchange.get_b2()), round(exchange.get_b3()), ] ) synch_900.set_payload_values(values) wrapped_packet_900.set_time_synchronization_sensor(synch_900) accel_m = sensors_m.get_accelerometer() if accel_m is not None: accel_900 = reader_900.AccelerometerSensor() accel_900.set_sensor_name(accel_m.get_sensor_description()) accel_900.set_timestamps_microseconds_utc( accel_m.get_timestamps().get_timestamps().astype(np.int64) ) accel_900.set_metadata_as_dict(accel_m.get_metadata().get_metadata()) accel_900.set_payload_values( accel_m.get_x_samples().get_values(), accel_m.get_y_samples().get_values(), accel_m.get_z_samples().get_values(), ) wrapped_packet_900.set_accelerometer_sensor(accel_900) magnetometer_m = sensors_m.get_magnetometer() if magnetometer_m is not None: magnetometer_900 = reader_900.MagnetometerSensor() magnetometer_900.set_sensor_name(magnetometer_m.get_sensor_description()) magnetometer_900.set_timestamps_microseconds_utc( magnetometer_m.get_timestamps().get_timestamps().astype(np.int64) ) magnetometer_900.set_metadata_as_dict( magnetometer_m.get_metadata().get_metadata() ) magnetometer_900.set_payload_values( magnetometer_m.get_x_samples().get_values(), magnetometer_m.get_y_samples().get_values(), magnetometer_m.get_z_samples().get_values(), ) wrapped_packet_900.set_magnetometer_sensor(magnetometer_900) gyroscope_m = sensors_m.get_gyroscope() if gyroscope_m is not None: gyroscope_900 = reader_900.GyroscopeSensor() gyroscope_900.set_sensor_name(gyroscope_m.get_sensor_description()) gyroscope_900.set_timestamps_microseconds_utc( gyroscope_m.get_timestamps().get_timestamps().astype(np.int64) ) gyroscope_900.set_metadata_as_dict(gyroscope_m.get_metadata().get_metadata()) gyroscope_900.set_payload_values( gyroscope_m.get_x_samples().get_values(), gyroscope_m.get_y_samples().get_values(), gyroscope_m.get_z_samples().get_values(), ) wrapped_packet_900.set_gyroscope_sensor(gyroscope_900) # Light light_m = sensors_m.get_light() if light_m is not None: light_900 = reader_900.LightSensor() light_900.set_sensor_name(light_m.get_sensor_description()) light_900.set_metadata_as_dict(light_m.get_metadata().get_metadata()) light_900.set_timestamps_microseconds_utc( light_m.get_timestamps().get_timestamps().astype(np.int64) ) light_900.set_payload_values(light_m.get_samples().get_values()) wrapped_packet_900.set_light_sensor(light_900) # Image, skip for now # Infrared / proximity proximity_m = sensors_m.get_proximity() if proximity_m is not None: proximity_900 = reader_900.InfraredSensor() proximity_900.set_sensor_name(proximity_m.get_sensor_description()) proximity_900.set_metadata_as_dict(proximity_m.get_metadata().get_metadata()) proximity_900.set_timestamps_microseconds_utc( proximity_m.get_timestamps().get_timestamps().astype(np.int64) ) proximity_900.set_payload_values(proximity_m.get_samples().get_values()) wrapped_packet_900.set_infrared_sensor(proximity_900) return wrapped_packet_900 def convert_api_900_to_1000(wrapped_packet_900: WrappedRedvoxPacket) ‑> WrappedRedvoxPacketM- 
Converts a wrapped API 900 packet into a wrapped API M packet.
:param wrapped_packet_900: API 900 packet to convert. :return: A wrapped API M packet.
Expand source code
def convert_api_900_to_1000( wrapped_packet_900: reader_900.WrappedRedvoxPacket, ) -> WrappedRedvoxPacketM: """ Converts a wrapped API 900 packet into a wrapped API M packet. :param wrapped_packet_900: API 900 packet to convert. :return: A wrapped API M packet. """ wrapped_packet_m: WrappedRedvoxPacketM = WrappedRedvoxPacketM.new() # Top-level metadata wrapped_packet_m.set_api(1000.0) # noinspection PyUnresolvedReferences,Mypy wrapped_packet_m.set_sub_api(900.0) # Station information station_information: StationInformation = wrapped_packet_m.get_station_information() station_information.set_id(wrapped_packet_900.redvox_id()).set_uuid( wrapped_packet_900.uuid() ).set_make(wrapped_packet_900.device_make()).set_model( wrapped_packet_900.device_model() ).set_os( _migrate_os_type_900_to_1000(wrapped_packet_900.device_os()) ).set_os_version( wrapped_packet_900.device_os_version() ).set_app_version( wrapped_packet_900.app_version() ).set_is_private( wrapped_packet_900.is_private() ) station_information.get_service_urls().set_acquisition_server( wrapped_packet_900.acquisition_server() ).set_synch_server( wrapped_packet_900.time_synchronization_server() ).set_auth_server( wrapped_packet_900.authentication_server() ) # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings # StationMetrics - We know a couple. We take a slightly more cumbersome approach using the raw protobuf # to avoid some conversions between lists and np arrays. station_metrics: StationMetrics = station_information.get_station_metrics() station_metrics.get_timestamps().append_timestamp( wrapped_packet_900.app_file_start_timestamp_machine() ) station_metrics.get_temperature().set_values( np.array([wrapped_packet_900.device_temperature_c()]), True ) station_metrics.get_battery().set_values( np.array([wrapped_packet_900.battery_level_percent()]), True ) # Timing information mach_time_900: int = wrapped_packet_900.app_file_start_timestamp_machine() os_time_900: int = ( wrapped_packet_900.app_file_start_timestamp_epoch_microseconds_utc() ) len_micros: int = _packet_length_microseconds_900(wrapped_packet_900) best_latency: Optional[float] = wrapped_packet_900.best_latency() best_latency = best_latency if best_latency is not None else NAN best_offset: Optional[float] = wrapped_packet_900.best_offset() best_offset = best_offset if best_offset is not None else NAN wrapped_packet_m.get_timing_information().set_unit( common_m.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ).set_packet_start_mach_timestamp(mach_time_900).set_packet_start_os_timestamp( os_time_900 ).set_packet_end_mach_timestamp( mach_time_900 + len_micros ).set_packet_end_os_timestamp( os_time_900 + len_micros ).set_server_acquisition_arrival_timestamp( wrapped_packet_900.server_timestamp_epoch_microseconds_utc() ).set_app_start_mach_timestamp( _find_mach_time_zero(wrapped_packet_900) ).set_best_latency( best_latency ).set_best_offset( best_offset ) time_sensor = wrapped_packet_900.time_synchronization_sensor() if time_sensor is not None: wrapped_packet_m.get_timing_information().get_synch_exchanges().set_values( _migrate_synch_exchanges_900_to_1000(time_sensor.payload_values()) ) # Sensors sensors_m: Sensors = wrapped_packet_m.get_sensors() # Microphone / Audio mic_sensor_900: Optional[ reader_900.MicrophoneSensor ] = wrapped_packet_900.microphone_sensor() if mic_sensor_900 is not None: normalized_audio: np.ndarray = ( mic_sensor_900.payload_values() / _NORMALIZATION_CONSTANT ) audio_sensor_m = sensors_m.new_audio() audio_sensor_m.set_first_sample_timestamp( mic_sensor_900.first_sample_timestamp_epoch_microseconds_utc() ).set_is_scrambled(wrapped_packet_900.is_scrambled()).set_sample_rate( mic_sensor_900.sample_rate_hz() ).set_sensor_description( mic_sensor_900.sensor_name() ).get_samples().set_values( normalized_audio, update_value_statistics=True ) audio_sensor_m.get_metadata().set_metadata(mic_sensor_900.metadata_as_dict()) # Barometer barometer_sensor_900: Optional[ reader_900.BarometerSensor ] = wrapped_packet_900.barometer_sensor() if barometer_sensor_900 is not None: pressure_sensor_m = sensors_m.new_pressure() pressure_sensor_m.set_sensor_description(barometer_sensor_900.sensor_name()) pressure_sensor_m.get_timestamps().set_timestamps( barometer_sensor_900.timestamps_microseconds_utc(), True ) pressure_sensor_m.get_samples().set_values( barometer_sensor_900.payload_values(), True ) pressure_sensor_m.get_metadata().set_metadata( barometer_sensor_900.metadata_as_dict() ) # Location # TODO: rework location_sensor_900: Optional[ reader_900.LocationSensor ] = wrapped_packet_900.location_sensor() if location_sensor_900 is not None: location_m = sensors_m.new_location() location_m.set_sensor_description(location_sensor_900.sensor_name()) location_m.get_timestamps().set_timestamps( location_sensor_900.timestamps_microseconds_utc(), True ) if location_sensor_900.check_for_preset_lat_lon(): lat_lon: np.ndarray = location_sensor_900.get_payload_lat_lon() location_m.get_latitude_samples().set_values(lat_lon[:1], True) location_m.get_longitude_samples().set_values(lat_lon[1:], True) else: location_m.get_latitude_samples().set_values( location_sensor_900.payload_values_latitude(), True ) location_m.get_longitude_samples().set_values( location_sensor_900.payload_values_longitude(), True ) location_m.get_altitude_samples().set_values( location_sensor_900.payload_values_altitude(), True ) location_m.get_speed_samples().set_values( location_sensor_900.payload_values_speed(), True ) location_m.get_horizontal_accuracy_samples().set_values( location_sensor_900.payload_values_accuracy(), True ) def _extract_meta_bool(metad: Dict[str, str], key: str) -> bool: if key not in metad: return False return metad[key] == "T" loc_meta_900 = location_sensor_900.metadata_as_dict() use_location = _extract_meta_bool(loc_meta_900, "useLocation") desired_location = _extract_meta_bool(loc_meta_900, "desiredLocation") permission_location = _extract_meta_bool(loc_meta_900, "permissionLocation") enabled_location = _extract_meta_bool(loc_meta_900, "enabledLocation") n_p = location_m.get_timestamps().get_timestamps_count() if desired_location: location_m.get_location_providers().set_values( [LocationProvider.USER for i in range(n_p)] ) elif enabled_location: location_m.get_location_providers().set_values( [LocationProvider.GPS for i in range(n_p)] ) elif use_location and desired_location and permission_location: location_m.get_location_providers().set_values( [LocationProvider.NETWORK for i in range(n_p)] ) else: location_m.get_location_providers().set_values( [LocationProvider.NONE for i in range(n_p)] ) location_m.set_location_permissions_granted(permission_location) location_m.set_location_services_enabled(use_location) location_m.set_location_services_requested(desired_location) # Once we're done here, we should remove the original metadata if "useLocation" in loc_meta_900: del loc_meta_900["useLocation"] if "desiredLocation" in loc_meta_900: del loc_meta_900["desiredLocation"] if "permissionLocation" in loc_meta_900: del loc_meta_900["permissionLocation"] if "enabledLocation" in loc_meta_900: del loc_meta_900["enabledLocation"] if "machTimeZero" in loc_meta_900: del loc_meta_900["machTimeZero"] location_m.get_metadata().set_metadata(loc_meta_900) # Time Synchronization # This was already added to the timing information # Accelerometer accelerometer_900 = wrapped_packet_900.accelerometer_sensor() if accelerometer_900 is not None: accelerometer_m = sensors_m.new_accelerometer() accelerometer_m.set_sensor_description(accelerometer_900.sensor_name()) accelerometer_m.get_timestamps().set_timestamps( accelerometer_900.timestamps_microseconds_utc(), True ) accelerometer_m.get_x_samples().set_values( accelerometer_900.payload_values_x(), True ) accelerometer_m.get_y_samples().set_values( accelerometer_900.payload_values_y(), True ) accelerometer_m.get_z_samples().set_values( accelerometer_900.payload_values_z(), True ) accelerometer_m.get_metadata().set_metadata( accelerometer_900.metadata_as_dict() ) # Magnetometer magnetometer_900 = wrapped_packet_900.magnetometer_sensor() if magnetometer_900 is not None: magnetometer_m = sensors_m.new_magnetometer() magnetometer_m.set_sensor_description(magnetometer_900.sensor_name()) magnetometer_m.get_timestamps().set_timestamps( magnetometer_900.timestamps_microseconds_utc(), True ) magnetometer_m.get_x_samples().set_values( magnetometer_900.payload_values_x(), True ) magnetometer_m.get_y_samples().set_values( magnetometer_900.payload_values_y(), True ) magnetometer_m.get_z_samples().set_values( magnetometer_900.payload_values_z(), True ) magnetometer_m.get_metadata().set_metadata(magnetometer_900.metadata_as_dict()) # Gyroscope gyroscope_900 = wrapped_packet_900.gyroscope_sensor() if gyroscope_900 is not None: gyroscope_m = sensors_m.new_gyroscope() gyroscope_m.set_sensor_description(gyroscope_900.sensor_name()) gyroscope_m.get_timestamps().set_timestamps( gyroscope_900.timestamps_microseconds_utc(), True ) gyroscope_m.get_x_samples().set_values(gyroscope_900.payload_values_x(), True) gyroscope_m.get_y_samples().set_values(gyroscope_900.payload_values_y(), True) gyroscope_m.get_z_samples().set_values(gyroscope_900.payload_values_z(), True) gyroscope_m.get_metadata().set_metadata(gyroscope_900.metadata_as_dict()) # Light light_900 = wrapped_packet_900.light_sensor() if light_900 is not None: light_m = sensors_m.new_light() light_m.set_sensor_description(light_900.sensor_name()) light_m.get_timestamps().set_timestamps( light_900.timestamps_microseconds_utc(), True ) light_m.get_samples().set_values(light_900.payload_values(), True) light_m.get_metadata().set_metadata(light_900.metadata_as_dict()) # Image # TODO: Implement # Proximity proximity_900 = wrapped_packet_900.infrared_sensor() if proximity_900 is not None: proximity_m = sensors_m.new_proximity() proximity_m.set_sensor_description(proximity_900.sensor_name()) proximity_m.get_timestamps().set_timestamps( proximity_900.timestamps_microseconds_utc(), True ) proximity_m.get_samples().set_values(proximity_900.payload_values(), True) proximity_m.get_metadata().set_metadata(proximity_900.metadata_as_dict()) # Removed any other API 900 top-level metadata now that its been used meta = wrapped_packet_900.metadata_as_dict() if "machTimeZero" in meta: del meta["machTimeZero"] if "bestOffset" in meta: del meta["bestOffset"] if "bestLatency" in meta: del meta["bestLatency"] wrapped_packet_m.get_metadata().append_metadata( "migrated_from_api_900", f"v{redvox.VERSION}" ) return wrapped_packet_m def convert_api_900_to_1000_raw(packet: api900_pb2.RedvoxPacket) ‑> src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM- 
Converts a wrapped API 900 packet into a wrapped API M packet.
:param packet: API 900 packet to convert. :return: A wrapped API M packet.
Expand source code
def convert_api_900_to_1000_raw(packet: api_900.RedvoxPacket) -> api_m.RedvoxPacketM: """ Converts a wrapped API 900 packet into a wrapped API M packet. :param packet: API 900 packet to convert. :return: A wrapped API M packet. """ packet_m: api_m.RedvoxPacketM = api_m.RedvoxPacketM() # Top-level metadata packet_m.api = 1000.0 # noinspection PyUnresolvedReferences,Mypy packet_m.sub_api = 900.0 # Station information packet_m.station_information.id = packet.redvox_id packet_m.station_information.uuid = packet.uuid packet_m.station_information.make = packet.device_make packet_m.station_information.model = packet.device_model packet_m.station_information.os = _migrate_os_type_900_to_1000_raw(packet.device_os) packet_m.station_information.os_version = packet.device_os_version packet_m.station_information.app_version = packet.app_version packet_m.station_information.is_private = packet.is_private packet_m.station_information.app_settings.samples_per_window = \ len(packet.evenly_sampled_channels[0].int32_payload.payload) packet_m.station_information.service_urls.acquisition_server = ( packet.acquisition_server ) packet_m.station_information.service_urls.synch_server = ( packet.time_synchronization_server ) packet_m.station_information.service_urls.auth_server = packet.authentication_server # API 900 does not maintain a copy of its settings. So we will not set anything in AppSettings # StationMetrics - We know a couple. packet_m.station_information.station_metrics.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.station_information.station_metrics.timestamps.timestamps[:] = [ packet.app_file_start_timestamp_machine ] packet_m.station_information.station_metrics.temperature.unit = ( api_m.RedvoxPacketM.Unit.DEGREES_CELSIUS ) packet_m.station_information.station_metrics.temperature.values[:] = [ packet.device_temperature_c ] packet_m.station_information.station_metrics.battery.unit = ( api_m.RedvoxPacketM.Unit.PERCENTAGE ) packet_m.station_information.station_metrics.battery.values[:] = [ packet.battery_level_percent ] # And we can fill in defaults for those we don't know packet_m.station_information.station_metrics.available_disk.unit = ( api_m.RedvoxPacketM.Unit.BYTE ) packet_m.station_information.station_metrics.available_disk.values[:] = [ float("nan") ] packet_m.station_information.station_metrics.available_ram.unit = ( api_m.RedvoxPacketM.Unit.BYTE ) packet_m.station_information.station_metrics.available_ram.values[:] = [ float("nan") ] packet_m.station_information.station_metrics.cpu_utilization.unit = ( api_m.RedvoxPacketM.Unit.PERCENTAGE ) packet_m.station_information.station_metrics.cpu_utilization.values[:] = [ float("nan") ] packet_m.station_information.station_metrics.network_strength.unit = ( api_m.RedvoxPacketM.Unit.DECIBEL ) packet_m.station_information.station_metrics.network_strength.values[:] = [ float("nan") ] packet_m.station_information.station_metrics.battery_current.unit = ( api_m.RedvoxPacketM.Unit.MICROAMPERES ) packet_m.station_information.station_metrics.battery_current.values[:] = [ float("nan") ] packet_m.station_information.station_metrics.screen_brightness.unit = ( api_m.RedvoxPacketM.Unit.PERCENTAGE ) packet_m.station_information.station_metrics.screen_brightness.values[:] = [ float("nan") ] packet_m.station_information.station_metrics.network_type[:] = [ api_m.RedvoxPacketM.StationInformation.StationMetrics.NetworkType.UNKNOWN_NETWORK ] packet_m.station_information.station_metrics.cell_service_state[:] = [ api_m.RedvoxPacketM.StationInformation.StationMetrics.CellServiceState.UNKNOWN ] packet_m.station_information.station_metrics.power_state[:] = [ api_m.RedvoxPacketM.StationInformation.StationMetrics.PowerState.UNKNOWN_POWER_STATE ] packet_m.station_information.station_metrics.wifi_wake_lock[:] = [ api_m.RedvoxPacketM.StationInformation.StationMetrics.WifiWakeLock.OTHER ] packet_m.station_information.station_metrics.screen_state[:] = [ api_m.RedvoxPacketM.StationInformation.StationMetrics.ScreenState.UNKNOWN_SCREEN_STATE ] compute_stats_raw(packet_m.station_information.station_metrics.timestamps) compute_stats_raw(packet_m.station_information.station_metrics.temperature) compute_stats_raw(packet_m.station_information.station_metrics.battery) compute_stats_raw(packet_m.station_information.station_metrics.available_disk) compute_stats_raw(packet_m.station_information.station_metrics.available_ram) compute_stats_raw(packet_m.station_information.station_metrics.cpu_utilization) compute_stats_raw(packet_m.station_information.station_metrics.network_strength) compute_stats_raw(packet_m.station_information.station_metrics.battery_current) compute_stats_raw(packet_m.station_information.station_metrics.screen_brightness) # Timing information mach_time_900: int = packet.app_file_start_timestamp_machine os_time_900: int = packet.app_file_start_timestamp_epoch_microseconds_utc len_micros: int = _packet_length_microseconds_900_raw(packet) best_latency: float = reader_utils.get_metadata_or_default( list(packet.metadata), "bestLatency", float, NAN ) best_offset: float = reader_utils.get_metadata_or_default( list(packet.metadata), "bestOffset", float, NAN ) packet_m.timing_information.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.timing_information.packet_start_mach_timestamp = mach_time_900 packet_m.timing_information.packet_start_os_timestamp = os_time_900 packet_m.timing_information.packet_end_mach_timestamp = mach_time_900 + len_micros packet_m.timing_information.server_acquisition_arrival_timestamp = ( packet.server_timestamp_epoch_microseconds_utc ) packet_m.timing_information.packet_end_os_timestamp = os_time_900 + len_micros packet_m.timing_information.app_start_mach_timestamp = _find_mach_time_zero_raw( packet ) packet_m.timing_information.best_latency = best_latency packet_m.timing_information.best_offset = best_offset synch_sensor: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw( packet, {api_900.ChannelType.TIME_SYNCHRONIZATION} ) if synch_sensor is not None: synch_payload: np.ndarray = reader_utils.extract_payload(synch_sensor) packet_m.timing_information.synch_exchanges.extend( _migrate_synch_exchanges_900_to_1000_raw(synch_payload) ) # Sensors # Microphone / Audio if len(packet.evenly_sampled_channels) < 1: raise ValueError("Cannot convert API900 to API1000; Audio sensor missing.") audio_900: api_900.EvenlySampledChannel = packet.evenly_sampled_channels[0] packet_m.sensors.audio.sensor_description = audio_900.sensor_name packet_m.sensors.audio.sample_rate = audio_900.sample_rate_hz packet_m.sensors.audio.first_sample_timestamp = ( audio_900.first_sample_timestamp_epoch_microseconds_utc ) packet_m.sensors.audio.bits_of_precision = 16.0 packet_m.sensors.audio.encoding = "counts" normalized_audio: np.ndarray = ( reader_utils.extract_payload(audio_900) / _NORMALIZATION_CONSTANT ) packet_m.sensors.audio.samples.values[:] = list(normalized_audio) packet_m.sensors.audio.samples.unit = api_m.RedvoxPacketM.Unit.NORMALIZED_COUNTS for i in range(0, len(audio_900.metadata), 2): v: str = audio_900.metadata[i + 1] if (i + 1) < len(audio_900.metadata) else "" packet_m.sensors.audio.metadata[audio_900.metadata[i]] = v compute_stats_raw(packet_m.sensors.audio.samples) # Pressure barometer_900: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.BAROMETER}) if barometer_900 is not None: packet_m.sensors.pressure.sensor_description = barometer_900.sensor_name packet_m.sensors.pressure.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.pressure.timestamps.timestamps[ : ] = barometer_900.timestamps_microseconds_utc packet_m.sensors.pressure.samples.values[:] = list( reader_utils.extract_payload(barometer_900) ) packet_m.sensors.pressure.samples.unit = api_m.RedvoxPacketM.Unit.KILOPASCAL for i in range(0, len(barometer_900.metadata), 2): v = ( barometer_900.metadata[i + 1] if (i + 1) < len(barometer_900.metadata) else "" ) packet_m.sensors.pressure.metadata[barometer_900.metadata[i]] = v compute_stats_raw(packet_m.sensors.pressure.timestamps) compute_stats_raw(packet_m.sensors.pressure.samples) # Location loc_900: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw( packet, { api_900.ChannelType.LATITUDE, api_900.ChannelType.LONGITUDE, api_900.ChannelType.ALTITUDE, api_900.ChannelType.SPEED, api_900.ChannelType.ACCURACY, }, ) if loc_900 is not None: total_samples: int = len(loc_900.timestamps_microseconds_utc) loc_payload: List[float] = list(reader_utils.extract_payload(loc_900)) packet_m.sensors.location.sensor_description = loc_900.sensor_name packet_m.sensors.location.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.location.timestamps.timestamps[ : ] = loc_900.timestamps_microseconds_utc packet_m.sensors.location.timestamps_gps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.location.timestamps_gps.timestamps[:] = [ float("nan") ] * total_samples total_channels: int = len(loc_900.channel_types) lat_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw( packet, api_900.ChannelType.LATITUDE ) packet_m.sensors.location.latitude_samples.unit = ( api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES ) if lat_idx is not None: packet_m.sensors.location.latitude_samples.values[:] = loc_payload[ lat_idx::total_channels ] else: packet_m.sensors.location.latitude_samples.values[:] = [ float("nan") * total_samples ] lng_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw( packet, api_900.ChannelType.LONGITUDE ) packet_m.sensors.location.longitude_samples.unit = ( api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES ) if lng_idx is not None: packet_m.sensors.location.longitude_samples.values[:] = loc_payload[ lng_idx::total_channels ] else: packet_m.sensors.location.longitude_samples.values[:] = [ float("nan") * total_samples ] alt_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw( packet, api_900.ChannelType.ALTITUDE ) packet_m.sensors.location.altitude_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS ) if alt_idx is not None: packet_m.sensors.location.altitude_samples.values[:] = loc_payload[ alt_idx::total_channels ] else: packet_m.sensors.location.altitude_samples.values[:] = [ float("nan") * total_samples ] speed_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw( packet, api_900.ChannelType.SPEED ) packet_m.sensors.location.speed_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS_PER_SECOND ) if speed_idx is not None: packet_m.sensors.location.speed_samples.values[:] = loc_payload[ speed_idx::total_channels ] else: packet_m.sensors.location.speed_samples.values[:] = [ float("nan") * total_samples ] acc_idx: Optional[int] = reader_utils.extract_uneven_payload_idx_raw( packet, api_900.ChannelType.ACCURACY ) packet_m.sensors.location.horizontal_accuracy_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS ) if acc_idx is not None: packet_m.sensors.location.horizontal_accuracy_samples.values[ : ] = loc_payload[acc_idx::total_channels] else: packet_m.sensors.location.horizontal_accuracy_samples.values[:] = [ float("nan") * total_samples ] packet_m.sensors.location.bearing_samples.unit = ( api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES ) packet_m.sensors.location.bearing_samples.values[:] = [ float("nan") * total_samples ] packet_m.sensors.location.vertical_accuracy_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS ) packet_m.sensors.location.vertical_accuracy_samples.values[:] = [ float("nan") * total_samples ] packet_m.sensors.location.speed_accuracy_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS_PER_SECOND ) packet_m.sensors.location.speed_accuracy_samples.values[:] = [ float("nan") * total_samples ] packet_m.sensors.location.bearing_accuracy_samples.unit = ( api_m.RedvoxPacketM.Unit.DECIMAL_DEGREES ) packet_m.sensors.location.bearing_accuracy_samples.values[:] = [ float("nan") * total_samples ] # Compute stats compute_stats_raw(packet_m.sensors.location.timestamps) compute_stats_raw(packet_m.sensors.location.timestamps_gps) compute_stats_raw(packet_m.sensors.location.latitude_samples) compute_stats_raw(packet_m.sensors.location.longitude_samples) compute_stats_raw(packet_m.sensors.location.altitude_samples) compute_stats_raw(packet_m.sensors.location.speed_samples) compute_stats_raw(packet_m.sensors.location.bearing_samples) compute_stats_raw(packet_m.sensors.location.horizontal_accuracy_samples) compute_stats_raw(packet_m.sensors.location.vertical_accuracy_samples) compute_stats_raw(packet_m.sensors.location.speed_accuracy_samples) compute_stats_raw(packet_m.sensors.location.bearing_accuracy_samples) # Bookkeeping use_location: bool = reader_utils.get_metadata_or_default( list(loc_900.metadata), "useLocation", lambda val: v == "T", False ) desired_location: bool = reader_utils.get_metadata_or_default( list(loc_900.metadata), "desiredLocation", lambda val: v == "T", False ) permission_location: bool = reader_utils.get_metadata_or_default( list(loc_900.metadata), "permissionLocation", lambda val: v == "T", False ) enabled_location: bool = reader_utils.get_metadata_or_default( list(loc_900.metadata), "enabledLocation", lambda val: v == "T", False ) if desired_location: packet_m.sensors.location.location_providers[:] = [ api_m.RedvoxPacketM.Sensors.Location.LocationProvider.USER ] * total_samples elif enabled_location: packet_m.sensors.location.location_providers[:] = [ api_m.RedvoxPacketM.Sensors.Location.LocationProvider.GPS ] * total_samples elif use_location and desired_location and permission_location: packet_m.sensors.location.location_providers[:] = [ api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NETWORK ] * total_samples else: packet_m.sensors.location.location_providers[:] = [ api_m.RedvoxPacketM.Sensors.Location.LocationProvider.NONE ] * total_samples packet_m.sensors.location.location_permissions_granted = permission_location packet_m.sensors.location.location_services_enabled = use_location packet_m.sensors.location.location_services_requested = desired_location for (i, k) in enumerate(loc_900.metadata): if i + 1 < len(loc_900.metadata): packet_m.sensors.location.metadata[k] = loc_900.metadata[i + 1] else: packet_m.sensors.location.metadata[k] = "" # # Time Synchronization # # This was already added to the timing information # Accelerometer accel_900: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw( packet, { api_900.ChannelType.ACCELEROMETER_X, api_900.ChannelType.ACCELEROMETER_Y, api_900.ChannelType.ACCELEROMETER_Z, }, ) if accel_900 is not None: packet_m.sensors.accelerometer.sensor_description = accel_900.sensor_name packet_m.sensors.accelerometer.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.accelerometer.timestamps.timestamps[ : ] = accel_900.timestamps_microseconds_utc accel_payload: List[float] = list(reader_utils.extract_payload(accel_900)) packet_m.sensors.accelerometer.x_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED ) packet_m.sensors.accelerometer.x_samples.values[:] = accel_payload[0::3] packet_m.sensors.accelerometer.y_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED ) packet_m.sensors.accelerometer.y_samples.values[:] = accel_payload[1::3] packet_m.sensors.accelerometer.z_samples.unit = ( api_m.RedvoxPacketM.Unit.METERS_PER_SECOND_SQUARED ) packet_m.sensors.accelerometer.z_samples.values[:] = accel_payload[2::3] compute_stats_raw(packet_m.sensors.accelerometer.timestamps) compute_stats_raw(packet_m.sensors.accelerometer.x_samples) compute_stats_raw(packet_m.sensors.accelerometer.y_samples) compute_stats_raw(packet_m.sensors.accelerometer.z_samples) # Magnetometer sensor: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw( packet, { api_900.ChannelType.MAGNETOMETER_X, api_900.ChannelType.MAGNETOMETER_Y, api_900.ChannelType.MAGNETOMETER_Z, }, ) if sensor is not None: packet_m.sensors.magnetometer.sensor_description = sensor.sensor_name packet_m.sensors.magnetometer.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.magnetometer.timestamps.timestamps[ : ] = sensor.timestamps_microseconds_utc sensor_payload: List[float] = list(reader_utils.extract_payload(sensor)) packet_m.sensors.magnetometer.x_samples.unit = ( api_m.RedvoxPacketM.Unit.MICROTESLA ) packet_m.sensors.magnetometer.x_samples.values[:] = sensor_payload[0::3] packet_m.sensors.magnetometer.y_samples.unit = ( api_m.RedvoxPacketM.Unit.MICROTESLA ) packet_m.sensors.magnetometer.y_samples.values[:] = sensor_payload[1::3] packet_m.sensors.magnetometer.z_samples.unit = ( api_m.RedvoxPacketM.Unit.MICROTESLA ) packet_m.sensors.magnetometer.z_samples.values[:] = sensor_payload[2::3] compute_stats_raw(packet_m.sensors.magnetometer.timestamps) compute_stats_raw(packet_m.sensors.magnetometer.x_samples) compute_stats_raw(packet_m.sensors.magnetometer.y_samples) compute_stats_raw(packet_m.sensors.magnetometer.z_samples) # # Gyroscope sensor = reader_utils.find_uneven_channel_raw( packet, { api_900.ChannelType.GYROSCOPE_X, api_900.ChannelType.GYROSCOPE_Y, api_900.ChannelType.GYROSCOPE_Z, }, ) if sensor is not None: packet_m.sensors.gyroscope.sensor_description = sensor.sensor_name packet_m.sensors.gyroscope.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.gyroscope.timestamps.timestamps[ : ] = sensor.timestamps_microseconds_utc sensor_payload = list(reader_utils.extract_payload(sensor)) packet_m.sensors.gyroscope.x_samples.unit = ( api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND ) packet_m.sensors.gyroscope.x_samples.values[:] = sensor_payload[0::3] packet_m.sensors.gyroscope.y_samples.unit = ( api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND ) packet_m.sensors.gyroscope.y_samples.values[:] = sensor_payload[1::3] packet_m.sensors.gyroscope.z_samples.unit = ( api_m.RedvoxPacketM.Unit.RADIANS_PER_SECOND ) packet_m.sensors.gyroscope.z_samples.values[:] = sensor_payload[2::3] compute_stats_raw(packet_m.sensors.gyroscope.timestamps) compute_stats_raw(packet_m.sensors.gyroscope.x_samples) compute_stats_raw(packet_m.sensors.gyroscope.y_samples) compute_stats_raw(packet_m.sensors.gyroscope.z_samples) # # # Light light_900: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.LIGHT}) if light_900 is not None: packet_m.sensors.light.sensor_description = light_900.sensor_name packet_m.sensors.light.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.light.timestamps.timestamps[ : ] = light_900.timestamps_microseconds_utc packet_m.sensors.light.samples.values[:] = list( reader_utils.extract_payload(light_900) ) packet_m.sensors.light.samples.unit = api_m.RedvoxPacketM.Unit.LUX for i in range(0, len(light_900.metadata), 2): v = light_900.metadata[i + 1] if (i + 1) < len(light_900.metadata) else "" packet_m.sensors.light.metadata[light_900.metadata[i]] = v compute_stats_raw(packet_m.sensors.light.timestamps) compute_stats_raw(packet_m.sensors.light.samples) # # Image # Not implemented for conversion. Only a very small fraction of API 900 was ever image capable, and not the public # app. # # Proximity proximity_900: Optional[ api_900.UnevenlySampledChannel ] = reader_utils.find_uneven_channel_raw(packet, {api_900.ChannelType.INFRARED}) if proximity_900 is not None: packet_m.sensors.proximity.sensor_description = proximity_900.sensor_name packet_m.sensors.proximity.timestamps.unit = ( api_m.RedvoxPacketM.Unit.MICROSECONDS_SINCE_UNIX_EPOCH ) packet_m.sensors.proximity.timestamps.timestamps[ : ] = proximity_900.timestamps_microseconds_utc packet_m.sensors.proximity.samples.values[:] = list( reader_utils.extract_payload(proximity_900) ) packet_m.sensors.proximity.samples.unit = api_m.RedvoxPacketM.Unit.CENTIMETERS for i in range(0, len(proximity_900.metadata), 2): v = ( proximity_900.metadata[i + 1] if (i + 1) < len(proximity_900.metadata) else "" ) packet_m.sensors.proximity.metadata[proximity_900.metadata[i]] = v compute_stats_raw(packet_m.sensors.proximity.timestamps) compute_stats_raw(packet_m.sensors.proximity.samples) return packet_m