Module redvox.common.session_model_utils
This module contains classes and functions that support SessionModel.
Expand source code
"""
This module contains classes and functions that support SessionModel.
"""
from typing import List, Optional, Tuple, Dict, Union, Callable
from bisect import insort
import numpy as np
import redvox.api1000.proto.redvox_api_m_pb2 as api_m
from redvox.api1000.wrapped_redvox_packet.sensors.location import LocationProvider
from redvox.cloud import session_model_api as sm
from redvox.common.timesync import TimeSync
NUM_BUFFER_POINTS = 3 # number of data points to keep in a buffer
COLUMN_TO_ENUM_FN = {"location_provider": lambda l: LocationProvider(l).name}
# These are used for checking if a field is present or not
_ACCELEROMETER_FIELD_NAME: str = "accelerometer"
_AMBIENT_TEMPERATURE_FIELD_NAME: str = "ambient_temperature"
_AUDIO_FIELD_NAME: str = "audio"
_COMPRESSED_AUDIO_FIELD_NAME: str = "compressed_audio"
_GRAVITY_FIELD_NAME: str = "gravity"
_GYROSCOPE_FIELD_NAME: str = "gyroscope"
_IMAGE_FIELD_NAME: str = "image"
_LIGHT_FIELD_NAME: str = "light"
_LINEAR_ACCELERATION_FIELD_NAME: str = "linear_acceleration"
_LOCATION_FIELD_NAME: str = "location"
_MAGNETOMETER_FIELD_NAME: str = "magnetometer"
_ORIENTATION_FIELD_NAME: str = "orientation"
_PRESSURE_FIELD_NAME: str = "pressure"
_PROXIMITY_FIELD_NAME: str = "proximity"
_RELATIVE_HUMIDITY_FIELD_NAME: str = "relative_humidity"
_ROTATION_VECTOR_FIELD_NAME: str = "rotation_vector"
_VELOCITY_FIELD_NAME: str = "velocity"
_HEALTH_FIELD_NAME: str = "health"
Sensor = Union[
api_m.RedvoxPacketM.Sensors.Xyz,
api_m.RedvoxPacketM.Sensors.Single,
api_m.RedvoxPacketM.Sensors.Audio,
api_m.RedvoxPacketM.Sensors.Image,
api_m.RedvoxPacketM.Sensors.Location,
api_m.RedvoxPacketM.Sensors.CompressedAudio,
api_m.RedvoxPacketM.StationInformation.StationMetrics,
]
__SENSOR_NAME_TO_SENSOR_FN: Dict[
str,
Optional[
Callable[
[api_m.RedvoxPacketM],
Union[Sensor],
]
],
] = {
"unknown": None,
_HEALTH_FIELD_NAME: lambda packet: packet.station_information.station_metrics,
_ACCELEROMETER_FIELD_NAME: lambda packet: packet.sensors.accelerometer,
_AMBIENT_TEMPERATURE_FIELD_NAME: lambda packet: packet.sensors.ambient_temperature,
_AUDIO_FIELD_NAME: lambda packet: packet.sensors.audio,
_COMPRESSED_AUDIO_FIELD_NAME: lambda packet: packet.sensors.compressed_audio,
_GRAVITY_FIELD_NAME: lambda packet: packet.sensors.gravity,
_GYROSCOPE_FIELD_NAME: lambda packet: packet.sensors.gyroscope,
_IMAGE_FIELD_NAME: lambda packet: packet.sensors.image,
_LIGHT_FIELD_NAME: lambda packet: packet.sensors.light,
_LINEAR_ACCELERATION_FIELD_NAME: lambda packet: packet.sensors.linear_acceleration,
_LOCATION_FIELD_NAME: lambda packet: packet.sensors.location,
_MAGNETOMETER_FIELD_NAME: lambda packet: packet.sensors.magnetometer,
_ORIENTATION_FIELD_NAME: lambda packet: packet.sensors.orientation,
_PRESSURE_FIELD_NAME: lambda packet: packet.sensors.pressure,
_PROXIMITY_FIELD_NAME: lambda packet: packet.sensors.proximity,
_RELATIVE_HUMIDITY_FIELD_NAME: lambda packet: packet.sensors.relative_humidity,
_ROTATION_VECTOR_FIELD_NAME: lambda packet: packet.sensors.rotation_vector,
_VELOCITY_FIELD_NAME: lambda packet: packet.sensors.velocity,
}
def _get_sensor_for_data_extraction(sensor_name: str, packet: api_m.RedvoxPacketM) -> Optional[Sensor]:
"""
:param sensor_name: name of sensor to return
:param packet: the data packet to get the sensor from
:return: Sensor that matches the sensor_name or None if that Sensor doesn't exist
"""
sensor_fn: Optional[Callable[[api_m.RedvoxPacketM], Sensor]] = __SENSOR_NAME_TO_SENSOR_FN[sensor_name]
if (sensor_name == _HEALTH_FIELD_NAME or _has_sensor(packet, sensor_name)) and sensor_fn is not None:
return sensor_fn(packet)
def _get_mean_sample_rate_from_sensor(sensor: Sensor) -> float:
"""
:param sensor: Sensor to get data from
:return: mean sample rate of the sensor or np.nan if sample rate doesn't exist
"""
return sensor.timestamps.mean_sample_rate if int(sensor.timestamps.timestamp_statistics.count) > 1 else np.nan
def _has_sensor(data: Union[api_m.RedvoxPacketM, api_m.RedvoxPacketM.Sensors], field_name: str) -> bool:
"""
Returns true if the given packet or sensors instance contains the valid sensor.
:param data: Either a packet or a packet's sensors message.
:param field_name: The name of the sensor being checked.
:return: True if the sensor exists, False otherwise.
"""
if isinstance(data, api_m.RedvoxPacketM):
# noinspection Mypy,PyTypeChecker
return data.sensors.HasField(field_name)
if isinstance(data, api_m.RedvoxPacketM.Sensors):
# noinspection Mypy,PyTypeChecker
return data.HasField(field_name)
return False
def get_all_sensors_in_packet(packet: api_m.RedvoxPacketM) -> List[Tuple[str, str, float]]:
"""
:param packet: packet to check
:return: list of all sensors as tuple of name, description, and mean sample rate in the packet
"""
result: List[Tuple] = []
for s in [_AUDIO_FIELD_NAME, _COMPRESSED_AUDIO_FIELD_NAME]:
if _has_sensor(packet, s):
sensor = _get_sensor_for_data_extraction(s, packet)
result.append((s, sensor.sensor_description, sensor.sample_rate))
for s in [
_PRESSURE_FIELD_NAME,
_LOCATION_FIELD_NAME,
_ACCELEROMETER_FIELD_NAME,
_AMBIENT_TEMPERATURE_FIELD_NAME,
_GRAVITY_FIELD_NAME,
_GYROSCOPE_FIELD_NAME,
_IMAGE_FIELD_NAME,
_LIGHT_FIELD_NAME,
_LINEAR_ACCELERATION_FIELD_NAME,
_MAGNETOMETER_FIELD_NAME,
_ORIENTATION_FIELD_NAME,
_PROXIMITY_FIELD_NAME,
_RELATIVE_HUMIDITY_FIELD_NAME,
_ROTATION_VECTOR_FIELD_NAME,
_VELOCITY_FIELD_NAME,
]:
if _has_sensor(packet, s):
sensor = _get_sensor_for_data_extraction(s, packet)
result.append((s, sensor.sensor_description, sensor.timestamps.mean_sample_rate))
if packet.station_information.HasField("station_metrics"):
result.insert(
2,
(
_HEALTH_FIELD_NAME,
"station_metrics",
packet.station_information.station_metrics.timestamps.mean_sample_rate,
),
)
return result
def __ordered_insert(buffer: List, value: Tuple):
"""
inserts the value into the buffer using the timestamp as the key
:param value: value to add. Must include a timestamp and the same data type as the other buffer elements
"""
if len(buffer) < 1:
buffer.append(value)
else:
insort(buffer, value)
def add_to_fst_buffer(buffer: List, buf_max_size: int, timestamp: float, value):
"""
Add a value into the first buffer.
* If the buffer is not full, the value is added automatically
* If the buffer is full, the value is only added if it comes before the last element.
:param buffer: the buffer to add the value to
:param buf_max_size: the maximum size of the buffer
:param timestamp: timestamp in microseconds since epoch UTC to add.
:param value: value to add. Must be the same type of data as the other elements in the queue.
"""
if len(buffer) < buf_max_size or timestamp < buffer[-1][0]:
__ordered_insert(buffer, (timestamp, value))
while len(buffer) > buf_max_size:
buffer.pop()
def add_to_lst_buffer(buffer: List, buf_max_size: int, timestamp: float, value):
"""
Add a value into the last buffer.
* If the buffer is not full, the value is added automatically
* If the buffer is full, the value is only added if it comes after the first element.
:param buffer: the buffer to add the value to
:param buf_max_size: the maximum size of the buffer
:param timestamp: timestamp in microseconds since epoch UTC to add.
:param value: value to add. Must be the same type of data as the other elements in the queue.
"""
if len(buffer) < buf_max_size or timestamp > buffer[0][0]:
__ordered_insert(buffer, (timestamp, value))
while len(buffer) > buf_max_size:
buffer.pop(0)
def get_local_timesync(packet: api_m.RedvoxPacketM) -> Optional[Tuple]:
"""
if the data exists, the returning tuple looks like:
(start_timestamp, end_timestamp, num_exchanges, best_latency, best_offset, list of TimeSyncData)
:param packet: packet to get timesync data from
:return: Timing object using data from packet or None
"""
ts = TimeSync().from_raw_packets([packet])
if ts.num_tri_messages() > 0:
_ts_latencies = ts.latencies().flatten()
_ts_offsets = ts.offsets().flatten()
_ts_timestamps = ts.get_device_exchanges_timestamps()
# add data to the buffers
_ts_data = [
sm.TimeSyncData(_ts_timestamps[i], _ts_latencies[i], _ts_offsets[i]) for i in range(len(_ts_timestamps))
]
return (
ts.data_start_timestamp(),
ts.data_end_timestamp(),
ts.num_tri_messages(),
ts.best_latency(),
ts.best_offset(),
_ts_data,
)
return None
def add_to_welford(value: float, welford: Optional[sm.WelfordAggregator] = None) -> sm.WelfordAggregator:
"""
adds the value to the welford, then returns the updated object.
If welford is None, creates a new WelfordAggregator object and returns it.
:param value: the value to add
:param welford: optional WelfordAggregator object to update. if not given, will make a new one. Default None
:return: updated or new WelfordAggregator object
"""
if welford is None:
return sm.WelfordAggregator(0.0, value, 1)
welford.cnt += 1
delta = value - welford.mean
welford.mean += delta / float(welford.cnt)
delta2 = value - welford.mean
welford.m2 += delta * delta2
return welford
def add_to_stats(value: float, stats: Optional[sm.Stats] = None) -> sm.Stats:
"""
adds the value to the stats, then returns the updated object.
If stats is None, creates a new Stats object and returns it.
:param value: the value to add
:param stats: optional Stats object to update. if not given, will make a new one. Default None
:return: updated or new Stats object
"""
if stats is None:
return sm.Stats(value, value, add_to_welford(value))
if value < stats.min:
stats.min = value
if value > stats.max:
stats.max = value
add_to_welford(value, stats.welford)
return stats
def get_location_data(packet: api_m.RedvoxPacketM) -> List[Tuple[str, float, float, float, float]]:
"""
:param packet: packet to get location data from
:return: List of location data as a tuples from the packet
"""
locations = []
loc = packet.sensors.location
lat = lon = alt = ts = 0.0
source = "UNKNOWN"
num_pts = int(loc.timestamps.timestamp_statistics.count)
# check for actual location values
if len(loc.location_providers) < 1:
lat = loc.latitude_samples.value_statistics.mean
lon = loc.longitude_samples.value_statistics.mean
alt = loc.altitude_samples.value_statistics.mean
ts = loc.timestamps.timestamp_statistics.mean
elif (
num_pts > 0
and loc.latitude_samples.value_statistics.count > 0
and loc.longitude_samples.value_statistics.count > 0
and loc.altitude_samples.value_statistics.count > 0
and num_pts == loc.latitude_samples.value_statistics.count
and num_pts == loc.altitude_samples.value_statistics.count
and num_pts == loc.longitude_samples.value_statistics.count
):
lats = loc.latitude_samples.values
lons = loc.longitude_samples.values
alts = loc.altitude_samples.values
tstp = loc.timestamps.timestamps
for i in range(num_pts):
source = (
"UNKNOWN"
if len(loc.location_providers) != num_pts
else COLUMN_TO_ENUM_FN["location_provider"](loc.location_providers[i])
)
locations.append((source, lats[i], lons[i], alts[i], tstp[i]))
# set a special flag for later, so we don't add an extra location value
source = None
elif loc.last_best_location is not None:
ts = loc.last_best_location.latitude_longitude_timestamp.mach
source = loc.last_best_location.location_provider
lat = loc.last_best_location.latitude
lon = loc.last_best_location.longitude
alt = loc.last_best_location.altitude
elif loc.overall_best_location is not None:
ts = loc.overall_best_location.latitude_longitude_timestamp.mach
source = loc.overall_best_location.location_provider
lat = loc.overall_best_location.latitude
lon = loc.overall_best_location.longitude
alt = loc.overall_best_location.altitude
# source is not None if we got only one location through non-usual methods
if source is not None:
locations.append((source, lat, lon, alt, ts))
return locations
def get_dynamic_data(packet: api_m.RedvoxPacketM) -> Dict:
"""
:param packet: packet to get data from
:return: Dictionary of all dynamic session data from the packet
"""
location = get_location_data(packet)
battery = packet.station_information.station_metrics.battery.value_statistics.mean
temperature = packet.station_information.station_metrics.temperature.value_statistics.mean
return {"location": location, "battery": battery, "temperature": temperature}
def add_to_location(
lat: float, lon: float, alt: float, timestamp: float, loc_stat: Optional[sm.LocationStat] = None
) -> sm.LocationStat:
"""
update a LocationStat object with the location, or make a new one
:param lat: latitude in degrees
:param lon: longitude in degrees
:param alt: altitude in meters
:param timestamp: timestamp in microseconds from epoch UTC
:param loc_stat: optional LocationStat object to update. if not given, will make a new one. Default None
:return: updated or new LocationStat object
"""
if loc_stat is None:
fst_lst = sm.FirstLastBufLocation([], NUM_BUFFER_POINTS, [], NUM_BUFFER_POINTS)
add_to_fst_buffer(fst_lst.fst, fst_lst.fst_max_size, timestamp, sm.Location(lat, lon, alt))
add_to_lst_buffer(fst_lst.lst, fst_lst.lst_max_size, timestamp, sm.Location(lat, lon, alt))
return sm.LocationStat(fst_lst, add_to_stats(lat), add_to_stats(lon), add_to_stats(alt))
add_to_fst_buffer(loc_stat.fst_lst.fst, loc_stat.fst_lst.fst_max_size, timestamp, sm.Location(lat, lon, alt))
add_to_lst_buffer(loc_stat.fst_lst.lst, loc_stat.fst_lst.lst_max_size, timestamp, sm.Location(lat, lon, alt))
loc_stat.lat = add_to_stats(lat, loc_stat.lat)
loc_stat.lng = add_to_stats(lon, loc_stat.lng)
loc_stat.alt = add_to_stats(alt, loc_stat.alt)
return loc_stat
def add_location_data(
data: List[Tuple[str, float, float, float, float]], loc_dict: Optional[Dict[str, sm.LocationStat]] = None
) -> Dict[str, sm.LocationStat]:
"""
update a dictionary of LocationStat or make a new dictionary
:param data: the data to add
:param loc_dict: the location dictionary to update
:return: the updated or new location dictionary
"""
if loc_dict is None:
loc_dict = {}
for s in data:
loc_dict[s[0]] = add_to_location(s[1], s[2], s[3], s[4], loc_dict[s[0]] if s[0] in loc_dict.keys() else None)
return loc_dict
Functions
def add_location_data(data: List[Tuple[str, float, float, float, float]], loc_dict: Optional[Dict[str, LocationStat]] = None) ‑> Dict[str, LocationStat]
-
update a dictionary of LocationStat or make a new dictionary
:param data: the data to add :param loc_dict: the location dictionary to update :return: the updated or new location dictionary
Expand source code
def add_location_data( data: List[Tuple[str, float, float, float, float]], loc_dict: Optional[Dict[str, sm.LocationStat]] = None ) -> Dict[str, sm.LocationStat]: """ update a dictionary of LocationStat or make a new dictionary :param data: the data to add :param loc_dict: the location dictionary to update :return: the updated or new location dictionary """ if loc_dict is None: loc_dict = {} for s in data: loc_dict[s[0]] = add_to_location(s[1], s[2], s[3], s[4], loc_dict[s[0]] if s[0] in loc_dict.keys() else None) return loc_dict
def add_to_fst_buffer(buffer: List, buf_max_size: int, timestamp: float, value)
-
Add a value into the first buffer.
- If the buffer is not full, the value is added automatically
- If the buffer is full, the value is only added if it comes before the last element.
:param buffer: the buffer to add the value to :param buf_max_size: the maximum size of the buffer :param timestamp: timestamp in microseconds since epoch UTC to add. :param value: value to add. Must be the same type of data as the other elements in the queue.
Expand source code
def add_to_fst_buffer(buffer: List, buf_max_size: int, timestamp: float, value): """ Add a value into the first buffer. * If the buffer is not full, the value is added automatically * If the buffer is full, the value is only added if it comes before the last element. :param buffer: the buffer to add the value to :param buf_max_size: the maximum size of the buffer :param timestamp: timestamp in microseconds since epoch UTC to add. :param value: value to add. Must be the same type of data as the other elements in the queue. """ if len(buffer) < buf_max_size or timestamp < buffer[-1][0]: __ordered_insert(buffer, (timestamp, value)) while len(buffer) > buf_max_size: buffer.pop()
def add_to_location(lat: float, lon: float, alt: float, timestamp: float, loc_stat: Optional[LocationStat] = None) ‑> LocationStat
-
update a LocationStat object with the location, or make a new one
:param lat: latitude in degrees :param lon: longitude in degrees :param alt: altitude in meters :param timestamp: timestamp in microseconds from epoch UTC :param loc_stat: optional LocationStat object to update. if not given, will make a new one. Default None :return: updated or new LocationStat object
Expand source code
def add_to_location( lat: float, lon: float, alt: float, timestamp: float, loc_stat: Optional[sm.LocationStat] = None ) -> sm.LocationStat: """ update a LocationStat object with the location, or make a new one :param lat: latitude in degrees :param lon: longitude in degrees :param alt: altitude in meters :param timestamp: timestamp in microseconds from epoch UTC :param loc_stat: optional LocationStat object to update. if not given, will make a new one. Default None :return: updated or new LocationStat object """ if loc_stat is None: fst_lst = sm.FirstLastBufLocation([], NUM_BUFFER_POINTS, [], NUM_BUFFER_POINTS) add_to_fst_buffer(fst_lst.fst, fst_lst.fst_max_size, timestamp, sm.Location(lat, lon, alt)) add_to_lst_buffer(fst_lst.lst, fst_lst.lst_max_size, timestamp, sm.Location(lat, lon, alt)) return sm.LocationStat(fst_lst, add_to_stats(lat), add_to_stats(lon), add_to_stats(alt)) add_to_fst_buffer(loc_stat.fst_lst.fst, loc_stat.fst_lst.fst_max_size, timestamp, sm.Location(lat, lon, alt)) add_to_lst_buffer(loc_stat.fst_lst.lst, loc_stat.fst_lst.lst_max_size, timestamp, sm.Location(lat, lon, alt)) loc_stat.lat = add_to_stats(lat, loc_stat.lat) loc_stat.lng = add_to_stats(lon, loc_stat.lng) loc_stat.alt = add_to_stats(alt, loc_stat.alt) return loc_stat
def add_to_lst_buffer(buffer: List, buf_max_size: int, timestamp: float, value)
-
Add a value into the last buffer.
- If the buffer is not full, the value is added automatically
- If the buffer is full, the value is only added if it comes after the first element.
:param buffer: the buffer to add the value to :param buf_max_size: the maximum size of the buffer :param timestamp: timestamp in microseconds since epoch UTC to add. :param value: value to add. Must be the same type of data as the other elements in the queue.
Expand source code
def add_to_lst_buffer(buffer: List, buf_max_size: int, timestamp: float, value): """ Add a value into the last buffer. * If the buffer is not full, the value is added automatically * If the buffer is full, the value is only added if it comes after the first element. :param buffer: the buffer to add the value to :param buf_max_size: the maximum size of the buffer :param timestamp: timestamp in microseconds since epoch UTC to add. :param value: value to add. Must be the same type of data as the other elements in the queue. """ if len(buffer) < buf_max_size or timestamp > buffer[0][0]: __ordered_insert(buffer, (timestamp, value)) while len(buffer) > buf_max_size: buffer.pop(0)
def add_to_stats(value: float, stats: Optional[Stats] = None) ‑> Stats
-
adds the value to the stats, then returns the updated object.
If stats is None, creates a new Stats object and returns it.
:param value: the value to add :param stats: optional Stats object to update. if not given, will make a new one. Default None :return: updated or new Stats object
Expand source code
def add_to_stats(value: float, stats: Optional[sm.Stats] = None) -> sm.Stats: """ adds the value to the stats, then returns the updated object. If stats is None, creates a new Stats object and returns it. :param value: the value to add :param stats: optional Stats object to update. if not given, will make a new one. Default None :return: updated or new Stats object """ if stats is None: return sm.Stats(value, value, add_to_welford(value)) if value < stats.min: stats.min = value if value > stats.max: stats.max = value add_to_welford(value, stats.welford) return stats
def add_to_welford(value: float, welford: Optional[WelfordAggregator] = None) ‑> WelfordAggregator
-
adds the value to the welford, then returns the updated object.
If welford is None, creates a new WelfordAggregator object and returns it.
:param value: the value to add :param welford: optional WelfordAggregator object to update. if not given, will make a new one. Default None :return: updated or new WelfordAggregator object
Expand source code
def add_to_welford(value: float, welford: Optional[sm.WelfordAggregator] = None) -> sm.WelfordAggregator: """ adds the value to the welford, then returns the updated object. If welford is None, creates a new WelfordAggregator object and returns it. :param value: the value to add :param welford: optional WelfordAggregator object to update. if not given, will make a new one. Default None :return: updated or new WelfordAggregator object """ if welford is None: return sm.WelfordAggregator(0.0, value, 1) welford.cnt += 1 delta = value - welford.mean welford.mean += delta / float(welford.cnt) delta2 = value - welford.mean welford.m2 += delta * delta2 return welford
def get_all_sensors_in_packet(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> List[Tuple[str, str, float]]
-
:param packet: packet to check :return: list of all sensors as tuple of name, description, and mean sample rate in the packet
Expand source code
def get_all_sensors_in_packet(packet: api_m.RedvoxPacketM) -> List[Tuple[str, str, float]]: """ :param packet: packet to check :return: list of all sensors as tuple of name, description, and mean sample rate in the packet """ result: List[Tuple] = [] for s in [_AUDIO_FIELD_NAME, _COMPRESSED_AUDIO_FIELD_NAME]: if _has_sensor(packet, s): sensor = _get_sensor_for_data_extraction(s, packet) result.append((s, sensor.sensor_description, sensor.sample_rate)) for s in [ _PRESSURE_FIELD_NAME, _LOCATION_FIELD_NAME, _ACCELEROMETER_FIELD_NAME, _AMBIENT_TEMPERATURE_FIELD_NAME, _GRAVITY_FIELD_NAME, _GYROSCOPE_FIELD_NAME, _IMAGE_FIELD_NAME, _LIGHT_FIELD_NAME, _LINEAR_ACCELERATION_FIELD_NAME, _MAGNETOMETER_FIELD_NAME, _ORIENTATION_FIELD_NAME, _PROXIMITY_FIELD_NAME, _RELATIVE_HUMIDITY_FIELD_NAME, _ROTATION_VECTOR_FIELD_NAME, _VELOCITY_FIELD_NAME, ]: if _has_sensor(packet, s): sensor = _get_sensor_for_data_extraction(s, packet) result.append((s, sensor.sensor_description, sensor.timestamps.mean_sample_rate)) if packet.station_information.HasField("station_metrics"): result.insert( 2, ( _HEALTH_FIELD_NAME, "station_metrics", packet.station_information.station_metrics.timestamps.mean_sample_rate, ), ) return result
def get_dynamic_data(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Dict
-
:param packet: packet to get data from :return: Dictionary of all dynamic session data from the packet
Expand source code
def get_dynamic_data(packet: api_m.RedvoxPacketM) -> Dict: """ :param packet: packet to get data from :return: Dictionary of all dynamic session data from the packet """ location = get_location_data(packet) battery = packet.station_information.station_metrics.battery.value_statistics.mean temperature = packet.station_information.station_metrics.temperature.value_statistics.mean return {"location": location, "battery": battery, "temperature": temperature}
def get_local_timesync(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[Tuple]
-
if the data exists, the returning tuple looks like:
(start_timestamp, end_timestamp, num_exchanges, best_latency, best_offset, list of TimeSyncData)
:param packet: packet to get timesync data from :return: Timing object using data from packet or None
Expand source code
def get_local_timesync(packet: api_m.RedvoxPacketM) -> Optional[Tuple]: """ if the data exists, the returning tuple looks like: (start_timestamp, end_timestamp, num_exchanges, best_latency, best_offset, list of TimeSyncData) :param packet: packet to get timesync data from :return: Timing object using data from packet or None """ ts = TimeSync().from_raw_packets([packet]) if ts.num_tri_messages() > 0: _ts_latencies = ts.latencies().flatten() _ts_offsets = ts.offsets().flatten() _ts_timestamps = ts.get_device_exchanges_timestamps() # add data to the buffers _ts_data = [ sm.TimeSyncData(_ts_timestamps[i], _ts_latencies[i], _ts_offsets[i]) for i in range(len(_ts_timestamps)) ] return ( ts.data_start_timestamp(), ts.data_end_timestamp(), ts.num_tri_messages(), ts.best_latency(), ts.best_offset(), _ts_data, ) return None
def get_location_data(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> List[Tuple[str, float, float, float, float]]
-
:param packet: packet to get location data from :return: List of location data as a tuples from the packet
Expand source code
def get_location_data(packet: api_m.RedvoxPacketM) -> List[Tuple[str, float, float, float, float]]: """ :param packet: packet to get location data from :return: List of location data as a tuples from the packet """ locations = [] loc = packet.sensors.location lat = lon = alt = ts = 0.0 source = "UNKNOWN" num_pts = int(loc.timestamps.timestamp_statistics.count) # check for actual location values if len(loc.location_providers) < 1: lat = loc.latitude_samples.value_statistics.mean lon = loc.longitude_samples.value_statistics.mean alt = loc.altitude_samples.value_statistics.mean ts = loc.timestamps.timestamp_statistics.mean elif ( num_pts > 0 and loc.latitude_samples.value_statistics.count > 0 and loc.longitude_samples.value_statistics.count > 0 and loc.altitude_samples.value_statistics.count > 0 and num_pts == loc.latitude_samples.value_statistics.count and num_pts == loc.altitude_samples.value_statistics.count and num_pts == loc.longitude_samples.value_statistics.count ): lats = loc.latitude_samples.values lons = loc.longitude_samples.values alts = loc.altitude_samples.values tstp = loc.timestamps.timestamps for i in range(num_pts): source = ( "UNKNOWN" if len(loc.location_providers) != num_pts else COLUMN_TO_ENUM_FN["location_provider"](loc.location_providers[i]) ) locations.append((source, lats[i], lons[i], alts[i], tstp[i])) # set a special flag for later, so we don't add an extra location value source = None elif loc.last_best_location is not None: ts = loc.last_best_location.latitude_longitude_timestamp.mach source = loc.last_best_location.location_provider lat = loc.last_best_location.latitude lon = loc.last_best_location.longitude alt = loc.last_best_location.altitude elif loc.overall_best_location is not None: ts = loc.overall_best_location.latitude_longitude_timestamp.mach source = loc.overall_best_location.location_provider lat = loc.overall_best_location.latitude lon = loc.overall_best_location.longitude alt = loc.overall_best_location.altitude # source is not None if we got only one location through non-usual methods if source is not None: locations.append((source, lat, lon, alt, ts)) return locations