Module redvox.api1000.wrapped_redvox_packet.wrapped_packet
This module provides a high level API for creating, reading, and editing RedVox compliant API 1000 files.
Expand source code
"""
This module provides a high level API for creating, reading, and editing RedVox compliant API 1000 files.
"""
from datetime import datetime, timedelta
import os.path
from functools import total_ordering
from typing import Optional, List
# noinspection PyPackageRequirements
from google.protobuf import json_format
import lz4.frame
from redvox.api1000.common.common import check_type
import redvox.api1000.common.typing
import redvox.api1000.errors as errors
import redvox.api1000.wrapped_redvox_packet.sensors.sensors as _sensors
import redvox.api1000.wrapped_redvox_packet.station_information as _station_information
import redvox.api1000.wrapped_redvox_packet.timing_information as _timing_information
import redvox.common.date_time_utils as dt_utils
from redvox.api1000.common.generic import ProtoBase, ProtoRepeatedMessage
from redvox.api1000.proto.redvox_api_m_pb2 import RedvoxPacketM
from redvox.api1000.wrapped_redvox_packet.event_streams import EventStream
@total_ordering
class WrappedRedvoxPacketM(ProtoBase[RedvoxPacketM]):
"""
Wraps a RedVox API M protobuf buffer.
"""
def __init__(self, redvox_proto: RedvoxPacketM):
super().__init__(redvox_proto)
self._station_information: _station_information.StationInformation = (
_station_information.StationInformation(redvox_proto.station_information)
)
self._timing_information: _timing_information.TimingInformation = (
_timing_information.TimingInformation(redvox_proto.timing_information)
)
self._sensors: _sensors.Sensors = _sensors.Sensors(redvox_proto.sensors)
self._event_streams: ProtoRepeatedMessage = ProtoRepeatedMessage(
redvox_proto,
redvox_proto.event_streams,
"event_streams",
EventStream,
lambda event_stream: event_stream.get_proto(),
)
# Implement methods required for total_ordering
def __eq__(self, other) -> bool:
"""
Tests if this packet is equal in time to another packet by comparing start mach timestamps.
:param other: Other packet to compare against
:return: True if the packets mach timestamps match, False otherwise
"""
self_ts: float = self.get_timing_information().get_packet_start_mach_timestamp()
other_ts: float = (
other.get_timing_information().get_packet_start_mach_timestamp()
)
return self_ts == other_ts
def __lt__(self, other: "WrappedRedvoxPacketM") -> bool:
"""
Checks if this packet is less than another packet by comparing start mach times.
:param other: Other packet to compare against.
:return: True if this packet is less than the other packet
"""
self_ts: float = self.get_timing_information().get_packet_start_mach_timestamp()
other_ts: float = (
other.get_timing_information().get_packet_start_mach_timestamp()
)
return self_ts < other_ts
@staticmethod
def new() -> "WrappedRedvoxPacketM":
"""
Returns a new default instance of a WrappedRedvoxPacketApi1000.
:return: A new default instance of a WrappedRedvoxPacketApi1000.
"""
return WrappedRedvoxPacketM(RedvoxPacketM())
@staticmethod
def from_compressed_bytes(data: bytes) -> "WrappedRedvoxPacketM":
"""
Deserializes the byte content of an API M encoded .rdvxz file.
:param data: The compressed bytes to deserialize.
:return: An instance of a WrappedRedvoxPacketAPi1000.
"""
redvox.api1000.common.typing.check_type(data, [bytes])
proto: RedvoxPacketM = RedvoxPacketM()
proto.ParseFromString(lz4.frame.decompress(data, False))
return WrappedRedvoxPacketM(proto)
@staticmethod
def from_compressed_path(rdvxm_path: str) -> "WrappedRedvoxPacketM":
"""
Deserialize an API M encoded .rdvxm file from the specified file system path.
:param rdvxm_path: Path to the API M encoded file.
:return: An instance of a WrappedRedvoxPacketApiM.
"""
redvox.api1000.common.typing.check_type(rdvxm_path, [str])
if not os.path.isfile(rdvxm_path):
raise errors.WrappedRedvoxPacketMError(
f"Path to file={rdvxm_path} does not exist."
)
with lz4.frame.open(rdvxm_path, "rb") as serialized_in:
proto: RedvoxPacketM = RedvoxPacketM()
proto.ParseFromString(serialized_in.read())
return WrappedRedvoxPacketM(proto)
@staticmethod
def from_json(json_str: str) -> "WrappedRedvoxPacketM":
"""
read json packet representing an API 1000 packet
:param json_str: contains the json representing the packet
:return: An instance of a WrappedRedvoxPacketM
"""
return WrappedRedvoxPacketM(json_format.Parse(json_str, RedvoxPacketM()))
@staticmethod
def from_json_path(json_path: str) -> "WrappedRedvoxPacketM":
"""
read json from a file representing an api 1000 packet
:param json_path: the path to the file to read
:return: wrapped redvox packet api 1000
"""
with open(json_path, "r") as json_in:
return WrappedRedvoxPacketM.from_json(json_in.read())
def default_filename(self, extension: Optional[str] = "rdvxm") -> str:
"""
Returns the default filename for a given packet.
:param extension: An (optional) file extension to add to the default file name.
:return: The default filename for this packet.
"""
# Format to be exactly 10 characters
station_id: str = f"{self.get_station_information().get_id():0>10}"
timestamp: int = round(
self.get_timing_information().get_packet_start_mach_timestamp()
)
filename: str = f"{station_id}_{timestamp}"
if extension is not None:
filename = f"{filename}.{extension}"
return filename
def default_file_dir(self) -> str:
"""
Computes the default file directory structure for a structured layout for this particular packet.
:return: Directory structure for this packet when using structured layout
"""
timestamp: float = (
self.get_timing_information().get_packet_start_mach_timestamp()
)
date_time: datetime = dt_utils.datetime_from_epoch_microseconds_utc(timestamp)
year: str = f"{date_time.year:0>4}"
month: str = f"{date_time.month:0>2}"
day: str = f"{date_time.day:0>2}"
hour: str = f"{date_time.hour:0>2}"
return os.path.join(year, month, day, hour)
def default_file_path(self) -> str:
"""
Computes the default directory structure and file name for this packet for structured layouts.
:return: The default directory structure and file name for this packet for structured layouts.
"""
return os.path.join(self.default_file_dir(), self.default_filename())
def write_compressed_to_file(
self, base_dir: str, filename: Optional[str] = None
) -> str:
"""
Writes this packet to a .rdvxm file.
:param base_dir: Directory to write .rdvxm to.
:param filename: The (optional) file name to use. Will use default file name otherwise.
:return: Path of written file.
"""
if filename is None:
filename = self.default_filename("rdvxm")
if not os.path.isdir(base_dir):
raise errors.WrappedRedvoxPacketMError(
f"Base directory={base_dir} does not exist."
)
out_path: str = os.path.join(base_dir, filename)
with open(out_path, "wb") as compressed_out:
compressed_out.write(self.as_compressed_bytes())
return out_path
def write_json_to_file(self, base_dir: str, filename: Optional[str] = None) -> str:
"""
Writes this packet to a .json file.
:param base_dir: Directory to write .json to.
:param filename: The (optional) file name to use. Will use default file name otherwise.
:return: Path of written file.
"""
if filename is None:
filename = self.default_filename("json")
if not os.path.isdir(base_dir):
raise errors.WrappedRedvoxPacketMError(
f"Base directory={base_dir} does not exist."
)
out_path: str = os.path.join(base_dir, filename)
with open(out_path, "w") as json_out:
json_out.write(self.as_json())
return out_path
def validate(self) -> List[str]:
"""
Validates this packet.
:return: A list of validation errors.
"""
return validate_wrapped_packet(self)
# Top-level packet fields
def get_api(self) -> float:
"""
Returns the API version of this packet.
:return: The API version of this packet.
"""
return self._proto.api
def set_api(self, api: float) -> "WrappedRedvoxPacketM":
"""
Sets the api version of this packet.
:param api: The API version (should be 1000.0)
:return: The modified instance of this packet
"""
redvox.api1000.common.typing.check_type(api, [int, float])
self._proto.api = api
return self
def get_sub_api(self) -> float:
"""
Returns the sub_api version. This version tracks small changes within API 1000.
:return: The sub_api version. This version tracks small changes within API 1000.
"""
return self._proto.sub_api
def set_sub_api(self, sub_api: float) -> "WrappedRedvoxPacketM":
"""
Sets the sub_api.
:param sub_api: sub_api to set.
:return: A modified instance of self.
"""
redvox.api1000.common.typing.check_type(sub_api, [int, float])
self._proto.sub_api = sub_api
return self
def get_station_information(self) -> _station_information.StationInformation:
"""
:return: An instance of StationInformation
"""
return self._station_information
def set_station_information(
self, station_information: _station_information.StationInformation
) -> "WrappedRedvoxPacketM":
"""
Sets the StationInformation.
:param station_information: StationInformation to set.
:return: A modified instance of self.
"""
check_type(station_information, [_station_information.StationInformation])
self.get_proto().station_information.CopyFrom(station_information.get_proto())
self._station_information = _station_information.StationInformation(
self.get_proto().station_information
)
return self
def get_timing_information(self) -> _timing_information.TimingInformation:
"""
:return: An instance of TimingInformation
"""
return self._timing_information
def set_timing_information(
self, timing_information: _timing_information.TimingInformation
) -> "WrappedRedvoxPacketM":
"""
Sets the timing information.
:param timing_information: TimingInformation to set.
:return: A modified instance of self.
"""
check_type(timing_information, [_timing_information.TimingInformation])
self.get_proto().timing_information.CopyFrom(timing_information.get_proto())
self._timing_information = _timing_information.TimingInformation(
self.get_proto().timing_information
)
return self
def get_sensors(self) -> _sensors.Sensors:
"""
:return: An instance of Sensors
"""
return self._sensors
def set_sensors(self, sensors: _sensors.Sensors) -> "WrappedRedvoxPacketM":
"""
Sets the sensors.
:param sensors: Sensors to set.
:return: A modified instance of self.
"""
check_type(sensors, [_sensors.Sensors])
self.get_proto().sensors.CopyFrom(sensors.get_proto())
self._sensors = _sensors.Sensors(self.get_proto().sensors)
return self
def get_event_streams(self) -> ProtoRepeatedMessage:
"""
:return: A collection of event streams.
"""
return self._event_streams
def set_event_streams(
self, event_streams: ProtoRepeatedMessage
) -> "WrappedRedvoxPacketM":
"""
Set the event streams from the provided ProtoRepeatedMessage.
:param event_streams: EventStreams embedded in a ProtoRepeatedMessage.
:return: A modified instance of self.
"""
check_type(event_streams, [ProtoRepeatedMessage])
self._event_streams.clear_values()
self._event_streams.append_values(event_streams.get_values())
return self
# todo: add packet_duration calculations that don't rely on sensors existing if possible
def get_packet_duration(self) -> timedelta:
"""
:return: Packet duration as a timedelta
"""
audio = self.get_sensors().get_audio()
if audio is not None:
return audio.get_duration()
else:
return timedelta(seconds=0)
def get_packet_duration_s(self) -> float:
"""
get the packet duration in seconds from the audio data
:return: packet duration in seconds
"""
if self.get_sensors().has_audio():
return self.get_sensors().get_audio().get_duration_s()
else:
return 0.0
def update_timestamps(self, delta_offset: float = None) -> "WrappedRedvoxPacketM":
"""
update all timestamps in the packet by adding the delta offset
:param delta_offset: amount of microseconds to add to existing timestamps
:return: WrappedRedvoxPacketM with updated timestamps
"""
if delta_offset is None:
delta_offset = self.get_timing_information().get_best_offset()
redvox.api1000.common.typing.check_type(delta_offset, [float])
# create new wrapped packet to hold changed data
updated = WrappedRedvoxPacketM(self.get_proto())
if self.get_proto().HasField("timing_information"):
# update timing information timestamps
updated.get_timing_information().set_packet_start_mach_timestamp(
self.get_timing_information().get_packet_start_mach_timestamp()
+ delta_offset
)
updated.get_timing_information().set_packet_end_mach_timestamp(
self.get_timing_information().get_packet_end_mach_timestamp()
+ delta_offset
)
updated.get_timing_information().set_packet_start_os_timestamp(
self.get_timing_information().get_packet_start_os_timestamp()
+ delta_offset
)
updated.get_timing_information().set_packet_end_os_timestamp(
self.get_timing_information().get_packet_end_os_timestamp()
+ delta_offset
)
updated.get_timing_information().set_app_start_mach_timestamp(
self.get_timing_information().get_app_start_mach_timestamp()
+ delta_offset
)
if self.get_sensors().get_proto().HasField("audio"):
# update audio first sample timestamp
updated.get_sensors().get_audio().set_first_sample_timestamp(
self.get_sensors().get_audio().get_first_sample_timestamp()
+ delta_offset
)
# todo if self.get_sensors().get_proto().HasField("compressed_audio"):
# update compressed audio first sample timestamp
# update timestamp payloads
if (
self.get_station_information()
.get_station_metrics()
.get_proto()
.HasField("timestamps")
):
updated.get_station_information().get_station_metrics().get_timestamps().set_timestamps(
self.get_station_information()
.get_station_metrics()
.get_timestamps()
.get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_accelerometer().get_proto().HasField("timestamps"):
updated.get_sensors().get_accelerometer().get_timestamps().set_timestamps(
self.get_sensors().get_accelerometer().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if (
self.get_sensors()
.get_ambient_temperature()
.get_proto()
.HasField("timestamps")
):
updated.get_sensors().get_ambient_temperature().get_timestamps().set_timestamps(
self.get_sensors()
.get_ambient_temperature()
.get_timestamps()
.get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_gravity().get_proto().HasField("timestamps"):
updated.get_sensors().get_gravity().get_timestamps().set_timestamps(
self.get_sensors().get_gravity().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_gyroscope().get_proto().HasField("timestamps"):
updated.get_sensors().get_gyroscope().get_timestamps().set_timestamps(
self.get_sensors().get_gyroscope().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_image().get_proto().HasField("timestamps"):
updated.get_sensors().get_image().get_timestamps().set_timestamps(
self.get_sensors().get_image().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_light().get_proto().HasField("timestamps"):
updated.get_sensors().get_light().get_timestamps().set_timestamps(
self.get_sensors().get_light().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if (
self.get_sensors()
.get_linear_acceleration()
.get_proto()
.HasField("timestamps")
):
updated.get_sensors().get_linear_acceleration().get_timestamps().set_timestamps(
self.get_sensors()
.get_linear_acceleration()
.get_timestamps()
.get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_location().get_proto().HasField("timestamps"):
updated.get_sensors().get_location().get_timestamps().set_timestamps(
self.get_sensors().get_location().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_magnetometer().get_proto().HasField("timestamps"):
updated.get_sensors().get_magnetometer().get_timestamps().set_timestamps(
self.get_sensors().get_magnetometer().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_orientation().get_proto().HasField("timestamps"):
updated.get_sensors().get_orientation().get_timestamps().set_timestamps(
self.get_sensors().get_orientation().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_pressure().get_proto().HasField("timestamps"):
updated.get_sensors().get_pressure().get_timestamps().set_timestamps(
self.get_sensors().get_pressure().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_proximity().get_proto().HasField("timestamps"):
updated.get_sensors().get_proximity().get_timestamps().set_timestamps(
self.get_sensors().get_proximity().get_timestamps().get_timestamps()
+ delta_offset,
True,
)
if (
self.get_sensors()
.get_relative_humidity()
.get_proto()
.HasField("timestamps")
):
updated.get_sensors().get_relative_humidity().get_timestamps().set_timestamps(
self.get_sensors()
.get_relative_humidity()
.get_timestamps()
.get_timestamps()
+ delta_offset,
True,
)
if self.get_sensors().get_rotation_vector().get_proto().HasField("timestamps"):
updated.get_sensors().get_rotation_vector().get_timestamps().set_timestamps(
self.get_sensors()
.get_rotation_vector()
.get_timestamps()
.get_timestamps()
+ delta_offset,
True,
)
return updated
def validate_wrapped_packet(wrapped_packet: WrappedRedvoxPacketM) -> List[str]:
"""
Validates a wrapped packet.
:param wrapped_packet: Packet to validate.
:return: A list of validation errors.
"""
errors_list = _station_information.validate_station_information(
wrapped_packet.get_station_information()
)
errors_list.extend(
_timing_information.validate_timing_information(
wrapped_packet.get_timing_information()
)
)
errors_list.extend(_sensors.validate_sensors(wrapped_packet.get_sensors()))
if wrapped_packet.get_api() != 1000:
errors_list.append("Wrapped packet api is not 1000")
return errors_list
Functions
def validate_wrapped_packet(wrapped_packet: WrappedRedvoxPacketM) ‑> List[str]
-
Validates a wrapped packet. :param wrapped_packet: Packet to validate. :return: A list of validation errors.
Expand source code
def validate_wrapped_packet(wrapped_packet: WrappedRedvoxPacketM) -> List[str]: """ Validates a wrapped packet. :param wrapped_packet: Packet to validate. :return: A list of validation errors. """ errors_list = _station_information.validate_station_information( wrapped_packet.get_station_information() ) errors_list.extend( _timing_information.validate_timing_information( wrapped_packet.get_timing_information() ) ) errors_list.extend(_sensors.validate_sensors(wrapped_packet.get_sensors())) if wrapped_packet.get_api() != 1000: errors_list.append("Wrapped packet api is not 1000") return errors_list
Classes
class RedvoxPacketM (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
var DoubleSamplePayload
-
A ProtocolMessage
var EventStream
-
A ProtocolMessage
var MetadataEntry
-
A ProtocolMessage
var SamplePayload
-
A ProtocolMessage
var Sensors
-
A ProtocolMessage
var StationInformation
-
A ProtocolMessage
var SummaryStatistics
-
A ProtocolMessage
var TimingInformation
-
A ProtocolMessage
var TimingPayload
-
A ProtocolMessage
var Unit
class WrappedRedvoxPacketM (redvox_proto: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM)
-
Wraps a RedVox API M protobuf buffer.
Expand source code
@total_ordering class WrappedRedvoxPacketM(ProtoBase[RedvoxPacketM]): """ Wraps a RedVox API M protobuf buffer. """ def __init__(self, redvox_proto: RedvoxPacketM): super().__init__(redvox_proto) self._station_information: _station_information.StationInformation = ( _station_information.StationInformation(redvox_proto.station_information) ) self._timing_information: _timing_information.TimingInformation = ( _timing_information.TimingInformation(redvox_proto.timing_information) ) self._sensors: _sensors.Sensors = _sensors.Sensors(redvox_proto.sensors) self._event_streams: ProtoRepeatedMessage = ProtoRepeatedMessage( redvox_proto, redvox_proto.event_streams, "event_streams", EventStream, lambda event_stream: event_stream.get_proto(), ) # Implement methods required for total_ordering def __eq__(self, other) -> bool: """ Tests if this packet is equal in time to another packet by comparing start mach timestamps. :param other: Other packet to compare against :return: True if the packets mach timestamps match, False otherwise """ self_ts: float = self.get_timing_information().get_packet_start_mach_timestamp() other_ts: float = ( other.get_timing_information().get_packet_start_mach_timestamp() ) return self_ts == other_ts def __lt__(self, other: "WrappedRedvoxPacketM") -> bool: """ Checks if this packet is less than another packet by comparing start mach times. :param other: Other packet to compare against. :return: True if this packet is less than the other packet """ self_ts: float = self.get_timing_information().get_packet_start_mach_timestamp() other_ts: float = ( other.get_timing_information().get_packet_start_mach_timestamp() ) return self_ts < other_ts @staticmethod def new() -> "WrappedRedvoxPacketM": """ Returns a new default instance of a WrappedRedvoxPacketApi1000. :return: A new default instance of a WrappedRedvoxPacketApi1000. """ return WrappedRedvoxPacketM(RedvoxPacketM()) @staticmethod def from_compressed_bytes(data: bytes) -> "WrappedRedvoxPacketM": """ Deserializes the byte content of an API M encoded .rdvxz file. :param data: The compressed bytes to deserialize. :return: An instance of a WrappedRedvoxPacketAPi1000. """ redvox.api1000.common.typing.check_type(data, [bytes]) proto: RedvoxPacketM = RedvoxPacketM() proto.ParseFromString(lz4.frame.decompress(data, False)) return WrappedRedvoxPacketM(proto) @staticmethod def from_compressed_path(rdvxm_path: str) -> "WrappedRedvoxPacketM": """ Deserialize an API M encoded .rdvxm file from the specified file system path. :param rdvxm_path: Path to the API M encoded file. :return: An instance of a WrappedRedvoxPacketApiM. """ redvox.api1000.common.typing.check_type(rdvxm_path, [str]) if not os.path.isfile(rdvxm_path): raise errors.WrappedRedvoxPacketMError( f"Path to file={rdvxm_path} does not exist." ) with lz4.frame.open(rdvxm_path, "rb") as serialized_in: proto: RedvoxPacketM = RedvoxPacketM() proto.ParseFromString(serialized_in.read()) return WrappedRedvoxPacketM(proto) @staticmethod def from_json(json_str: str) -> "WrappedRedvoxPacketM": """ read json packet representing an API 1000 packet :param json_str: contains the json representing the packet :return: An instance of a WrappedRedvoxPacketM """ return WrappedRedvoxPacketM(json_format.Parse(json_str, RedvoxPacketM())) @staticmethod def from_json_path(json_path: str) -> "WrappedRedvoxPacketM": """ read json from a file representing an api 1000 packet :param json_path: the path to the file to read :return: wrapped redvox packet api 1000 """ with open(json_path, "r") as json_in: return WrappedRedvoxPacketM.from_json(json_in.read()) def default_filename(self, extension: Optional[str] = "rdvxm") -> str: """ Returns the default filename for a given packet. :param extension: An (optional) file extension to add to the default file name. :return: The default filename for this packet. """ # Format to be exactly 10 characters station_id: str = f"{self.get_station_information().get_id():0>10}" timestamp: int = round( self.get_timing_information().get_packet_start_mach_timestamp() ) filename: str = f"{station_id}_{timestamp}" if extension is not None: filename = f"{filename}.{extension}" return filename def default_file_dir(self) -> str: """ Computes the default file directory structure for a structured layout for this particular packet. :return: Directory structure for this packet when using structured layout """ timestamp: float = ( self.get_timing_information().get_packet_start_mach_timestamp() ) date_time: datetime = dt_utils.datetime_from_epoch_microseconds_utc(timestamp) year: str = f"{date_time.year:0>4}" month: str = f"{date_time.month:0>2}" day: str = f"{date_time.day:0>2}" hour: str = f"{date_time.hour:0>2}" return os.path.join(year, month, day, hour) def default_file_path(self) -> str: """ Computes the default directory structure and file name for this packet for structured layouts. :return: The default directory structure and file name for this packet for structured layouts. """ return os.path.join(self.default_file_dir(), self.default_filename()) def write_compressed_to_file( self, base_dir: str, filename: Optional[str] = None ) -> str: """ Writes this packet to a .rdvxm file. :param base_dir: Directory to write .rdvxm to. :param filename: The (optional) file name to use. Will use default file name otherwise. :return: Path of written file. """ if filename is None: filename = self.default_filename("rdvxm") if not os.path.isdir(base_dir): raise errors.WrappedRedvoxPacketMError( f"Base directory={base_dir} does not exist." ) out_path: str = os.path.join(base_dir, filename) with open(out_path, "wb") as compressed_out: compressed_out.write(self.as_compressed_bytes()) return out_path def write_json_to_file(self, base_dir: str, filename: Optional[str] = None) -> str: """ Writes this packet to a .json file. :param base_dir: Directory to write .json to. :param filename: The (optional) file name to use. Will use default file name otherwise. :return: Path of written file. """ if filename is None: filename = self.default_filename("json") if not os.path.isdir(base_dir): raise errors.WrappedRedvoxPacketMError( f"Base directory={base_dir} does not exist." ) out_path: str = os.path.join(base_dir, filename) with open(out_path, "w") as json_out: json_out.write(self.as_json()) return out_path def validate(self) -> List[str]: """ Validates this packet. :return: A list of validation errors. """ return validate_wrapped_packet(self) # Top-level packet fields def get_api(self) -> float: """ Returns the API version of this packet. :return: The API version of this packet. """ return self._proto.api def set_api(self, api: float) -> "WrappedRedvoxPacketM": """ Sets the api version of this packet. :param api: The API version (should be 1000.0) :return: The modified instance of this packet """ redvox.api1000.common.typing.check_type(api, [int, float]) self._proto.api = api return self def get_sub_api(self) -> float: """ Returns the sub_api version. This version tracks small changes within API 1000. :return: The sub_api version. This version tracks small changes within API 1000. """ return self._proto.sub_api def set_sub_api(self, sub_api: float) -> "WrappedRedvoxPacketM": """ Sets the sub_api. :param sub_api: sub_api to set. :return: A modified instance of self. """ redvox.api1000.common.typing.check_type(sub_api, [int, float]) self._proto.sub_api = sub_api return self def get_station_information(self) -> _station_information.StationInformation: """ :return: An instance of StationInformation """ return self._station_information def set_station_information( self, station_information: _station_information.StationInformation ) -> "WrappedRedvoxPacketM": """ Sets the StationInformation. :param station_information: StationInformation to set. :return: A modified instance of self. """ check_type(station_information, [_station_information.StationInformation]) self.get_proto().station_information.CopyFrom(station_information.get_proto()) self._station_information = _station_information.StationInformation( self.get_proto().station_information ) return self def get_timing_information(self) -> _timing_information.TimingInformation: """ :return: An instance of TimingInformation """ return self._timing_information def set_timing_information( self, timing_information: _timing_information.TimingInformation ) -> "WrappedRedvoxPacketM": """ Sets the timing information. :param timing_information: TimingInformation to set. :return: A modified instance of self. """ check_type(timing_information, [_timing_information.TimingInformation]) self.get_proto().timing_information.CopyFrom(timing_information.get_proto()) self._timing_information = _timing_information.TimingInformation( self.get_proto().timing_information ) return self def get_sensors(self) -> _sensors.Sensors: """ :return: An instance of Sensors """ return self._sensors def set_sensors(self, sensors: _sensors.Sensors) -> "WrappedRedvoxPacketM": """ Sets the sensors. :param sensors: Sensors to set. :return: A modified instance of self. """ check_type(sensors, [_sensors.Sensors]) self.get_proto().sensors.CopyFrom(sensors.get_proto()) self._sensors = _sensors.Sensors(self.get_proto().sensors) return self def get_event_streams(self) -> ProtoRepeatedMessage: """ :return: A collection of event streams. """ return self._event_streams def set_event_streams( self, event_streams: ProtoRepeatedMessage ) -> "WrappedRedvoxPacketM": """ Set the event streams from the provided ProtoRepeatedMessage. :param event_streams: EventStreams embedded in a ProtoRepeatedMessage. :return: A modified instance of self. """ check_type(event_streams, [ProtoRepeatedMessage]) self._event_streams.clear_values() self._event_streams.append_values(event_streams.get_values()) return self # todo: add packet_duration calculations that don't rely on sensors existing if possible def get_packet_duration(self) -> timedelta: """ :return: Packet duration as a timedelta """ audio = self.get_sensors().get_audio() if audio is not None: return audio.get_duration() else: return timedelta(seconds=0) def get_packet_duration_s(self) -> float: """ get the packet duration in seconds from the audio data :return: packet duration in seconds """ if self.get_sensors().has_audio(): return self.get_sensors().get_audio().get_duration_s() else: return 0.0 def update_timestamps(self, delta_offset: float = None) -> "WrappedRedvoxPacketM": """ update all timestamps in the packet by adding the delta offset :param delta_offset: amount of microseconds to add to existing timestamps :return: WrappedRedvoxPacketM with updated timestamps """ if delta_offset is None: delta_offset = self.get_timing_information().get_best_offset() redvox.api1000.common.typing.check_type(delta_offset, [float]) # create new wrapped packet to hold changed data updated = WrappedRedvoxPacketM(self.get_proto()) if self.get_proto().HasField("timing_information"): # update timing information timestamps updated.get_timing_information().set_packet_start_mach_timestamp( self.get_timing_information().get_packet_start_mach_timestamp() + delta_offset ) updated.get_timing_information().set_packet_end_mach_timestamp( self.get_timing_information().get_packet_end_mach_timestamp() + delta_offset ) updated.get_timing_information().set_packet_start_os_timestamp( self.get_timing_information().get_packet_start_os_timestamp() + delta_offset ) updated.get_timing_information().set_packet_end_os_timestamp( self.get_timing_information().get_packet_end_os_timestamp() + delta_offset ) updated.get_timing_information().set_app_start_mach_timestamp( self.get_timing_information().get_app_start_mach_timestamp() + delta_offset ) if self.get_sensors().get_proto().HasField("audio"): # update audio first sample timestamp updated.get_sensors().get_audio().set_first_sample_timestamp( self.get_sensors().get_audio().get_first_sample_timestamp() + delta_offset ) # todo if self.get_sensors().get_proto().HasField("compressed_audio"): # update compressed audio first sample timestamp # update timestamp payloads if ( self.get_station_information() .get_station_metrics() .get_proto() .HasField("timestamps") ): updated.get_station_information().get_station_metrics().get_timestamps().set_timestamps( self.get_station_information() .get_station_metrics() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_accelerometer().get_proto().HasField("timestamps"): updated.get_sensors().get_accelerometer().get_timestamps().set_timestamps( self.get_sensors().get_accelerometer().get_timestamps().get_timestamps() + delta_offset, True, ) if ( self.get_sensors() .get_ambient_temperature() .get_proto() .HasField("timestamps") ): updated.get_sensors().get_ambient_temperature().get_timestamps().set_timestamps( self.get_sensors() .get_ambient_temperature() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_gravity().get_proto().HasField("timestamps"): updated.get_sensors().get_gravity().get_timestamps().set_timestamps( self.get_sensors().get_gravity().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_gyroscope().get_proto().HasField("timestamps"): updated.get_sensors().get_gyroscope().get_timestamps().set_timestamps( self.get_sensors().get_gyroscope().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_image().get_proto().HasField("timestamps"): updated.get_sensors().get_image().get_timestamps().set_timestamps( self.get_sensors().get_image().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_light().get_proto().HasField("timestamps"): updated.get_sensors().get_light().get_timestamps().set_timestamps( self.get_sensors().get_light().get_timestamps().get_timestamps() + delta_offset, True, ) if ( self.get_sensors() .get_linear_acceleration() .get_proto() .HasField("timestamps") ): updated.get_sensors().get_linear_acceleration().get_timestamps().set_timestamps( self.get_sensors() .get_linear_acceleration() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_location().get_proto().HasField("timestamps"): updated.get_sensors().get_location().get_timestamps().set_timestamps( self.get_sensors().get_location().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_magnetometer().get_proto().HasField("timestamps"): updated.get_sensors().get_magnetometer().get_timestamps().set_timestamps( self.get_sensors().get_magnetometer().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_orientation().get_proto().HasField("timestamps"): updated.get_sensors().get_orientation().get_timestamps().set_timestamps( self.get_sensors().get_orientation().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_pressure().get_proto().HasField("timestamps"): updated.get_sensors().get_pressure().get_timestamps().set_timestamps( self.get_sensors().get_pressure().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_proximity().get_proto().HasField("timestamps"): updated.get_sensors().get_proximity().get_timestamps().set_timestamps( self.get_sensors().get_proximity().get_timestamps().get_timestamps() + delta_offset, True, ) if ( self.get_sensors() .get_relative_humidity() .get_proto() .HasField("timestamps") ): updated.get_sensors().get_relative_humidity().get_timestamps().set_timestamps( self.get_sensors() .get_relative_humidity() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_rotation_vector().get_proto().HasField("timestamps"): updated.get_sensors().get_rotation_vector().get_timestamps().set_timestamps( self.get_sensors() .get_rotation_vector() .get_timestamps() .get_timestamps() + delta_offset, True, ) return updated
Ancestors
- ProtoBase
- typing.Generic
Static methods
def from_compressed_bytes(data: bytes) ‑> WrappedRedvoxPacketM
-
Deserializes the byte content of an API M encoded .rdvxz file. :param data: The compressed bytes to deserialize. :return: An instance of a WrappedRedvoxPacketAPi1000.
Expand source code
@staticmethod def from_compressed_bytes(data: bytes) -> "WrappedRedvoxPacketM": """ Deserializes the byte content of an API M encoded .rdvxz file. :param data: The compressed bytes to deserialize. :return: An instance of a WrappedRedvoxPacketAPi1000. """ redvox.api1000.common.typing.check_type(data, [bytes]) proto: RedvoxPacketM = RedvoxPacketM() proto.ParseFromString(lz4.frame.decompress(data, False)) return WrappedRedvoxPacketM(proto)
def from_compressed_path(rdvxm_path: str) ‑> WrappedRedvoxPacketM
-
Deserialize an API M encoded .rdvxm file from the specified file system path. :param rdvxm_path: Path to the API M encoded file. :return: An instance of a WrappedRedvoxPacketApiM.
Expand source code
@staticmethod def from_compressed_path(rdvxm_path: str) -> "WrappedRedvoxPacketM": """ Deserialize an API M encoded .rdvxm file from the specified file system path. :param rdvxm_path: Path to the API M encoded file. :return: An instance of a WrappedRedvoxPacketApiM. """ redvox.api1000.common.typing.check_type(rdvxm_path, [str]) if not os.path.isfile(rdvxm_path): raise errors.WrappedRedvoxPacketMError( f"Path to file={rdvxm_path} does not exist." ) with lz4.frame.open(rdvxm_path, "rb") as serialized_in: proto: RedvoxPacketM = RedvoxPacketM() proto.ParseFromString(serialized_in.read()) return WrappedRedvoxPacketM(proto)
def from_json(json_str: str) ‑> WrappedRedvoxPacketM
-
read json packet representing an API 1000 packet :param json_str: contains the json representing the packet :return: An instance of a WrappedRedvoxPacketM
Expand source code
@staticmethod def from_json(json_str: str) -> "WrappedRedvoxPacketM": """ read json packet representing an API 1000 packet :param json_str: contains the json representing the packet :return: An instance of a WrappedRedvoxPacketM """ return WrappedRedvoxPacketM(json_format.Parse(json_str, RedvoxPacketM()))
def from_json_path(json_path: str) ‑> WrappedRedvoxPacketM
-
read json from a file representing an api 1000 packet :param json_path: the path to the file to read :return: wrapped redvox packet api 1000
Expand source code
@staticmethod def from_json_path(json_path: str) -> "WrappedRedvoxPacketM": """ read json from a file representing an api 1000 packet :param json_path: the path to the file to read :return: wrapped redvox packet api 1000 """ with open(json_path, "r") as json_in: return WrappedRedvoxPacketM.from_json(json_in.read())
def new() ‑> WrappedRedvoxPacketM
-
Returns a new default instance of a WrappedRedvoxPacketApi1000. :return: A new default instance of a WrappedRedvoxPacketApi1000.
Expand source code
@staticmethod def new() -> "WrappedRedvoxPacketM": """ Returns a new default instance of a WrappedRedvoxPacketApi1000. :return: A new default instance of a WrappedRedvoxPacketApi1000. """ return WrappedRedvoxPacketM(RedvoxPacketM())
Methods
def default_file_dir(self) ‑> str
-
Computes the default file directory structure for a structured layout for this particular packet. :return: Directory structure for this packet when using structured layout
Expand source code
def default_file_dir(self) -> str: """ Computes the default file directory structure for a structured layout for this particular packet. :return: Directory structure for this packet when using structured layout """ timestamp: float = ( self.get_timing_information().get_packet_start_mach_timestamp() ) date_time: datetime = dt_utils.datetime_from_epoch_microseconds_utc(timestamp) year: str = f"{date_time.year:0>4}" month: str = f"{date_time.month:0>2}" day: str = f"{date_time.day:0>2}" hour: str = f"{date_time.hour:0>2}" return os.path.join(year, month, day, hour)
def default_file_path(self) ‑> str
-
Computes the default directory structure and file name for this packet for structured layouts. :return: The default directory structure and file name for this packet for structured layouts.
Expand source code
def default_file_path(self) -> str: """ Computes the default directory structure and file name for this packet for structured layouts. :return: The default directory structure and file name for this packet for structured layouts. """ return os.path.join(self.default_file_dir(), self.default_filename())
def default_filename(self, extension: Optional[str] = 'rdvxm') ‑> str
-
Returns the default filename for a given packet. :param extension: An (optional) file extension to add to the default file name. :return: The default filename for this packet.
Expand source code
def default_filename(self, extension: Optional[str] = "rdvxm") -> str: """ Returns the default filename for a given packet. :param extension: An (optional) file extension to add to the default file name. :return: The default filename for this packet. """ # Format to be exactly 10 characters station_id: str = f"{self.get_station_information().get_id():0>10}" timestamp: int = round( self.get_timing_information().get_packet_start_mach_timestamp() ) filename: str = f"{station_id}_{timestamp}" if extension is not None: filename = f"{filename}.{extension}" return filename
def get_api(self) ‑> float
-
Returns the API version of this packet. :return: The API version of this packet.
Expand source code
def get_api(self) -> float: """ Returns the API version of this packet. :return: The API version of this packet. """ return self._proto.api
def get_event_streams(self) ‑> ProtoRepeatedMessage
-
:return: A collection of event streams.
Expand source code
def get_event_streams(self) -> ProtoRepeatedMessage: """ :return: A collection of event streams. """ return self._event_streams
def get_packet_duration(self) ‑> datetime.timedelta
-
:return: Packet duration as a timedelta
Expand source code
def get_packet_duration(self) -> timedelta: """ :return: Packet duration as a timedelta """ audio = self.get_sensors().get_audio() if audio is not None: return audio.get_duration() else: return timedelta(seconds=0)
def get_packet_duration_s(self) ‑> float
-
get the packet duration in seconds from the audio data :return: packet duration in seconds
Expand source code
def get_packet_duration_s(self) -> float: """ get the packet duration in seconds from the audio data :return: packet duration in seconds """ if self.get_sensors().has_audio(): return self.get_sensors().get_audio().get_duration_s() else: return 0.0
def get_sensors(self) ‑> Sensors
-
:return: An instance of Sensors
Expand source code
def get_sensors(self) -> _sensors.Sensors: """ :return: An instance of Sensors """ return self._sensors
def get_station_information(self) ‑> StationInformation
-
:return: An instance of StationInformation
Expand source code
def get_station_information(self) -> _station_information.StationInformation: """ :return: An instance of StationInformation """ return self._station_information
def get_sub_api(self) ‑> float
-
Returns the sub_api version. This version tracks small changes within API 1000. :return: The sub_api version. This version tracks small changes within API 1000.
Expand source code
def get_sub_api(self) -> float: """ Returns the sub_api version. This version tracks small changes within API 1000. :return: The sub_api version. This version tracks small changes within API 1000. """ return self._proto.sub_api
def get_timing_information(self) ‑> TimingInformation
-
:return: An instance of TimingInformation
Expand source code
def get_timing_information(self) -> _timing_information.TimingInformation: """ :return: An instance of TimingInformation """ return self._timing_information
def set_api(self, api: float) ‑> WrappedRedvoxPacketM
-
Sets the api version of this packet. :param api: The API version (should be 1000.0) :return: The modified instance of this packet
Expand source code
def set_api(self, api: float) -> "WrappedRedvoxPacketM": """ Sets the api version of this packet. :param api: The API version (should be 1000.0) :return: The modified instance of this packet """ redvox.api1000.common.typing.check_type(api, [int, float]) self._proto.api = api return self
def set_event_streams(self, event_streams: ProtoRepeatedMessage) ‑> WrappedRedvoxPacketM
-
Set the event streams from the provided ProtoRepeatedMessage. :param event_streams: EventStreams embedded in a ProtoRepeatedMessage. :return: A modified instance of self.
Expand source code
def set_event_streams( self, event_streams: ProtoRepeatedMessage ) -> "WrappedRedvoxPacketM": """ Set the event streams from the provided ProtoRepeatedMessage. :param event_streams: EventStreams embedded in a ProtoRepeatedMessage. :return: A modified instance of self. """ check_type(event_streams, [ProtoRepeatedMessage]) self._event_streams.clear_values() self._event_streams.append_values(event_streams.get_values()) return self
def set_sensors(self, sensors: Sensors) ‑> WrappedRedvoxPacketM
-
Sets the sensors. :param sensors: Sensors to set. :return: A modified instance of self.
Expand source code
def set_sensors(self, sensors: _sensors.Sensors) -> "WrappedRedvoxPacketM": """ Sets the sensors. :param sensors: Sensors to set. :return: A modified instance of self. """ check_type(sensors, [_sensors.Sensors]) self.get_proto().sensors.CopyFrom(sensors.get_proto()) self._sensors = _sensors.Sensors(self.get_proto().sensors) return self
def set_station_information(self, station_information: StationInformation) ‑> WrappedRedvoxPacketM
-
Sets the StationInformation. :param station_information: StationInformation to set. :return: A modified instance of self.
Expand source code
def set_station_information( self, station_information: _station_information.StationInformation ) -> "WrappedRedvoxPacketM": """ Sets the StationInformation. :param station_information: StationInformation to set. :return: A modified instance of self. """ check_type(station_information, [_station_information.StationInformation]) self.get_proto().station_information.CopyFrom(station_information.get_proto()) self._station_information = _station_information.StationInformation( self.get_proto().station_information ) return self
def set_sub_api(self, sub_api: float) ‑> WrappedRedvoxPacketM
-
Sets the sub_api. :param sub_api: sub_api to set. :return: A modified instance of self.
Expand source code
def set_sub_api(self, sub_api: float) -> "WrappedRedvoxPacketM": """ Sets the sub_api. :param sub_api: sub_api to set. :return: A modified instance of self. """ redvox.api1000.common.typing.check_type(sub_api, [int, float]) self._proto.sub_api = sub_api return self
def set_timing_information(self, timing_information: TimingInformation) ‑> WrappedRedvoxPacketM
-
Sets the timing information. :param timing_information: TimingInformation to set. :return: A modified instance of self.
Expand source code
def set_timing_information( self, timing_information: _timing_information.TimingInformation ) -> "WrappedRedvoxPacketM": """ Sets the timing information. :param timing_information: TimingInformation to set. :return: A modified instance of self. """ check_type(timing_information, [_timing_information.TimingInformation]) self.get_proto().timing_information.CopyFrom(timing_information.get_proto()) self._timing_information = _timing_information.TimingInformation( self.get_proto().timing_information ) return self
def update_timestamps(self, delta_offset: float = None) ‑> WrappedRedvoxPacketM
-
update all timestamps in the packet by adding the delta offset :param delta_offset: amount of microseconds to add to existing timestamps :return: WrappedRedvoxPacketM with updated timestamps
Expand source code
def update_timestamps(self, delta_offset: float = None) -> "WrappedRedvoxPacketM": """ update all timestamps in the packet by adding the delta offset :param delta_offset: amount of microseconds to add to existing timestamps :return: WrappedRedvoxPacketM with updated timestamps """ if delta_offset is None: delta_offset = self.get_timing_information().get_best_offset() redvox.api1000.common.typing.check_type(delta_offset, [float]) # create new wrapped packet to hold changed data updated = WrappedRedvoxPacketM(self.get_proto()) if self.get_proto().HasField("timing_information"): # update timing information timestamps updated.get_timing_information().set_packet_start_mach_timestamp( self.get_timing_information().get_packet_start_mach_timestamp() + delta_offset ) updated.get_timing_information().set_packet_end_mach_timestamp( self.get_timing_information().get_packet_end_mach_timestamp() + delta_offset ) updated.get_timing_information().set_packet_start_os_timestamp( self.get_timing_information().get_packet_start_os_timestamp() + delta_offset ) updated.get_timing_information().set_packet_end_os_timestamp( self.get_timing_information().get_packet_end_os_timestamp() + delta_offset ) updated.get_timing_information().set_app_start_mach_timestamp( self.get_timing_information().get_app_start_mach_timestamp() + delta_offset ) if self.get_sensors().get_proto().HasField("audio"): # update audio first sample timestamp updated.get_sensors().get_audio().set_first_sample_timestamp( self.get_sensors().get_audio().get_first_sample_timestamp() + delta_offset ) # todo if self.get_sensors().get_proto().HasField("compressed_audio"): # update compressed audio first sample timestamp # update timestamp payloads if ( self.get_station_information() .get_station_metrics() .get_proto() .HasField("timestamps") ): updated.get_station_information().get_station_metrics().get_timestamps().set_timestamps( self.get_station_information() .get_station_metrics() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_accelerometer().get_proto().HasField("timestamps"): updated.get_sensors().get_accelerometer().get_timestamps().set_timestamps( self.get_sensors().get_accelerometer().get_timestamps().get_timestamps() + delta_offset, True, ) if ( self.get_sensors() .get_ambient_temperature() .get_proto() .HasField("timestamps") ): updated.get_sensors().get_ambient_temperature().get_timestamps().set_timestamps( self.get_sensors() .get_ambient_temperature() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_gravity().get_proto().HasField("timestamps"): updated.get_sensors().get_gravity().get_timestamps().set_timestamps( self.get_sensors().get_gravity().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_gyroscope().get_proto().HasField("timestamps"): updated.get_sensors().get_gyroscope().get_timestamps().set_timestamps( self.get_sensors().get_gyroscope().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_image().get_proto().HasField("timestamps"): updated.get_sensors().get_image().get_timestamps().set_timestamps( self.get_sensors().get_image().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_light().get_proto().HasField("timestamps"): updated.get_sensors().get_light().get_timestamps().set_timestamps( self.get_sensors().get_light().get_timestamps().get_timestamps() + delta_offset, True, ) if ( self.get_sensors() .get_linear_acceleration() .get_proto() .HasField("timestamps") ): updated.get_sensors().get_linear_acceleration().get_timestamps().set_timestamps( self.get_sensors() .get_linear_acceleration() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_location().get_proto().HasField("timestamps"): updated.get_sensors().get_location().get_timestamps().set_timestamps( self.get_sensors().get_location().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_magnetometer().get_proto().HasField("timestamps"): updated.get_sensors().get_magnetometer().get_timestamps().set_timestamps( self.get_sensors().get_magnetometer().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_orientation().get_proto().HasField("timestamps"): updated.get_sensors().get_orientation().get_timestamps().set_timestamps( self.get_sensors().get_orientation().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_pressure().get_proto().HasField("timestamps"): updated.get_sensors().get_pressure().get_timestamps().set_timestamps( self.get_sensors().get_pressure().get_timestamps().get_timestamps() + delta_offset, True, ) if self.get_sensors().get_proximity().get_proto().HasField("timestamps"): updated.get_sensors().get_proximity().get_timestamps().set_timestamps( self.get_sensors().get_proximity().get_timestamps().get_timestamps() + delta_offset, True, ) if ( self.get_sensors() .get_relative_humidity() .get_proto() .HasField("timestamps") ): updated.get_sensors().get_relative_humidity().get_timestamps().set_timestamps( self.get_sensors() .get_relative_humidity() .get_timestamps() .get_timestamps() + delta_offset, True, ) if self.get_sensors().get_rotation_vector().get_proto().HasField("timestamps"): updated.get_sensors().get_rotation_vector().get_timestamps().set_timestamps( self.get_sensors() .get_rotation_vector() .get_timestamps() .get_timestamps() + delta_offset, True, ) return updated
def validate(self) ‑> List[str]
-
Validates this packet. :return: A list of validation errors.
Expand source code
def validate(self) -> List[str]: """ Validates this packet. :return: A list of validation errors. """ return validate_wrapped_packet(self)
def write_compressed_to_file(self, base_dir: str, filename: Optional[str] = None) ‑> str
-
Writes this packet to a .rdvxm file. :param base_dir: Directory to write .rdvxm to. :param filename: The (optional) file name to use. Will use default file name otherwise. :return: Path of written file.
Expand source code
def write_compressed_to_file( self, base_dir: str, filename: Optional[str] = None ) -> str: """ Writes this packet to a .rdvxm file. :param base_dir: Directory to write .rdvxm to. :param filename: The (optional) file name to use. Will use default file name otherwise. :return: Path of written file. """ if filename is None: filename = self.default_filename("rdvxm") if not os.path.isdir(base_dir): raise errors.WrappedRedvoxPacketMError( f"Base directory={base_dir} does not exist." ) out_path: str = os.path.join(base_dir, filename) with open(out_path, "wb") as compressed_out: compressed_out.write(self.as_compressed_bytes()) return out_path
def write_json_to_file(self, base_dir: str, filename: Optional[str] = None) ‑> str
-
Writes this packet to a .json file. :param base_dir: Directory to write .json to. :param filename: The (optional) file name to use. Will use default file name otherwise. :return: Path of written file.
Expand source code
def write_json_to_file(self, base_dir: str, filename: Optional[str] = None) -> str: """ Writes this packet to a .json file. :param base_dir: Directory to write .json to. :param filename: The (optional) file name to use. Will use default file name otherwise. :return: Path of written file. """ if filename is None: filename = self.default_filename("json") if not os.path.isdir(base_dir): raise errors.WrappedRedvoxPacketMError( f"Base directory={base_dir} does not exist." ) out_path: str = os.path.join(base_dir, filename) with open(out_path, "w") as json_out: json_out.write(self.as_json()) return out_path
Inherited members