Module redvox.common.packet_to_pyarrow
Converts data from RedVox packets into pyarrow tables.
Expand source code
"""
Converts data from RedVox packets into pyarrow tables.
"""
from typing import Optional, Dict, Callable, List, Tuple
import os
from pathlib import Path
from itertools import repeat
from glob import glob
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
from redvox.api1000.proto.redvox_api_m_pb2 import RedvoxPacketM
from redvox.common import sensor_reader_utils as srupa
from redvox.common import date_time_utils as dtu
from redvox.common import gap_and_pad_utils as gpu
from redvox.common.sensor_data import SensorType
from redvox.common.errors import RedVoxExceptions
packet_schema = pa.schema(
[
("packet_start_mach_timestamp", pa.float64()),
("packet_end_mach_timestamp", pa.float64()),
("packet_start_os_timestamp", pa.float64()),
("packet_end_os_timestamp", pa.float64()),
("timing_info_score", pa.int64()),
]
)
sensor_schema = pa.schema([("description", pa.string()), ("first_timestamp", pa.float64())])
station_schema = pa.schema(
[
("id", pa.string()),
("uuid", pa.string()),
("start_time", pa.float64()),
("api", pa.float64()),
("sub_api", pa.float64()),
("make", pa.string()),
("model", pa.string()),
("os", pa.int64()),
("os_version", pa.string()),
("app", pa.string()),
("app_version", pa.string()),
("is_private", pa.bool_()),
("packet_duration_s", pa.float64()),
("station_description", pa.string()),
]
)
@dataclass_json
@dataclass
class PyarrowSummary:
"""
Summary of a sensor using Pyarrow Tables or parquet files to store the data
Properties:
name: str, name of sensor
stype: SensorType, sensor type of summary
start: float, start timestamp in microseconds since epoch utc of sensor
srate: float, sample rate in Hz
fdir: str, directory where parquet files can be found
scount: int, number of samples to read
smint: float, mean interval of sample rate in seconds
sstd: float, std dev of sample rate in seconds
_data: optional data as a Pyarrow Table
"""
name: str
stype: srupa.SensorType
start: float
srate_hz: float
fdir: str
scount: int
smint_s: float = np.nan
sstd_s: float = np.nan
_data: Optional[pa.Table] = None
def file_name(self) -> str:
"""
:return: full path and file name of where the file should exist
"""
return os.path.join(self.fdir, f"{self.stype.name}_{int(self.start)}.parquet")
def fdir_stem(self) -> str:
"""
:return: the name of the parent directory of the file
"""
return Path(self.fdir).stem
def clean_fdir(self):
"""
remove all parquets in the self.fdir
"""
for f in glob(os.path.join(self.fdir, "*.parquet")):
os.remove(f)
def write_data(self, clean_dir: bool = False) -> str:
"""
write the data being summarized to disk, then remove the data from the object
:param clean_dir: if True, remove any files in the dir before writing the data, default False
:return: the path to the file where the data exists or empty string if data wasn't written
"""
if self.check_data():
os.makedirs(self.fdir, exist_ok=True)
if clean_dir:
self.clean_fdir()
pq.write_table(self._data, self.file_name())
self._data = None
return self.file_name()
return ""
def check_data(self) -> bool:
"""
:return: True if data exists as a property (also means not written to disk)
"""
return True if self._data else False
def data(self) -> Optional[pa.Table]:
"""
:return: the data as a Pyarrow Table
"""
if self.check_data():
return self._data
if os.path.exists(self.file_name()):
return pq.read_table(self.file_name())
return pa.Table.from_pydict({})
@dataclass_json
@dataclass
class AggregateSummary:
"""
aggregate of summaries
properties:
summaries: the summaries of sensors
gaps: gaps in audio data as a list of tuples of start and end time
"""
summaries: List[PyarrowSummary] = field(default_factory=lambda: [])
gaps: List[Tuple[float, float]] = field(default_factory=lambda: [])
errors: RedVoxExceptions = RedVoxExceptions("AggregateSummary")
def add_aggregate_summary(self, agg_sum: "AggregateSummary"):
"""
adds another aggregate summary to this one
:param agg_sum: another aggregate summary to add
"""
self.summaries.extend(agg_sum.summaries)
def add_summary(self, pya_sum: PyarrowSummary):
"""
adds a summary to the aggregate
:param pya_sum: the summary to add
"""
self.summaries.append(pya_sum)
def merge_audio_summaries(self):
"""
combines and replaces all Audio summaries into a single summary; also adds any gaps in the data
"""
pckt_info = []
audio_lst = self.get_audio()
frst_audio = audio_lst[0]
use_mem = frst_audio.check_data()
for adl in audio_lst:
pckt_info.append((int(adl.start), adl.data()))
audio_data = gpu.fill_audio_gaps(pckt_info, dtu.seconds_to_microseconds(1 / frst_audio.srate_hz))
tbl = audio_data.create_timestamps()
frst_audio = PyarrowSummary(
frst_audio.name,
frst_audio.stype,
frst_audio.start,
frst_audio.srate_hz,
frst_audio.fdir,
tbl.num_rows,
frst_audio.smint_s,
frst_audio.sstd_s,
tbl,
)
if not use_mem:
frst_audio.write_data(True)
self.gaps = audio_data.gaps
self.summaries = self.get_non_audio_list()
self.add_summary(frst_audio)
def merge_non_audio_summaries(self):
"""
combines and replaces all summaries per type except for audio summaries
"""
smrs_dict = {}
for smry in self.summaries:
if smry.stype != SensorType.AUDIO:
if smry.stype in smrs_dict.keys():
smrs_dict[smry.stype].append(smry)
else:
smrs_dict[smry.stype] = [smry]
self.summaries = self.get_audio()
for styp, smrys in smrs_dict.items():
if len(smrys) > 0:
combined_mint = np.mean([smrs.smint_s for smrs in smrys])
combined_std = np.mean([smrs.sstd_s for smrs in smrys])
first_summary = smrys.pop(0)
tbl = first_summary.data()
if not first_summary.check_data():
os.makedirs(first_summary.fdir, exist_ok=True)
for smrs in smrys:
tbl = pa.concat_tables([tbl, smrs.data()])
if not first_summary.check_data():
os.remove(smrs.file_name())
if first_summary.check_data():
first_summary._data = tbl
else:
pq.write_table(tbl, first_summary.file_name())
# sort data by timestamps
tbl = pc.take(tbl, pc.sort_indices(tbl, sort_keys=[("timestamps", "ascending")]))
timestamps = tbl["timestamps"].to_numpy()
if len(timestamps) > 1:
mnint = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps))))
stdint = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps))))
else:
mnint = np.nan
stdint = np.nan
if not combined_mint + combined_std > mnint > combined_mint - combined_std:
self.errors.append(
f"Mean interval s of combined {styp.name} sensor does not match the "
f"compilation of individual mean interval s per packet. Will use compilation "
f"of individual values."
)
mnint = combined_mint
stdint = combined_std
single_smry = PyarrowSummary(
first_summary.name,
styp,
first_summary.start,
1 / mnint,
first_summary.fdir,
tbl.num_rows,
mnint,
stdint,
first_summary.data() if first_summary.check_data() else None,
)
self.summaries.append(single_smry)
def merge_summaries_of_type(self, stype: SensorType):
"""
combines and replaces multiple summaries of one SensorType into a single one
*caution: using this on an audio sensor may cause data validation issues*
:param stype: the type of sensor to combine
"""
smrs = []
other_smrs = []
for smry in self.summaries:
if smry.stype == stype:
smrs.append(smry)
else:
other_smrs.append(smry)
first_summary = smrs.pop(0)
tbl = first_summary.data()
if not first_summary.check_data():
os.makedirs(first_summary.fdir, exist_ok=True)
for smrys in smrs:
tbl = pa.concat_tables([first_summary.data(), smrys.data()])
if first_summary.check_data():
first_summary._data = tbl
else:
pq.write_table(tbl, first_summary.file_name())
os.remove(smrys.file_name())
mnint = dtu.microseconds_to_seconds(float(np.mean(np.diff(tbl["timestamps"].to_numpy()))))
stdint = dtu.microseconds_to_seconds(float(np.std(np.diff(tbl["timestamps"].to_numpy()))))
single_smry = PyarrowSummary(
first_summary.name,
first_summary.stype,
first_summary.start,
1 / mnint,
first_summary.fdir,
tbl.num_rows,
mnint,
stdint,
first_summary.data() if first_summary.check_data() else None,
)
self.summaries = other_smrs
self.summaries.append(single_smry)
def merge_all_summaries(self):
"""
merge all PyarrowSummary with the same sensor type into single PyarrowSummary per type
"""
self.merge_audio_summaries()
self.merge_non_audio_summaries()
def get_audio(self) -> List[PyarrowSummary]:
"""
:return: a list of PyarrowSummary of only Audio data
"""
return [s for s in self.summaries if s.stype == srupa.SensorType.AUDIO]
def get_non_audio(self) -> Dict[srupa.SensorType, List[PyarrowSummary]]:
"""
:return: a dictionary of {non-Audio SensorType: PyarrowSummary}
"""
result = {}
for k in self.sensor_types():
if k != srupa.SensorType.AUDIO:
result[k] = [s for s in self.summaries if s.stype == k]
return result
def get_non_audio_list(self) -> List[PyarrowSummary]:
"""
:return: a list of all non-Audio PyarrowSummary
"""
return [s for s in self.summaries if s.stype != srupa.SensorType.AUDIO]
def get_sensor(self, stype: srupa.SensorType) -> List[PyarrowSummary]:
"""
:param stype: type of sensor to find
:return: a list of all PyarrowSummary of the specified type
"""
return [s for s in self.summaries if s.stype == stype]
def sensor_types(self) -> List[srupa.SensorType]:
"""
:return: a list of sensor types in self.summaries
"""
result = []
for s in self.summaries:
if s.stype not in result:
result.append(s.stype)
return result
def stream_to_pyarrow(packets: List[RedvoxPacketM], out_dir: Optional[str] = None) -> AggregateSummary:
"""
stream the packets to parquet files for later processing.
:param packets: redvox packets to convert
:param out_dir: optional directory to write the pyarrow files to; if None, don't write files. default None
:return: AggregateSummary of the sensors' metadata, data, and location of the data if written to disk
"""
summary = AggregateSummary()
for k in map(packet_to_pyarrow, packets, repeat(out_dir)):
for t in k.summaries:
summary.add_summary(t)
return summary
def packet_to_pyarrow(packet: RedvoxPacketM, out_dir: Optional[str] = None) -> AggregateSummary:
"""
gets non-audio sensor information by keeping it memory or writing it into folders named with the sensor names
:param packet: packet to extract data from
:param out_dir: optional directory to write the pyarrow files to; if None, don't write files. default None
:return: AggregateSummary of the sensors' metadata, data, and location of the data if written to disk
"""
result = AggregateSummary()
packet_start = int(packet.timing_information.packet_start_mach_timestamp)
funcs = [
load_apim_audio,
load_apim_compressed_audio,
load_apim_image,
load_apim_health,
load_apim_best_location,
load_apim_location,
load_apim_pressure,
load_apim_light,
load_apim_ambient_temp,
load_apim_rel_humidity,
load_apim_proximity,
load_apim_accelerometer,
load_apim_gyroscope,
load_apim_magnetometer,
load_apim_gravity,
load_apim_linear_accel,
load_apim_orientation,
load_apim_rotation_vector,
load_apim_velocity,
]
sensors = map(lambda fn: fn(packet), funcs)
for data in sensors:
if data:
data.start = packet_start
if out_dir:
data.fdir = os.path.join(out_dir, f"{data.stype.name}_SUMMARY")
data.write_data()
result.add_summary(data)
return result
def load_apim_audio(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load audio data from a single redvox packet
:param packet: packet with data to load
:return: audio sensor type, name, data and sample rate
"""
if srupa.__has_sensor(packet, srupa.__AUDIO_FIELD_NAME):
audio_sensor: RedvoxPacketM.Sensors.Audio = packet.sensors.audio
return PyarrowSummary(
audio_sensor.sensor_description,
srupa.SensorType.AUDIO,
np.nan,
audio_sensor.sample_rate,
"",
int(audio_sensor.samples.value_statistics.count),
1.0 / audio_sensor.sample_rate,
0.0,
pa.Table.from_pydict({"microphone": np.array(audio_sensor.samples.values)}),
)
return None
def load_apim_compressed_audio(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load compressed audio data from a single redvox packet
:param packet: packet with data to load
:return: compressed audio sensor data if it exists, None otherwise
"""
if srupa.__has_sensor(packet, srupa.__COMPRESSED_AUDIO_FIELD_NAME):
comp_audio: RedvoxPacketM.Sensors.CompressedAudio = packet.sensors.compressed_audio
return PyarrowSummary(
comp_audio.sensor_description,
srupa.SensorType.COMPRESSED_AUDIO,
np.nan,
comp_audio.sample_rate,
"",
np.nan,
1.0 / comp_audio.sample_rate,
0.0,
srupa.apim_compressed_audio_to_pyarrow(comp_audio),
)
return None
def load_apim_image(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load image data from a single redvox packet
:param packet: packet with data to load
:return: image sensor data if it exists, None otherwise
"""
if srupa.__has_sensor(packet, srupa.__IMAGE_FIELD_NAME):
image_sensor: RedvoxPacketM.Sensors.Image = packet.sensors.image
timestamps = image_sensor.timestamps.timestamps
additional_inputs = packet.station_information.app_settings.additional_input_sensors
if RedvoxPacketM.StationInformation.AppSettings.InputSensor.IMAGE_PER_SECOND in additional_inputs:
sample_rate = 1.0
# elif RedvoxPacketM.StationInformation.AppSettings.InputSensor.IMAGE_PER_PACKET in additional_inputs:
else:
sample_rate = 1 / srupa.__packet_duration_s(packet)
# else:
# sample_rate = np.nan
return PyarrowSummary(
image_sensor.sensor_description,
srupa.SensorType.IMAGE,
np.nan,
sample_rate,
"",
len(timestamps),
1.0 / sample_rate,
0.0,
srupa.apim_image_to_pyarrow(image_sensor),
)
return None
def load_apim_location(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load location data from a single packet
:param packet: packet with data to load
:return: location sensor data if it exists, None otherwise
"""
if srupa.__has_sensor(packet, srupa.__LOCATION_FIELD_NAME):
loc: RedvoxPacketM.Sensors.Location = packet.sensors.location
timestamps = loc.timestamps.timestamps
if len(timestamps) > 0:
if len(timestamps) > 1:
m_intv = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps))))
intv_std = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps))))
else:
m_intv = srupa.__packet_duration_s(packet)
intv_std = 0.0
return PyarrowSummary(
loc.sensor_description,
srupa.SensorType.LOCATION,
np.nan,
np.nan,
"",
len(timestamps),
m_intv,
intv_std,
srupa.apim_location_to_pyarrow(loc),
)
return None
def load_apim_best_location(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load best location data from a single redvox packet
:param packet: packet with data to load
:return: best location sensor data if it exists, None otherwise
"""
if srupa.__has_sensor(packet, srupa.__LOCATION_FIELD_NAME):
loc: RedvoxPacketM.Sensors.Location = packet.sensors.location
if loc.HasField("last_best_location") or loc.HasField("overall_best_location"):
best_loc: RedvoxPacketM.Sensors.Location.BestLocation
if loc.HasField("last_best_location"):
best_loc = loc.last_best_location
else:
best_loc = loc.overall_best_location
packet_len_s = srupa.__packet_duration_s(packet)
return PyarrowSummary(
loc.sensor_description,
srupa.SensorType.BEST_LOCATION,
np.nan,
1.0 / packet_len_s,
"",
1,
packet_len_s,
0.0,
srupa.apim_best_location_to_pyarrow(best_loc, packet.timing_information.packet_start_mach_timestamp),
)
return None
def load_apim_health(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load station health data from a single redvox packet
:param packet: packet with data to load
:return: station health data if it exists, None otherwise
"""
metrics: RedvoxPacketM.StationInformation.StationMetrics = packet.station_information.station_metrics
timestamps = metrics.timestamps.timestamps
rate = packet.station_information.app_settings.metrics_rate
if rate == RedvoxPacketM.StationInformation.MetricsRate.ONCE_PER_SECOND:
sample_rate = 1
elif rate == RedvoxPacketM.StationInformation.MetricsRate.ONCE_PER_PACKET:
sample_rate = 1 / srupa.__packet_duration_s(packet)
else:
sample_rate = np.nan
if len(timestamps) > 0:
return PyarrowSummary(
"station health",
srupa.SensorType.STATION_HEALTH,
np.nan,
sample_rate,
"",
len(timestamps),
1.0 / sample_rate,
0.0,
srupa.apim_health_to_pyarrow(metrics),
)
return None
def load_single(
packet: RedvoxPacketM,
sensor_type: srupa.SensorType,
) -> Optional[PyarrowSummary]:
field_name: str = srupa.__SENSOR_TYPE_TO_FIELD_NAME[sensor_type]
sensor_fn: Optional[Callable[[RedvoxPacketM], srupa.Sensor]] = srupa.__SENSOR_TYPE_TO_SENSOR_FN[sensor_type]
if srupa.__has_sensor(packet, field_name) and sensor_fn is not None:
sensor = sensor_fn(packet)
t = sensor.timestamps.timestamps
if len(t) > 1:
m_intv = dtu.microseconds_to_seconds(float(np.mean(np.diff(t))))
intv_std = dtu.microseconds_to_seconds(float(np.std(np.diff(t))))
else:
m_intv = srupa.__packet_duration_s(packet)
intv_std = 0.0
if len(t) > 0:
return PyarrowSummary(
sensor.sensor_description,
sensor_type,
np.nan,
np.nan,
"",
len(t),
m_intv,
intv_std,
srupa.read_apim_single_sensor(sensor, field_name),
)
return None
def load_apim_pressure(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load pressure data from a single redvox packet
:param packet: packet with data to load
:return: pressure sensor data if it exists, None otherwise
"""
return load_single(packet, srupa.SensorType.PRESSURE)
def load_apim_light(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load light data from a single redvox packet
:param packet: packet with data to load
:return: light sensor data if it exists, None otherwise
"""
return load_single(packet, srupa.SensorType.LIGHT)
def load_apim_proximity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load proximity data from a single redvox packet
:param packet: packet with data to load
:return: proximity sensor data if it exists, None otherwise
"""
return load_single(packet, srupa.SensorType.PROXIMITY)
def load_apim_ambient_temp(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load ambient temperature data from a single redvox packet
:param packet: packet with data to load
:return: ambient temperature sensor data if it exists, None otherwise
"""
return load_single(packet, srupa.SensorType.AMBIENT_TEMPERATURE)
def load_apim_rel_humidity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load relative humidity data from a single redvox packet
:param packet: packet with data to load
:return: relative humidity sensor data if it exists, None otherwise
"""
return load_single(packet, srupa.SensorType.RELATIVE_HUMIDITY)
def load_xyz(
packet: RedvoxPacketM,
sensor_type: srupa.SensorType,
) -> Optional[PyarrowSummary]:
field_name: str = srupa.__SENSOR_TYPE_TO_FIELD_NAME[sensor_type]
sensor_fn: Optional[Callable[[RedvoxPacketM], srupa.Sensor]] = srupa.__SENSOR_TYPE_TO_SENSOR_FN[sensor_type]
if srupa.__has_sensor(packet, field_name) and sensor_fn is not None:
sensor = sensor_fn(packet)
t = sensor.timestamps.timestamps
if len(t) > 1:
m_intv = dtu.microseconds_to_seconds(float(np.mean(np.diff(t))))
intv_std = dtu.microseconds_to_seconds(float(np.std(np.diff(t))))
else:
m_intv = srupa.__packet_duration_s(packet)
intv_std = 0.0
if len(t) > 0:
# read packet.station_information.app_settings.additional_input_sensors for fast sensors
# rename if needed
return PyarrowSummary(
sensor.sensor_description,
sensor_type,
np.nan,
np.nan,
"",
len(t),
m_intv,
intv_std,
srupa.read_apim_xyz_sensor(sensor, field_name),
)
return None
def load_apim_accelerometer(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load accelerometer data from a single redvox packet
:param packet: packet with data to load
:return: accelerometer sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.ACCELEROMETER)
def load_apim_magnetometer(
packet: RedvoxPacketM,
) -> Optional[PyarrowSummary]:
"""
load magnetometer data from a single redvox packet
:param packet: packet with data to load
:return: magnetometer sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.MAGNETOMETER)
def load_apim_gyroscope(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load gyroscope data from a single redvox packet
:param packet: packet with data to load
:return: gyroscope sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.GYROSCOPE)
def load_apim_gravity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load gravity data from a single redvox packet
:param packet: packet with data to load
:return: gravity sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.GRAVITY)
def load_apim_orientation(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load orientation data from a single redvox packet
:param packet: packet with data to load
:return: orientation sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.ORIENTATION)
def load_apim_linear_accel(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load linear acceleration data from a single redvox packet
:param packet: packet with data to load
:return: linear acceleration sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.LINEAR_ACCELERATION)
def load_apim_rotation_vector(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load rotation vector data from a single redvox packet
:param packet: packet with data to load
:return: rotation vector sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.ROTATION_VECTOR)
def load_apim_velocity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]:
"""
load velocity data from a single redvox packet
:param packet: packet with data to load
:return: velocity sensor data if it exists, None otherwise
"""
return load_xyz(packet, srupa.SensorType.VELOCITY)
Functions
def load_apim_accelerometer(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load accelerometer data from a single redvox packet
:param packet: packet with data to load :return: accelerometer sensor data if it exists, None otherwise
Expand source code
def load_apim_accelerometer(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load accelerometer data from a single redvox packet :param packet: packet with data to load :return: accelerometer sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.ACCELEROMETER)
def load_apim_ambient_temp(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load ambient temperature data from a single redvox packet
:param packet: packet with data to load :return: ambient temperature sensor data if it exists, None otherwise
Expand source code
def load_apim_ambient_temp(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load ambient temperature data from a single redvox packet :param packet: packet with data to load :return: ambient temperature sensor data if it exists, None otherwise """ return load_single(packet, srupa.SensorType.AMBIENT_TEMPERATURE)
def load_apim_audio(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load audio data from a single redvox packet
:param packet: packet with data to load :return: audio sensor type, name, data and sample rate
Expand source code
def load_apim_audio(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load audio data from a single redvox packet :param packet: packet with data to load :return: audio sensor type, name, data and sample rate """ if srupa.__has_sensor(packet, srupa.__AUDIO_FIELD_NAME): audio_sensor: RedvoxPacketM.Sensors.Audio = packet.sensors.audio return PyarrowSummary( audio_sensor.sensor_description, srupa.SensorType.AUDIO, np.nan, audio_sensor.sample_rate, "", int(audio_sensor.samples.value_statistics.count), 1.0 / audio_sensor.sample_rate, 0.0, pa.Table.from_pydict({"microphone": np.array(audio_sensor.samples.values)}), ) return None
def load_apim_best_location(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load best location data from a single redvox packet
:param packet: packet with data to load :return: best location sensor data if it exists, None otherwise
Expand source code
def load_apim_best_location(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load best location data from a single redvox packet :param packet: packet with data to load :return: best location sensor data if it exists, None otherwise """ if srupa.__has_sensor(packet, srupa.__LOCATION_FIELD_NAME): loc: RedvoxPacketM.Sensors.Location = packet.sensors.location if loc.HasField("last_best_location") or loc.HasField("overall_best_location"): best_loc: RedvoxPacketM.Sensors.Location.BestLocation if loc.HasField("last_best_location"): best_loc = loc.last_best_location else: best_loc = loc.overall_best_location packet_len_s = srupa.__packet_duration_s(packet) return PyarrowSummary( loc.sensor_description, srupa.SensorType.BEST_LOCATION, np.nan, 1.0 / packet_len_s, "", 1, packet_len_s, 0.0, srupa.apim_best_location_to_pyarrow(best_loc, packet.timing_information.packet_start_mach_timestamp), ) return None
def load_apim_compressed_audio(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load compressed audio data from a single redvox packet
:param packet: packet with data to load :return: compressed audio sensor data if it exists, None otherwise
Expand source code
def load_apim_compressed_audio(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load compressed audio data from a single redvox packet :param packet: packet with data to load :return: compressed audio sensor data if it exists, None otherwise """ if srupa.__has_sensor(packet, srupa.__COMPRESSED_AUDIO_FIELD_NAME): comp_audio: RedvoxPacketM.Sensors.CompressedAudio = packet.sensors.compressed_audio return PyarrowSummary( comp_audio.sensor_description, srupa.SensorType.COMPRESSED_AUDIO, np.nan, comp_audio.sample_rate, "", np.nan, 1.0 / comp_audio.sample_rate, 0.0, srupa.apim_compressed_audio_to_pyarrow(comp_audio), ) return None
def load_apim_gravity(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load gravity data from a single redvox packet
:param packet: packet with data to load :return: gravity sensor data if it exists, None otherwise
Expand source code
def load_apim_gravity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load gravity data from a single redvox packet :param packet: packet with data to load :return: gravity sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.GRAVITY)
def load_apim_gyroscope(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load gyroscope data from a single redvox packet
:param packet: packet with data to load :return: gyroscope sensor data if it exists, None otherwise
Expand source code
def load_apim_gyroscope(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load gyroscope data from a single redvox packet :param packet: packet with data to load :return: gyroscope sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.GYROSCOPE)
def load_apim_health(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load station health data from a single redvox packet
:param packet: packet with data to load :return: station health data if it exists, None otherwise
Expand source code
def load_apim_health(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load station health data from a single redvox packet :param packet: packet with data to load :return: station health data if it exists, None otherwise """ metrics: RedvoxPacketM.StationInformation.StationMetrics = packet.station_information.station_metrics timestamps = metrics.timestamps.timestamps rate = packet.station_information.app_settings.metrics_rate if rate == RedvoxPacketM.StationInformation.MetricsRate.ONCE_PER_SECOND: sample_rate = 1 elif rate == RedvoxPacketM.StationInformation.MetricsRate.ONCE_PER_PACKET: sample_rate = 1 / srupa.__packet_duration_s(packet) else: sample_rate = np.nan if len(timestamps) > 0: return PyarrowSummary( "station health", srupa.SensorType.STATION_HEALTH, np.nan, sample_rate, "", len(timestamps), 1.0 / sample_rate, 0.0, srupa.apim_health_to_pyarrow(metrics), ) return None
def load_apim_image(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load image data from a single redvox packet
:param packet: packet with data to load :return: image sensor data if it exists, None otherwise
Expand source code
def load_apim_image(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load image data from a single redvox packet :param packet: packet with data to load :return: image sensor data if it exists, None otherwise """ if srupa.__has_sensor(packet, srupa.__IMAGE_FIELD_NAME): image_sensor: RedvoxPacketM.Sensors.Image = packet.sensors.image timestamps = image_sensor.timestamps.timestamps additional_inputs = packet.station_information.app_settings.additional_input_sensors if RedvoxPacketM.StationInformation.AppSettings.InputSensor.IMAGE_PER_SECOND in additional_inputs: sample_rate = 1.0 # elif RedvoxPacketM.StationInformation.AppSettings.InputSensor.IMAGE_PER_PACKET in additional_inputs: else: sample_rate = 1 / srupa.__packet_duration_s(packet) # else: # sample_rate = np.nan return PyarrowSummary( image_sensor.sensor_description, srupa.SensorType.IMAGE, np.nan, sample_rate, "", len(timestamps), 1.0 / sample_rate, 0.0, srupa.apim_image_to_pyarrow(image_sensor), ) return None
def load_apim_light(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load light data from a single redvox packet
:param packet: packet with data to load :return: light sensor data if it exists, None otherwise
Expand source code
def load_apim_light(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load light data from a single redvox packet :param packet: packet with data to load :return: light sensor data if it exists, None otherwise """ return load_single(packet, srupa.SensorType.LIGHT)
def load_apim_linear_accel(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load linear acceleration data from a single redvox packet
:param packet: packet with data to load :return: linear acceleration sensor data if it exists, None otherwise
Expand source code
def load_apim_linear_accel(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load linear acceleration data from a single redvox packet :param packet: packet with data to load :return: linear acceleration sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.LINEAR_ACCELERATION)
def load_apim_location(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load location data from a single packet
:param packet: packet with data to load :return: location sensor data if it exists, None otherwise
Expand source code
def load_apim_location(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load location data from a single packet :param packet: packet with data to load :return: location sensor data if it exists, None otherwise """ if srupa.__has_sensor(packet, srupa.__LOCATION_FIELD_NAME): loc: RedvoxPacketM.Sensors.Location = packet.sensors.location timestamps = loc.timestamps.timestamps if len(timestamps) > 0: if len(timestamps) > 1: m_intv = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps)))) intv_std = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps)))) else: m_intv = srupa.__packet_duration_s(packet) intv_std = 0.0 return PyarrowSummary( loc.sensor_description, srupa.SensorType.LOCATION, np.nan, np.nan, "", len(timestamps), m_intv, intv_std, srupa.apim_location_to_pyarrow(loc), ) return None
def load_apim_magnetometer(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load magnetometer data from a single redvox packet
:param packet: packet with data to load :return: magnetometer sensor data if it exists, None otherwise
Expand source code
def load_apim_magnetometer( packet: RedvoxPacketM, ) -> Optional[PyarrowSummary]: """ load magnetometer data from a single redvox packet :param packet: packet with data to load :return: magnetometer sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.MAGNETOMETER)
def load_apim_orientation(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load orientation data from a single redvox packet
:param packet: packet with data to load :return: orientation sensor data if it exists, None otherwise
Expand source code
def load_apim_orientation(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load orientation data from a single redvox packet :param packet: packet with data to load :return: orientation sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.ORIENTATION)
def load_apim_pressure(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load pressure data from a single redvox packet
:param packet: packet with data to load :return: pressure sensor data if it exists, None otherwise
Expand source code
def load_apim_pressure(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load pressure data from a single redvox packet :param packet: packet with data to load :return: pressure sensor data if it exists, None otherwise """ return load_single(packet, srupa.SensorType.PRESSURE)
def load_apim_proximity(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load proximity data from a single redvox packet
:param packet: packet with data to load :return: proximity sensor data if it exists, None otherwise
Expand source code
def load_apim_proximity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load proximity data from a single redvox packet :param packet: packet with data to load :return: proximity sensor data if it exists, None otherwise """ return load_single(packet, srupa.SensorType.PROXIMITY)
def load_apim_rel_humidity(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load relative humidity data from a single redvox packet
:param packet: packet with data to load :return: relative humidity sensor data if it exists, None otherwise
Expand source code
def load_apim_rel_humidity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load relative humidity data from a single redvox packet :param packet: packet with data to load :return: relative humidity sensor data if it exists, None otherwise """ return load_single(packet, srupa.SensorType.RELATIVE_HUMIDITY)
def load_apim_rotation_vector(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load rotation vector data from a single redvox packet
:param packet: packet with data to load :return: rotation vector sensor data if it exists, None otherwise
Expand source code
def load_apim_rotation_vector(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load rotation vector data from a single redvox packet :param packet: packet with data to load :return: rotation vector sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.ROTATION_VECTOR)
def load_apim_velocity(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> Optional[PyarrowSummary]
-
load velocity data from a single redvox packet
:param packet: packet with data to load :return: velocity sensor data if it exists, None otherwise
Expand source code
def load_apim_velocity(packet: RedvoxPacketM) -> Optional[PyarrowSummary]: """ load velocity data from a single redvox packet :param packet: packet with data to load :return: velocity sensor data if it exists, None otherwise """ return load_xyz(packet, srupa.SensorType.VELOCITY)
def load_single(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM, sensor_type: SensorType) ‑> Optional[PyarrowSummary]
-
Expand source code
def load_single( packet: RedvoxPacketM, sensor_type: srupa.SensorType, ) -> Optional[PyarrowSummary]: field_name: str = srupa.__SENSOR_TYPE_TO_FIELD_NAME[sensor_type] sensor_fn: Optional[Callable[[RedvoxPacketM], srupa.Sensor]] = srupa.__SENSOR_TYPE_TO_SENSOR_FN[sensor_type] if srupa.__has_sensor(packet, field_name) and sensor_fn is not None: sensor = sensor_fn(packet) t = sensor.timestamps.timestamps if len(t) > 1: m_intv = dtu.microseconds_to_seconds(float(np.mean(np.diff(t)))) intv_std = dtu.microseconds_to_seconds(float(np.std(np.diff(t)))) else: m_intv = srupa.__packet_duration_s(packet) intv_std = 0.0 if len(t) > 0: return PyarrowSummary( sensor.sensor_description, sensor_type, np.nan, np.nan, "", len(t), m_intv, intv_std, srupa.read_apim_single_sensor(sensor, field_name), ) return None
def load_xyz(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM, sensor_type: SensorType) ‑> Optional[PyarrowSummary]
-
Expand source code
def load_xyz( packet: RedvoxPacketM, sensor_type: srupa.SensorType, ) -> Optional[PyarrowSummary]: field_name: str = srupa.__SENSOR_TYPE_TO_FIELD_NAME[sensor_type] sensor_fn: Optional[Callable[[RedvoxPacketM], srupa.Sensor]] = srupa.__SENSOR_TYPE_TO_SENSOR_FN[sensor_type] if srupa.__has_sensor(packet, field_name) and sensor_fn is not None: sensor = sensor_fn(packet) t = sensor.timestamps.timestamps if len(t) > 1: m_intv = dtu.microseconds_to_seconds(float(np.mean(np.diff(t)))) intv_std = dtu.microseconds_to_seconds(float(np.std(np.diff(t)))) else: m_intv = srupa.__packet_duration_s(packet) intv_std = 0.0 if len(t) > 0: # read packet.station_information.app_settings.additional_input_sensors for fast sensors # rename if needed return PyarrowSummary( sensor.sensor_description, sensor_type, np.nan, np.nan, "", len(t), m_intv, intv_std, srupa.read_apim_xyz_sensor(sensor, field_name), ) return None
def packet_to_pyarrow(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM, out_dir: Optional[str] = None) ‑> AggregateSummary
-
gets non-audio sensor information by keeping it memory or writing it into folders named with the sensor names
:param packet: packet to extract data from :param out_dir: optional directory to write the pyarrow files to; if None, don't write files. default None :return: AggregateSummary of the sensors' metadata, data, and location of the data if written to disk
Expand source code
def packet_to_pyarrow(packet: RedvoxPacketM, out_dir: Optional[str] = None) -> AggregateSummary: """ gets non-audio sensor information by keeping it memory or writing it into folders named with the sensor names :param packet: packet to extract data from :param out_dir: optional directory to write the pyarrow files to; if None, don't write files. default None :return: AggregateSummary of the sensors' metadata, data, and location of the data if written to disk """ result = AggregateSummary() packet_start = int(packet.timing_information.packet_start_mach_timestamp) funcs = [ load_apim_audio, load_apim_compressed_audio, load_apim_image, load_apim_health, load_apim_best_location, load_apim_location, load_apim_pressure, load_apim_light, load_apim_ambient_temp, load_apim_rel_humidity, load_apim_proximity, load_apim_accelerometer, load_apim_gyroscope, load_apim_magnetometer, load_apim_gravity, load_apim_linear_accel, load_apim_orientation, load_apim_rotation_vector, load_apim_velocity, ] sensors = map(lambda fn: fn(packet), funcs) for data in sensors: if data: data.start = packet_start if out_dir: data.fdir = os.path.join(out_dir, f"{data.stype.name}_SUMMARY") data.write_data() result.add_summary(data) return result
def stream_to_pyarrow(packets: List[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM], out_dir: Optional[str] = None) ‑> AggregateSummary
-
stream the packets to parquet files for later processing.
:param packets: redvox packets to convert :param out_dir: optional directory to write the pyarrow files to; if None, don't write files. default None :return: AggregateSummary of the sensors' metadata, data, and location of the data if written to disk
Expand source code
def stream_to_pyarrow(packets: List[RedvoxPacketM], out_dir: Optional[str] = None) -> AggregateSummary: """ stream the packets to parquet files for later processing. :param packets: redvox packets to convert :param out_dir: optional directory to write the pyarrow files to; if None, don't write files. default None :return: AggregateSummary of the sensors' metadata, data, and location of the data if written to disk """ summary = AggregateSummary() for k in map(packet_to_pyarrow, packets, repeat(out_dir)): for t in k.summaries: summary.add_summary(t) return summary
Classes
class AggregateSummary (summaries: List[PyarrowSummary] = <factory>, gaps: List[Tuple[float, float]] = <factory>, errors: RedVoxExceptions = {'obj_class': 'AggregateSummary'})
-
aggregate of summaries
properties: summaries: the summaries of sensors gaps: gaps in audio data as a list of tuples of start and end time
Expand source code
@dataclass_json @dataclass class AggregateSummary: """ aggregate of summaries properties: summaries: the summaries of sensors gaps: gaps in audio data as a list of tuples of start and end time """ summaries: List[PyarrowSummary] = field(default_factory=lambda: []) gaps: List[Tuple[float, float]] = field(default_factory=lambda: []) errors: RedVoxExceptions = RedVoxExceptions("AggregateSummary") def add_aggregate_summary(self, agg_sum: "AggregateSummary"): """ adds another aggregate summary to this one :param agg_sum: another aggregate summary to add """ self.summaries.extend(agg_sum.summaries) def add_summary(self, pya_sum: PyarrowSummary): """ adds a summary to the aggregate :param pya_sum: the summary to add """ self.summaries.append(pya_sum) def merge_audio_summaries(self): """ combines and replaces all Audio summaries into a single summary; also adds any gaps in the data """ pckt_info = [] audio_lst = self.get_audio() frst_audio = audio_lst[0] use_mem = frst_audio.check_data() for adl in audio_lst: pckt_info.append((int(adl.start), adl.data())) audio_data = gpu.fill_audio_gaps(pckt_info, dtu.seconds_to_microseconds(1 / frst_audio.srate_hz)) tbl = audio_data.create_timestamps() frst_audio = PyarrowSummary( frst_audio.name, frst_audio.stype, frst_audio.start, frst_audio.srate_hz, frst_audio.fdir, tbl.num_rows, frst_audio.smint_s, frst_audio.sstd_s, tbl, ) if not use_mem: frst_audio.write_data(True) self.gaps = audio_data.gaps self.summaries = self.get_non_audio_list() self.add_summary(frst_audio) def merge_non_audio_summaries(self): """ combines and replaces all summaries per type except for audio summaries """ smrs_dict = {} for smry in self.summaries: if smry.stype != SensorType.AUDIO: if smry.stype in smrs_dict.keys(): smrs_dict[smry.stype].append(smry) else: smrs_dict[smry.stype] = [smry] self.summaries = self.get_audio() for styp, smrys in smrs_dict.items(): if len(smrys) > 0: combined_mint = np.mean([smrs.smint_s for smrs in smrys]) combined_std = np.mean([smrs.sstd_s for smrs in smrys]) first_summary = smrys.pop(0) tbl = first_summary.data() if not first_summary.check_data(): os.makedirs(first_summary.fdir, exist_ok=True) for smrs in smrys: tbl = pa.concat_tables([tbl, smrs.data()]) if not first_summary.check_data(): os.remove(smrs.file_name()) if first_summary.check_data(): first_summary._data = tbl else: pq.write_table(tbl, first_summary.file_name()) # sort data by timestamps tbl = pc.take(tbl, pc.sort_indices(tbl, sort_keys=[("timestamps", "ascending")])) timestamps = tbl["timestamps"].to_numpy() if len(timestamps) > 1: mnint = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps)))) stdint = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps)))) else: mnint = np.nan stdint = np.nan if not combined_mint + combined_std > mnint > combined_mint - combined_std: self.errors.append( f"Mean interval s of combined {styp.name} sensor does not match the " f"compilation of individual mean interval s per packet. Will use compilation " f"of individual values." ) mnint = combined_mint stdint = combined_std single_smry = PyarrowSummary( first_summary.name, styp, first_summary.start, 1 / mnint, first_summary.fdir, tbl.num_rows, mnint, stdint, first_summary.data() if first_summary.check_data() else None, ) self.summaries.append(single_smry) def merge_summaries_of_type(self, stype: SensorType): """ combines and replaces multiple summaries of one SensorType into a single one *caution: using this on an audio sensor may cause data validation issues* :param stype: the type of sensor to combine """ smrs = [] other_smrs = [] for smry in self.summaries: if smry.stype == stype: smrs.append(smry) else: other_smrs.append(smry) first_summary = smrs.pop(0) tbl = first_summary.data() if not first_summary.check_data(): os.makedirs(first_summary.fdir, exist_ok=True) for smrys in smrs: tbl = pa.concat_tables([first_summary.data(), smrys.data()]) if first_summary.check_data(): first_summary._data = tbl else: pq.write_table(tbl, first_summary.file_name()) os.remove(smrys.file_name()) mnint = dtu.microseconds_to_seconds(float(np.mean(np.diff(tbl["timestamps"].to_numpy())))) stdint = dtu.microseconds_to_seconds(float(np.std(np.diff(tbl["timestamps"].to_numpy())))) single_smry = PyarrowSummary( first_summary.name, first_summary.stype, first_summary.start, 1 / mnint, first_summary.fdir, tbl.num_rows, mnint, stdint, first_summary.data() if first_summary.check_data() else None, ) self.summaries = other_smrs self.summaries.append(single_smry) def merge_all_summaries(self): """ merge all PyarrowSummary with the same sensor type into single PyarrowSummary per type """ self.merge_audio_summaries() self.merge_non_audio_summaries() def get_audio(self) -> List[PyarrowSummary]: """ :return: a list of PyarrowSummary of only Audio data """ return [s for s in self.summaries if s.stype == srupa.SensorType.AUDIO] def get_non_audio(self) -> Dict[srupa.SensorType, List[PyarrowSummary]]: """ :return: a dictionary of {non-Audio SensorType: PyarrowSummary} """ result = {} for k in self.sensor_types(): if k != srupa.SensorType.AUDIO: result[k] = [s for s in self.summaries if s.stype == k] return result def get_non_audio_list(self) -> List[PyarrowSummary]: """ :return: a list of all non-Audio PyarrowSummary """ return [s for s in self.summaries if s.stype != srupa.SensorType.AUDIO] def get_sensor(self, stype: srupa.SensorType) -> List[PyarrowSummary]: """ :param stype: type of sensor to find :return: a list of all PyarrowSummary of the specified type """ return [s for s in self.summaries if s.stype == stype] def sensor_types(self) -> List[srupa.SensorType]: """ :return: a list of sensor types in self.summaries """ result = [] for s in self.summaries: if s.stype not in result: result.append(s.stype) return result
Class variables
var errors : RedVoxExceptions
var gaps : List[Tuple[float, float]]
var summaries : List[PyarrowSummary]
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def add_aggregate_summary(self, agg_sum: AggregateSummary)
-
adds another aggregate summary to this one
:param agg_sum: another aggregate summary to add
Expand source code
def add_aggregate_summary(self, agg_sum: "AggregateSummary"): """ adds another aggregate summary to this one :param agg_sum: another aggregate summary to add """ self.summaries.extend(agg_sum.summaries)
def add_summary(self, pya_sum: PyarrowSummary)
-
adds a summary to the aggregate
:param pya_sum: the summary to add
Expand source code
def add_summary(self, pya_sum: PyarrowSummary): """ adds a summary to the aggregate :param pya_sum: the summary to add """ self.summaries.append(pya_sum)
def get_audio(self) ‑> List[PyarrowSummary]
-
:return: a list of PyarrowSummary of only Audio data
Expand source code
def get_audio(self) -> List[PyarrowSummary]: """ :return: a list of PyarrowSummary of only Audio data """ return [s for s in self.summaries if s.stype == srupa.SensorType.AUDIO]
def get_non_audio(self) ‑> Dict[SensorType, List[PyarrowSummary]]
-
:return: a dictionary of {non-Audio SensorType: PyarrowSummary}
Expand source code
def get_non_audio(self) -> Dict[srupa.SensorType, List[PyarrowSummary]]: """ :return: a dictionary of {non-Audio SensorType: PyarrowSummary} """ result = {} for k in self.sensor_types(): if k != srupa.SensorType.AUDIO: result[k] = [s for s in self.summaries if s.stype == k] return result
def get_non_audio_list(self) ‑> List[PyarrowSummary]
-
:return: a list of all non-Audio PyarrowSummary
Expand source code
def get_non_audio_list(self) -> List[PyarrowSummary]: """ :return: a list of all non-Audio PyarrowSummary """ return [s for s in self.summaries if s.stype != srupa.SensorType.AUDIO]
def get_sensor(self, stype: SensorType) ‑> List[PyarrowSummary]
-
:param stype: type of sensor to find :return: a list of all PyarrowSummary of the specified type
Expand source code
def get_sensor(self, stype: srupa.SensorType) -> List[PyarrowSummary]: """ :param stype: type of sensor to find :return: a list of all PyarrowSummary of the specified type """ return [s for s in self.summaries if s.stype == stype]
def merge_all_summaries(self)
-
merge all PyarrowSummary with the same sensor type into single PyarrowSummary per type
Expand source code
def merge_all_summaries(self): """ merge all PyarrowSummary with the same sensor type into single PyarrowSummary per type """ self.merge_audio_summaries() self.merge_non_audio_summaries()
def merge_audio_summaries(self)
-
combines and replaces all Audio summaries into a single summary; also adds any gaps in the data
Expand source code
def merge_audio_summaries(self): """ combines and replaces all Audio summaries into a single summary; also adds any gaps in the data """ pckt_info = [] audio_lst = self.get_audio() frst_audio = audio_lst[0] use_mem = frst_audio.check_data() for adl in audio_lst: pckt_info.append((int(adl.start), adl.data())) audio_data = gpu.fill_audio_gaps(pckt_info, dtu.seconds_to_microseconds(1 / frst_audio.srate_hz)) tbl = audio_data.create_timestamps() frst_audio = PyarrowSummary( frst_audio.name, frst_audio.stype, frst_audio.start, frst_audio.srate_hz, frst_audio.fdir, tbl.num_rows, frst_audio.smint_s, frst_audio.sstd_s, tbl, ) if not use_mem: frst_audio.write_data(True) self.gaps = audio_data.gaps self.summaries = self.get_non_audio_list() self.add_summary(frst_audio)
def merge_non_audio_summaries(self)
-
combines and replaces all summaries per type except for audio summaries
Expand source code
def merge_non_audio_summaries(self): """ combines and replaces all summaries per type except for audio summaries """ smrs_dict = {} for smry in self.summaries: if smry.stype != SensorType.AUDIO: if smry.stype in smrs_dict.keys(): smrs_dict[smry.stype].append(smry) else: smrs_dict[smry.stype] = [smry] self.summaries = self.get_audio() for styp, smrys in smrs_dict.items(): if len(smrys) > 0: combined_mint = np.mean([smrs.smint_s for smrs in smrys]) combined_std = np.mean([smrs.sstd_s for smrs in smrys]) first_summary = smrys.pop(0) tbl = first_summary.data() if not first_summary.check_data(): os.makedirs(first_summary.fdir, exist_ok=True) for smrs in smrys: tbl = pa.concat_tables([tbl, smrs.data()]) if not first_summary.check_data(): os.remove(smrs.file_name()) if first_summary.check_data(): first_summary._data = tbl else: pq.write_table(tbl, first_summary.file_name()) # sort data by timestamps tbl = pc.take(tbl, pc.sort_indices(tbl, sort_keys=[("timestamps", "ascending")])) timestamps = tbl["timestamps"].to_numpy() if len(timestamps) > 1: mnint = dtu.microseconds_to_seconds(float(np.mean(np.diff(timestamps)))) stdint = dtu.microseconds_to_seconds(float(np.std(np.diff(timestamps)))) else: mnint = np.nan stdint = np.nan if not combined_mint + combined_std > mnint > combined_mint - combined_std: self.errors.append( f"Mean interval s of combined {styp.name} sensor does not match the " f"compilation of individual mean interval s per packet. Will use compilation " f"of individual values." ) mnint = combined_mint stdint = combined_std single_smry = PyarrowSummary( first_summary.name, styp, first_summary.start, 1 / mnint, first_summary.fdir, tbl.num_rows, mnint, stdint, first_summary.data() if first_summary.check_data() else None, ) self.summaries.append(single_smry)
def merge_summaries_of_type(self, stype: SensorType)
-
combines and replaces multiple summaries of one SensorType into a single one
caution: using this on an audio sensor may cause data validation issues
:param stype: the type of sensor to combine
Expand source code
def merge_summaries_of_type(self, stype: SensorType): """ combines and replaces multiple summaries of one SensorType into a single one *caution: using this on an audio sensor may cause data validation issues* :param stype: the type of sensor to combine """ smrs = [] other_smrs = [] for smry in self.summaries: if smry.stype == stype: smrs.append(smry) else: other_smrs.append(smry) first_summary = smrs.pop(0) tbl = first_summary.data() if not first_summary.check_data(): os.makedirs(first_summary.fdir, exist_ok=True) for smrys in smrs: tbl = pa.concat_tables([first_summary.data(), smrys.data()]) if first_summary.check_data(): first_summary._data = tbl else: pq.write_table(tbl, first_summary.file_name()) os.remove(smrys.file_name()) mnint = dtu.microseconds_to_seconds(float(np.mean(np.diff(tbl["timestamps"].to_numpy())))) stdint = dtu.microseconds_to_seconds(float(np.std(np.diff(tbl["timestamps"].to_numpy())))) single_smry = PyarrowSummary( first_summary.name, first_summary.stype, first_summary.start, 1 / mnint, first_summary.fdir, tbl.num_rows, mnint, stdint, first_summary.data() if first_summary.check_data() else None, ) self.summaries = other_smrs self.summaries.append(single_smry)
def sensor_types(self) ‑> List[SensorType]
-
:return: a list of sensor types in self.summaries
Expand source code
def sensor_types(self) -> List[srupa.SensorType]: """ :return: a list of sensor types in self.summaries """ result = [] for s in self.summaries: if s.stype not in result: result.append(s.stype) return result
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class PyarrowSummary (name: str, stype: SensorType, start: float, srate_hz: float, fdir: str, scount: int, smint_s: float = nan, sstd_s: float = nan)
-
Summary of a sensor using Pyarrow Tables or parquet files to store the data
Properties
name: str, name of sensor
stype: SensorType, sensor type of summary
start: float, start timestamp in microseconds since epoch utc of sensor
srate: float, sample rate in Hz
fdir: str, directory where parquet files can be found
scount: int, number of samples to read
smint: float, mean interval of sample rate in seconds
sstd: float, std dev of sample rate in seconds
_data: optional data as a Pyarrow Table
Expand source code
@dataclass_json @dataclass class PyarrowSummary: """ Summary of a sensor using Pyarrow Tables or parquet files to store the data Properties: name: str, name of sensor stype: SensorType, sensor type of summary start: float, start timestamp in microseconds since epoch utc of sensor srate: float, sample rate in Hz fdir: str, directory where parquet files can be found scount: int, number of samples to read smint: float, mean interval of sample rate in seconds sstd: float, std dev of sample rate in seconds _data: optional data as a Pyarrow Table """ name: str stype: srupa.SensorType start: float srate_hz: float fdir: str scount: int smint_s: float = np.nan sstd_s: float = np.nan _data: Optional[pa.Table] = None def file_name(self) -> str: """ :return: full path and file name of where the file should exist """ return os.path.join(self.fdir, f"{self.stype.name}_{int(self.start)}.parquet") def fdir_stem(self) -> str: """ :return: the name of the parent directory of the file """ return Path(self.fdir).stem def clean_fdir(self): """ remove all parquets in the self.fdir """ for f in glob(os.path.join(self.fdir, "*.parquet")): os.remove(f) def write_data(self, clean_dir: bool = False) -> str: """ write the data being summarized to disk, then remove the data from the object :param clean_dir: if True, remove any files in the dir before writing the data, default False :return: the path to the file where the data exists or empty string if data wasn't written """ if self.check_data(): os.makedirs(self.fdir, exist_ok=True) if clean_dir: self.clean_fdir() pq.write_table(self._data, self.file_name()) self._data = None return self.file_name() return "" def check_data(self) -> bool: """ :return: True if data exists as a property (also means not written to disk) """ return True if self._data else False def data(self) -> Optional[pa.Table]: """ :return: the data as a Pyarrow Table """ if self.check_data(): return self._data if os.path.exists(self.file_name()): return pq.read_table(self.file_name()) return pa.Table.from_pydict({})
Class variables
var fdir : str
var name : str
var scount : int
var smint_s : float
var srate_hz : float
var sstd_s : float
var start : float
var stype : SensorType
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def check_data(self) ‑> bool
-
:return: True if data exists as a property (also means not written to disk)
Expand source code
def check_data(self) -> bool: """ :return: True if data exists as a property (also means not written to disk) """ return True if self._data else False
def clean_fdir(self)
-
remove all parquets in the self.fdir
Expand source code
def clean_fdir(self): """ remove all parquets in the self.fdir """ for f in glob(os.path.join(self.fdir, "*.parquet")): os.remove(f)
def data(self) ‑> Optional[pyarrow.lib.Table]
-
:return: the data as a Pyarrow Table
Expand source code
def data(self) -> Optional[pa.Table]: """ :return: the data as a Pyarrow Table """ if self.check_data(): return self._data if os.path.exists(self.file_name()): return pq.read_table(self.file_name()) return pa.Table.from_pydict({})
def fdir_stem(self) ‑> str
-
:return: the name of the parent directory of the file
Expand source code
def fdir_stem(self) -> str: """ :return: the name of the parent directory of the file """ return Path(self.fdir).stem
def file_name(self) ‑> str
-
:return: full path and file name of where the file should exist
Expand source code
def file_name(self) -> str: """ :return: full path and file name of where the file should exist """ return os.path.join(self.fdir, f"{self.stype.name}_{int(self.start)}.parquet")
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
def write_data(self, clean_dir: bool = False) ‑> str
-
write the data being summarized to disk, then remove the data from the object
:param clean_dir: if True, remove any files in the dir before writing the data, default False :return: the path to the file where the data exists or empty string if data wasn't written
Expand source code
def write_data(self, clean_dir: bool = False) -> str: """ write the data being summarized to disk, then remove the data from the object :param clean_dir: if True, remove any files in the dir before writing the data, default False :return: the path to the file where the data exists or empty string if data wasn't written """ if self.check_data(): os.makedirs(self.fdir, exist_ok=True) if clean_dir: self.clean_fdir() pq.write_table(self._data, self.file_name()) self._data = None return self.file_name() return ""
class RedvoxPacketM (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
var DoubleSamplePayload
-
A ProtocolMessage
var EventStream
-
A ProtocolMessage
var MetadataEntry
-
A ProtocolMessage
var SamplePayload
-
A ProtocolMessage
var Sensors
-
A ProtocolMessage
var StationInformation
-
A ProtocolMessage
var SummaryStatistics
-
A ProtocolMessage
var TimingInformation
-
A ProtocolMessage
var TimingPayload
-
A ProtocolMessage
var Unit