Module redvox.common.station_utils
Defines generic station metadata for API-independent analysis all timestamps are floats in microseconds unless otherwise stated
Expand source code
"""
Defines generic station metadata for API-independent analysis
all timestamps are floats in microseconds unless otherwise stated
"""
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from typing import Tuple, Optional, List, Dict
import numpy as np
from redvox.common.offset_model import OffsetModel
from redvox.api1000.wrapped_redvox_packet.station_information import OsType
from redvox.api1000.wrapped_redvox_packet.timing_information import TimingScoreMethod
from redvox.common.errors import RedVoxExceptions
import redvox.api1000.proto.redvox_api_m_pb2 as api_m
def validate_station_key_list(data_packets: List[api_m.RedvoxPacketM], errors: RedVoxExceptions) -> bool:
"""
Checks for consistency in the data packets. Returns False if discrepancies are found.
If debug is True, will output the discrepancies.
:param data_packets: list of RedvoxPacketM to look at
:param errors: RedVoxExceptions detailing errors found while validating
:return: True if no discrepancies found. False otherwise
"""
my_errors = RedVoxExceptions("StationKeyValidation")
if len(data_packets) < 2:
return True
j: np.ndarray = np.transpose(
[
[
t.station_information.id,
t.station_information.uuid,
t.timing_information.app_start_mach_timestamp,
t.api,
t.sub_api,
t.station_information.make,
t.station_information.model,
t.station_information.os,
t.station_information.os_version,
t.station_information.app_version,
t.station_information.is_private,
len(t.sensors.audio.samples.values) / t.sensors.audio.sample_rate,
]
for t in data_packets
]
)
k: Dict[str, np.ndarray] = {
"ids": j[0],
"uuids": j[1],
"station_start_times": j[2],
"apis": j[3],
"sub_apis": j[4],
"makes": j[5],
"models": j[6],
"os": j[7],
"os_versions": j[8],
"app_versions": j[9],
"privates": j[10],
"durations": j[11],
}
for key, value in k.items():
result = np.unique(value)
if len(result) > 1:
my_errors.append(
f"WARNING: {data_packets[0].station_information.id} "
f"{key} contains multiple unique values: {result}.\n"
"Please update your query to focus on one of these values."
)
if my_errors.get_num_errors() > 0:
errors.extend_error(my_errors)
return False
return True # if here, everything is consistent
@dataclass_json
@dataclass
class StationKey:
"""
A set of values that uniquely define a station
Properties:
id: str, id of the station
uuid: str, uuid of the station
start_timestamp_micros: float, starting time of the station in microseconds since epoch UTC
"""
id: str
uuid: str
start_timestamp_micros: float
def __repr__(self):
return f"id:{self.id}, uuid:{self.uuid}, start_date:{self.start_timestamp_micros}"
def get_key(self) -> Tuple[str, str, float]:
"""
:return: the key as a tuple
"""
return self.id, self.uuid, self.start_timestamp_micros
def check_key(
self,
station_id: Optional[str] = None,
station_uuid: Optional[str] = None,
start_timestamp: Optional[float] = None,
) -> bool:
"""
Check if the key has the values specified. If the parameter is None, any value will match.
Note that NAN is a valid value for start_timestamps, but any station with start_timestamp = NAN
will not match any value, including another NAN.
:param station_id: station id, default None
:param station_uuid: station uuid, default None
:param start_timestamp: station start timestamp in microseconds since UTC epoch, default None
:return: True if all parameters match key values
"""
if (
(station_id is not None and station_id != self.id)
or (station_uuid is not None and station_uuid != self.uuid)
or (
start_timestamp is not None
and (
start_timestamp != self.start_timestamp_micros
or np.isnan(start_timestamp)
or np.isnan(self.start_timestamp_micros)
)
)
):
return False
return True
def compare_key(self, other_key: "StationKey") -> bool:
"""
compare key to another station's key
:param other_key: another station's key
:return: True if the keys match
"""
return self.check_key(other_key.id, other_key.uuid, other_key.start_timestamp_micros)
def as_cloud_key(self) -> str:
"""
:return: key in format used by cloud session models
"""
return f"{self.id}:{self.uuid}:{int(self.start_timestamp_micros)}"
class StationMetadata:
"""
A container for all the packet metadata consistent across all packets
Properties:
api: float, api version, default np.nan
sub_api: float, sub api version, default np.nan
make: str, station make, default empty string
model: str, station model, default empty string
os: OsType enum, station OS, default OsType.UNKNOWN_OS
os_version: str, station OS version, default empty string
app: str, station app, default empty string
app_version: str, station app version, default empty string
is_private: bool, is station data private, default False
packet_duration_s: float, duration of the packet in seconds, default np.nan
station_description: str, description of the station, default empty string
other_metadata: dict, str: str of other metadata from the packet, default empty list
"""
def __init__(self, app: str = "", packet: Optional[api_m.RedvoxPacketM] = None):
"""
initialize the metadata
:param app: app name
:param packet: Optional RedvoxPacketM to read data from
"""
self.app = app
self.other_metadata = {}
if packet:
self.api = packet.api
self.sub_api = packet.sub_api
self.make = packet.station_information.make
self.model = packet.station_information.model
self.os: OsType = OsType(packet.station_information.os)
self.os_version = packet.station_information.os_version
self.app_version = packet.station_information.app_version
self.is_private = packet.station_information.is_private
self.packet_duration_s = len(packet.sensors.audio.samples.values) / packet.sensors.audio.sample_rate
self.station_description = packet.station_information.description
else:
self.api = np.nan
self.sub_api = np.nan
self.make = ""
self.model = ""
self.os: OsType = OsType["UNKNOWN_OS"]
self.os_version = ""
self.app_version = ""
self.is_private = False
self.packet_duration_s = np.nan
self.station_description = ""
def __repr__(self):
return (
f"app: {self.app}, "
f"api: {self.api}, "
f"sub_api: {self.sub_api}, "
f"make: {self.make}, "
f"model: {self.model}, "
f"os: {self.os.value}, "
f"os_version: {self.os_version}, "
f"app_version: {self.app_version}, "
f"is_private: {self.is_private}, "
f"packet_duration_s: {self.packet_duration_s}, "
f"station_description: {self.station_description}"
)
def __str__(self):
return (
f"app: {self.app}, "
f"api: {self.api}, "
f"sub_api: {self.sub_api}, "
f"make: {self.make}, "
f"model: {self.model}, "
f"os: {self.os.name}, "
f"os_version: {self.os_version}, "
f"app_version: {self.app_version}, "
f"is_private: {self.is_private}, "
f"packet_duration_s: {self.packet_duration_s}, "
f"station_description: {self.station_description}"
)
def validate_metadata(self, other_metadata: "StationMetadata") -> bool:
"""
:param other_metadata: another StationMetadata object to compare
:return: True if other_metadata is equal to the calling metadata
"""
return (
self.app == other_metadata.app
and self.api == other_metadata.api
and self.sub_api == other_metadata.sub_api
and self.make == other_metadata.make
and self.model == other_metadata.model
and self.os == other_metadata.os
and self.os_version == other_metadata.os_version
and self.app_version == other_metadata.app_version
and self.is_private == other_metadata.is_private
and self.packet_duration_s == other_metadata.packet_duration_s
and self.station_description == other_metadata.station_description
)
def as_dict(self) -> dict:
"""
:return: dictionary representation of the metadata
"""
return {
"app": self.app,
"api": self.api,
"sub_api": self.sub_api,
"make": self.make,
"model": self.model,
"os": self.os.name,
"os_version": self.os_version,
"app_version": self.app_version,
"is_private": self.is_private,
"packet_duration_s": self.packet_duration_s,
"station_description": self.station_description,
"other_metadata": self.other_metadata,
}
@staticmethod
def from_dict(md_dict: dict) -> "StationMetadata":
"""
:param md_dict: metadata dictionary
:return: StationMetadata from dictionary
"""
result = StationMetadata(md_dict["app"])
result.api = md_dict["api"]
result.sub_api = md_dict["sub_api"]
result.make = md_dict["make"]
result.model = md_dict["model"]
result.os = OsType[md_dict["os"]]
result.os_version = md_dict["os_version"]
result.app_version = md_dict["app_version"]
result.is_private = md_dict["is_private"]
result.packet_duration_s = md_dict["packet_duration_s"]
result.station_description = md_dict["station_description"]
result.other_metadata = md_dict["other_metadata"]
return result
class StationPacketMetadata:
"""
A container for all the packet metadata that isn't consistent across all packets
Properties:
packet_start_mach_timestamp: float, machine timestamp of packet start in microseconds since epoch UTC,
default np.nan
packet_end_mach_timestamp: float, machine timestamp of packet end in microseconds since epoch UTC,
default np.nan
packet_start_os_timestamp: float, os timestamp of packet start in microseconds since epoch UTC, default np.nan
packet_end_os_timestamp: float, os timestamp of packet end in microseconds since epoch UTC, default np.nan
server_packet_received_timestamp: float, timestamp from server when packet was received in
microseconds since epoch UTC
timing_info_score: float, quality of timing information, default np.nan
timing_score_method: TimingScoreMethod, method used to determine timing score, default "UNKNOWN"
other_metadata: dict, str: str of other metadata from the packet, default empty list
"""
def __init__(self, packet: Optional[api_m.RedvoxPacketM] = None):
"""
initialize the metadata
:param packet: Optional RedvoxPacketM to read data from
"""
self.other_metadata = {}
if packet:
self.packet_start_mach_timestamp = packet.timing_information.packet_start_mach_timestamp
self.packet_end_mach_timestamp = packet.timing_information.packet_end_mach_timestamp
self.packet_start_os_timestamp = packet.timing_information.packet_start_os_timestamp
self.packet_end_os_timestamp = packet.timing_information.packet_end_os_timestamp
self.server_packet_receive_timestamp = packet.timing_information.server_acquisition_arrival_timestamp
self.timing_info_score = packet.timing_information.score
self.timing_score_method = TimingScoreMethod(packet.timing_information.score_method)
else:
self.packet_start_mach_timestamp = np.nan
self.packet_end_mach_timestamp = np.nan
self.packet_start_os_timestamp = np.nan
self.packet_end_os_timestamp = np.nan
self.server_packet_receive_timestamp = np.nan
self.timing_info_score = np.nan
self.timing_score_method = TimingScoreMethod["UNKNOWN"]
def __repr__(self):
return (
f"packet_start_mach_timestamp: {self.packet_start_mach_timestamp}, "
f"packet_end_mach_timestamp: {self.packet_end_mach_timestamp}, "
f"packet_start_os_timestamp: {self.packet_start_os_timestamp}, "
f"packet_end_os_timestamp: {self.packet_end_os_timestamp}, "
f"server_packet_receive_timestamp: {self.server_packet_receive_timestamp}, "
f"timing_info_score: {self.timing_info_score}, "
f"timing_score_method: {self.timing_score_method.value}"
)
def __str__(self):
return (
f"packet_start_mach_timestamp: {self.packet_start_mach_timestamp}, "
f"packet_end_mach_timestamp: {self.packet_end_mach_timestamp}, "
f"packet_start_os_timestamp: {self.packet_start_os_timestamp}, "
f"packet_end_os_timestamp: {self.packet_end_os_timestamp}, "
f"server_packet_receive_timestamp: {self.server_packet_receive_timestamp}, "
f"timing_info_score: {self.timing_info_score}, "
f"timing_score_method: {self.timing_score_method.name}"
)
def update_timestamps(self, om: OffsetModel, use_model_function: bool = True):
"""
updates the timestamps in the metadata using the offset model
:param om: OffsetModel to apply to data
:param use_model_function: if True, use the offset model's correction function to correct time,
otherwise use best offset (model's intercept value). default True
"""
self.packet_start_mach_timestamp = om.update_time(self.packet_start_mach_timestamp, use_model_function)
self.packet_end_mach_timestamp = om.update_time(self.packet_end_mach_timestamp, use_model_function)
self.packet_start_os_timestamp = om.update_time(self.packet_start_os_timestamp, use_model_function)
self.packet_end_os_timestamp = om.update_time(self.packet_end_os_timestamp, use_model_function)
def original_timestamps(self, om: OffsetModel, use_model_function: bool = True):
"""
undo the updates to the timestamps in the metadata using the offset model
:param om: OffsetModel to apply to data
:param use_model_function: if True, use the offset model's correction function to correct time,
otherwise use best offset (model's intercept value). default True
"""
self.packet_start_mach_timestamp = om.get_original_time(self.packet_start_mach_timestamp, use_model_function)
self.packet_end_mach_timestamp = om.get_original_time(self.packet_end_mach_timestamp, use_model_function)
self.packet_start_os_timestamp = om.get_original_time(self.packet_start_os_timestamp, use_model_function)
self.packet_end_os_timestamp = om.get_original_time(self.packet_end_os_timestamp, use_model_function)
def as_dict(self) -> dict:
"""
:return: dictionary representation of the packet metadata
"""
return {
"packet_start_mach_timestamp": self.packet_start_mach_timestamp,
"packet_end_mach_timestamp": self.packet_end_mach_timestamp,
"packet_start_os_timestamp": self.packet_start_os_timestamp,
"packet_end_os_timestamp": self.packet_end_os_timestamp,
"server_packet_receive_timestamp": self.server_packet_receive_timestamp,
"timing_info_score": self.timing_info_score,
"timing_score_method": self.timing_score_method.name,
"other_metadata": self.other_metadata,
}
@staticmethod
def from_dict(pmd_dict: dict) -> "StationPacketMetadata":
"""
:param pmd_dict: dictionary with StationPacketMetadata
:return: StationPacketMetadata from dictionary
"""
result = StationPacketMetadata()
result.other_metadata = pmd_dict["other_metadata"]
result.packet_start_mach_timestamp = pmd_dict["packet_start_mach_timestamp"]
result.packet_end_mach_timestamp = pmd_dict["packet_end_mach_timestamp"]
result.packet_start_os_timestamp = pmd_dict["packet_start_os_timestamp"]
result.packet_end_os_timestamp = pmd_dict["packet_end_os_timestamp"]
result.server_packet_receive_timestamp = pmd_dict["server_packet_receive_timestamp"]
result.timing_info_score = pmd_dict["timing_info_score"]
result.timing_score_method = TimingScoreMethod[
pmd_dict["timing_score_method"] if "timing_score_method" in pmd_dict.keys() else "UNKNOWN"
]
return result
Functions
def validate_station_key_list(data_packets: List[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM], errors: RedVoxExceptions) ‑> bool
-
Checks for consistency in the data packets. Returns False if discrepancies are found. If debug is True, will output the discrepancies.
:param data_packets: list of RedvoxPacketM to look at :param errors: RedVoxExceptions detailing errors found while validating :return: True if no discrepancies found. False otherwise
Expand source code
def validate_station_key_list(data_packets: List[api_m.RedvoxPacketM], errors: RedVoxExceptions) -> bool: """ Checks for consistency in the data packets. Returns False if discrepancies are found. If debug is True, will output the discrepancies. :param data_packets: list of RedvoxPacketM to look at :param errors: RedVoxExceptions detailing errors found while validating :return: True if no discrepancies found. False otherwise """ my_errors = RedVoxExceptions("StationKeyValidation") if len(data_packets) < 2: return True j: np.ndarray = np.transpose( [ [ t.station_information.id, t.station_information.uuid, t.timing_information.app_start_mach_timestamp, t.api, t.sub_api, t.station_information.make, t.station_information.model, t.station_information.os, t.station_information.os_version, t.station_information.app_version, t.station_information.is_private, len(t.sensors.audio.samples.values) / t.sensors.audio.sample_rate, ] for t in data_packets ] ) k: Dict[str, np.ndarray] = { "ids": j[0], "uuids": j[1], "station_start_times": j[2], "apis": j[3], "sub_apis": j[4], "makes": j[5], "models": j[6], "os": j[7], "os_versions": j[8], "app_versions": j[9], "privates": j[10], "durations": j[11], } for key, value in k.items(): result = np.unique(value) if len(result) > 1: my_errors.append( f"WARNING: {data_packets[0].station_information.id} " f"{key} contains multiple unique values: {result}.\n" "Please update your query to focus on one of these values." ) if my_errors.get_num_errors() > 0: errors.extend_error(my_errors) return False return True # if here, everything is consistent
Classes
class StationKey (id: str, uuid: str, start_timestamp_micros: float)
-
A set of values that uniquely define a station
Properties
id: str, id of the station
uuid: str, uuid of the station
start_timestamp_micros: float, starting time of the station in microseconds since epoch UTC
Expand source code
@dataclass_json @dataclass class StationKey: """ A set of values that uniquely define a station Properties: id: str, id of the station uuid: str, uuid of the station start_timestamp_micros: float, starting time of the station in microseconds since epoch UTC """ id: str uuid: str start_timestamp_micros: float def __repr__(self): return f"id:{self.id}, uuid:{self.uuid}, start_date:{self.start_timestamp_micros}" def get_key(self) -> Tuple[str, str, float]: """ :return: the key as a tuple """ return self.id, self.uuid, self.start_timestamp_micros def check_key( self, station_id: Optional[str] = None, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None, ) -> bool: """ Check if the key has the values specified. If the parameter is None, any value will match. Note that NAN is a valid value for start_timestamps, but any station with start_timestamp = NAN will not match any value, including another NAN. :param station_id: station id, default None :param station_uuid: station uuid, default None :param start_timestamp: station start timestamp in microseconds since UTC epoch, default None :return: True if all parameters match key values """ if ( (station_id is not None and station_id != self.id) or (station_uuid is not None and station_uuid != self.uuid) or ( start_timestamp is not None and ( start_timestamp != self.start_timestamp_micros or np.isnan(start_timestamp) or np.isnan(self.start_timestamp_micros) ) ) ): return False return True def compare_key(self, other_key: "StationKey") -> bool: """ compare key to another station's key :param other_key: another station's key :return: True if the keys match """ return self.check_key(other_key.id, other_key.uuid, other_key.start_timestamp_micros) def as_cloud_key(self) -> str: """ :return: key in format used by cloud session models """ return f"{self.id}:{self.uuid}:{int(self.start_timestamp_micros)}"
Class variables
var id : str
var start_timestamp_micros : float
var uuid : str
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def as_cloud_key(self) ‑> str
-
:return: key in format used by cloud session models
Expand source code
def as_cloud_key(self) -> str: """ :return: key in format used by cloud session models """ return f"{self.id}:{self.uuid}:{int(self.start_timestamp_micros)}"
def check_key(self, station_id: Optional[str] = None, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None) ‑> bool
-
Check if the key has the values specified. If the parameter is None, any value will match. Note that NAN is a valid value for start_timestamps, but any station with start_timestamp = NAN will not match any value, including another NAN.
:param station_id: station id, default None :param station_uuid: station uuid, default None :param start_timestamp: station start timestamp in microseconds since UTC epoch, default None :return: True if all parameters match key values
Expand source code
def check_key( self, station_id: Optional[str] = None, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None, ) -> bool: """ Check if the key has the values specified. If the parameter is None, any value will match. Note that NAN is a valid value for start_timestamps, but any station with start_timestamp = NAN will not match any value, including another NAN. :param station_id: station id, default None :param station_uuid: station uuid, default None :param start_timestamp: station start timestamp in microseconds since UTC epoch, default None :return: True if all parameters match key values """ if ( (station_id is not None and station_id != self.id) or (station_uuid is not None and station_uuid != self.uuid) or ( start_timestamp is not None and ( start_timestamp != self.start_timestamp_micros or np.isnan(start_timestamp) or np.isnan(self.start_timestamp_micros) ) ) ): return False return True
def compare_key(self, other_key: StationKey) ‑> bool
-
compare key to another station's key
:param other_key: another station's key :return: True if the keys match
Expand source code
def compare_key(self, other_key: "StationKey") -> bool: """ compare key to another station's key :param other_key: another station's key :return: True if the keys match """ return self.check_key(other_key.id, other_key.uuid, other_key.start_timestamp_micros)
def get_key(self) ‑> Tuple[str, str, float]
-
:return: the key as a tuple
Expand source code
def get_key(self) -> Tuple[str, str, float]: """ :return: the key as a tuple """ return self.id, self.uuid, self.start_timestamp_micros
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class StationMetadata (app: str = '', packet: Optional[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM] = None)
-
A container for all the packet metadata consistent across all packets
Properties
api: float, api version, default np.nan
sub_api: float, sub api version, default np.nan
make: str, station make, default empty string
model: str, station model, default empty string
os: OsType enum, station OS, default OsType.UNKNOWN_OS
os_version: str, station OS version, default empty string
app: str, station app, default empty string
app_version: str, station app version, default empty string
is_private: bool, is station data private, default False
packet_duration_s: float, duration of the packet in seconds, default np.nan
station_description: str, description of the station, default empty string
other_metadata: dict, str: str of other metadata from the packet, default empty list
initialize the metadata
:param app: app name :param packet: Optional RedvoxPacketM to read data from
Expand source code
class StationMetadata: """ A container for all the packet metadata consistent across all packets Properties: api: float, api version, default np.nan sub_api: float, sub api version, default np.nan make: str, station make, default empty string model: str, station model, default empty string os: OsType enum, station OS, default OsType.UNKNOWN_OS os_version: str, station OS version, default empty string app: str, station app, default empty string app_version: str, station app version, default empty string is_private: bool, is station data private, default False packet_duration_s: float, duration of the packet in seconds, default np.nan station_description: str, description of the station, default empty string other_metadata: dict, str: str of other metadata from the packet, default empty list """ def __init__(self, app: str = "", packet: Optional[api_m.RedvoxPacketM] = None): """ initialize the metadata :param app: app name :param packet: Optional RedvoxPacketM to read data from """ self.app = app self.other_metadata = {} if packet: self.api = packet.api self.sub_api = packet.sub_api self.make = packet.station_information.make self.model = packet.station_information.model self.os: OsType = OsType(packet.station_information.os) self.os_version = packet.station_information.os_version self.app_version = packet.station_information.app_version self.is_private = packet.station_information.is_private self.packet_duration_s = len(packet.sensors.audio.samples.values) / packet.sensors.audio.sample_rate self.station_description = packet.station_information.description else: self.api = np.nan self.sub_api = np.nan self.make = "" self.model = "" self.os: OsType = OsType["UNKNOWN_OS"] self.os_version = "" self.app_version = "" self.is_private = False self.packet_duration_s = np.nan self.station_description = "" def __repr__(self): return ( f"app: {self.app}, " f"api: {self.api}, " f"sub_api: {self.sub_api}, " f"make: {self.make}, " f"model: {self.model}, " f"os: {self.os.value}, " f"os_version: {self.os_version}, " f"app_version: {self.app_version}, " f"is_private: {self.is_private}, " f"packet_duration_s: {self.packet_duration_s}, " f"station_description: {self.station_description}" ) def __str__(self): return ( f"app: {self.app}, " f"api: {self.api}, " f"sub_api: {self.sub_api}, " f"make: {self.make}, " f"model: {self.model}, " f"os: {self.os.name}, " f"os_version: {self.os_version}, " f"app_version: {self.app_version}, " f"is_private: {self.is_private}, " f"packet_duration_s: {self.packet_duration_s}, " f"station_description: {self.station_description}" ) def validate_metadata(self, other_metadata: "StationMetadata") -> bool: """ :param other_metadata: another StationMetadata object to compare :return: True if other_metadata is equal to the calling metadata """ return ( self.app == other_metadata.app and self.api == other_metadata.api and self.sub_api == other_metadata.sub_api and self.make == other_metadata.make and self.model == other_metadata.model and self.os == other_metadata.os and self.os_version == other_metadata.os_version and self.app_version == other_metadata.app_version and self.is_private == other_metadata.is_private and self.packet_duration_s == other_metadata.packet_duration_s and self.station_description == other_metadata.station_description ) def as_dict(self) -> dict: """ :return: dictionary representation of the metadata """ return { "app": self.app, "api": self.api, "sub_api": self.sub_api, "make": self.make, "model": self.model, "os": self.os.name, "os_version": self.os_version, "app_version": self.app_version, "is_private": self.is_private, "packet_duration_s": self.packet_duration_s, "station_description": self.station_description, "other_metadata": self.other_metadata, } @staticmethod def from_dict(md_dict: dict) -> "StationMetadata": """ :param md_dict: metadata dictionary :return: StationMetadata from dictionary """ result = StationMetadata(md_dict["app"]) result.api = md_dict["api"] result.sub_api = md_dict["sub_api"] result.make = md_dict["make"] result.model = md_dict["model"] result.os = OsType[md_dict["os"]] result.os_version = md_dict["os_version"] result.app_version = md_dict["app_version"] result.is_private = md_dict["is_private"] result.packet_duration_s = md_dict["packet_duration_s"] result.station_description = md_dict["station_description"] result.other_metadata = md_dict["other_metadata"] return result
Static methods
def from_dict(md_dict: dict) ‑> StationMetadata
-
:param md_dict: metadata dictionary :return: StationMetadata from dictionary
Expand source code
@staticmethod def from_dict(md_dict: dict) -> "StationMetadata": """ :param md_dict: metadata dictionary :return: StationMetadata from dictionary """ result = StationMetadata(md_dict["app"]) result.api = md_dict["api"] result.sub_api = md_dict["sub_api"] result.make = md_dict["make"] result.model = md_dict["model"] result.os = OsType[md_dict["os"]] result.os_version = md_dict["os_version"] result.app_version = md_dict["app_version"] result.is_private = md_dict["is_private"] result.packet_duration_s = md_dict["packet_duration_s"] result.station_description = md_dict["station_description"] result.other_metadata = md_dict["other_metadata"] return result
Methods
def as_dict(self) ‑> dict
-
:return: dictionary representation of the metadata
Expand source code
def as_dict(self) -> dict: """ :return: dictionary representation of the metadata """ return { "app": self.app, "api": self.api, "sub_api": self.sub_api, "make": self.make, "model": self.model, "os": self.os.name, "os_version": self.os_version, "app_version": self.app_version, "is_private": self.is_private, "packet_duration_s": self.packet_duration_s, "station_description": self.station_description, "other_metadata": self.other_metadata, }
def validate_metadata(self, other_metadata: StationMetadata) ‑> bool
-
:param other_metadata: another StationMetadata object to compare :return: True if other_metadata is equal to the calling metadata
Expand source code
def validate_metadata(self, other_metadata: "StationMetadata") -> bool: """ :param other_metadata: another StationMetadata object to compare :return: True if other_metadata is equal to the calling metadata """ return ( self.app == other_metadata.app and self.api == other_metadata.api and self.sub_api == other_metadata.sub_api and self.make == other_metadata.make and self.model == other_metadata.model and self.os == other_metadata.os and self.os_version == other_metadata.os_version and self.app_version == other_metadata.app_version and self.is_private == other_metadata.is_private and self.packet_duration_s == other_metadata.packet_duration_s and self.station_description == other_metadata.station_description )
class StationPacketMetadata (packet: Optional[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM] = None)
-
A container for all the packet metadata that isn't consistent across all packets
Properties
packet_start_mach_timestamp: float, machine timestamp of packet start in microseconds since epoch UTC, default np.nan
packet_end_mach_timestamp: float, machine timestamp of packet end in microseconds since epoch UTC, default np.nan
packet_start_os_timestamp: float, os timestamp of packet start in microseconds since epoch UTC, default np.nan
packet_end_os_timestamp: float, os timestamp of packet end in microseconds since epoch UTC, default np.nan
server_packet_received_timestamp: float, timestamp from server when packet was received in microseconds since epoch UTC
timing_info_score: float, quality of timing information, default np.nan
timing_score_method: TimingScoreMethod, method used to determine timing score, default "UNKNOWN"
other_metadata: dict, str: str of other metadata from the packet, default empty list
initialize the metadata
:param packet: Optional RedvoxPacketM to read data from
Expand source code
class StationPacketMetadata: """ A container for all the packet metadata that isn't consistent across all packets Properties: packet_start_mach_timestamp: float, machine timestamp of packet start in microseconds since epoch UTC, default np.nan packet_end_mach_timestamp: float, machine timestamp of packet end in microseconds since epoch UTC, default np.nan packet_start_os_timestamp: float, os timestamp of packet start in microseconds since epoch UTC, default np.nan packet_end_os_timestamp: float, os timestamp of packet end in microseconds since epoch UTC, default np.nan server_packet_received_timestamp: float, timestamp from server when packet was received in microseconds since epoch UTC timing_info_score: float, quality of timing information, default np.nan timing_score_method: TimingScoreMethod, method used to determine timing score, default "UNKNOWN" other_metadata: dict, str: str of other metadata from the packet, default empty list """ def __init__(self, packet: Optional[api_m.RedvoxPacketM] = None): """ initialize the metadata :param packet: Optional RedvoxPacketM to read data from """ self.other_metadata = {} if packet: self.packet_start_mach_timestamp = packet.timing_information.packet_start_mach_timestamp self.packet_end_mach_timestamp = packet.timing_information.packet_end_mach_timestamp self.packet_start_os_timestamp = packet.timing_information.packet_start_os_timestamp self.packet_end_os_timestamp = packet.timing_information.packet_end_os_timestamp self.server_packet_receive_timestamp = packet.timing_information.server_acquisition_arrival_timestamp self.timing_info_score = packet.timing_information.score self.timing_score_method = TimingScoreMethod(packet.timing_information.score_method) else: self.packet_start_mach_timestamp = np.nan self.packet_end_mach_timestamp = np.nan self.packet_start_os_timestamp = np.nan self.packet_end_os_timestamp = np.nan self.server_packet_receive_timestamp = np.nan self.timing_info_score = np.nan self.timing_score_method = TimingScoreMethod["UNKNOWN"] def __repr__(self): return ( f"packet_start_mach_timestamp: {self.packet_start_mach_timestamp}, " f"packet_end_mach_timestamp: {self.packet_end_mach_timestamp}, " f"packet_start_os_timestamp: {self.packet_start_os_timestamp}, " f"packet_end_os_timestamp: {self.packet_end_os_timestamp}, " f"server_packet_receive_timestamp: {self.server_packet_receive_timestamp}, " f"timing_info_score: {self.timing_info_score}, " f"timing_score_method: {self.timing_score_method.value}" ) def __str__(self): return ( f"packet_start_mach_timestamp: {self.packet_start_mach_timestamp}, " f"packet_end_mach_timestamp: {self.packet_end_mach_timestamp}, " f"packet_start_os_timestamp: {self.packet_start_os_timestamp}, " f"packet_end_os_timestamp: {self.packet_end_os_timestamp}, " f"server_packet_receive_timestamp: {self.server_packet_receive_timestamp}, " f"timing_info_score: {self.timing_info_score}, " f"timing_score_method: {self.timing_score_method.name}" ) def update_timestamps(self, om: OffsetModel, use_model_function: bool = True): """ updates the timestamps in the metadata using the offset model :param om: OffsetModel to apply to data :param use_model_function: if True, use the offset model's correction function to correct time, otherwise use best offset (model's intercept value). default True """ self.packet_start_mach_timestamp = om.update_time(self.packet_start_mach_timestamp, use_model_function) self.packet_end_mach_timestamp = om.update_time(self.packet_end_mach_timestamp, use_model_function) self.packet_start_os_timestamp = om.update_time(self.packet_start_os_timestamp, use_model_function) self.packet_end_os_timestamp = om.update_time(self.packet_end_os_timestamp, use_model_function) def original_timestamps(self, om: OffsetModel, use_model_function: bool = True): """ undo the updates to the timestamps in the metadata using the offset model :param om: OffsetModel to apply to data :param use_model_function: if True, use the offset model's correction function to correct time, otherwise use best offset (model's intercept value). default True """ self.packet_start_mach_timestamp = om.get_original_time(self.packet_start_mach_timestamp, use_model_function) self.packet_end_mach_timestamp = om.get_original_time(self.packet_end_mach_timestamp, use_model_function) self.packet_start_os_timestamp = om.get_original_time(self.packet_start_os_timestamp, use_model_function) self.packet_end_os_timestamp = om.get_original_time(self.packet_end_os_timestamp, use_model_function) def as_dict(self) -> dict: """ :return: dictionary representation of the packet metadata """ return { "packet_start_mach_timestamp": self.packet_start_mach_timestamp, "packet_end_mach_timestamp": self.packet_end_mach_timestamp, "packet_start_os_timestamp": self.packet_start_os_timestamp, "packet_end_os_timestamp": self.packet_end_os_timestamp, "server_packet_receive_timestamp": self.server_packet_receive_timestamp, "timing_info_score": self.timing_info_score, "timing_score_method": self.timing_score_method.name, "other_metadata": self.other_metadata, } @staticmethod def from_dict(pmd_dict: dict) -> "StationPacketMetadata": """ :param pmd_dict: dictionary with StationPacketMetadata :return: StationPacketMetadata from dictionary """ result = StationPacketMetadata() result.other_metadata = pmd_dict["other_metadata"] result.packet_start_mach_timestamp = pmd_dict["packet_start_mach_timestamp"] result.packet_end_mach_timestamp = pmd_dict["packet_end_mach_timestamp"] result.packet_start_os_timestamp = pmd_dict["packet_start_os_timestamp"] result.packet_end_os_timestamp = pmd_dict["packet_end_os_timestamp"] result.server_packet_receive_timestamp = pmd_dict["server_packet_receive_timestamp"] result.timing_info_score = pmd_dict["timing_info_score"] result.timing_score_method = TimingScoreMethod[ pmd_dict["timing_score_method"] if "timing_score_method" in pmd_dict.keys() else "UNKNOWN" ] return result
Static methods
def from_dict(pmd_dict: dict) ‑> StationPacketMetadata
-
:param pmd_dict: dictionary with StationPacketMetadata :return: StationPacketMetadata from dictionary
Expand source code
@staticmethod def from_dict(pmd_dict: dict) -> "StationPacketMetadata": """ :param pmd_dict: dictionary with StationPacketMetadata :return: StationPacketMetadata from dictionary """ result = StationPacketMetadata() result.other_metadata = pmd_dict["other_metadata"] result.packet_start_mach_timestamp = pmd_dict["packet_start_mach_timestamp"] result.packet_end_mach_timestamp = pmd_dict["packet_end_mach_timestamp"] result.packet_start_os_timestamp = pmd_dict["packet_start_os_timestamp"] result.packet_end_os_timestamp = pmd_dict["packet_end_os_timestamp"] result.server_packet_receive_timestamp = pmd_dict["server_packet_receive_timestamp"] result.timing_info_score = pmd_dict["timing_info_score"] result.timing_score_method = TimingScoreMethod[ pmd_dict["timing_score_method"] if "timing_score_method" in pmd_dict.keys() else "UNKNOWN" ] return result
Methods
def as_dict(self) ‑> dict
-
:return: dictionary representation of the packet metadata
Expand source code
def as_dict(self) -> dict: """ :return: dictionary representation of the packet metadata """ return { "packet_start_mach_timestamp": self.packet_start_mach_timestamp, "packet_end_mach_timestamp": self.packet_end_mach_timestamp, "packet_start_os_timestamp": self.packet_start_os_timestamp, "packet_end_os_timestamp": self.packet_end_os_timestamp, "server_packet_receive_timestamp": self.server_packet_receive_timestamp, "timing_info_score": self.timing_info_score, "timing_score_method": self.timing_score_method.name, "other_metadata": self.other_metadata, }
def original_timestamps(self, om: OffsetModel, use_model_function: bool = True)
-
undo the updates to the timestamps in the metadata using the offset model
:param om: OffsetModel to apply to data :param use_model_function: if True, use the offset model's correction function to correct time, otherwise use best offset (model's intercept value). default True
Expand source code
def original_timestamps(self, om: OffsetModel, use_model_function: bool = True): """ undo the updates to the timestamps in the metadata using the offset model :param om: OffsetModel to apply to data :param use_model_function: if True, use the offset model's correction function to correct time, otherwise use best offset (model's intercept value). default True """ self.packet_start_mach_timestamp = om.get_original_time(self.packet_start_mach_timestamp, use_model_function) self.packet_end_mach_timestamp = om.get_original_time(self.packet_end_mach_timestamp, use_model_function) self.packet_start_os_timestamp = om.get_original_time(self.packet_start_os_timestamp, use_model_function) self.packet_end_os_timestamp = om.get_original_time(self.packet_end_os_timestamp, use_model_function)
def update_timestamps(self, om: OffsetModel, use_model_function: bool = True)
-
updates the timestamps in the metadata using the offset model
:param om: OffsetModel to apply to data :param use_model_function: if True, use the offset model's correction function to correct time, otherwise use best offset (model's intercept value). default True
Expand source code
def update_timestamps(self, om: OffsetModel, use_model_function: bool = True): """ updates the timestamps in the metadata using the offset model :param om: OffsetModel to apply to data :param use_model_function: if True, use the offset model's correction function to correct time, otherwise use best offset (model's intercept value). default True """ self.packet_start_mach_timestamp = om.update_time(self.packet_start_mach_timestamp, use_model_function) self.packet_end_mach_timestamp = om.update_time(self.packet_end_mach_timestamp, use_model_function) self.packet_start_os_timestamp = om.update_time(self.packet_start_os_timestamp, use_model_function) self.packet_end_os_timestamp = om.update_time(self.packet_end_os_timestamp, use_model_function)