Module redvox.api900.wrapped_redvox_packet

This module contains classes and methods for working with WrappedRedvoxPackets

Expand source code
# pylint: disable=C0302
"""
This module contains classes and methods for working with WrappedRedvoxPackets
"""

import os
import typing

import redvox.api900.concat
import redvox.common.date_time_utils as date_time_utils
import redvox.api900.lib.api900_pb2 as api900_pb2
import redvox.api900.migrations as migrations
import redvox.api900.reader_utils as reader_utils
import redvox.api900.sensors.accelerometer_sensor as _accelerometer_sensor
import redvox.api900.sensors.barometer_sensor as _barometer_sensor
import redvox.api900.sensors.gyroscope_sensor as _gyroscope_sensor
import redvox.api900.sensors.image_sensor as _image_sensor
import redvox.api900.sensors.infrared_sensor as _infrared_sensor
import redvox.api900.sensors.light_sensor as _light_sensor
import redvox.api900.sensors.location_sensor as _location_sensor
import redvox.api900.sensors.magnetometer_sensor as _magnetometer_sensor
import redvox.api900.sensors.microphone_sensor as _microphone_sensor
import redvox.api900.sensors.time_synchronization_sensor as _time_synchronization_sensor
from redvox.api900.sensors.evenly_sampled_channel import EvenlySampledChannel
from redvox.api900.sensors.unevenly_sampled_channel import UnevenlySampledChannel


def read_buffer(buf: bytes, is_compressed: bool = True) -> api900_pb2.RedvoxPacket:
    """
    Deserializes a serialized protobuf RedvoxPacket buffer.
    :param buf: Buffer to deserialize.
    :param is_compressed: Whether or not the buffer is compressed or decompressed.
    :return: Deserialized protobuf redvox packet.
    """
    buffer = reader_utils.lz4_decompress(buf) if is_compressed else buf
    redvox_packet = api900_pb2.RedvoxPacket()
    redvox_packet.ParseFromString(buffer)
    return redvox_packet


# pylint: disable=R0904
class WrappedRedvoxPacket:
    """
    This class provides convenience methods for accessing API 900 protobuf redvox packets.

    This packet contains a reference to the original packet which should be used to access all "top-level" fields. For
    accessing channels, this class can search for and return our high-level channel wrappers or can extract the payload
    directly.
    """

    def __init__(self, redvox_packet: api900_pb2.RedvoxPacket = None):
        """
        Initializes this wrapped redvox packet.
        :param redvox_packet: A protobuf redvox packet.
        """
        if redvox_packet is None:
            self._redvox_packet = api900_pb2.RedvoxPacket()
            self._evenly_sampled_channels_field = list()
            self._unevenly_sampled_channels_field = list()
            self._metadata_list = list()
            self._channel_cache = {}

        else:
            self._redvox_packet: api900_pb2.RedvoxPacket = redvox_packet
            """Protobuf api 900 redvox packet"""

            self._evenly_sampled_channels_field: typing.List[EvenlySampledChannel] = list(
                map(EvenlySampledChannel, reader_utils.repeated_to_array(redvox_packet.evenly_sampled_channels)))
            """List of evenly sampled channels"""

            self._unevenly_sampled_channels_field: typing.List[UnevenlySampledChannel] = list(
                map(UnevenlySampledChannel,
                    reader_utils.repeated_to_array(redvox_packet.unevenly_sampled_channels)))
            """List of unevenly sampled channels"""

            self._metadata_list: typing.List[str] = reader_utils.repeated_to_list(redvox_packet.metadata)
            """List of metadata"""

            self._channel_cache: typing.Dict[int, typing.Union[EvenlySampledChannel, UnevenlySampledChannel]] = {}
            """Holds a mapping of channel type to channel for O(1) access."""

            # Initialize channel cache
            for evenly_sampled_channel in self._evenly_sampled_channels_field:
                for channel_type in evenly_sampled_channel.channel_types:
                    self._channel_cache[channel_type] = evenly_sampled_channel

            for unevenly_sampled_channel in self._unevenly_sampled_channels_field:
                for channel_type in unevenly_sampled_channel.channel_types:
                    self._channel_cache[channel_type] = unevenly_sampled_channel

    def redvox_packet(self) -> api900_pb2.RedvoxPacket:
        """
        returns the protobuf redvox packet
        :return: protobuf redvox packet
        """
        return self._redvox_packet

    def _evenly_sampled_channels(self) -> typing.List[EvenlySampledChannel]:
        """
        returns the evenly sampled channels as a copied list to avoid built in functions making untracked changes
        :return: list of evenly sampled channels
        """
        return self._evenly_sampled_channels_field.copy()

    def _unevenly_sampled_channels(self) -> typing.List[UnevenlySampledChannel]:
        """
        returns the unevenly sampled channels as a copied list to avoid built in functions making untracked changes
        :return: list of unevenly sampled channels
        """
        return self._unevenly_sampled_channels_field.copy()

    def _refresh_channels(self):
        """
        takes the redvox packet and rebuilds the channel cache from it
        """
        self._evenly_sampled_channels_field = list(map(EvenlySampledChannel,
                                                       reader_utils.repeated_to_array(
                                                           self._redvox_packet.evenly_sampled_channels)))
        self._unevenly_sampled_channels_field = list(map(UnevenlySampledChannel,
                                                         reader_utils.repeated_to_array(
                                                             self._redvox_packet.unevenly_sampled_channels)))
        self._channel_cache = {}
        for evenly_sampled_channel in self._evenly_sampled_channels_field:
            for channel_type in evenly_sampled_channel.channel_types:
                self._channel_cache[channel_type] = evenly_sampled_channel
        for unevenly_sampled_channel in self._unevenly_sampled_channels_field:
            for channel_type in unevenly_sampled_channel.channel_types:
                self._channel_cache[channel_type] = unevenly_sampled_channel

    def _add_channel(self, channel: typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        Add a channel
        :param channel: channel to add
        """
        index, sample = self._find_channel(channel.channel_types[0])
        if index is None and sample is None:
            # if type(channel) not in [EvenlySampledChannel, UnevenlySampledChannel]:
            if not isinstance(channel, (EvenlySampledChannel, UnevenlySampledChannel)):
                raise TypeError("Channel type to add must be even or uneven.")
            else:
                self._add_channel_redvox_packet(channel)
                self._refresh_channels()
        else:
            raise ValueError("Cannot add a channel with a type that already exists in this packet.")

    def _edit_channel(self, channel_type: int, channel: typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        removes the channel with the given type and adds the channel supplied
        :param channel_type: type of channel to remove
        :param channel: the channel to add
        """
        index, sampling = self._find_channel(channel_type)
        if index is not None and sampling is not None:
            if isinstance(channel, EvenlySampledChannel):
                del self._redvox_packet.evenly_sampled_channels[index]
                self._add_channel_redvox_packet(channel)
            elif isinstance(channel, UnevenlySampledChannel):
                del self._redvox_packet.unevenly_sampled_channels[index]
                self._add_channel_redvox_packet(channel)
            else:
                raise TypeError("Channel type to edit is unknown!")
            self._refresh_channels()
        else:
            raise TypeError("Unknown channel type specified for edit.")

    def _delete_channel(self, channel_type: int):
        """
        deletes the channel type specified
        :param channel_type: a channel to remove
        """
        index, sampling = self._find_channel(channel_type)
        if index is not None and sampling is not None:
            if sampling == EvenlySampledChannel:
                del self._redvox_packet.evenly_sampled_channels[index]
            else:
                del self._redvox_packet.unevenly_sampled_channels[index]
            self._refresh_channels()
        else:
            raise TypeError("Unknown channel type to remove from packet.")

    # pylint: disable=W0120
    def _find_channel(self, channel_type: int) -> (int, typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        returns the index of the channel and the kind of sampled array its in
        :return: the index in the even or uneven array and the name of the array
        """
        if self._has_channel(channel_type):
            for idx in range(len(self._evenly_sampled_channels_field)):
                if channel_type in self._evenly_sampled_channels_field[idx].channel_types:
                    return idx, EvenlySampledChannel
            for idx in range(len(self._unevenly_sampled_channels_field)):
                if channel_type in self._unevenly_sampled_channels_field[idx].channel_types:
                    return idx, UnevenlySampledChannel
            else:
                return None, None
        else:
            return None, None

    # pylint: disable=R0912
    def _add_channel_redvox_packet(self, channel: typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        adds the channel to the redvox_packet
        :param channel: channel to add
        """
        if isinstance(channel, EvenlySampledChannel):
            newchan = self._redvox_packet.evenly_sampled_channels.add()
            newchan.sample_rate_hz = channel.sample_rate_hz
            newchan.first_sample_timestamp_epoch_microseconds_utc = \
                channel.first_sample_timestamp_epoch_microseconds_utc
        elif isinstance(channel, UnevenlySampledChannel):
            newchan = self._redvox_packet.unevenly_sampled_channels.add()
            for time in channel.timestamps_microseconds_utc:
                newchan.timestamps_microseconds_utc.append(time)
            newchan.sample_interval_mean = channel.sample_interval_mean
            newchan.sample_interval_std = channel.sample_interval_std
            newchan.sample_interval_median = channel.sample_interval_median
        else:
            raise TypeError("Channel type to add to redvox packet is unknown!")

        pl_type = channel.get_payload_type()
        if pl_type == "byte_payload":
            newchan.byte_payload.payload.extend(channel.payload)
        elif pl_type == "uint32_payload":
            newchan.uint32_payload.payload.extend(channel.payload)
        elif pl_type == "uint64_payload":
            newchan.uint64_payload.payload.extend(channel.payload)
        elif pl_type == "int32_payload":
            newchan.int32_payload.payload.extend(channel.payload)
        elif pl_type == "int64_payload":
            newchan.int64_payload.payload.extend(channel.payload)
        elif pl_type == "float32_payload":
            newchan.float32_payload.payload.extend(channel.payload)
        elif pl_type == "float64_payload":
            newchan.float64_payload.payload.extend(channel.payload)
        elif pl_type is None:
            pass
        else:
            raise TypeError("Unknown payload type in channel to add.")

        for chan_type in channel.channel_types:
            newchan.channel_types.append(chan_type)
        newchan.sensor_name = channel.sensor_name
        for mean in channel.value_means:
            newchan.value_means.append(mean)
        for stds in channel.value_stds:
            newchan.value_stds.append(stds)
        for median in channel.value_medians:
            newchan.value_medians.append(median)
        for meta in channel.metadata:
            newchan.metadata.append(meta)

    def _get_channel_types(self) -> typing.List[typing.List[int]]:
        """
        Returns a list of channel type enumerations. This is a list of lists, and allows us to easily view
        interleaved channels.
        :return: A list of channel type enumerations.
        """
        channel_types = []
        for evenly_sampled_channel in self._evenly_sampled_channels_field:
            channel_types.append(evenly_sampled_channel.channel_types)

        for unevenly_sampled_channel in self._unevenly_sampled_channels_field:
            channel_types.append(unevenly_sampled_channel.channel_types)

        return channel_types

    def _get_channel_type_names(self) -> typing.List[typing.List[str]]:
        """
        Returns a list of channel type names. This is a list of lists, and allows us to easily view
        interleaved channels.
        :return: A list of channel type names. This is a list of lists, and allows us to easily view
        interleaved channels.
        """
        names = []
        for channel_types in self._get_channel_types():
            names.append(list(map(reader_utils.channel_type_name_from_enum, channel_types)))
        return names

    def _get_channel(self, channel_type: int) -> typing.Union[EvenlySampledChannel, UnevenlySampledChannel, None]:
        """
        Returns a channel from this packet according to the channel type.
        :param channel_type: The channel type to search for.
        :return: A high level channel wrapper or None.
        """
        if channel_type in self._channel_cache:
            return self._channel_cache[channel_type]

        return None

    def _has_channel(self, channel_type: int) -> bool:
        """
        Returns True if this packet contains a channel with this type otherwise False.
        :param channel_type: Channel type to search for.
        :return: True is this packet contains a channel with this type otherwise False.
        """
        return channel_type in self._channel_cache

    def _has_channels(self, channel_types: typing.List[int]) -> bool:
        """
        Checks that this packet contains all of the provided channels.
        :param channel_types: Channel types that this packet must contain.
        :return: True if this packet contains all provided channel types, False otherwise.
        """
        has_channel_results = map(self._has_channel, channel_types)
        for has_channel_result in has_channel_results:
            if not has_channel_result:
                return False
        return True

    def to_json(self) -> str:
        """
        Converts the protobuf packet stored in this wrapped packet to JSON.
        :return: The JSON representation of the protobuf encoded packet.
        """
        return reader_utils.to_json(self._redvox_packet)

    def compressed_buffer(self) -> bytes:
        """
        Returns the compressed buffer associated with this packet.
        :return: The compressed buffer associated with this packet.
        """
        return reader_utils.lz4_compress(self._redvox_packet.SerializeToString())

    def default_filename(self, extension: str = "rdvxz") -> str:
        """
        Constructs a default filename from the packet's metadata.
        :param extension: An optional extension to use.
        :return: A default filename from the packet's metadata.
        """
        return "%s_%d.%s" % (self.redvox_id(), int(round(self.app_file_start_timestamp_machine() / 1000.0)), extension)

    def write_rdvxz(self, directory: str, filename: typing.Optional[str] = None):
        """
        Writes a compressed .rdvxz file to the specified directory.
        :param directory: The directory to write the file to.
        :param filename: An optional filename (the default filename will be used if one is not provided).
        """
        filename = self.default_filename() if filename is None else filename
        path = os.path.join(directory, filename)
        with open(path, "wb") as rdvxz_out:
            rdvxz_out.write(self.compressed_buffer())

    def write_json(self, directory: str, filename: typing.Optional[str] = None):
        """
        Writes a RedVox compliant .json file to the specified directory.
        :param directory: The directory to write the file to.
        :param filename: An optional filename (the default filename will be used if one is not provided).
        """
        filename = self.default_filename(extension="json") if filename is None else filename
        path = os.path.join(directory, filename)
        with open(path, "w") as json_out:
            json_out.write(self.to_json())

    def clone(self) -> 'WrappedRedvoxPacket':
        """
        Returns a clone of this WrappedRedvoxPacket.
        :return: A clone of this WrappedRedvoxPacket.
        """
        return WrappedRedvoxPacket(read_buffer(self.compressed_buffer(), True))

    def concat(self, wrapped_redvox_packets: typing.List['WrappedRedvoxPacket']) -> typing.List['WrappedRedvoxPacket']:
        """
        Concatenates this packet with other packets.
        :param wrapped_redvox_packets: Other packets to concatenate with this packet.
        :return: A list of packets each containing a continuous set of data.
        """
        return redvox.api900.concat.concat_wrapped_redvox_packets([self] + wrapped_redvox_packets)

    # Start of packet level API getters and setters
    def api(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.api)

    def set_api(self, version: int) -> 'WrappedRedvoxPacket':
        """
        sets the api version number
        :param version: version number
        """
        self._redvox_packet.api = version
        return self

    def uuid(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.uuid

    def set_uuid(self, uid: str) -> 'WrappedRedvoxPacket':
        """
        sets the uuid
        :param uid: uuid string
        """
        self._redvox_packet.uuid = uid
        return self

    def redvox_id(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.redvox_id

    def set_redvox_id(self, rid: str) -> 'WrappedRedvoxPacket':
        """
        sets the redvox id
        :param rid: redvox id string
        """
        self._redvox_packet.redvox_id = rid
        return self

    def authenticated_email(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.authenticated_email

    def set_authenticated_email(self, email: str) -> 'WrappedRedvoxPacket':
        """
        sets the authenticated email
        :param email: authenticated email string
        """
        self._redvox_packet.authenticated_email = email
        return self

    def authentication_token(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.authentication_token

    def set_authentication_token(self, token: str) -> 'WrappedRedvoxPacket':
        """
        sets the authentication token
        :param token: authentication token string
        """
        self._redvox_packet.authentication_token = token
        return self

    def firebase_token(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.firebase_token

    def set_firebase_token(self, token: str) -> 'WrappedRedvoxPacket':
        """
        sets the firebase token
        :param token: firebase token string
        """
        self._redvox_packet.firebase_token = token
        return self

    def is_backfilled(self) -> bool:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.is_backfilled

    def set_is_backfilled(self, tof: bool) -> 'WrappedRedvoxPacket':
        """
        sets the is_backfilled flag
        :param tof: true or false
        """
        self._redvox_packet.is_backfilled = tof
        return self

    def is_private(self) -> bool:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.is_private

    def set_is_private(self, tof: bool) -> 'WrappedRedvoxPacket':
        """
        sets the is_private flag
        :param tof: true or false
        """
        self._redvox_packet.is_private = tof
        return self

    def is_scrambled(self) -> bool:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.is_scrambled

    def set_is_scrambled(self, tof: bool) -> 'WrappedRedvoxPacket':
        """
        sets the is_scrambled flag
        :param tof: true or false
        """
        self._redvox_packet.is_scrambled = tof
        return self

    def device_make(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_make

    def set_device_make(self, make: str) -> 'WrappedRedvoxPacket':
        """
        sets the make of the device
        :param make: make of the device string
        """
        self._redvox_packet.device_make = make
        return self

    def device_model(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_model

    def set_device_model(self, model: str) -> 'WrappedRedvoxPacket':
        """
        sets the model of the device
        :param model: model of the device string
        """
        self._redvox_packet.device_model = model
        return self

    def device_os(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_os

    def set_device_os(self, device_os: str) -> 'WrappedRedvoxPacket':
        """
        sets the device operating system
        :param device_os: operating system string
        """
        self._redvox_packet.device_os = device_os
        return self

    def device_os_version(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_os_version

    def set_device_os_version(self, version: str) -> 'WrappedRedvoxPacket':
        """
        sets the device OS version
        :param version: device OS version string
        """
        self._redvox_packet.device_os_version = version
        return self

    def app_version(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.app_version

    def set_app_version(self, version: str) -> 'WrappedRedvoxPacket':
        """
        sets the app version number
        :param version: app version string
        """
        self._redvox_packet.app_version = version
        return self

    def battery_level_percent(self) -> float:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.battery_level_percent

    def set_battery_level_percent(self, percent: float) -> 'WrappedRedvoxPacket':
        """
        sets the percentage of battery left
        :param percent: percentage of battery left
        """
        self._redvox_packet.battery_level_percent = percent
        return self

    def device_temperature_c(self) -> float:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_temperature_c

    def set_device_temperature_c(self, temp: float) -> 'WrappedRedvoxPacket':
        """
        sets the device temperature in degrees Celsius
        :param temp: temperature in degrees Celsius
        """
        self._redvox_packet.device_temperature_c = temp
        return self

    def acquisition_server(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.acquisition_server

    def set_acquisition_server(self, server: str) -> 'WrappedRedvoxPacket':
        """
        sets the acquisition server url
        :param server: url to acquisition server
        """
        self._redvox_packet.acquisition_server = server
        return self

    # pylint: disable=invalid-name
    def time_synchronization_server(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.time_synchronization_server

    def set_time_synchronization_server(self, server: str) -> 'WrappedRedvoxPacket':
        """
        sets the time synchronization server url
        :param server: url to time synchronization server
        """
        self._redvox_packet.time_synchronization_server = server
        return self

    def authentication_server(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.authentication_server

    def set_authentication_server(self, server: str) -> 'WrappedRedvoxPacket':
        """
        sets the authentication server url
        :param server: url to authentication server
        """
        self._redvox_packet.authentication_server = server
        return self

    # pylint: disable=invalid-name
    def app_file_start_timestamp_epoch_microseconds_utc(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.app_file_start_timestamp_epoch_microseconds_utc)

    def set_app_file_start_timestamp_epoch_microseconds_utc(self, time: int) -> 'WrappedRedvoxPacket':
        """
        sets the timestamp of packet creation
        :param time: time when packet was created in microseconds since utc epoch
        """
        self._redvox_packet.app_file_start_timestamp_epoch_microseconds_utc = migrations.maybe_set_int(time)
        return self

    # pylint: disable=invalid-name
    def app_file_start_timestamp_machine(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.app_file_start_timestamp_machine)

    def set_app_file_start_timestamp_machine(self, time: int) -> 'WrappedRedvoxPacket':
        """
        sets the internal machine timestamp of packet creation
        :param time: time when packet was created on local machine
        """
        self._redvox_packet.app_file_start_timestamp_machine = migrations.maybe_set_int(time)
        return self

    # pylint: disable=invalid-name
    def server_timestamp_epoch_microseconds_utc(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.server_timestamp_epoch_microseconds_utc)

    def set_server_timestamp_epoch_microseconds_utc(self, time: int) -> 'WrappedRedvoxPacket':
        """
        sets the server timestamp when the packet was received
        :param time: time when packet was received by server
        """
        self._redvox_packet.server_timestamp_epoch_microseconds_utc = time
        return self

    def metadata(self) -> typing.List[str]:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._metadata_list

    def set_metadata(self, data: typing.List[str]) -> 'WrappedRedvoxPacket':
        """
        sets the metadata
        :param data: metadata as list of strings
        """
        self._metadata_list = data
        self._redvox_packet.metadata[:] = data
        return self

    def _clear_metadata(self):
        """
        removes all of the packet level metadata from packet
        """
        del self._redvox_packet.metadata[:]
        self._metadata_list.clear()

    def metadata_as_dict(self) -> typing.Dict[str, str]:
        """
        Return this packet's metadata as a key-value Python dictionary.
        :return: This packet's metadata as a key-value Python dictionary.
        """
        return reader_utils.get_metadata_as_dict(self._metadata_list)

    def set_metadata_as_dict(self, metadata_dict: typing.Dict[str, str]) -> 'WrappedRedvoxPacket':
        """
        Sets the metadata using a dictionary.
        :param metadata_dict: Dictionary of metadata.
        :return: This WrappedRedvoxPacket.
        """
        self.set_metadata(reader_utils.metadata_dict_to_list(metadata_dict))
        return self

    def add_metadata(self, key: str, value: str) -> 'WrappedRedvoxPacket':
        """
        Adds a key pair value to the metadata.
        :param key: The key.
        :param value: The value.
        :return: This instance of a WrappedRedvoxPacket.
        """
        metadata = self.metadata_as_dict()
        metadata[key] = value if isinstance(value, str) else str(value)
        self.set_metadata_as_dict(metadata)
        return self

    def start_timestamp_us_utc(self) -> int:
        """
        Returns the start timestamp of a WrappedRedvoxPacket.
        :return: The start timestamp of a WrappedRedvoxPacket.
        """
        return migrations.maybe_get_float(
            self.microphone_sensor().first_sample_timestamp_epoch_microseconds_utc())

    def duration_s(self) -> float:
        """
        The duration of this WrappedRedvoxPacket in seconds.
        :return: The duration of this WrappedRedvoxPacket in seconds.
        """
        microphone_sensor = self.microphone_sensor()
        return len(microphone_sensor.payload_values()) / microphone_sensor.sample_rate_hz()

    def end_timestamp_us_utc(self) -> float:
        """
        Returns the end timestamp of a WrappedRedvoxPacket.
        :return: The end timestamp of a WrappedRedvoxPacket.
        """
        return self.start_timestamp_us_utc() + date_time_utils.seconds_to_microseconds(self.duration_s())

    def update_uneven_sensor_timestamps(self, time_delta: int or float):
        """
        Given a time delta in microseconds, will adjust all unevenly sampled sensor timestamps by that amount.
        Use negative values to adjust backwards in time.
        :param time_delta: amount of time to adjust timestamps in microseconds
        """
        for channel in self._unevenly_sampled_channels_field:
            if not channel.has_channel(api900_pb2.TIME_SYNCHRONIZATION):
                channel.set_timestamps_microseconds_utc(channel.timestamps_microseconds_utc + time_delta)

    def mach_time_zero(self) -> typing.Optional[int]:
        """
        Returns the mach time zero from the metadata if it exists.
        :return: The mach time zero from the metadata if it exists.
        """
        try:
            # First, try to extract metadata from the correct location
            top_level_metadata: typing.Dict[str, str] = self.metadata_as_dict()
            if "machTimeZero" in top_level_metadata:
                return migrations.maybe_get_float(int(top_level_metadata["machTimeZero"]))

            # If that fails, some Androids stored it in the Location metadata
            location_sensor = self.location_sensor()
            if location_sensor is not None:
                location_metadata: typing.Dict[str, str] = location_sensor.metadata_as_dict()
                if "machTimeZero" in location_metadata:
                    return migrations.maybe_get_float(int(location_metadata["machTimeZero"]))

            # Finally, if it hasn't been found, it's probably a really old packet that doesn't contain this field
            return None
        except (KeyError, ValueError):
            return None

    def set_mach_time_zero(self, mach_time_zero: int) -> 'WrappedRedvoxPacket':
        """
        Sets the mach time zero in the packet's metadata.
        :param mach_time_zero: The mach time zero to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("machTimeZero", str(mach_time_zero))

    def best_latency(self) -> typing.Optional[float]:
        """
        Returns the best latency from the metadata if it exists.
        :return: The best latency from the metadata if it exists.
        """
        try:
            return float(self.metadata_as_dict()["bestLatency"])
        except (KeyError, ValueError):
            return None

    def set_best_latency(self, best_latency: float) -> 'WrappedRedvoxPacket':
        """
        Sets the best latency in the packet's metadata.
        :param best_latency: The best latency to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("bestLatency", str(best_latency))

    def best_offset(self) -> typing.Optional[float]:
        """
        Returns the best offset from the metadata if it exists.
        :return: The best offset from the metadata if it exists.
        """
        try:
            return float(self.metadata_as_dict()["bestOffset"])
        except (KeyError, ValueError):
            return None

    def set_best_offset(self, best_offset: float) -> 'WrappedRedvoxPacket':
        """
        Sets the best offset in the packet's metadata.
        :param best_offset: The best offset to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("bestOffset", str(best_offset))

    def is_synch_corrected(self) -> bool:
        """
        Returns the isSynchCorrected value from the metadata if it exists.
        :return: The isSynchCorrected value from the metadata if it exists.
        """

        def _parse_bool(s: str) -> bool:
            return s == "true" or s == "True"

        try:
            return _parse_bool(self.metadata_as_dict()["isSynchCorrected"])
        except (KeyError, ValueError):
            return False

    def set_is_synch_corrected(self, is_synch_corrected: bool) -> 'WrappedRedvoxPacket':
        """
        Sets the is_synch_corrected field in the packet's metadata.
        :param is_synch_corrected: The is_synch_corrected field to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("isSynchCorrected", "true" if is_synch_corrected else "false")

    # Sensor channels
    def has_microphone_sensor(self) -> bool:
        """
        Returns if this packet has a microphone channel.
        :return: If this packet has a microphone channel.
        """
        return self._has_channel(api900_pb2.MICROPHONE)

    def microphone_sensor(self) -> typing.Optional[_microphone_sensor.MicrophoneSensor]:
        """
        Returns the high-level microphone channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level microphone channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_microphone_sensor():
            return _microphone_sensor.MicrophoneSensor(self._get_channel(api900_pb2.MICROPHONE))

        return None

    def set_microphone_sensor(self, microphone_sensor: typing.Optional[
            _microphone_sensor.MicrophoneSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packets microphone sensor. A channel can be removed by passing in None.
        :param microphone_sensor: An optional instance of a microphone sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_microphone_sensor():
            self._delete_channel(api900_pb2.MICROPHONE)

        if microphone_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(microphone_sensor._evenly_sampled_channel)

        return self

    def has_barometer_sensor(self) -> bool:
        """
        Returns if this packet has a barometer channel.
        :return: If this packet has a barometer channel.
        """
        return self._has_channel(api900_pb2.BAROMETER)

    def barometer_sensor(self) -> typing.Optional[_barometer_sensor.BarometerSensor]:
        """
        Returns the high-level barometer channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level barometer channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_barometer_sensor():
            return _barometer_sensor.BarometerSensor(self._get_channel(api900_pb2.BAROMETER))

        return None

    def set_barometer_sensor(self, barometer_sensor: typing.Optional[
            _barometer_sensor.BarometerSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packets barometer sensor. A channel can be removed by passing in None.
        :param barometer_sensor: An optional instance of a barometer sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_barometer_sensor():
            self._delete_channel(api900_pb2.BAROMETER)

        if barometer_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(barometer_sensor._unevenly_sampled_channel)

        return self

    def has_location_sensor(self) -> bool:
        """
        Returns if this packet has a location channel.
        :return: If this packet has a location channel.
        """
        return (self._has_channels(
            [api900_pb2.LATITUDE, api900_pb2.LONGITUDE, api900_pb2.ALTITUDE, api900_pb2.SPEED,
             api900_pb2.ACCURACY]) or self._has_channels([api900_pb2.LATITUDE, api900_pb2.LONGITUDE]))

    def location_sensor(self) -> typing.Optional[_location_sensor.LocationSensor]:
        """
        Returns the high-level location channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level location channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_location_sensor():
            return _location_sensor.LocationSensor(self._get_channel(api900_pb2.LATITUDE))

        return None

    def set_location_sensor(self,
                            location_sensor: typing.Optional[_location_sensor.LocationSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's location sensor. A channel can be removed by passing in None.
        :param location_sensor: An optional instance of a location sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_location_sensor():
            self._delete_channel(api900_pb2.LATITUDE)

        if location_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(location_sensor._unevenly_sampled_channel)

        return self

    # pylint: disable=invalid-name,C1801
    def has_time_synchronization_sensor(self) -> bool:
        """
        Returns if this packet has a time synchronization channel.
        :return: If this packet has a time synchronization channel.
        """
        if self._has_channel(api900_pb2.TIME_SYNCHRONIZATION):
            ch = _time_synchronization_sensor.TimeSynchronizationSensor(
                self._get_channel(api900_pb2.TIME_SYNCHRONIZATION))
            return len(ch.payload_values()) > 0

        return False

    def time_synchronization_sensor(self) -> typing.Optional[_time_synchronization_sensor.TimeSynchronizationSensor]:
        """
        Returns the high-level time synchronization channel API or None if this packet doesn't contain a channel of
        this type.
        :return: the high-level time synchronization channel API or None if this packet doesn't contain a channel of
        this type.
        """
        if self.has_time_synchronization_sensor():
            return _time_synchronization_sensor.TimeSynchronizationSensor(
                self._get_channel(api900_pb2.TIME_SYNCHRONIZATION))

        return None

    def set_time_synchronization_sensor(self, time_synchronization_sensor: typing.Optional[
            _time_synchronization_sensor.TimeSynchronizationSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's time sync sensor. A channel can be removed by passing in None.
        :param time_synchronization_sensor: An optional instance of a time sync sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_time_synchronization_sensor():
            self._delete_channel(api900_pb2.TIME_SYNCHRONIZATION)

        if time_synchronization_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(time_synchronization_sensor._unevenly_sampled_channel)

        return self

    def has_accelerometer_sensor(self) -> bool:
        """
        Returns if this packet has an accelerometer channel.
        :return: If this packet has an accelerometer channel.
        """
        return self._has_channels([api900_pb2.ACCELEROMETER_X, api900_pb2.ACCELEROMETER_Y, api900_pb2.ACCELEROMETER_Z])

    def accelerometer_sensor(self) -> typing.Optional[_accelerometer_sensor.AccelerometerSensor]:
        """
        Returns the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_accelerometer_sensor():
            return _accelerometer_sensor.AccelerometerSensor(self._get_channel(api900_pb2.ACCELEROMETER_X))

        return None

    def set_accelerometer_sensor(self,
                                 accelerometer_sensor: typing.Optional[
                                     _accelerometer_sensor.AccelerometerSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's accelerometer sensor. A channel can be removed by passing in None.
        :param accelerometer_sensor: An optional instance of a accelerometer sensor.
        """
        if self.has_accelerometer_sensor():
            self._delete_channel(api900_pb2.ACCELEROMETER_X)

        if accelerometer_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(accelerometer_sensor._unevenly_sampled_channel)

        return self

    def has_magnetometer_sensor(self) -> bool:
        """
        Returns if this packet has a magnetometer channel.
        :return: If this packet has a magnetometer channel.
        """
        return self._has_channels([api900_pb2.MAGNETOMETER_X, api900_pb2.MAGNETOMETER_Y, api900_pb2.MAGNETOMETER_Z])

    def magnetometer_sensor(self) -> typing.Optional[_magnetometer_sensor.MagnetometerSensor]:
        """
        Returns the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_magnetometer_sensor():
            return _magnetometer_sensor.MagnetometerSensor(self._get_channel(api900_pb2.MAGNETOMETER_X))

        return None

    def set_magnetometer_sensor(self,
                                magnetometer_sensor: typing.Optional[
                                    _magnetometer_sensor.MagnetometerSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's magnetomer sensor. A channel can be removed by passing in None.
        :param magnetometer_sensor: An optional instance of a magnetometer sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_magnetometer_sensor():
            self._delete_channel(api900_pb2.MAGNETOMETER_X)

        if magnetometer_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(magnetometer_sensor._unevenly_sampled_channel)

        return self

    def has_gyroscope_sensor(self) -> bool:
        """
        Returns if this packet has a gyroscope channel.
        :return: If this packet has a gyroscope channel.
        """
        return self._has_channels([api900_pb2.GYROSCOPE_X, api900_pb2.GYROSCOPE_Y, api900_pb2.GYROSCOPE_Z])

    def gyroscope_sensor(self) -> typing.Optional[_gyroscope_sensor.GyroscopeSensor]:
        """
        Returns the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_gyroscope_sensor():
            return _gyroscope_sensor.GyroscopeSensor(self._get_channel(api900_pb2.GYROSCOPE_X))

        return None

    def set_gyroscope_sensor(self, gyroscope_sensor: typing.Optional[
            _gyroscope_sensor.GyroscopeSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's gyroscope sensor. A channel can be removed by passing in None.
        :param gyroscope_sensor: An optional instance of a gyroscope sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_gyroscope_sensor():
            self._delete_channel(api900_pb2.GYROSCOPE_X)

        if gyroscope_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(gyroscope_sensor._unevenly_sampled_channel)

        return self

    def has_light_sensor(self) -> bool:
        """
        Returns if this packet has a light channel.
        :return: If this packet has a light channel.
        """
        return self._has_channel(api900_pb2.LIGHT)

    def light_sensor(self) -> typing.Optional[_light_sensor.LightSensor]:
        """
        Returns the high-level light channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level light channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_light_sensor():
            return _light_sensor.LightSensor(self._get_channel(api900_pb2.LIGHT))

        return None

    def set_light_sensor(self, light_sensor: typing.Optional[_light_sensor.LightSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's light sensor. A channel can be removed by passing in None.
        :param light_sensor: An optional instance of a light sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_light_sensor():
            self._delete_channel(api900_pb2.LIGHT)

        if light_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(light_sensor._unevenly_sampled_channel)

        return self

    def has_infrared_sensor(self) -> bool:
        """
        Returns if this packet has an infrared channel.
        :return: If this packlet has an infrared channel.
        """
        return self._has_channel(api900_pb2.INFRARED)

    def infrared_sensor(self) -> typing.Optional[_infrared_sensor.InfraredSensor]:
        """
        Returns the high-level infrared channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level infrared channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_infrared_sensor():
            return _infrared_sensor.InfraredSensor(self._get_channel(api900_pb2.INFRARED))

        return None

    def set_infrared_sensor(self,
                            infrared_sensor: typing.Optional[
                                _infrared_sensor.InfraredSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's infrared sensor. A channel can be removed by passing in None.
        :param infrared_sensor: An optional instance of a infrared sensor.
        """
        if self.has_infrared_sensor():
            self._delete_channel(api900_pb2.INFRARED)

        if infrared_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(infrared_sensor._unevenly_sampled_channel)

        return self

    def has_image_sensor(self) -> bool:
        """
        Returns if this packet has an image channel.
        :return: If this packlet has an image channel.
        """
        return self._has_channel(api900_pb2.IMAGE)

    def image_sensor(self) -> typing.Optional[_image_sensor.ImageSensor]:
        """
        Returns the high-level image channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level image channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_image_sensor():
            return _image_sensor.ImageSensor(self._get_channel(api900_pb2.IMAGE))

        return None

    def set_image_sensor(self, image_sensor: typing.Optional[_image_sensor.ImageSensor]) -> 'WrappedRedvoxPacket':
        """
        Set's the image channel.
        :param image_sensor: Image sensor.
        """
        if self.has_image_sensor():
            self._delete_channel(api900_pb2.IMAGE)

        if image_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(image_sensor._unevenly_sampled_channel)

        return self

    def __str__(self):
        """
        Returns protobuf's String representation of this packet.
        :return: Protobuf's String representation of this packet.
        """
        return str(self._redvox_packet)

    def __eq__(self, other):
        return isinstance(other, WrappedRedvoxPacket) and len(self.diff(other)) == 0

    def diff(self, other: 'WrappedRedvoxPacket') -> typing.List[str]:
        """
        Finds the differences (if any) between two WrappedRedvoxPackets.
        :param other: The other wrapped redvox packet to compare to.
        :return: A list of differences or an empty list if there are none.
        """
        diffs = map(lambda tuple2: reader_utils.diff(tuple2[0], tuple2[1]), [
            (self.api(), other.api()),
            (self.redvox_id(), other.redvox_id()),
            (self.uuid(), other.uuid()),
            (self.authenticated_email(), other.authenticated_email()),
            (self.authentication_token(), other.authentication_token()),
            (self.firebase_token(), other.firebase_token()),
            (self.is_backfilled(), other.is_backfilled()),
            (self.is_private(), other.is_private()),
            (self.is_scrambled(), other.is_scrambled()),
            (self.device_make(), other.device_make()),
            (self.device_model(), other.device_model()),
            (self.device_os(), other.device_os()),
            (self.device_os_version(), other.device_os_version()),
            (self.app_version(), other.app_version()),
            (self.battery_level_percent(), other.battery_level_percent()),
            (self.device_temperature_c(), other.device_temperature_c()),
            (self.acquisition_server(), other.acquisition_server()),
            (self.time_synchronization_server(), other.time_synchronization_server()),
            (self.authentication_server(), other.authentication_server()),
            (self.app_file_start_timestamp_epoch_microseconds_utc(),
             other.app_file_start_timestamp_epoch_microseconds_utc()),
            (self.app_file_start_timestamp_machine(), other.app_file_start_timestamp_machine()),
            (self.server_timestamp_epoch_microseconds_utc(),
             other.server_timestamp_epoch_microseconds_utc()),
            (self.metadata(), other.metadata()),
            (self.microphone_sensor(), other.microphone_sensor()),
            (self.barometer_sensor(), other.barometer_sensor()),
            (self.location_sensor(), other.location_sensor()),
            (self.time_synchronization_sensor(), other.time_synchronization_sensor()),
            (self.accelerometer_sensor(), other.accelerometer_sensor()),
            (self.magnetometer_sensor(), other.magnetometer_sensor()),
            (self.gyroscope_sensor(), other.gyroscope_sensor()),
            (self.light_sensor(), other.light_sensor()),
            (self.infrared_sensor(), other.infrared_sensor())
        ])
        # Filter only out only the differences
        diffs = filter(lambda tuple2: tuple2[0], diffs)
        # Extract the difference string
        diffs = map(lambda tuple2: tuple2[1], diffs)
        return list(diffs)

Functions

def read_buffer(buf: bytes, is_compressed: bool = True) ‑> api900_pb2.RedvoxPacket

Deserializes a serialized protobuf RedvoxPacket buffer. :param buf: Buffer to deserialize. :param is_compressed: Whether or not the buffer is compressed or decompressed. :return: Deserialized protobuf redvox packet.

Expand source code
def read_buffer(buf: bytes, is_compressed: bool = True) -> api900_pb2.RedvoxPacket:
    """
    Deserializes a serialized protobuf RedvoxPacket buffer.
    :param buf: Buffer to deserialize.
    :param is_compressed: Whether or not the buffer is compressed or decompressed.
    :return: Deserialized protobuf redvox packet.
    """
    buffer = reader_utils.lz4_decompress(buf) if is_compressed else buf
    redvox_packet = api900_pb2.RedvoxPacket()
    redvox_packet.ParseFromString(buffer)
    return redvox_packet

Classes

class WrappedRedvoxPacket (redvox_packet: api900_pb2.RedvoxPacket = None)

This class provides convenience methods for accessing API 900 protobuf redvox packets.

This packet contains a reference to the original packet which should be used to access all "top-level" fields. For accessing channels, this class can search for and return our high-level channel wrappers or can extract the payload directly.

Initializes this wrapped redvox packet. :param redvox_packet: A protobuf redvox packet.

Expand source code
class WrappedRedvoxPacket:
    """
    This class provides convenience methods for accessing API 900 protobuf redvox packets.

    This packet contains a reference to the original packet which should be used to access all "top-level" fields. For
    accessing channels, this class can search for and return our high-level channel wrappers or can extract the payload
    directly.
    """

    def __init__(self, redvox_packet: api900_pb2.RedvoxPacket = None):
        """
        Initializes this wrapped redvox packet.
        :param redvox_packet: A protobuf redvox packet.
        """
        if redvox_packet is None:
            self._redvox_packet = api900_pb2.RedvoxPacket()
            self._evenly_sampled_channels_field = list()
            self._unevenly_sampled_channels_field = list()
            self._metadata_list = list()
            self._channel_cache = {}

        else:
            self._redvox_packet: api900_pb2.RedvoxPacket = redvox_packet
            """Protobuf api 900 redvox packet"""

            self._evenly_sampled_channels_field: typing.List[EvenlySampledChannel] = list(
                map(EvenlySampledChannel, reader_utils.repeated_to_array(redvox_packet.evenly_sampled_channels)))
            """List of evenly sampled channels"""

            self._unevenly_sampled_channels_field: typing.List[UnevenlySampledChannel] = list(
                map(UnevenlySampledChannel,
                    reader_utils.repeated_to_array(redvox_packet.unevenly_sampled_channels)))
            """List of unevenly sampled channels"""

            self._metadata_list: typing.List[str] = reader_utils.repeated_to_list(redvox_packet.metadata)
            """List of metadata"""

            self._channel_cache: typing.Dict[int, typing.Union[EvenlySampledChannel, UnevenlySampledChannel]] = {}
            """Holds a mapping of channel type to channel for O(1) access."""

            # Initialize channel cache
            for evenly_sampled_channel in self._evenly_sampled_channels_field:
                for channel_type in evenly_sampled_channel.channel_types:
                    self._channel_cache[channel_type] = evenly_sampled_channel

            for unevenly_sampled_channel in self._unevenly_sampled_channels_field:
                for channel_type in unevenly_sampled_channel.channel_types:
                    self._channel_cache[channel_type] = unevenly_sampled_channel

    def redvox_packet(self) -> api900_pb2.RedvoxPacket:
        """
        returns the protobuf redvox packet
        :return: protobuf redvox packet
        """
        return self._redvox_packet

    def _evenly_sampled_channels(self) -> typing.List[EvenlySampledChannel]:
        """
        returns the evenly sampled channels as a copied list to avoid built in functions making untracked changes
        :return: list of evenly sampled channels
        """
        return self._evenly_sampled_channels_field.copy()

    def _unevenly_sampled_channels(self) -> typing.List[UnevenlySampledChannel]:
        """
        returns the unevenly sampled channels as a copied list to avoid built in functions making untracked changes
        :return: list of unevenly sampled channels
        """
        return self._unevenly_sampled_channels_field.copy()

    def _refresh_channels(self):
        """
        takes the redvox packet and rebuilds the channel cache from it
        """
        self._evenly_sampled_channels_field = list(map(EvenlySampledChannel,
                                                       reader_utils.repeated_to_array(
                                                           self._redvox_packet.evenly_sampled_channels)))
        self._unevenly_sampled_channels_field = list(map(UnevenlySampledChannel,
                                                         reader_utils.repeated_to_array(
                                                             self._redvox_packet.unevenly_sampled_channels)))
        self._channel_cache = {}
        for evenly_sampled_channel in self._evenly_sampled_channels_field:
            for channel_type in evenly_sampled_channel.channel_types:
                self._channel_cache[channel_type] = evenly_sampled_channel
        for unevenly_sampled_channel in self._unevenly_sampled_channels_field:
            for channel_type in unevenly_sampled_channel.channel_types:
                self._channel_cache[channel_type] = unevenly_sampled_channel

    def _add_channel(self, channel: typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        Add a channel
        :param channel: channel to add
        """
        index, sample = self._find_channel(channel.channel_types[0])
        if index is None and sample is None:
            # if type(channel) not in [EvenlySampledChannel, UnevenlySampledChannel]:
            if not isinstance(channel, (EvenlySampledChannel, UnevenlySampledChannel)):
                raise TypeError("Channel type to add must be even or uneven.")
            else:
                self._add_channel_redvox_packet(channel)
                self._refresh_channels()
        else:
            raise ValueError("Cannot add a channel with a type that already exists in this packet.")

    def _edit_channel(self, channel_type: int, channel: typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        removes the channel with the given type and adds the channel supplied
        :param channel_type: type of channel to remove
        :param channel: the channel to add
        """
        index, sampling = self._find_channel(channel_type)
        if index is not None and sampling is not None:
            if isinstance(channel, EvenlySampledChannel):
                del self._redvox_packet.evenly_sampled_channels[index]
                self._add_channel_redvox_packet(channel)
            elif isinstance(channel, UnevenlySampledChannel):
                del self._redvox_packet.unevenly_sampled_channels[index]
                self._add_channel_redvox_packet(channel)
            else:
                raise TypeError("Channel type to edit is unknown!")
            self._refresh_channels()
        else:
            raise TypeError("Unknown channel type specified for edit.")

    def _delete_channel(self, channel_type: int):
        """
        deletes the channel type specified
        :param channel_type: a channel to remove
        """
        index, sampling = self._find_channel(channel_type)
        if index is not None and sampling is not None:
            if sampling == EvenlySampledChannel:
                del self._redvox_packet.evenly_sampled_channels[index]
            else:
                del self._redvox_packet.unevenly_sampled_channels[index]
            self._refresh_channels()
        else:
            raise TypeError("Unknown channel type to remove from packet.")

    # pylint: disable=W0120
    def _find_channel(self, channel_type: int) -> (int, typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        returns the index of the channel and the kind of sampled array its in
        :return: the index in the even or uneven array and the name of the array
        """
        if self._has_channel(channel_type):
            for idx in range(len(self._evenly_sampled_channels_field)):
                if channel_type in self._evenly_sampled_channels_field[idx].channel_types:
                    return idx, EvenlySampledChannel
            for idx in range(len(self._unevenly_sampled_channels_field)):
                if channel_type in self._unevenly_sampled_channels_field[idx].channel_types:
                    return idx, UnevenlySampledChannel
            else:
                return None, None
        else:
            return None, None

    # pylint: disable=R0912
    def _add_channel_redvox_packet(self, channel: typing.Union[EvenlySampledChannel, UnevenlySampledChannel]):
        """
        adds the channel to the redvox_packet
        :param channel: channel to add
        """
        if isinstance(channel, EvenlySampledChannel):
            newchan = self._redvox_packet.evenly_sampled_channels.add()
            newchan.sample_rate_hz = channel.sample_rate_hz
            newchan.first_sample_timestamp_epoch_microseconds_utc = \
                channel.first_sample_timestamp_epoch_microseconds_utc
        elif isinstance(channel, UnevenlySampledChannel):
            newchan = self._redvox_packet.unevenly_sampled_channels.add()
            for time in channel.timestamps_microseconds_utc:
                newchan.timestamps_microseconds_utc.append(time)
            newchan.sample_interval_mean = channel.sample_interval_mean
            newchan.sample_interval_std = channel.sample_interval_std
            newchan.sample_interval_median = channel.sample_interval_median
        else:
            raise TypeError("Channel type to add to redvox packet is unknown!")

        pl_type = channel.get_payload_type()
        if pl_type == "byte_payload":
            newchan.byte_payload.payload.extend(channel.payload)
        elif pl_type == "uint32_payload":
            newchan.uint32_payload.payload.extend(channel.payload)
        elif pl_type == "uint64_payload":
            newchan.uint64_payload.payload.extend(channel.payload)
        elif pl_type == "int32_payload":
            newchan.int32_payload.payload.extend(channel.payload)
        elif pl_type == "int64_payload":
            newchan.int64_payload.payload.extend(channel.payload)
        elif pl_type == "float32_payload":
            newchan.float32_payload.payload.extend(channel.payload)
        elif pl_type == "float64_payload":
            newchan.float64_payload.payload.extend(channel.payload)
        elif pl_type is None:
            pass
        else:
            raise TypeError("Unknown payload type in channel to add.")

        for chan_type in channel.channel_types:
            newchan.channel_types.append(chan_type)
        newchan.sensor_name = channel.sensor_name
        for mean in channel.value_means:
            newchan.value_means.append(mean)
        for stds in channel.value_stds:
            newchan.value_stds.append(stds)
        for median in channel.value_medians:
            newchan.value_medians.append(median)
        for meta in channel.metadata:
            newchan.metadata.append(meta)

    def _get_channel_types(self) -> typing.List[typing.List[int]]:
        """
        Returns a list of channel type enumerations. This is a list of lists, and allows us to easily view
        interleaved channels.
        :return: A list of channel type enumerations.
        """
        channel_types = []
        for evenly_sampled_channel in self._evenly_sampled_channels_field:
            channel_types.append(evenly_sampled_channel.channel_types)

        for unevenly_sampled_channel in self._unevenly_sampled_channels_field:
            channel_types.append(unevenly_sampled_channel.channel_types)

        return channel_types

    def _get_channel_type_names(self) -> typing.List[typing.List[str]]:
        """
        Returns a list of channel type names. This is a list of lists, and allows us to easily view
        interleaved channels.
        :return: A list of channel type names. This is a list of lists, and allows us to easily view
        interleaved channels.
        """
        names = []
        for channel_types in self._get_channel_types():
            names.append(list(map(reader_utils.channel_type_name_from_enum, channel_types)))
        return names

    def _get_channel(self, channel_type: int) -> typing.Union[EvenlySampledChannel, UnevenlySampledChannel, None]:
        """
        Returns a channel from this packet according to the channel type.
        :param channel_type: The channel type to search for.
        :return: A high level channel wrapper or None.
        """
        if channel_type in self._channel_cache:
            return self._channel_cache[channel_type]

        return None

    def _has_channel(self, channel_type: int) -> bool:
        """
        Returns True if this packet contains a channel with this type otherwise False.
        :param channel_type: Channel type to search for.
        :return: True is this packet contains a channel with this type otherwise False.
        """
        return channel_type in self._channel_cache

    def _has_channels(self, channel_types: typing.List[int]) -> bool:
        """
        Checks that this packet contains all of the provided channels.
        :param channel_types: Channel types that this packet must contain.
        :return: True if this packet contains all provided channel types, False otherwise.
        """
        has_channel_results = map(self._has_channel, channel_types)
        for has_channel_result in has_channel_results:
            if not has_channel_result:
                return False
        return True

    def to_json(self) -> str:
        """
        Converts the protobuf packet stored in this wrapped packet to JSON.
        :return: The JSON representation of the protobuf encoded packet.
        """
        return reader_utils.to_json(self._redvox_packet)

    def compressed_buffer(self) -> bytes:
        """
        Returns the compressed buffer associated with this packet.
        :return: The compressed buffer associated with this packet.
        """
        return reader_utils.lz4_compress(self._redvox_packet.SerializeToString())

    def default_filename(self, extension: str = "rdvxz") -> str:
        """
        Constructs a default filename from the packet's metadata.
        :param extension: An optional extension to use.
        :return: A default filename from the packet's metadata.
        """
        return "%s_%d.%s" % (self.redvox_id(), int(round(self.app_file_start_timestamp_machine() / 1000.0)), extension)

    def write_rdvxz(self, directory: str, filename: typing.Optional[str] = None):
        """
        Writes a compressed .rdvxz file to the specified directory.
        :param directory: The directory to write the file to.
        :param filename: An optional filename (the default filename will be used if one is not provided).
        """
        filename = self.default_filename() if filename is None else filename
        path = os.path.join(directory, filename)
        with open(path, "wb") as rdvxz_out:
            rdvxz_out.write(self.compressed_buffer())

    def write_json(self, directory: str, filename: typing.Optional[str] = None):
        """
        Writes a RedVox compliant .json file to the specified directory.
        :param directory: The directory to write the file to.
        :param filename: An optional filename (the default filename will be used if one is not provided).
        """
        filename = self.default_filename(extension="json") if filename is None else filename
        path = os.path.join(directory, filename)
        with open(path, "w") as json_out:
            json_out.write(self.to_json())

    def clone(self) -> 'WrappedRedvoxPacket':
        """
        Returns a clone of this WrappedRedvoxPacket.
        :return: A clone of this WrappedRedvoxPacket.
        """
        return WrappedRedvoxPacket(read_buffer(self.compressed_buffer(), True))

    def concat(self, wrapped_redvox_packets: typing.List['WrappedRedvoxPacket']) -> typing.List['WrappedRedvoxPacket']:
        """
        Concatenates this packet with other packets.
        :param wrapped_redvox_packets: Other packets to concatenate with this packet.
        :return: A list of packets each containing a continuous set of data.
        """
        return redvox.api900.concat.concat_wrapped_redvox_packets([self] + wrapped_redvox_packets)

    # Start of packet level API getters and setters
    def api(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.api)

    def set_api(self, version: int) -> 'WrappedRedvoxPacket':
        """
        sets the api version number
        :param version: version number
        """
        self._redvox_packet.api = version
        return self

    def uuid(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.uuid

    def set_uuid(self, uid: str) -> 'WrappedRedvoxPacket':
        """
        sets the uuid
        :param uid: uuid string
        """
        self._redvox_packet.uuid = uid
        return self

    def redvox_id(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.redvox_id

    def set_redvox_id(self, rid: str) -> 'WrappedRedvoxPacket':
        """
        sets the redvox id
        :param rid: redvox id string
        """
        self._redvox_packet.redvox_id = rid
        return self

    def authenticated_email(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.authenticated_email

    def set_authenticated_email(self, email: str) -> 'WrappedRedvoxPacket':
        """
        sets the authenticated email
        :param email: authenticated email string
        """
        self._redvox_packet.authenticated_email = email
        return self

    def authentication_token(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.authentication_token

    def set_authentication_token(self, token: str) -> 'WrappedRedvoxPacket':
        """
        sets the authentication token
        :param token: authentication token string
        """
        self._redvox_packet.authentication_token = token
        return self

    def firebase_token(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.firebase_token

    def set_firebase_token(self, token: str) -> 'WrappedRedvoxPacket':
        """
        sets the firebase token
        :param token: firebase token string
        """
        self._redvox_packet.firebase_token = token
        return self

    def is_backfilled(self) -> bool:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.is_backfilled

    def set_is_backfilled(self, tof: bool) -> 'WrappedRedvoxPacket':
        """
        sets the is_backfilled flag
        :param tof: true or false
        """
        self._redvox_packet.is_backfilled = tof
        return self

    def is_private(self) -> bool:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.is_private

    def set_is_private(self, tof: bool) -> 'WrappedRedvoxPacket':
        """
        sets the is_private flag
        :param tof: true or false
        """
        self._redvox_packet.is_private = tof
        return self

    def is_scrambled(self) -> bool:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.is_scrambled

    def set_is_scrambled(self, tof: bool) -> 'WrappedRedvoxPacket':
        """
        sets the is_scrambled flag
        :param tof: true or false
        """
        self._redvox_packet.is_scrambled = tof
        return self

    def device_make(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_make

    def set_device_make(self, make: str) -> 'WrappedRedvoxPacket':
        """
        sets the make of the device
        :param make: make of the device string
        """
        self._redvox_packet.device_make = make
        return self

    def device_model(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_model

    def set_device_model(self, model: str) -> 'WrappedRedvoxPacket':
        """
        sets the model of the device
        :param model: model of the device string
        """
        self._redvox_packet.device_model = model
        return self

    def device_os(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_os

    def set_device_os(self, device_os: str) -> 'WrappedRedvoxPacket':
        """
        sets the device operating system
        :param device_os: operating system string
        """
        self._redvox_packet.device_os = device_os
        return self

    def device_os_version(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_os_version

    def set_device_os_version(self, version: str) -> 'WrappedRedvoxPacket':
        """
        sets the device OS version
        :param version: device OS version string
        """
        self._redvox_packet.device_os_version = version
        return self

    def app_version(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.app_version

    def set_app_version(self, version: str) -> 'WrappedRedvoxPacket':
        """
        sets the app version number
        :param version: app version string
        """
        self._redvox_packet.app_version = version
        return self

    def battery_level_percent(self) -> float:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.battery_level_percent

    def set_battery_level_percent(self, percent: float) -> 'WrappedRedvoxPacket':
        """
        sets the percentage of battery left
        :param percent: percentage of battery left
        """
        self._redvox_packet.battery_level_percent = percent
        return self

    def device_temperature_c(self) -> float:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.device_temperature_c

    def set_device_temperature_c(self, temp: float) -> 'WrappedRedvoxPacket':
        """
        sets the device temperature in degrees Celsius
        :param temp: temperature in degrees Celsius
        """
        self._redvox_packet.device_temperature_c = temp
        return self

    def acquisition_server(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.acquisition_server

    def set_acquisition_server(self, server: str) -> 'WrappedRedvoxPacket':
        """
        sets the acquisition server url
        :param server: url to acquisition server
        """
        self._redvox_packet.acquisition_server = server
        return self

    # pylint: disable=invalid-name
    def time_synchronization_server(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.time_synchronization_server

    def set_time_synchronization_server(self, server: str) -> 'WrappedRedvoxPacket':
        """
        sets the time synchronization server url
        :param server: url to time synchronization server
        """
        self._redvox_packet.time_synchronization_server = server
        return self

    def authentication_server(self) -> str:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._redvox_packet.authentication_server

    def set_authentication_server(self, server: str) -> 'WrappedRedvoxPacket':
        """
        sets the authentication server url
        :param server: url to authentication server
        """
        self._redvox_packet.authentication_server = server
        return self

    # pylint: disable=invalid-name
    def app_file_start_timestamp_epoch_microseconds_utc(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.app_file_start_timestamp_epoch_microseconds_utc)

    def set_app_file_start_timestamp_epoch_microseconds_utc(self, time: int) -> 'WrappedRedvoxPacket':
        """
        sets the timestamp of packet creation
        :param time: time when packet was created in microseconds since utc epoch
        """
        self._redvox_packet.app_file_start_timestamp_epoch_microseconds_utc = migrations.maybe_set_int(time)
        return self

    # pylint: disable=invalid-name
    def app_file_start_timestamp_machine(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.app_file_start_timestamp_machine)

    def set_app_file_start_timestamp_machine(self, time: int) -> 'WrappedRedvoxPacket':
        """
        sets the internal machine timestamp of packet creation
        :param time: time when packet was created on local machine
        """
        self._redvox_packet.app_file_start_timestamp_machine = migrations.maybe_set_int(time)
        return self

    # pylint: disable=invalid-name
    def server_timestamp_epoch_microseconds_utc(self) -> int:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return migrations.maybe_get_float(self._redvox_packet.server_timestamp_epoch_microseconds_utc)

    def set_server_timestamp_epoch_microseconds_utc(self, time: int) -> 'WrappedRedvoxPacket':
        """
        sets the server timestamp when the packet was received
        :param time: time when packet was received by server
        """
        self._redvox_packet.server_timestamp_epoch_microseconds_utc = time
        return self

    def metadata(self) -> typing.List[str]:
        """
        See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
        description of this field.
        """
        return self._metadata_list

    def set_metadata(self, data: typing.List[str]) -> 'WrappedRedvoxPacket':
        """
        sets the metadata
        :param data: metadata as list of strings
        """
        self._metadata_list = data
        self._redvox_packet.metadata[:] = data
        return self

    def _clear_metadata(self):
        """
        removes all of the packet level metadata from packet
        """
        del self._redvox_packet.metadata[:]
        self._metadata_list.clear()

    def metadata_as_dict(self) -> typing.Dict[str, str]:
        """
        Return this packet's metadata as a key-value Python dictionary.
        :return: This packet's metadata as a key-value Python dictionary.
        """
        return reader_utils.get_metadata_as_dict(self._metadata_list)

    def set_metadata_as_dict(self, metadata_dict: typing.Dict[str, str]) -> 'WrappedRedvoxPacket':
        """
        Sets the metadata using a dictionary.
        :param metadata_dict: Dictionary of metadata.
        :return: This WrappedRedvoxPacket.
        """
        self.set_metadata(reader_utils.metadata_dict_to_list(metadata_dict))
        return self

    def add_metadata(self, key: str, value: str) -> 'WrappedRedvoxPacket':
        """
        Adds a key pair value to the metadata.
        :param key: The key.
        :param value: The value.
        :return: This instance of a WrappedRedvoxPacket.
        """
        metadata = self.metadata_as_dict()
        metadata[key] = value if isinstance(value, str) else str(value)
        self.set_metadata_as_dict(metadata)
        return self

    def start_timestamp_us_utc(self) -> int:
        """
        Returns the start timestamp of a WrappedRedvoxPacket.
        :return: The start timestamp of a WrappedRedvoxPacket.
        """
        return migrations.maybe_get_float(
            self.microphone_sensor().first_sample_timestamp_epoch_microseconds_utc())

    def duration_s(self) -> float:
        """
        The duration of this WrappedRedvoxPacket in seconds.
        :return: The duration of this WrappedRedvoxPacket in seconds.
        """
        microphone_sensor = self.microphone_sensor()
        return len(microphone_sensor.payload_values()) / microphone_sensor.sample_rate_hz()

    def end_timestamp_us_utc(self) -> float:
        """
        Returns the end timestamp of a WrappedRedvoxPacket.
        :return: The end timestamp of a WrappedRedvoxPacket.
        """
        return self.start_timestamp_us_utc() + date_time_utils.seconds_to_microseconds(self.duration_s())

    def update_uneven_sensor_timestamps(self, time_delta: int or float):
        """
        Given a time delta in microseconds, will adjust all unevenly sampled sensor timestamps by that amount.
        Use negative values to adjust backwards in time.
        :param time_delta: amount of time to adjust timestamps in microseconds
        """
        for channel in self._unevenly_sampled_channels_field:
            if not channel.has_channel(api900_pb2.TIME_SYNCHRONIZATION):
                channel.set_timestamps_microseconds_utc(channel.timestamps_microseconds_utc + time_delta)

    def mach_time_zero(self) -> typing.Optional[int]:
        """
        Returns the mach time zero from the metadata if it exists.
        :return: The mach time zero from the metadata if it exists.
        """
        try:
            # First, try to extract metadata from the correct location
            top_level_metadata: typing.Dict[str, str] = self.metadata_as_dict()
            if "machTimeZero" in top_level_metadata:
                return migrations.maybe_get_float(int(top_level_metadata["machTimeZero"]))

            # If that fails, some Androids stored it in the Location metadata
            location_sensor = self.location_sensor()
            if location_sensor is not None:
                location_metadata: typing.Dict[str, str] = location_sensor.metadata_as_dict()
                if "machTimeZero" in location_metadata:
                    return migrations.maybe_get_float(int(location_metadata["machTimeZero"]))

            # Finally, if it hasn't been found, it's probably a really old packet that doesn't contain this field
            return None
        except (KeyError, ValueError):
            return None

    def set_mach_time_zero(self, mach_time_zero: int) -> 'WrappedRedvoxPacket':
        """
        Sets the mach time zero in the packet's metadata.
        :param mach_time_zero: The mach time zero to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("machTimeZero", str(mach_time_zero))

    def best_latency(self) -> typing.Optional[float]:
        """
        Returns the best latency from the metadata if it exists.
        :return: The best latency from the metadata if it exists.
        """
        try:
            return float(self.metadata_as_dict()["bestLatency"])
        except (KeyError, ValueError):
            return None

    def set_best_latency(self, best_latency: float) -> 'WrappedRedvoxPacket':
        """
        Sets the best latency in the packet's metadata.
        :param best_latency: The best latency to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("bestLatency", str(best_latency))

    def best_offset(self) -> typing.Optional[float]:
        """
        Returns the best offset from the metadata if it exists.
        :return: The best offset from the metadata if it exists.
        """
        try:
            return float(self.metadata_as_dict()["bestOffset"])
        except (KeyError, ValueError):
            return None

    def set_best_offset(self, best_offset: float) -> 'WrappedRedvoxPacket':
        """
        Sets the best offset in the packet's metadata.
        :param best_offset: The best offset to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("bestOffset", str(best_offset))

    def is_synch_corrected(self) -> bool:
        """
        Returns the isSynchCorrected value from the metadata if it exists.
        :return: The isSynchCorrected value from the metadata if it exists.
        """

        def _parse_bool(s: str) -> bool:
            return s == "true" or s == "True"

        try:
            return _parse_bool(self.metadata_as_dict()["isSynchCorrected"])
        except (KeyError, ValueError):
            return False

    def set_is_synch_corrected(self, is_synch_corrected: bool) -> 'WrappedRedvoxPacket':
        """
        Sets the is_synch_corrected field in the packet's metadata.
        :param is_synch_corrected: The is_synch_corrected field to use.
        :return: An instance of this wrapped packet.
        """
        return self.add_metadata("isSynchCorrected", "true" if is_synch_corrected else "false")

    # Sensor channels
    def has_microphone_sensor(self) -> bool:
        """
        Returns if this packet has a microphone channel.
        :return: If this packet has a microphone channel.
        """
        return self._has_channel(api900_pb2.MICROPHONE)

    def microphone_sensor(self) -> typing.Optional[_microphone_sensor.MicrophoneSensor]:
        """
        Returns the high-level microphone channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level microphone channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_microphone_sensor():
            return _microphone_sensor.MicrophoneSensor(self._get_channel(api900_pb2.MICROPHONE))

        return None

    def set_microphone_sensor(self, microphone_sensor: typing.Optional[
            _microphone_sensor.MicrophoneSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packets microphone sensor. A channel can be removed by passing in None.
        :param microphone_sensor: An optional instance of a microphone sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_microphone_sensor():
            self._delete_channel(api900_pb2.MICROPHONE)

        if microphone_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(microphone_sensor._evenly_sampled_channel)

        return self

    def has_barometer_sensor(self) -> bool:
        """
        Returns if this packet has a barometer channel.
        :return: If this packet has a barometer channel.
        """
        return self._has_channel(api900_pb2.BAROMETER)

    def barometer_sensor(self) -> typing.Optional[_barometer_sensor.BarometerSensor]:
        """
        Returns the high-level barometer channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level barometer channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_barometer_sensor():
            return _barometer_sensor.BarometerSensor(self._get_channel(api900_pb2.BAROMETER))

        return None

    def set_barometer_sensor(self, barometer_sensor: typing.Optional[
            _barometer_sensor.BarometerSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packets barometer sensor. A channel can be removed by passing in None.
        :param barometer_sensor: An optional instance of a barometer sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_barometer_sensor():
            self._delete_channel(api900_pb2.BAROMETER)

        if barometer_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(barometer_sensor._unevenly_sampled_channel)

        return self

    def has_location_sensor(self) -> bool:
        """
        Returns if this packet has a location channel.
        :return: If this packet has a location channel.
        """
        return (self._has_channels(
            [api900_pb2.LATITUDE, api900_pb2.LONGITUDE, api900_pb2.ALTITUDE, api900_pb2.SPEED,
             api900_pb2.ACCURACY]) or self._has_channels([api900_pb2.LATITUDE, api900_pb2.LONGITUDE]))

    def location_sensor(self) -> typing.Optional[_location_sensor.LocationSensor]:
        """
        Returns the high-level location channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level location channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_location_sensor():
            return _location_sensor.LocationSensor(self._get_channel(api900_pb2.LATITUDE))

        return None

    def set_location_sensor(self,
                            location_sensor: typing.Optional[_location_sensor.LocationSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's location sensor. A channel can be removed by passing in None.
        :param location_sensor: An optional instance of a location sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_location_sensor():
            self._delete_channel(api900_pb2.LATITUDE)

        if location_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(location_sensor._unevenly_sampled_channel)

        return self

    # pylint: disable=invalid-name,C1801
    def has_time_synchronization_sensor(self) -> bool:
        """
        Returns if this packet has a time synchronization channel.
        :return: If this packet has a time synchronization channel.
        """
        if self._has_channel(api900_pb2.TIME_SYNCHRONIZATION):
            ch = _time_synchronization_sensor.TimeSynchronizationSensor(
                self._get_channel(api900_pb2.TIME_SYNCHRONIZATION))
            return len(ch.payload_values()) > 0

        return False

    def time_synchronization_sensor(self) -> typing.Optional[_time_synchronization_sensor.TimeSynchronizationSensor]:
        """
        Returns the high-level time synchronization channel API or None if this packet doesn't contain a channel of
        this type.
        :return: the high-level time synchronization channel API or None if this packet doesn't contain a channel of
        this type.
        """
        if self.has_time_synchronization_sensor():
            return _time_synchronization_sensor.TimeSynchronizationSensor(
                self._get_channel(api900_pb2.TIME_SYNCHRONIZATION))

        return None

    def set_time_synchronization_sensor(self, time_synchronization_sensor: typing.Optional[
            _time_synchronization_sensor.TimeSynchronizationSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's time sync sensor. A channel can be removed by passing in None.
        :param time_synchronization_sensor: An optional instance of a time sync sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_time_synchronization_sensor():
            self._delete_channel(api900_pb2.TIME_SYNCHRONIZATION)

        if time_synchronization_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(time_synchronization_sensor._unevenly_sampled_channel)

        return self

    def has_accelerometer_sensor(self) -> bool:
        """
        Returns if this packet has an accelerometer channel.
        :return: If this packet has an accelerometer channel.
        """
        return self._has_channels([api900_pb2.ACCELEROMETER_X, api900_pb2.ACCELEROMETER_Y, api900_pb2.ACCELEROMETER_Z])

    def accelerometer_sensor(self) -> typing.Optional[_accelerometer_sensor.AccelerometerSensor]:
        """
        Returns the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_accelerometer_sensor():
            return _accelerometer_sensor.AccelerometerSensor(self._get_channel(api900_pb2.ACCELEROMETER_X))

        return None

    def set_accelerometer_sensor(self,
                                 accelerometer_sensor: typing.Optional[
                                     _accelerometer_sensor.AccelerometerSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's accelerometer sensor. A channel can be removed by passing in None.
        :param accelerometer_sensor: An optional instance of a accelerometer sensor.
        """
        if self.has_accelerometer_sensor():
            self._delete_channel(api900_pb2.ACCELEROMETER_X)

        if accelerometer_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(accelerometer_sensor._unevenly_sampled_channel)

        return self

    def has_magnetometer_sensor(self) -> bool:
        """
        Returns if this packet has a magnetometer channel.
        :return: If this packet has a magnetometer channel.
        """
        return self._has_channels([api900_pb2.MAGNETOMETER_X, api900_pb2.MAGNETOMETER_Y, api900_pb2.MAGNETOMETER_Z])

    def magnetometer_sensor(self) -> typing.Optional[_magnetometer_sensor.MagnetometerSensor]:
        """
        Returns the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_magnetometer_sensor():
            return _magnetometer_sensor.MagnetometerSensor(self._get_channel(api900_pb2.MAGNETOMETER_X))

        return None

    def set_magnetometer_sensor(self,
                                magnetometer_sensor: typing.Optional[
                                    _magnetometer_sensor.MagnetometerSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's magnetomer sensor. A channel can be removed by passing in None.
        :param magnetometer_sensor: An optional instance of a magnetometer sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_magnetometer_sensor():
            self._delete_channel(api900_pb2.MAGNETOMETER_X)

        if magnetometer_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(magnetometer_sensor._unevenly_sampled_channel)

        return self

    def has_gyroscope_sensor(self) -> bool:
        """
        Returns if this packet has a gyroscope channel.
        :return: If this packet has a gyroscope channel.
        """
        return self._has_channels([api900_pb2.GYROSCOPE_X, api900_pb2.GYROSCOPE_Y, api900_pb2.GYROSCOPE_Z])

    def gyroscope_sensor(self) -> typing.Optional[_gyroscope_sensor.GyroscopeSensor]:
        """
        Returns the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_gyroscope_sensor():
            return _gyroscope_sensor.GyroscopeSensor(self._get_channel(api900_pb2.GYROSCOPE_X))

        return None

    def set_gyroscope_sensor(self, gyroscope_sensor: typing.Optional[
            _gyroscope_sensor.GyroscopeSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's gyroscope sensor. A channel can be removed by passing in None.
        :param gyroscope_sensor: An optional instance of a gyroscope sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_gyroscope_sensor():
            self._delete_channel(api900_pb2.GYROSCOPE_X)

        if gyroscope_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(gyroscope_sensor._unevenly_sampled_channel)

        return self

    def has_light_sensor(self) -> bool:
        """
        Returns if this packet has a light channel.
        :return: If this packet has a light channel.
        """
        return self._has_channel(api900_pb2.LIGHT)

    def light_sensor(self) -> typing.Optional[_light_sensor.LightSensor]:
        """
        Returns the high-level light channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level light channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_light_sensor():
            return _light_sensor.LightSensor(self._get_channel(api900_pb2.LIGHT))

        return None

    def set_light_sensor(self, light_sensor: typing.Optional[_light_sensor.LightSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's light sensor. A channel can be removed by passing in None.
        :param light_sensor: An optional instance of a light sensor.
        :return: This instance of a wrapped redvox packet.
        """
        if self.has_light_sensor():
            self._delete_channel(api900_pb2.LIGHT)

        if light_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(light_sensor._unevenly_sampled_channel)

        return self

    def has_infrared_sensor(self) -> bool:
        """
        Returns if this packet has an infrared channel.
        :return: If this packlet has an infrared channel.
        """
        return self._has_channel(api900_pb2.INFRARED)

    def infrared_sensor(self) -> typing.Optional[_infrared_sensor.InfraredSensor]:
        """
        Returns the high-level infrared channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level infrared channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_infrared_sensor():
            return _infrared_sensor.InfraredSensor(self._get_channel(api900_pb2.INFRARED))

        return None

    def set_infrared_sensor(self,
                            infrared_sensor: typing.Optional[
                                _infrared_sensor.InfraredSensor]) -> 'WrappedRedvoxPacket':
        """
        Sets this packet's infrared sensor. A channel can be removed by passing in None.
        :param infrared_sensor: An optional instance of a infrared sensor.
        """
        if self.has_infrared_sensor():
            self._delete_channel(api900_pb2.INFRARED)

        if infrared_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(infrared_sensor._unevenly_sampled_channel)

        return self

    def has_image_sensor(self) -> bool:
        """
        Returns if this packet has an image channel.
        :return: If this packlet has an image channel.
        """
        return self._has_channel(api900_pb2.IMAGE)

    def image_sensor(self) -> typing.Optional[_image_sensor.ImageSensor]:
        """
        Returns the high-level image channel API or None if this packet doesn't contain a channel of this type.
        :return: the high-level image channel API or None if this packet doesn't contain a channel of this type.
        """
        if self.has_image_sensor():
            return _image_sensor.ImageSensor(self._get_channel(api900_pb2.IMAGE))

        return None

    def set_image_sensor(self, image_sensor: typing.Optional[_image_sensor.ImageSensor]) -> 'WrappedRedvoxPacket':
        """
        Set's the image channel.
        :param image_sensor: Image sensor.
        """
        if self.has_image_sensor():
            self._delete_channel(api900_pb2.IMAGE)

        if image_sensor is not None:
            # pylint: disable=W0212
            self._add_channel(image_sensor._unevenly_sampled_channel)

        return self

    def __str__(self):
        """
        Returns protobuf's String representation of this packet.
        :return: Protobuf's String representation of this packet.
        """
        return str(self._redvox_packet)

    def __eq__(self, other):
        return isinstance(other, WrappedRedvoxPacket) and len(self.diff(other)) == 0

    def diff(self, other: 'WrappedRedvoxPacket') -> typing.List[str]:
        """
        Finds the differences (if any) between two WrappedRedvoxPackets.
        :param other: The other wrapped redvox packet to compare to.
        :return: A list of differences or an empty list if there are none.
        """
        diffs = map(lambda tuple2: reader_utils.diff(tuple2[0], tuple2[1]), [
            (self.api(), other.api()),
            (self.redvox_id(), other.redvox_id()),
            (self.uuid(), other.uuid()),
            (self.authenticated_email(), other.authenticated_email()),
            (self.authentication_token(), other.authentication_token()),
            (self.firebase_token(), other.firebase_token()),
            (self.is_backfilled(), other.is_backfilled()),
            (self.is_private(), other.is_private()),
            (self.is_scrambled(), other.is_scrambled()),
            (self.device_make(), other.device_make()),
            (self.device_model(), other.device_model()),
            (self.device_os(), other.device_os()),
            (self.device_os_version(), other.device_os_version()),
            (self.app_version(), other.app_version()),
            (self.battery_level_percent(), other.battery_level_percent()),
            (self.device_temperature_c(), other.device_temperature_c()),
            (self.acquisition_server(), other.acquisition_server()),
            (self.time_synchronization_server(), other.time_synchronization_server()),
            (self.authentication_server(), other.authentication_server()),
            (self.app_file_start_timestamp_epoch_microseconds_utc(),
             other.app_file_start_timestamp_epoch_microseconds_utc()),
            (self.app_file_start_timestamp_machine(), other.app_file_start_timestamp_machine()),
            (self.server_timestamp_epoch_microseconds_utc(),
             other.server_timestamp_epoch_microseconds_utc()),
            (self.metadata(), other.metadata()),
            (self.microphone_sensor(), other.microphone_sensor()),
            (self.barometer_sensor(), other.barometer_sensor()),
            (self.location_sensor(), other.location_sensor()),
            (self.time_synchronization_sensor(), other.time_synchronization_sensor()),
            (self.accelerometer_sensor(), other.accelerometer_sensor()),
            (self.magnetometer_sensor(), other.magnetometer_sensor()),
            (self.gyroscope_sensor(), other.gyroscope_sensor()),
            (self.light_sensor(), other.light_sensor()),
            (self.infrared_sensor(), other.infrared_sensor())
        ])
        # Filter only out only the differences
        diffs = filter(lambda tuple2: tuple2[0], diffs)
        # Extract the difference string
        diffs = map(lambda tuple2: tuple2[1], diffs)
        return list(diffs)

Methods

def accelerometer_sensor(self) ‑> Optional[AccelerometerSensor]

Returns the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type. :return: the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def accelerometer_sensor(self) -> typing.Optional[_accelerometer_sensor.AccelerometerSensor]:
    """
    Returns the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level accelerometer channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_accelerometer_sensor():
        return _accelerometer_sensor.AccelerometerSensor(self._get_channel(api900_pb2.ACCELEROMETER_X))

    return None
def acquisition_server(self) ‑> str
Expand source code
def acquisition_server(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.acquisition_server
def add_metadata(self, key: str, value: str) ‑> WrappedRedvoxPacket

Adds a key pair value to the metadata. :param key: The key. :param value: The value. :return: This instance of a WrappedRedvoxPacket.

Expand source code
def add_metadata(self, key: str, value: str) -> 'WrappedRedvoxPacket':
    """
    Adds a key pair value to the metadata.
    :param key: The key.
    :param value: The value.
    :return: This instance of a WrappedRedvoxPacket.
    """
    metadata = self.metadata_as_dict()
    metadata[key] = value if isinstance(value, str) else str(value)
    self.set_metadata_as_dict(metadata)
    return self
def api(self) ‑> int
Expand source code
def api(self) -> int:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return migrations.maybe_get_float(self._redvox_packet.api)
def app_file_start_timestamp_epoch_microseconds_utc(self) ‑> int
Expand source code
def app_file_start_timestamp_epoch_microseconds_utc(self) -> int:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return migrations.maybe_get_float(self._redvox_packet.app_file_start_timestamp_epoch_microseconds_utc)
def app_file_start_timestamp_machine(self) ‑> int
Expand source code
def app_file_start_timestamp_machine(self) -> int:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return migrations.maybe_get_float(self._redvox_packet.app_file_start_timestamp_machine)
def app_version(self) ‑> str
Expand source code
def app_version(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.app_version
def authenticated_email(self) ‑> str
Expand source code
def authenticated_email(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.authenticated_email
def authentication_server(self) ‑> str
Expand source code
def authentication_server(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.authentication_server
def authentication_token(self) ‑> str
Expand source code
def authentication_token(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.authentication_token
def barometer_sensor(self) ‑> Optional[BarometerSensor]

Returns the high-level barometer channel API or None if this packet doesn't contain a channel of this type. :return: the high-level barometer channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def barometer_sensor(self) -> typing.Optional[_barometer_sensor.BarometerSensor]:
    """
    Returns the high-level barometer channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level barometer channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_barometer_sensor():
        return _barometer_sensor.BarometerSensor(self._get_channel(api900_pb2.BAROMETER))

    return None
def battery_level_percent(self) ‑> float
Expand source code
def battery_level_percent(self) -> float:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.battery_level_percent
def best_latency(self) ‑> Optional[float]

Returns the best latency from the metadata if it exists. :return: The best latency from the metadata if it exists.

Expand source code
def best_latency(self) -> typing.Optional[float]:
    """
    Returns the best latency from the metadata if it exists.
    :return: The best latency from the metadata if it exists.
    """
    try:
        return float(self.metadata_as_dict()["bestLatency"])
    except (KeyError, ValueError):
        return None
def best_offset(self) ‑> Optional[float]

Returns the best offset from the metadata if it exists. :return: The best offset from the metadata if it exists.

Expand source code
def best_offset(self) -> typing.Optional[float]:
    """
    Returns the best offset from the metadata if it exists.
    :return: The best offset from the metadata if it exists.
    """
    try:
        return float(self.metadata_as_dict()["bestOffset"])
    except (KeyError, ValueError):
        return None
def clone(self) ‑> WrappedRedvoxPacket

Returns a clone of this WrappedRedvoxPacket. :return: A clone of this WrappedRedvoxPacket.

Expand source code
def clone(self) -> 'WrappedRedvoxPacket':
    """
    Returns a clone of this WrappedRedvoxPacket.
    :return: A clone of this WrappedRedvoxPacket.
    """
    return WrappedRedvoxPacket(read_buffer(self.compressed_buffer(), True))
def compressed_buffer(self) ‑> bytes

Returns the compressed buffer associated with this packet. :return: The compressed buffer associated with this packet.

Expand source code
def compressed_buffer(self) -> bytes:
    """
    Returns the compressed buffer associated with this packet.
    :return: The compressed buffer associated with this packet.
    """
    return reader_utils.lz4_compress(self._redvox_packet.SerializeToString())
def concat(self, wrapped_redvox_packets: List[ForwardRef('WrappedRedvoxPacket')]) ‑> List[WrappedRedvoxPacket]

Concatenates this packet with other packets. :param wrapped_redvox_packets: Other packets to concatenate with this packet. :return: A list of packets each containing a continuous set of data.

Expand source code
def concat(self, wrapped_redvox_packets: typing.List['WrappedRedvoxPacket']) -> typing.List['WrappedRedvoxPacket']:
    """
    Concatenates this packet with other packets.
    :param wrapped_redvox_packets: Other packets to concatenate with this packet.
    :return: A list of packets each containing a continuous set of data.
    """
    return redvox.api900.concat.concat_wrapped_redvox_packets([self] + wrapped_redvox_packets)
def default_filename(self, extension: str = 'rdvxz') ‑> str

Constructs a default filename from the packet's metadata. :param extension: An optional extension to use. :return: A default filename from the packet's metadata.

Expand source code
def default_filename(self, extension: str = "rdvxz") -> str:
    """
    Constructs a default filename from the packet's metadata.
    :param extension: An optional extension to use.
    :return: A default filename from the packet's metadata.
    """
    return "%s_%d.%s" % (self.redvox_id(), int(round(self.app_file_start_timestamp_machine() / 1000.0)), extension)
def device_make(self) ‑> str
Expand source code
def device_make(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.device_make
def device_model(self) ‑> str
Expand source code
def device_model(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.device_model
def device_os(self) ‑> str
Expand source code
def device_os(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.device_os
def device_os_version(self) ‑> str
Expand source code
def device_os_version(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.device_os_version
def device_temperature_c(self) ‑> float
Expand source code
def device_temperature_c(self) -> float:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.device_temperature_c
def diff(self, other: WrappedRedvoxPacket) ‑> List[str]

Finds the differences (if any) between two WrappedRedvoxPackets. :param other: The other wrapped redvox packet to compare to. :return: A list of differences or an empty list if there are none.

Expand source code
def diff(self, other: 'WrappedRedvoxPacket') -> typing.List[str]:
    """
    Finds the differences (if any) between two WrappedRedvoxPackets.
    :param other: The other wrapped redvox packet to compare to.
    :return: A list of differences or an empty list if there are none.
    """
    diffs = map(lambda tuple2: reader_utils.diff(tuple2[0], tuple2[1]), [
        (self.api(), other.api()),
        (self.redvox_id(), other.redvox_id()),
        (self.uuid(), other.uuid()),
        (self.authenticated_email(), other.authenticated_email()),
        (self.authentication_token(), other.authentication_token()),
        (self.firebase_token(), other.firebase_token()),
        (self.is_backfilled(), other.is_backfilled()),
        (self.is_private(), other.is_private()),
        (self.is_scrambled(), other.is_scrambled()),
        (self.device_make(), other.device_make()),
        (self.device_model(), other.device_model()),
        (self.device_os(), other.device_os()),
        (self.device_os_version(), other.device_os_version()),
        (self.app_version(), other.app_version()),
        (self.battery_level_percent(), other.battery_level_percent()),
        (self.device_temperature_c(), other.device_temperature_c()),
        (self.acquisition_server(), other.acquisition_server()),
        (self.time_synchronization_server(), other.time_synchronization_server()),
        (self.authentication_server(), other.authentication_server()),
        (self.app_file_start_timestamp_epoch_microseconds_utc(),
         other.app_file_start_timestamp_epoch_microseconds_utc()),
        (self.app_file_start_timestamp_machine(), other.app_file_start_timestamp_machine()),
        (self.server_timestamp_epoch_microseconds_utc(),
         other.server_timestamp_epoch_microseconds_utc()),
        (self.metadata(), other.metadata()),
        (self.microphone_sensor(), other.microphone_sensor()),
        (self.barometer_sensor(), other.barometer_sensor()),
        (self.location_sensor(), other.location_sensor()),
        (self.time_synchronization_sensor(), other.time_synchronization_sensor()),
        (self.accelerometer_sensor(), other.accelerometer_sensor()),
        (self.magnetometer_sensor(), other.magnetometer_sensor()),
        (self.gyroscope_sensor(), other.gyroscope_sensor()),
        (self.light_sensor(), other.light_sensor()),
        (self.infrared_sensor(), other.infrared_sensor())
    ])
    # Filter only out only the differences
    diffs = filter(lambda tuple2: tuple2[0], diffs)
    # Extract the difference string
    diffs = map(lambda tuple2: tuple2[1], diffs)
    return list(diffs)
def duration_s(self) ‑> float

The duration of this WrappedRedvoxPacket in seconds. :return: The duration of this WrappedRedvoxPacket in seconds.

Expand source code
def duration_s(self) -> float:
    """
    The duration of this WrappedRedvoxPacket in seconds.
    :return: The duration of this WrappedRedvoxPacket in seconds.
    """
    microphone_sensor = self.microphone_sensor()
    return len(microphone_sensor.payload_values()) / microphone_sensor.sample_rate_hz()
def end_timestamp_us_utc(self) ‑> float

Returns the end timestamp of a WrappedRedvoxPacket. :return: The end timestamp of a WrappedRedvoxPacket.

Expand source code
def end_timestamp_us_utc(self) -> float:
    """
    Returns the end timestamp of a WrappedRedvoxPacket.
    :return: The end timestamp of a WrappedRedvoxPacket.
    """
    return self.start_timestamp_us_utc() + date_time_utils.seconds_to_microseconds(self.duration_s())
def firebase_token(self) ‑> str
Expand source code
def firebase_token(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.firebase_token
def gyroscope_sensor(self) ‑> Optional[GyroscopeSensor]

Returns the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type. :return: the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def gyroscope_sensor(self) -> typing.Optional[_gyroscope_sensor.GyroscopeSensor]:
    """
    Returns the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level gyroscope channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_gyroscope_sensor():
        return _gyroscope_sensor.GyroscopeSensor(self._get_channel(api900_pb2.GYROSCOPE_X))

    return None
def has_accelerometer_sensor(self) ‑> bool

Returns if this packet has an accelerometer channel. :return: If this packet has an accelerometer channel.

Expand source code
def has_accelerometer_sensor(self) -> bool:
    """
    Returns if this packet has an accelerometer channel.
    :return: If this packet has an accelerometer channel.
    """
    return self._has_channels([api900_pb2.ACCELEROMETER_X, api900_pb2.ACCELEROMETER_Y, api900_pb2.ACCELEROMETER_Z])
def has_barometer_sensor(self) ‑> bool

Returns if this packet has a barometer channel. :return: If this packet has a barometer channel.

Expand source code
def has_barometer_sensor(self) -> bool:
    """
    Returns if this packet has a barometer channel.
    :return: If this packet has a barometer channel.
    """
    return self._has_channel(api900_pb2.BAROMETER)
def has_gyroscope_sensor(self) ‑> bool

Returns if this packet has a gyroscope channel. :return: If this packet has a gyroscope channel.

Expand source code
def has_gyroscope_sensor(self) -> bool:
    """
    Returns if this packet has a gyroscope channel.
    :return: If this packet has a gyroscope channel.
    """
    return self._has_channels([api900_pb2.GYROSCOPE_X, api900_pb2.GYROSCOPE_Y, api900_pb2.GYROSCOPE_Z])
def has_image_sensor(self) ‑> bool

Returns if this packet has an image channel. :return: If this packlet has an image channel.

Expand source code
def has_image_sensor(self) -> bool:
    """
    Returns if this packet has an image channel.
    :return: If this packlet has an image channel.
    """
    return self._has_channel(api900_pb2.IMAGE)
def has_infrared_sensor(self) ‑> bool

Returns if this packet has an infrared channel. :return: If this packlet has an infrared channel.

Expand source code
def has_infrared_sensor(self) -> bool:
    """
    Returns if this packet has an infrared channel.
    :return: If this packlet has an infrared channel.
    """
    return self._has_channel(api900_pb2.INFRARED)
def has_light_sensor(self) ‑> bool

Returns if this packet has a light channel. :return: If this packet has a light channel.

Expand source code
def has_light_sensor(self) -> bool:
    """
    Returns if this packet has a light channel.
    :return: If this packet has a light channel.
    """
    return self._has_channel(api900_pb2.LIGHT)
def has_location_sensor(self) ‑> bool

Returns if this packet has a location channel. :return: If this packet has a location channel.

Expand source code
def has_location_sensor(self) -> bool:
    """
    Returns if this packet has a location channel.
    :return: If this packet has a location channel.
    """
    return (self._has_channels(
        [api900_pb2.LATITUDE, api900_pb2.LONGITUDE, api900_pb2.ALTITUDE, api900_pb2.SPEED,
         api900_pb2.ACCURACY]) or self._has_channels([api900_pb2.LATITUDE, api900_pb2.LONGITUDE]))
def has_magnetometer_sensor(self) ‑> bool

Returns if this packet has a magnetometer channel. :return: If this packet has a magnetometer channel.

Expand source code
def has_magnetometer_sensor(self) -> bool:
    """
    Returns if this packet has a magnetometer channel.
    :return: If this packet has a magnetometer channel.
    """
    return self._has_channels([api900_pb2.MAGNETOMETER_X, api900_pb2.MAGNETOMETER_Y, api900_pb2.MAGNETOMETER_Z])
def has_microphone_sensor(self) ‑> bool

Returns if this packet has a microphone channel. :return: If this packet has a microphone channel.

Expand source code
def has_microphone_sensor(self) -> bool:
    """
    Returns if this packet has a microphone channel.
    :return: If this packet has a microphone channel.
    """
    return self._has_channel(api900_pb2.MICROPHONE)
def has_time_synchronization_sensor(self) ‑> bool

Returns if this packet has a time synchronization channel. :return: If this packet has a time synchronization channel.

Expand source code
def has_time_synchronization_sensor(self) -> bool:
    """
    Returns if this packet has a time synchronization channel.
    :return: If this packet has a time synchronization channel.
    """
    if self._has_channel(api900_pb2.TIME_SYNCHRONIZATION):
        ch = _time_synchronization_sensor.TimeSynchronizationSensor(
            self._get_channel(api900_pb2.TIME_SYNCHRONIZATION))
        return len(ch.payload_values()) > 0

    return False
def image_sensor(self) ‑> Optional[ImageSensor]

Returns the high-level image channel API or None if this packet doesn't contain a channel of this type. :return: the high-level image channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def image_sensor(self) -> typing.Optional[_image_sensor.ImageSensor]:
    """
    Returns the high-level image channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level image channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_image_sensor():
        return _image_sensor.ImageSensor(self._get_channel(api900_pb2.IMAGE))

    return None
def infrared_sensor(self) ‑> Optional[InfraredSensor]

Returns the high-level infrared channel API or None if this packet doesn't contain a channel of this type. :return: the high-level infrared channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def infrared_sensor(self) -> typing.Optional[_infrared_sensor.InfraredSensor]:
    """
    Returns the high-level infrared channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level infrared channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_infrared_sensor():
        return _infrared_sensor.InfraredSensor(self._get_channel(api900_pb2.INFRARED))

    return None
def is_backfilled(self) ‑> bool
Expand source code
def is_backfilled(self) -> bool:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.is_backfilled
def is_private(self) ‑> bool
Expand source code
def is_private(self) -> bool:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.is_private
def is_scrambled(self) ‑> bool
Expand source code
def is_scrambled(self) -> bool:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.is_scrambled
def is_synch_corrected(self) ‑> bool

Returns the isSynchCorrected value from the metadata if it exists. :return: The isSynchCorrected value from the metadata if it exists.

Expand source code
def is_synch_corrected(self) -> bool:
    """
    Returns the isSynchCorrected value from the metadata if it exists.
    :return: The isSynchCorrected value from the metadata if it exists.
    """

    def _parse_bool(s: str) -> bool:
        return s == "true" or s == "True"

    try:
        return _parse_bool(self.metadata_as_dict()["isSynchCorrected"])
    except (KeyError, ValueError):
        return False
def light_sensor(self) ‑> Optional[LightSensor]

Returns the high-level light channel API or None if this packet doesn't contain a channel of this type. :return: the high-level light channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def light_sensor(self) -> typing.Optional[_light_sensor.LightSensor]:
    """
    Returns the high-level light channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level light channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_light_sensor():
        return _light_sensor.LightSensor(self._get_channel(api900_pb2.LIGHT))

    return None
def location_sensor(self) ‑> Optional[LocationSensor]

Returns the high-level location channel API or None if this packet doesn't contain a channel of this type. :return: the high-level location channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def location_sensor(self) -> typing.Optional[_location_sensor.LocationSensor]:
    """
    Returns the high-level location channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level location channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_location_sensor():
        return _location_sensor.LocationSensor(self._get_channel(api900_pb2.LATITUDE))

    return None
def mach_time_zero(self) ‑> Optional[int]

Returns the mach time zero from the metadata if it exists. :return: The mach time zero from the metadata if it exists.

Expand source code
def mach_time_zero(self) -> typing.Optional[int]:
    """
    Returns the mach time zero from the metadata if it exists.
    :return: The mach time zero from the metadata if it exists.
    """
    try:
        # First, try to extract metadata from the correct location
        top_level_metadata: typing.Dict[str, str] = self.metadata_as_dict()
        if "machTimeZero" in top_level_metadata:
            return migrations.maybe_get_float(int(top_level_metadata["machTimeZero"]))

        # If that fails, some Androids stored it in the Location metadata
        location_sensor = self.location_sensor()
        if location_sensor is not None:
            location_metadata: typing.Dict[str, str] = location_sensor.metadata_as_dict()
            if "machTimeZero" in location_metadata:
                return migrations.maybe_get_float(int(location_metadata["machTimeZero"]))

        # Finally, if it hasn't been found, it's probably a really old packet that doesn't contain this field
        return None
    except (KeyError, ValueError):
        return None
def magnetometer_sensor(self) ‑> Optional[MagnetometerSensor]

Returns the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type. :return: the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def magnetometer_sensor(self) -> typing.Optional[_magnetometer_sensor.MagnetometerSensor]:
    """
    Returns the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level magnetometer channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_magnetometer_sensor():
        return _magnetometer_sensor.MagnetometerSensor(self._get_channel(api900_pb2.MAGNETOMETER_X))

    return None
def metadata(self) ‑> List[str]
Expand source code
def metadata(self) -> typing.List[str]:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._metadata_list
def metadata_as_dict(self) ‑> Dict[str, str]

Return this packet's metadata as a key-value Python dictionary. :return: This packet's metadata as a key-value Python dictionary.

Expand source code
def metadata_as_dict(self) -> typing.Dict[str, str]:
    """
    Return this packet's metadata as a key-value Python dictionary.
    :return: This packet's metadata as a key-value Python dictionary.
    """
    return reader_utils.get_metadata_as_dict(self._metadata_list)
def microphone_sensor(self) ‑> Optional[MicrophoneSensor]

Returns the high-level microphone channel API or None if this packet doesn't contain a channel of this type. :return: the high-level microphone channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def microphone_sensor(self) -> typing.Optional[_microphone_sensor.MicrophoneSensor]:
    """
    Returns the high-level microphone channel API or None if this packet doesn't contain a channel of this type.
    :return: the high-level microphone channel API or None if this packet doesn't contain a channel of this type.
    """
    if self.has_microphone_sensor():
        return _microphone_sensor.MicrophoneSensor(self._get_channel(api900_pb2.MICROPHONE))

    return None
def redvox_id(self) ‑> str
Expand source code
def redvox_id(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.redvox_id
def redvox_packet(self) ‑> api900_pb2.RedvoxPacket

returns the protobuf redvox packet :return: protobuf redvox packet

Expand source code
def redvox_packet(self) -> api900_pb2.RedvoxPacket:
    """
    returns the protobuf redvox packet
    :return: protobuf redvox packet
    """
    return self._redvox_packet
def server_timestamp_epoch_microseconds_utc(self) ‑> int
Expand source code
def server_timestamp_epoch_microseconds_utc(self) -> int:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return migrations.maybe_get_float(self._redvox_packet.server_timestamp_epoch_microseconds_utc)
def set_accelerometer_sensor(self, accelerometer_sensor: Optional[AccelerometerSensor]) ‑> WrappedRedvoxPacket

Sets this packet's accelerometer sensor. A channel can be removed by passing in None. :param accelerometer_sensor: An optional instance of a accelerometer sensor.

Expand source code
def set_accelerometer_sensor(self,
                             accelerometer_sensor: typing.Optional[
                                 _accelerometer_sensor.AccelerometerSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's accelerometer sensor. A channel can be removed by passing in None.
    :param accelerometer_sensor: An optional instance of a accelerometer sensor.
    """
    if self.has_accelerometer_sensor():
        self._delete_channel(api900_pb2.ACCELEROMETER_X)

    if accelerometer_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(accelerometer_sensor._unevenly_sampled_channel)

    return self
def set_acquisition_server(self, server: str) ‑> WrappedRedvoxPacket

sets the acquisition server url :param server: url to acquisition server

Expand source code
def set_acquisition_server(self, server: str) -> 'WrappedRedvoxPacket':
    """
    sets the acquisition server url
    :param server: url to acquisition server
    """
    self._redvox_packet.acquisition_server = server
    return self
def set_api(self, version: int) ‑> WrappedRedvoxPacket

sets the api version number :param version: version number

Expand source code
def set_api(self, version: int) -> 'WrappedRedvoxPacket':
    """
    sets the api version number
    :param version: version number
    """
    self._redvox_packet.api = version
    return self
def set_app_file_start_timestamp_epoch_microseconds_utc(self, time: int) ‑> WrappedRedvoxPacket

sets the timestamp of packet creation :param time: time when packet was created in microseconds since utc epoch

Expand source code
def set_app_file_start_timestamp_epoch_microseconds_utc(self, time: int) -> 'WrappedRedvoxPacket':
    """
    sets the timestamp of packet creation
    :param time: time when packet was created in microseconds since utc epoch
    """
    self._redvox_packet.app_file_start_timestamp_epoch_microseconds_utc = migrations.maybe_set_int(time)
    return self
def set_app_file_start_timestamp_machine(self, time: int) ‑> WrappedRedvoxPacket

sets the internal machine timestamp of packet creation :param time: time when packet was created on local machine

Expand source code
def set_app_file_start_timestamp_machine(self, time: int) -> 'WrappedRedvoxPacket':
    """
    sets the internal machine timestamp of packet creation
    :param time: time when packet was created on local machine
    """
    self._redvox_packet.app_file_start_timestamp_machine = migrations.maybe_set_int(time)
    return self
def set_app_version(self, version: str) ‑> WrappedRedvoxPacket

sets the app version number :param version: app version string

Expand source code
def set_app_version(self, version: str) -> 'WrappedRedvoxPacket':
    """
    sets the app version number
    :param version: app version string
    """
    self._redvox_packet.app_version = version
    return self
def set_authenticated_email(self, email: str) ‑> WrappedRedvoxPacket

sets the authenticated email :param email: authenticated email string

Expand source code
def set_authenticated_email(self, email: str) -> 'WrappedRedvoxPacket':
    """
    sets the authenticated email
    :param email: authenticated email string
    """
    self._redvox_packet.authenticated_email = email
    return self
def set_authentication_server(self, server: str) ‑> WrappedRedvoxPacket

sets the authentication server url :param server: url to authentication server

Expand source code
def set_authentication_server(self, server: str) -> 'WrappedRedvoxPacket':
    """
    sets the authentication server url
    :param server: url to authentication server
    """
    self._redvox_packet.authentication_server = server
    return self
def set_authentication_token(self, token: str) ‑> WrappedRedvoxPacket

sets the authentication token :param token: authentication token string

Expand source code
def set_authentication_token(self, token: str) -> 'WrappedRedvoxPacket':
    """
    sets the authentication token
    :param token: authentication token string
    """
    self._redvox_packet.authentication_token = token
    return self
def set_barometer_sensor(self, barometer_sensor: Optional[BarometerSensor]) ‑> WrappedRedvoxPacket

Sets this packets barometer sensor. A channel can be removed by passing in None. :param barometer_sensor: An optional instance of a barometer sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_barometer_sensor(self, barometer_sensor: typing.Optional[
        _barometer_sensor.BarometerSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packets barometer sensor. A channel can be removed by passing in None.
    :param barometer_sensor: An optional instance of a barometer sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_barometer_sensor():
        self._delete_channel(api900_pb2.BAROMETER)

    if barometer_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(barometer_sensor._unevenly_sampled_channel)

    return self
def set_battery_level_percent(self, percent: float) ‑> WrappedRedvoxPacket

sets the percentage of battery left :param percent: percentage of battery left

Expand source code
def set_battery_level_percent(self, percent: float) -> 'WrappedRedvoxPacket':
    """
    sets the percentage of battery left
    :param percent: percentage of battery left
    """
    self._redvox_packet.battery_level_percent = percent
    return self
def set_best_latency(self, best_latency: float) ‑> WrappedRedvoxPacket

Sets the best latency in the packet's metadata. :param best_latency: The best latency to use. :return: An instance of this wrapped packet.

Expand source code
def set_best_latency(self, best_latency: float) -> 'WrappedRedvoxPacket':
    """
    Sets the best latency in the packet's metadata.
    :param best_latency: The best latency to use.
    :return: An instance of this wrapped packet.
    """
    return self.add_metadata("bestLatency", str(best_latency))
def set_best_offset(self, best_offset: float) ‑> WrappedRedvoxPacket

Sets the best offset in the packet's metadata. :param best_offset: The best offset to use. :return: An instance of this wrapped packet.

Expand source code
def set_best_offset(self, best_offset: float) -> 'WrappedRedvoxPacket':
    """
    Sets the best offset in the packet's metadata.
    :param best_offset: The best offset to use.
    :return: An instance of this wrapped packet.
    """
    return self.add_metadata("bestOffset", str(best_offset))
def set_device_make(self, make: str) ‑> WrappedRedvoxPacket

sets the make of the device :param make: make of the device string

Expand source code
def set_device_make(self, make: str) -> 'WrappedRedvoxPacket':
    """
    sets the make of the device
    :param make: make of the device string
    """
    self._redvox_packet.device_make = make
    return self
def set_device_model(self, model: str) ‑> WrappedRedvoxPacket

sets the model of the device :param model: model of the device string

Expand source code
def set_device_model(self, model: str) -> 'WrappedRedvoxPacket':
    """
    sets the model of the device
    :param model: model of the device string
    """
    self._redvox_packet.device_model = model
    return self
def set_device_os(self, device_os: str) ‑> WrappedRedvoxPacket

sets the device operating system :param device_os: operating system string

Expand source code
def set_device_os(self, device_os: str) -> 'WrappedRedvoxPacket':
    """
    sets the device operating system
    :param device_os: operating system string
    """
    self._redvox_packet.device_os = device_os
    return self
def set_device_os_version(self, version: str) ‑> WrappedRedvoxPacket

sets the device OS version :param version: device OS version string

Expand source code
def set_device_os_version(self, version: str) -> 'WrappedRedvoxPacket':
    """
    sets the device OS version
    :param version: device OS version string
    """
    self._redvox_packet.device_os_version = version
    return self
def set_device_temperature_c(self, temp: float) ‑> WrappedRedvoxPacket

sets the device temperature in degrees Celsius :param temp: temperature in degrees Celsius

Expand source code
def set_device_temperature_c(self, temp: float) -> 'WrappedRedvoxPacket':
    """
    sets the device temperature in degrees Celsius
    :param temp: temperature in degrees Celsius
    """
    self._redvox_packet.device_temperature_c = temp
    return self
def set_firebase_token(self, token: str) ‑> WrappedRedvoxPacket

sets the firebase token :param token: firebase token string

Expand source code
def set_firebase_token(self, token: str) -> 'WrappedRedvoxPacket':
    """
    sets the firebase token
    :param token: firebase token string
    """
    self._redvox_packet.firebase_token = token
    return self
def set_gyroscope_sensor(self, gyroscope_sensor: Optional[GyroscopeSensor]) ‑> WrappedRedvoxPacket

Sets this packet's gyroscope sensor. A channel can be removed by passing in None. :param gyroscope_sensor: An optional instance of a gyroscope sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_gyroscope_sensor(self, gyroscope_sensor: typing.Optional[
        _gyroscope_sensor.GyroscopeSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's gyroscope sensor. A channel can be removed by passing in None.
    :param gyroscope_sensor: An optional instance of a gyroscope sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_gyroscope_sensor():
        self._delete_channel(api900_pb2.GYROSCOPE_X)

    if gyroscope_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(gyroscope_sensor._unevenly_sampled_channel)

    return self
def set_image_sensor(self, image_sensor: Optional[ImageSensor]) ‑> WrappedRedvoxPacket

Set's the image channel. :param image_sensor: Image sensor.

Expand source code
def set_image_sensor(self, image_sensor: typing.Optional[_image_sensor.ImageSensor]) -> 'WrappedRedvoxPacket':
    """
    Set's the image channel.
    :param image_sensor: Image sensor.
    """
    if self.has_image_sensor():
        self._delete_channel(api900_pb2.IMAGE)

    if image_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(image_sensor._unevenly_sampled_channel)

    return self
def set_infrared_sensor(self, infrared_sensor: Optional[InfraredSensor]) ‑> WrappedRedvoxPacket

Sets this packet's infrared sensor. A channel can be removed by passing in None. :param infrared_sensor: An optional instance of a infrared sensor.

Expand source code
def set_infrared_sensor(self,
                        infrared_sensor: typing.Optional[
                            _infrared_sensor.InfraredSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's infrared sensor. A channel can be removed by passing in None.
    :param infrared_sensor: An optional instance of a infrared sensor.
    """
    if self.has_infrared_sensor():
        self._delete_channel(api900_pb2.INFRARED)

    if infrared_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(infrared_sensor._unevenly_sampled_channel)

    return self
def set_is_backfilled(self, tof: bool) ‑> WrappedRedvoxPacket

sets the is_backfilled flag :param tof: true or false

Expand source code
def set_is_backfilled(self, tof: bool) -> 'WrappedRedvoxPacket':
    """
    sets the is_backfilled flag
    :param tof: true or false
    """
    self._redvox_packet.is_backfilled = tof
    return self
def set_is_private(self, tof: bool) ‑> WrappedRedvoxPacket

sets the is_private flag :param tof: true or false

Expand source code
def set_is_private(self, tof: bool) -> 'WrappedRedvoxPacket':
    """
    sets the is_private flag
    :param tof: true or false
    """
    self._redvox_packet.is_private = tof
    return self
def set_is_scrambled(self, tof: bool) ‑> WrappedRedvoxPacket

sets the is_scrambled flag :param tof: true or false

Expand source code
def set_is_scrambled(self, tof: bool) -> 'WrappedRedvoxPacket':
    """
    sets the is_scrambled flag
    :param tof: true or false
    """
    self._redvox_packet.is_scrambled = tof
    return self
def set_is_synch_corrected(self, is_synch_corrected: bool) ‑> WrappedRedvoxPacket

Sets the is_synch_corrected field in the packet's metadata. :param is_synch_corrected: The is_synch_corrected field to use. :return: An instance of this wrapped packet.

Expand source code
def set_is_synch_corrected(self, is_synch_corrected: bool) -> 'WrappedRedvoxPacket':
    """
    Sets the is_synch_corrected field in the packet's metadata.
    :param is_synch_corrected: The is_synch_corrected field to use.
    :return: An instance of this wrapped packet.
    """
    return self.add_metadata("isSynchCorrected", "true" if is_synch_corrected else "false")
def set_light_sensor(self, light_sensor: Optional[LightSensor]) ‑> WrappedRedvoxPacket

Sets this packet's light sensor. A channel can be removed by passing in None. :param light_sensor: An optional instance of a light sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_light_sensor(self, light_sensor: typing.Optional[_light_sensor.LightSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's light sensor. A channel can be removed by passing in None.
    :param light_sensor: An optional instance of a light sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_light_sensor():
        self._delete_channel(api900_pb2.LIGHT)

    if light_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(light_sensor._unevenly_sampled_channel)

    return self
def set_location_sensor(self, location_sensor: Optional[LocationSensor]) ‑> WrappedRedvoxPacket

Sets this packet's location sensor. A channel can be removed by passing in None. :param location_sensor: An optional instance of a location sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_location_sensor(self,
                        location_sensor: typing.Optional[_location_sensor.LocationSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's location sensor. A channel can be removed by passing in None.
    :param location_sensor: An optional instance of a location sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_location_sensor():
        self._delete_channel(api900_pb2.LATITUDE)

    if location_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(location_sensor._unevenly_sampled_channel)

    return self
def set_mach_time_zero(self, mach_time_zero: int) ‑> WrappedRedvoxPacket

Sets the mach time zero in the packet's metadata. :param mach_time_zero: The mach time zero to use. :return: An instance of this wrapped packet.

Expand source code
def set_mach_time_zero(self, mach_time_zero: int) -> 'WrappedRedvoxPacket':
    """
    Sets the mach time zero in the packet's metadata.
    :param mach_time_zero: The mach time zero to use.
    :return: An instance of this wrapped packet.
    """
    return self.add_metadata("machTimeZero", str(mach_time_zero))
def set_magnetometer_sensor(self, magnetometer_sensor: Optional[MagnetometerSensor]) ‑> WrappedRedvoxPacket

Sets this packet's magnetomer sensor. A channel can be removed by passing in None. :param magnetometer_sensor: An optional instance of a magnetometer sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_magnetometer_sensor(self,
                            magnetometer_sensor: typing.Optional[
                                _magnetometer_sensor.MagnetometerSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's magnetomer sensor. A channel can be removed by passing in None.
    :param magnetometer_sensor: An optional instance of a magnetometer sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_magnetometer_sensor():
        self._delete_channel(api900_pb2.MAGNETOMETER_X)

    if magnetometer_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(magnetometer_sensor._unevenly_sampled_channel)

    return self
def set_metadata(self, data: List[str]) ‑> WrappedRedvoxPacket

sets the metadata :param data: metadata as list of strings

Expand source code
def set_metadata(self, data: typing.List[str]) -> 'WrappedRedvoxPacket':
    """
    sets the metadata
    :param data: metadata as list of strings
    """
    self._metadata_list = data
    self._redvox_packet.metadata[:] = data
    return self
def set_metadata_as_dict(self, metadata_dict: Dict[str, str]) ‑> WrappedRedvoxPacket

Sets the metadata using a dictionary. :param metadata_dict: Dictionary of metadata. :return: This WrappedRedvoxPacket.

Expand source code
def set_metadata_as_dict(self, metadata_dict: typing.Dict[str, str]) -> 'WrappedRedvoxPacket':
    """
    Sets the metadata using a dictionary.
    :param metadata_dict: Dictionary of metadata.
    :return: This WrappedRedvoxPacket.
    """
    self.set_metadata(reader_utils.metadata_dict_to_list(metadata_dict))
    return self
def set_microphone_sensor(self, microphone_sensor: Optional[MicrophoneSensor]) ‑> WrappedRedvoxPacket

Sets this packets microphone sensor. A channel can be removed by passing in None. :param microphone_sensor: An optional instance of a microphone sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_microphone_sensor(self, microphone_sensor: typing.Optional[
        _microphone_sensor.MicrophoneSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packets microphone sensor. A channel can be removed by passing in None.
    :param microphone_sensor: An optional instance of a microphone sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_microphone_sensor():
        self._delete_channel(api900_pb2.MICROPHONE)

    if microphone_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(microphone_sensor._evenly_sampled_channel)

    return self
def set_redvox_id(self, rid: str) ‑> WrappedRedvoxPacket

sets the redvox id :param rid: redvox id string

Expand source code
def set_redvox_id(self, rid: str) -> 'WrappedRedvoxPacket':
    """
    sets the redvox id
    :param rid: redvox id string
    """
    self._redvox_packet.redvox_id = rid
    return self
def set_server_timestamp_epoch_microseconds_utc(self, time: int) ‑> WrappedRedvoxPacket

sets the server timestamp when the packet was received :param time: time when packet was received by server

Expand source code
def set_server_timestamp_epoch_microseconds_utc(self, time: int) -> 'WrappedRedvoxPacket':
    """
    sets the server timestamp when the packet was received
    :param time: time when packet was received by server
    """
    self._redvox_packet.server_timestamp_epoch_microseconds_utc = time
    return self
def set_time_synchronization_sensor(self, time_synchronization_sensor: Optional[TimeSynchronizationSensor]) ‑> WrappedRedvoxPacket

Sets this packet's time sync sensor. A channel can be removed by passing in None. :param time_synchronization_sensor: An optional instance of a time sync sensor. :return: This instance of a wrapped redvox packet.

Expand source code
def set_time_synchronization_sensor(self, time_synchronization_sensor: typing.Optional[
        _time_synchronization_sensor.TimeSynchronizationSensor]) -> 'WrappedRedvoxPacket':
    """
    Sets this packet's time sync sensor. A channel can be removed by passing in None.
    :param time_synchronization_sensor: An optional instance of a time sync sensor.
    :return: This instance of a wrapped redvox packet.
    """
    if self.has_time_synchronization_sensor():
        self._delete_channel(api900_pb2.TIME_SYNCHRONIZATION)

    if time_synchronization_sensor is not None:
        # pylint: disable=W0212
        self._add_channel(time_synchronization_sensor._unevenly_sampled_channel)

    return self
def set_time_synchronization_server(self, server: str) ‑> WrappedRedvoxPacket

sets the time synchronization server url :param server: url to time synchronization server

Expand source code
def set_time_synchronization_server(self, server: str) -> 'WrappedRedvoxPacket':
    """
    sets the time synchronization server url
    :param server: url to time synchronization server
    """
    self._redvox_packet.time_synchronization_server = server
    return self
def set_uuid(self, uid: str) ‑> WrappedRedvoxPacket

sets the uuid :param uid: uuid string

Expand source code
def set_uuid(self, uid: str) -> 'WrappedRedvoxPacket':
    """
    sets the uuid
    :param uid: uuid string
    """
    self._redvox_packet.uuid = uid
    return self
def start_timestamp_us_utc(self) ‑> int

Returns the start timestamp of a WrappedRedvoxPacket. :return: The start timestamp of a WrappedRedvoxPacket.

Expand source code
def start_timestamp_us_utc(self) -> int:
    """
    Returns the start timestamp of a WrappedRedvoxPacket.
    :return: The start timestamp of a WrappedRedvoxPacket.
    """
    return migrations.maybe_get_float(
        self.microphone_sensor().first_sample_timestamp_epoch_microseconds_utc())
def time_synchronization_sensor(self) ‑> Optional[TimeSynchronizationSensor]

Returns the high-level time synchronization channel API or None if this packet doesn't contain a channel of this type. :return: the high-level time synchronization channel API or None if this packet doesn't contain a channel of this type.

Expand source code
def time_synchronization_sensor(self) -> typing.Optional[_time_synchronization_sensor.TimeSynchronizationSensor]:
    """
    Returns the high-level time synchronization channel API or None if this packet doesn't contain a channel of
    this type.
    :return: the high-level time synchronization channel API or None if this packet doesn't contain a channel of
    this type.
    """
    if self.has_time_synchronization_sensor():
        return _time_synchronization_sensor.TimeSynchronizationSensor(
            self._get_channel(api900_pb2.TIME_SYNCHRONIZATION))

    return None
def time_synchronization_server(self) ‑> str
Expand source code
def time_synchronization_server(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.time_synchronization_server
def to_json(self) ‑> str

Converts the protobuf packet stored in this wrapped packet to JSON. :return: The JSON representation of the protobuf encoded packet.

Expand source code
def to_json(self) -> str:
    """
    Converts the protobuf packet stored in this wrapped packet to JSON.
    :return: The JSON representation of the protobuf encoded packet.
    """
    return reader_utils.to_json(self._redvox_packet)
def update_uneven_sensor_timestamps(self, time_delta: int)

Given a time delta in microseconds, will adjust all unevenly sampled sensor timestamps by that amount. Use negative values to adjust backwards in time. :param time_delta: amount of time to adjust timestamps in microseconds

Expand source code
def update_uneven_sensor_timestamps(self, time_delta: int or float):
    """
    Given a time delta in microseconds, will adjust all unevenly sampled sensor timestamps by that amount.
    Use negative values to adjust backwards in time.
    :param time_delta: amount of time to adjust timestamps in microseconds
    """
    for channel in self._unevenly_sampled_channels_field:
        if not channel.has_channel(api900_pb2.TIME_SYNCHRONIZATION):
            channel.set_timestamps_microseconds_utc(channel.timestamps_microseconds_utc + time_delta)
def uuid(self) ‑> str
Expand source code
def uuid(self) -> str:
    """
    See https://bitbucket.org/redvoxhi/redvox-data-apis/src/master/src/api900/api900.proto?at=master for a
    description of this field.
    """
    return self._redvox_packet.uuid
def write_json(self, directory: str, filename: Optional[str] = None)

Writes a RedVox compliant .json file to the specified directory. :param directory: The directory to write the file to. :param filename: An optional filename (the default filename will be used if one is not provided).

Expand source code
def write_json(self, directory: str, filename: typing.Optional[str] = None):
    """
    Writes a RedVox compliant .json file to the specified directory.
    :param directory: The directory to write the file to.
    :param filename: An optional filename (the default filename will be used if one is not provided).
    """
    filename = self.default_filename(extension="json") if filename is None else filename
    path = os.path.join(directory, filename)
    with open(path, "w") as json_out:
        json_out.write(self.to_json())
def write_rdvxz(self, directory: str, filename: Optional[str] = None)

Writes a compressed .rdvxz file to the specified directory. :param directory: The directory to write the file to. :param filename: An optional filename (the default filename will be used if one is not provided).

Expand source code
def write_rdvxz(self, directory: str, filename: typing.Optional[str] = None):
    """
    Writes a compressed .rdvxz file to the specified directory.
    :param directory: The directory to write the file to.
    :param filename: An optional filename (the default filename will be used if one is not provided).
    """
    filename = self.default_filename() if filename is None else filename
    path = os.path.join(directory, filename)
    with open(path, "wb") as rdvxz_out:
        rdvxz_out.write(self.compressed_buffer())