Module redvox.common.session_model
Expand source code
import os.path
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
import redvox
import redvox.api1000.proto.redvox_api_m_pb2 as api_m
import redvox.common.date_time_utils as dtu
import redvox.common.session_io as s_io
import redvox.common.session_model_utils as smu
from redvox.cloud import session_model_api as cloud_sm
from redvox.common.errors import RedVoxError, RedVoxExceptions
SESSION_VERSION = "2023-08-16" # Version of the SessionModel
CLIENT_NAME = "redvox-sdk/session_model" # Name of the client used to create the SessionModel
CLIENT_VERSION = SESSION_VERSION # Version of the client used to create the SessionModel
APP_NAME = "RedVox" # Default name of the app
DAILY_SESSION_NAME = "Day" # Identifier for day-long dynamic sessions
HOURLY_SESSION_NAME = "Hour" # Identifier for hour-long dynamic sessions
class SessionModel:
"""
SDK version of Session from the cloud API
"""
def __init__(
self, session: Optional[cloud_sm.Session] = None, dynamic: Optional[Dict[str, cloud_sm.DynamicSession]] = None
):
self.cloud_session: Optional[cloud_sm.Session] = session
self.dynamic_sessions: Dict[str, cloud_sm.DynamicSession] = {} if dynamic is None else dynamic
self._sdk_version: str = redvox.VERSION
self._errors: RedVoxExceptions = RedVoxExceptions("SessionModel")
def __repr__(self):
return (
f"cloud_session: {self.cloud_session}, "
f"dynamic_sessions: {self.dynamic_sessions}, "
f"sdk_version: {self._sdk_version}, "
f"errors: {self._errors}"
)
def as_dict(self) -> dict:
"""
:return: SessionModel as dictionary
"""
return {
"cloud_session": self.cloud_session.to_dict(),
"dynamic_sessions": {n: m.to_dict() for n, m in self.dynamic_sessions.items()},
}
@staticmethod
def from_dict(dictionary: Dict) -> "SessionModel":
"""
:param dictionary: dictionary to read from
:return: SessionModel from the dict
"""
return SessionModel(
cloud_sm.Session.from_dict(dictionary["cloud_session"]),
{n: cloud_sm.DynamicSession.from_dict(m) for n, m in dictionary["dynamic_sessions"].items()},
)
def compress(self, out_dir: str = ".") -> Path:
"""
Compresses this SessionModel to a file at out_dir.
Uses the id and start_ts to name the file.
:param out_dir: Directory to save file to. Default "." (current directory)
:return: The path to the written file.
"""
return s_io.compress_session_model(self, out_dir)
def save(self, out_type: str = "json", out_dir: str = ".") -> Path:
"""
Save the SessionModel to disk. Options for out_type are "json" for JSON file and "pkl" for .pkl file.
Defaults to "json". File will be named after id and start_ts of the SessionModel
:param out_type: "json" for JSON file and "pkl" for .pkl file
:param out_dir: Directory to save file to. Default "." (current directory)
:return: path to saved file
"""
if out_type == "pkl":
return self.compress(out_dir)
return s_io.session_model_to_json_file(self, out_dir)
@staticmethod
def load(file_path: str) -> "SessionModel":
"""
Load only works on a JSON or .pkl file.
:param file_path: full name and path to the SessionModel file
:return: SessionModel from a JSON or .pkl file.
"""
ext = os.path.splitext(file_path)[1]
if ext == ".json":
return SessionModel.from_dict(s_io.session_model_dict_from_json_file(file_path))
elif ext == ".pkl":
return s_io.decompress_session_model(file_path)
else:
raise ValueError(f"{file_path} has unknown file extension; this function only accepts json and pkl files.")
def default_file_name(self) -> str:
"""
:return: Default file name as [id]_[start_ts]_model, with start_ts as integer of microseconds
since epoch UTC. File extension NOT included.
"""
return (
f"{self.cloud_session.id}_"
f"{0 if np.isnan(self.cloud_session.start_ts) else self.cloud_session.start_ts}_model"
)
@staticmethod
def create_from_packet(packet: api_m.RedvoxPacketM) -> "SessionModel":
"""
:param packet: API M packet of data to read
:return: Session using the data from the packet
"""
try:
duration = (
packet.timing_information.packet_end_mach_timestamp
- packet.timing_information.packet_start_mach_timestamp
)
all_sensors = smu.get_all_sensors_in_packet(packet)
sensors = [cloud_sm.Sensor(s[0], s[1], smu.add_to_stats(s[2])) for s in all_sensors]
local_ts = smu.get_local_timesync(packet)
if local_ts is None:
raise RedVoxError(
f"Unable to find timing data for station {packet.station_information.id}.\n"
f"Timing is required to complete SessionModel.\nNow Quitting."
)
fst_lst = cloud_sm.FirstLastBufTimeSync([], smu.NUM_BUFFER_POINTS, [], smu.NUM_BUFFER_POINTS)
for f in local_ts[5]:
smu.add_to_fst_buffer(fst_lst.fst, fst_lst.fst_max_size, f.ts, f)
smu.add_to_lst_buffer(fst_lst.lst, fst_lst.lst_max_size, f.ts, f)
timing = cloud_sm.Timing(local_ts[0], local_ts[1], local_ts[2], local_ts[3], local_ts[4], fst_lst)
result = SessionModel(
cloud_sm.Session(
id=packet.station_information.id,
uuid=packet.station_information.uuid,
desc=packet.station_information.description,
start_ts=int(packet.timing_information.app_start_mach_timestamp),
client=CLIENT_NAME,
client_ver=CLIENT_VERSION,
session_ver=SESSION_VERSION,
app=APP_NAME,
api=int(packet.api),
sub_api=int(packet.sub_api),
make=packet.station_information.make,
model=packet.station_information.model,
app_ver=packet.station_information.app_version,
owner=packet.station_information.auth_id,
private=packet.station_information.is_private,
packet_dur=duration,
sensors=sensors,
n_pkts=1,
timing=timing,
sub=[],
)
)
result.cloud_session.sub = [result.add_dynamic_day(packet)]
except Exception as e:
raise e
return result
@staticmethod
def create_from_stream(data_stream: List[api_m.RedvoxPacketM]) -> "SessionModel":
"""
:param data_stream: list of API M packets from a single station to read
:return: SessionModel using the data packets from the stream
"""
p1 = data_stream.pop(0)
model = SessionModel.create_from_packet(p1)
for p in data_stream:
model.add_data_from_packet(p)
data_stream.insert(0, p1)
return model
def add_data_from_packet(self, packet: api_m.RedvoxPacketM):
"""
Adds the data from the packet to the SessionModel.
If the packet doesn't match the key of the SessionModel, writes an error and no data is added
:param packet: packet to add
"""
if (
self.cloud_session.session_key() != f"{packet.station_information.id}:{packet.station_information.uuid}:"
f"{int(packet.timing_information.app_start_mach_timestamp)}"
):
self._errors.append(
"Attempted to add packet with invalid key: "
f"{packet.station_information.id}:{packet.station_information.uuid}:"
f"{int(packet.timing_information.app_start_mach_timestamp)}"
)
return
local_ts = smu.get_local_timesync(packet)
if local_ts is None:
self._errors.append(
f"Timesync doesn't exist in packet starting at "
f"{packet.timing_information.packet_start_mach_timestamp}."
)
else:
timing = self.cloud_session.timing
for f in local_ts[5]:
smu.add_to_fst_buffer(timing.fst_lst.fst, timing.fst_lst.fst_max_size, f.ts, f)
smu.add_to_lst_buffer(timing.fst_lst.lst, timing.fst_lst.lst_max_size, f.ts, f)
timing.n_ex += local_ts[2]
timing.mean_lat = (timing.mean_lat * self.cloud_session.n_pkts + local_ts[3]) / (
self.cloud_session.n_pkts + 1
)
timing.mean_off = (timing.mean_off * self.cloud_session.n_pkts + local_ts[4]) / (
self.cloud_session.n_pkts + 1
)
if local_ts[0] < timing.first_data_ts:
timing.first_data_ts = local_ts[0]
if local_ts[1] > timing.last_data_ts:
timing.last_data_ts = local_ts[1]
all_sensors = smu.get_all_sensors_in_packet(packet)
for s in all_sensors:
sensor = self.get_sensor(s[0], s[1])
if sensor is not None:
sensor.sample_rate_stats = smu.add_to_stats(s[2], sensor.sample_rate_stats)
else:
self.cloud_session.sensors.append(cloud_sm.Sensor(s[0], s[1], smu.add_to_stats(s[2])))
self.add_dynamic_day(packet)
self.cloud_session.n_pkts += 1
def add_dynamic_hour(self, data: dict, packet_start: float, session_key: str) -> str:
"""
Add (or update an existing session if key exists) a dynamic session with length of 1 hour using a single packet
:param data: dictionary of data to add
:param packet_start: starting timestamp of the packet in microseconds since epoch UTC
:param session_key: the session key of the parent Session
:return: the key to the new dynamic session
"""
start_dt = dtu.datetime_from_epoch_microseconds_utc(packet_start)
hour_start_dt = dtu.datetime(start_dt.year, start_dt.month, start_dt.day, start_dt.hour)
hour_end_ts = int(dtu.datetime_to_epoch_microseconds_utc(hour_start_dt + dtu.timedelta(hours=1)))
hour_start_ts = int(dtu.datetime_to_epoch_microseconds_utc(hour_start_dt))
dynamic_key = f"{hour_start_ts}:{hour_end_ts}"
key = f"{session_key}:{dynamic_key}"
if key in self.dynamic_sessions.keys():
self._update_dynamic_session(key, data, [f"{int(packet_start)}"])
else:
self.dynamic_sessions[key] = cloud_sm.DynamicSession(
1,
smu.add_location_data(data["location"]),
smu.add_to_stats(data["battery"]),
smu.add_to_stats(data["temperature"]),
session_key,
hour_start_ts,
hour_end_ts,
HOURLY_SESSION_NAME,
[f"{int(packet_start)}"],
)
return dynamic_key
def add_dynamic_day(self, packet: api_m.RedvoxPacketM) -> str:
"""
Add (or update an existing session if key exists) a dynamic session with length of 1 day using a single packet
:param packet: packet to read data from
:return: the key to the new or updated dynamic session
"""
data = smu.get_dynamic_data(packet)
start_dt = dtu.datetime_from_epoch_microseconds_utc(packet.timing_information.packet_start_mach_timestamp)
day_start_dt = dtu.datetime(start_dt.year, start_dt.month, start_dt.day)
day_end_ts = int(dtu.datetime_to_epoch_microseconds_utc(day_start_dt + dtu.timedelta(days=1)))
day_start_ts = int(dtu.datetime_to_epoch_microseconds_utc(day_start_dt))
dynamic_key = f"{day_start_ts}:{day_end_ts}"
session_key = (
f"{packet.station_information.id}:{packet.station_information.uuid}:"
f"{int(packet.timing_information.app_start_mach_timestamp)}"
)
key = f"{session_key}:{dynamic_key}"
hourly_key = self.add_dynamic_hour(data, packet.timing_information.packet_start_mach_timestamp, session_key)
if key in self.dynamic_sessions.keys():
self._update_dynamic_session(key, data, [hourly_key])
else:
self.dynamic_sessions[key] = cloud_sm.DynamicSession(
1,
smu.add_location_data(data["location"]),
smu.add_to_stats(data["battery"]),
smu.add_to_stats(data["temperature"]),
session_key,
day_start_ts,
day_end_ts,
DAILY_SESSION_NAME,
[hourly_key],
)
return dynamic_key
def _update_dynamic_session(self, key: str, data: Dict, sub: List[str]):
"""
update a dynamic session with a given key.
:param key: key to the dynamic session
:param data: dictionary of data to add
:param sub: the list of keys that the dynamic session is linked to
"""
if key not in self.dynamic_sessions.keys():
self._errors.append(f"Attempted to update non-existent key: {key}.")
else:
self.dynamic_sessions[key].n_pkts += 1
self.dynamic_sessions[key].location = smu.add_location_data(
data["location"], self.dynamic_sessions[key].location
)
self.dynamic_sessions[key].battery = smu.add_to_stats(data["battery"], self.dynamic_sessions[key].battery)
self.dynamic_sessions[key].temperature = smu.add_to_stats(
data["temperature"], self.dynamic_sessions[key].temperature
)
for s in sub:
if s not in self.dynamic_sessions[key].sub:
self.dynamic_sessions[key].sub.append(s)
child_key = f"{self.dynamic_sessions[key].session_key}:{s}"
if child_key in self.dynamic_sessions.keys():
self._update_dynamic_session(s, data, self.dynamic_sessions[child_key].sub)
def sdk_version(self) -> str:
"""
:return: sdk version used to create the SessionModel
"""
return self._sdk_version
def num_sensors(self) -> int:
"""
:return: number of sensors in the Session
"""
return len(self.cloud_session.sensors)
def get_sensor_names(self) -> List[str]:
"""
:return: number of sensors in the Session
"""
return [n.name for n in self.cloud_session.sensors]
def get_sensor(self, name: str, desc: Optional[str] = None) -> Optional[cloud_sm.Sensor]:
"""
:param name: name of the sensor to get
:param desc: Optional description of the sensor to get. If None, will get the first sensor that
matches the name given. Default None.
:return: the first sensor that matches the name and description given or None if sensor was not found
"""
for s in self.cloud_session.sensors:
if s.name == name:
if desc is None or s.description == desc:
return s
return None
def audio_sample_rate_nominal_hz(self) -> float:
"""
:return: number of sensors in the Session
"""
for n in self.cloud_session.sensors:
if n.name == "audio":
return n.sample_rate_stats.welford.mean
def get_daily_dynamic_sessions(self) -> List[cloud_sm.DynamicSession]:
"""
:return: all day-long dynamic sessions in the Session
"""
return [n for n in self.dynamic_sessions.values() if n.dur == DAILY_SESSION_NAME]
def get_hourly_dynamic_sessions(self) -> List[cloud_sm.DynamicSession]:
"""
:return: all hour-long dynamic sessions in the Session
"""
return [n for n in self.dynamic_sessions.values() if n.dur == HOURLY_SESSION_NAME]
def print_errors(self):
"""
print all errors encountered by the SessionModel
"""
self._errors.print()
class LocalSessionModels:
"""
SDK version of SessionModelsResp from the cloud API
"""
def __init__(self):
self.sessions: List[SessionModel] = []
def __repr__(self):
return f"sessions: {[s.__repr__() for s in self.sessions]}"
def as_dict(self) -> Dict:
"""
:return: LocalSessionModels as a dictionary
"""
return {"sessions": [s.as_dict() for s in self.sessions]}
@staticmethod
def from_dict(in_dict: Dict) -> "LocalSessionModels":
"""
:param in_dict: dictionary to convert from
:return: LocalSessionModels object from dictionary
"""
if "sessions" not in in_dict.keys():
return LocalSessionModels()
result = LocalSessionModels()
result.sessions = in_dict["sessions"]
return result
def add_packet(self, packet: api_m.RedvoxPacketM) -> str:
"""
add a packet to one of the models, or make a new one
:param packet: packet of data to add.
:return: key of new or updated packet
"""
key = (
f"{packet.station_information.id}:{packet.station_information.uuid}:"
f"{int(packet.timing_information.app_start_mach_timestamp)}"
)
for s in self.sessions:
if key == s.cloud_session.session_key():
s.cloud_session.add_data_from_packet(packet)
return s.cloud_session.session_key()
# if here, key is not in the sessions.
self.sessions.append(SessionModel.create_from_packet(packet))
return key
def get_session(self, key: str) -> Optional[SessionModel]:
"""
:param key: key of SessionModel to get
:return: SessionModel that matches the given key or None
"""
for s in self.sessions:
if key == s.cloud_session.session_key():
return s
return None
Classes
class LocalSessionModels
-
SDK version of SessionModelsResp from the cloud API
Expand source code
class LocalSessionModels: """ SDK version of SessionModelsResp from the cloud API """ def __init__(self): self.sessions: List[SessionModel] = [] def __repr__(self): return f"sessions: {[s.__repr__() for s in self.sessions]}" def as_dict(self) -> Dict: """ :return: LocalSessionModels as a dictionary """ return {"sessions": [s.as_dict() for s in self.sessions]} @staticmethod def from_dict(in_dict: Dict) -> "LocalSessionModels": """ :param in_dict: dictionary to convert from :return: LocalSessionModels object from dictionary """ if "sessions" not in in_dict.keys(): return LocalSessionModels() result = LocalSessionModels() result.sessions = in_dict["sessions"] return result def add_packet(self, packet: api_m.RedvoxPacketM) -> str: """ add a packet to one of the models, or make a new one :param packet: packet of data to add. :return: key of new or updated packet """ key = ( f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ) for s in self.sessions: if key == s.cloud_session.session_key(): s.cloud_session.add_data_from_packet(packet) return s.cloud_session.session_key() # if here, key is not in the sessions. self.sessions.append(SessionModel.create_from_packet(packet)) return key def get_session(self, key: str) -> Optional[SessionModel]: """ :param key: key of SessionModel to get :return: SessionModel that matches the given key or None """ for s in self.sessions: if key == s.cloud_session.session_key(): return s return None
Static methods
def from_dict(in_dict: Dict) ‑> LocalSessionModels
-
:param in_dict: dictionary to convert from :return: LocalSessionModels object from dictionary
Expand source code
@staticmethod def from_dict(in_dict: Dict) -> "LocalSessionModels": """ :param in_dict: dictionary to convert from :return: LocalSessionModels object from dictionary """ if "sessions" not in in_dict.keys(): return LocalSessionModels() result = LocalSessionModels() result.sessions = in_dict["sessions"] return result
Methods
def add_packet(self, packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> str
-
add a packet to one of the models, or make a new one
:param packet: packet of data to add. :return: key of new or updated packet
Expand source code
def add_packet(self, packet: api_m.RedvoxPacketM) -> str: """ add a packet to one of the models, or make a new one :param packet: packet of data to add. :return: key of new or updated packet """ key = ( f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ) for s in self.sessions: if key == s.cloud_session.session_key(): s.cloud_session.add_data_from_packet(packet) return s.cloud_session.session_key() # if here, key is not in the sessions. self.sessions.append(SessionModel.create_from_packet(packet)) return key
def as_dict(self) ‑> Dict
-
:return: LocalSessionModels as a dictionary
Expand source code
def as_dict(self) -> Dict: """ :return: LocalSessionModels as a dictionary """ return {"sessions": [s.as_dict() for s in self.sessions]}
def get_session(self, key: str) ‑> Optional[SessionModel]
-
:param key: key of SessionModel to get :return: SessionModel that matches the given key or None
Expand source code
def get_session(self, key: str) -> Optional[SessionModel]: """ :param key: key of SessionModel to get :return: SessionModel that matches the given key or None """ for s in self.sessions: if key == s.cloud_session.session_key(): return s return None
class SessionModel (session: Optional[Session] = None, dynamic: Optional[Dict[str, DynamicSession]] = None)
-
SDK version of Session from the cloud API
Expand source code
class SessionModel: """ SDK version of Session from the cloud API """ def __init__( self, session: Optional[cloud_sm.Session] = None, dynamic: Optional[Dict[str, cloud_sm.DynamicSession]] = None ): self.cloud_session: Optional[cloud_sm.Session] = session self.dynamic_sessions: Dict[str, cloud_sm.DynamicSession] = {} if dynamic is None else dynamic self._sdk_version: str = redvox.VERSION self._errors: RedVoxExceptions = RedVoxExceptions("SessionModel") def __repr__(self): return ( f"cloud_session: {self.cloud_session}, " f"dynamic_sessions: {self.dynamic_sessions}, " f"sdk_version: {self._sdk_version}, " f"errors: {self._errors}" ) def as_dict(self) -> dict: """ :return: SessionModel as dictionary """ return { "cloud_session": self.cloud_session.to_dict(), "dynamic_sessions": {n: m.to_dict() for n, m in self.dynamic_sessions.items()}, } @staticmethod def from_dict(dictionary: Dict) -> "SessionModel": """ :param dictionary: dictionary to read from :return: SessionModel from the dict """ return SessionModel( cloud_sm.Session.from_dict(dictionary["cloud_session"]), {n: cloud_sm.DynamicSession.from_dict(m) for n, m in dictionary["dynamic_sessions"].items()}, ) def compress(self, out_dir: str = ".") -> Path: """ Compresses this SessionModel to a file at out_dir. Uses the id and start_ts to name the file. :param out_dir: Directory to save file to. Default "." (current directory) :return: The path to the written file. """ return s_io.compress_session_model(self, out_dir) def save(self, out_type: str = "json", out_dir: str = ".") -> Path: """ Save the SessionModel to disk. Options for out_type are "json" for JSON file and "pkl" for .pkl file. Defaults to "json". File will be named after id and start_ts of the SessionModel :param out_type: "json" for JSON file and "pkl" for .pkl file :param out_dir: Directory to save file to. Default "." (current directory) :return: path to saved file """ if out_type == "pkl": return self.compress(out_dir) return s_io.session_model_to_json_file(self, out_dir) @staticmethod def load(file_path: str) -> "SessionModel": """ Load only works on a JSON or .pkl file. :param file_path: full name and path to the SessionModel file :return: SessionModel from a JSON or .pkl file. """ ext = os.path.splitext(file_path)[1] if ext == ".json": return SessionModel.from_dict(s_io.session_model_dict_from_json_file(file_path)) elif ext == ".pkl": return s_io.decompress_session_model(file_path) else: raise ValueError(f"{file_path} has unknown file extension; this function only accepts json and pkl files.") def default_file_name(self) -> str: """ :return: Default file name as [id]_[start_ts]_model, with start_ts as integer of microseconds since epoch UTC. File extension NOT included. """ return ( f"{self.cloud_session.id}_" f"{0 if np.isnan(self.cloud_session.start_ts) else self.cloud_session.start_ts}_model" ) @staticmethod def create_from_packet(packet: api_m.RedvoxPacketM) -> "SessionModel": """ :param packet: API M packet of data to read :return: Session using the data from the packet """ try: duration = ( packet.timing_information.packet_end_mach_timestamp - packet.timing_information.packet_start_mach_timestamp ) all_sensors = smu.get_all_sensors_in_packet(packet) sensors = [cloud_sm.Sensor(s[0], s[1], smu.add_to_stats(s[2])) for s in all_sensors] local_ts = smu.get_local_timesync(packet) if local_ts is None: raise RedVoxError( f"Unable to find timing data for station {packet.station_information.id}.\n" f"Timing is required to complete SessionModel.\nNow Quitting." ) fst_lst = cloud_sm.FirstLastBufTimeSync([], smu.NUM_BUFFER_POINTS, [], smu.NUM_BUFFER_POINTS) for f in local_ts[5]: smu.add_to_fst_buffer(fst_lst.fst, fst_lst.fst_max_size, f.ts, f) smu.add_to_lst_buffer(fst_lst.lst, fst_lst.lst_max_size, f.ts, f) timing = cloud_sm.Timing(local_ts[0], local_ts[1], local_ts[2], local_ts[3], local_ts[4], fst_lst) result = SessionModel( cloud_sm.Session( id=packet.station_information.id, uuid=packet.station_information.uuid, desc=packet.station_information.description, start_ts=int(packet.timing_information.app_start_mach_timestamp), client=CLIENT_NAME, client_ver=CLIENT_VERSION, session_ver=SESSION_VERSION, app=APP_NAME, api=int(packet.api), sub_api=int(packet.sub_api), make=packet.station_information.make, model=packet.station_information.model, app_ver=packet.station_information.app_version, owner=packet.station_information.auth_id, private=packet.station_information.is_private, packet_dur=duration, sensors=sensors, n_pkts=1, timing=timing, sub=[], ) ) result.cloud_session.sub = [result.add_dynamic_day(packet)] except Exception as e: raise e return result @staticmethod def create_from_stream(data_stream: List[api_m.RedvoxPacketM]) -> "SessionModel": """ :param data_stream: list of API M packets from a single station to read :return: SessionModel using the data packets from the stream """ p1 = data_stream.pop(0) model = SessionModel.create_from_packet(p1) for p in data_stream: model.add_data_from_packet(p) data_stream.insert(0, p1) return model def add_data_from_packet(self, packet: api_m.RedvoxPacketM): """ Adds the data from the packet to the SessionModel. If the packet doesn't match the key of the SessionModel, writes an error and no data is added :param packet: packet to add """ if ( self.cloud_session.session_key() != f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ): self._errors.append( "Attempted to add packet with invalid key: " f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ) return local_ts = smu.get_local_timesync(packet) if local_ts is None: self._errors.append( f"Timesync doesn't exist in packet starting at " f"{packet.timing_information.packet_start_mach_timestamp}." ) else: timing = self.cloud_session.timing for f in local_ts[5]: smu.add_to_fst_buffer(timing.fst_lst.fst, timing.fst_lst.fst_max_size, f.ts, f) smu.add_to_lst_buffer(timing.fst_lst.lst, timing.fst_lst.lst_max_size, f.ts, f) timing.n_ex += local_ts[2] timing.mean_lat = (timing.mean_lat * self.cloud_session.n_pkts + local_ts[3]) / ( self.cloud_session.n_pkts + 1 ) timing.mean_off = (timing.mean_off * self.cloud_session.n_pkts + local_ts[4]) / ( self.cloud_session.n_pkts + 1 ) if local_ts[0] < timing.first_data_ts: timing.first_data_ts = local_ts[0] if local_ts[1] > timing.last_data_ts: timing.last_data_ts = local_ts[1] all_sensors = smu.get_all_sensors_in_packet(packet) for s in all_sensors: sensor = self.get_sensor(s[0], s[1]) if sensor is not None: sensor.sample_rate_stats = smu.add_to_stats(s[2], sensor.sample_rate_stats) else: self.cloud_session.sensors.append(cloud_sm.Sensor(s[0], s[1], smu.add_to_stats(s[2]))) self.add_dynamic_day(packet) self.cloud_session.n_pkts += 1 def add_dynamic_hour(self, data: dict, packet_start: float, session_key: str) -> str: """ Add (or update an existing session if key exists) a dynamic session with length of 1 hour using a single packet :param data: dictionary of data to add :param packet_start: starting timestamp of the packet in microseconds since epoch UTC :param session_key: the session key of the parent Session :return: the key to the new dynamic session """ start_dt = dtu.datetime_from_epoch_microseconds_utc(packet_start) hour_start_dt = dtu.datetime(start_dt.year, start_dt.month, start_dt.day, start_dt.hour) hour_end_ts = int(dtu.datetime_to_epoch_microseconds_utc(hour_start_dt + dtu.timedelta(hours=1))) hour_start_ts = int(dtu.datetime_to_epoch_microseconds_utc(hour_start_dt)) dynamic_key = f"{hour_start_ts}:{hour_end_ts}" key = f"{session_key}:{dynamic_key}" if key in self.dynamic_sessions.keys(): self._update_dynamic_session(key, data, [f"{int(packet_start)}"]) else: self.dynamic_sessions[key] = cloud_sm.DynamicSession( 1, smu.add_location_data(data["location"]), smu.add_to_stats(data["battery"]), smu.add_to_stats(data["temperature"]), session_key, hour_start_ts, hour_end_ts, HOURLY_SESSION_NAME, [f"{int(packet_start)}"], ) return dynamic_key def add_dynamic_day(self, packet: api_m.RedvoxPacketM) -> str: """ Add (or update an existing session if key exists) a dynamic session with length of 1 day using a single packet :param packet: packet to read data from :return: the key to the new or updated dynamic session """ data = smu.get_dynamic_data(packet) start_dt = dtu.datetime_from_epoch_microseconds_utc(packet.timing_information.packet_start_mach_timestamp) day_start_dt = dtu.datetime(start_dt.year, start_dt.month, start_dt.day) day_end_ts = int(dtu.datetime_to_epoch_microseconds_utc(day_start_dt + dtu.timedelta(days=1))) day_start_ts = int(dtu.datetime_to_epoch_microseconds_utc(day_start_dt)) dynamic_key = f"{day_start_ts}:{day_end_ts}" session_key = ( f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ) key = f"{session_key}:{dynamic_key}" hourly_key = self.add_dynamic_hour(data, packet.timing_information.packet_start_mach_timestamp, session_key) if key in self.dynamic_sessions.keys(): self._update_dynamic_session(key, data, [hourly_key]) else: self.dynamic_sessions[key] = cloud_sm.DynamicSession( 1, smu.add_location_data(data["location"]), smu.add_to_stats(data["battery"]), smu.add_to_stats(data["temperature"]), session_key, day_start_ts, day_end_ts, DAILY_SESSION_NAME, [hourly_key], ) return dynamic_key def _update_dynamic_session(self, key: str, data: Dict, sub: List[str]): """ update a dynamic session with a given key. :param key: key to the dynamic session :param data: dictionary of data to add :param sub: the list of keys that the dynamic session is linked to """ if key not in self.dynamic_sessions.keys(): self._errors.append(f"Attempted to update non-existent key: {key}.") else: self.dynamic_sessions[key].n_pkts += 1 self.dynamic_sessions[key].location = smu.add_location_data( data["location"], self.dynamic_sessions[key].location ) self.dynamic_sessions[key].battery = smu.add_to_stats(data["battery"], self.dynamic_sessions[key].battery) self.dynamic_sessions[key].temperature = smu.add_to_stats( data["temperature"], self.dynamic_sessions[key].temperature ) for s in sub: if s not in self.dynamic_sessions[key].sub: self.dynamic_sessions[key].sub.append(s) child_key = f"{self.dynamic_sessions[key].session_key}:{s}" if child_key in self.dynamic_sessions.keys(): self._update_dynamic_session(s, data, self.dynamic_sessions[child_key].sub) def sdk_version(self) -> str: """ :return: sdk version used to create the SessionModel """ return self._sdk_version def num_sensors(self) -> int: """ :return: number of sensors in the Session """ return len(self.cloud_session.sensors) def get_sensor_names(self) -> List[str]: """ :return: number of sensors in the Session """ return [n.name for n in self.cloud_session.sensors] def get_sensor(self, name: str, desc: Optional[str] = None) -> Optional[cloud_sm.Sensor]: """ :param name: name of the sensor to get :param desc: Optional description of the sensor to get. If None, will get the first sensor that matches the name given. Default None. :return: the first sensor that matches the name and description given or None if sensor was not found """ for s in self.cloud_session.sensors: if s.name == name: if desc is None or s.description == desc: return s return None def audio_sample_rate_nominal_hz(self) -> float: """ :return: number of sensors in the Session """ for n in self.cloud_session.sensors: if n.name == "audio": return n.sample_rate_stats.welford.mean def get_daily_dynamic_sessions(self) -> List[cloud_sm.DynamicSession]: """ :return: all day-long dynamic sessions in the Session """ return [n for n in self.dynamic_sessions.values() if n.dur == DAILY_SESSION_NAME] def get_hourly_dynamic_sessions(self) -> List[cloud_sm.DynamicSession]: """ :return: all hour-long dynamic sessions in the Session """ return [n for n in self.dynamic_sessions.values() if n.dur == HOURLY_SESSION_NAME] def print_errors(self): """ print all errors encountered by the SessionModel """ self._errors.print()
Static methods
def create_from_packet(packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> SessionModel
-
:param packet: API M packet of data to read :return: Session using the data from the packet
Expand source code
@staticmethod def create_from_packet(packet: api_m.RedvoxPacketM) -> "SessionModel": """ :param packet: API M packet of data to read :return: Session using the data from the packet """ try: duration = ( packet.timing_information.packet_end_mach_timestamp - packet.timing_information.packet_start_mach_timestamp ) all_sensors = smu.get_all_sensors_in_packet(packet) sensors = [cloud_sm.Sensor(s[0], s[1], smu.add_to_stats(s[2])) for s in all_sensors] local_ts = smu.get_local_timesync(packet) if local_ts is None: raise RedVoxError( f"Unable to find timing data for station {packet.station_information.id}.\n" f"Timing is required to complete SessionModel.\nNow Quitting." ) fst_lst = cloud_sm.FirstLastBufTimeSync([], smu.NUM_BUFFER_POINTS, [], smu.NUM_BUFFER_POINTS) for f in local_ts[5]: smu.add_to_fst_buffer(fst_lst.fst, fst_lst.fst_max_size, f.ts, f) smu.add_to_lst_buffer(fst_lst.lst, fst_lst.lst_max_size, f.ts, f) timing = cloud_sm.Timing(local_ts[0], local_ts[1], local_ts[2], local_ts[3], local_ts[4], fst_lst) result = SessionModel( cloud_sm.Session( id=packet.station_information.id, uuid=packet.station_information.uuid, desc=packet.station_information.description, start_ts=int(packet.timing_information.app_start_mach_timestamp), client=CLIENT_NAME, client_ver=CLIENT_VERSION, session_ver=SESSION_VERSION, app=APP_NAME, api=int(packet.api), sub_api=int(packet.sub_api), make=packet.station_information.make, model=packet.station_information.model, app_ver=packet.station_information.app_version, owner=packet.station_information.auth_id, private=packet.station_information.is_private, packet_dur=duration, sensors=sensors, n_pkts=1, timing=timing, sub=[], ) ) result.cloud_session.sub = [result.add_dynamic_day(packet)] except Exception as e: raise e return result
def create_from_stream(data_stream: List[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM]) ‑> SessionModel
-
:param data_stream: list of API M packets from a single station to read :return: SessionModel using the data packets from the stream
Expand source code
@staticmethod def create_from_stream(data_stream: List[api_m.RedvoxPacketM]) -> "SessionModel": """ :param data_stream: list of API M packets from a single station to read :return: SessionModel using the data packets from the stream """ p1 = data_stream.pop(0) model = SessionModel.create_from_packet(p1) for p in data_stream: model.add_data_from_packet(p) data_stream.insert(0, p1) return model
def from_dict(dictionary: Dict) ‑> SessionModel
-
:param dictionary: dictionary to read from :return: SessionModel from the dict
Expand source code
@staticmethod def from_dict(dictionary: Dict) -> "SessionModel": """ :param dictionary: dictionary to read from :return: SessionModel from the dict """ return SessionModel( cloud_sm.Session.from_dict(dictionary["cloud_session"]), {n: cloud_sm.DynamicSession.from_dict(m) for n, m in dictionary["dynamic_sessions"].items()}, )
def load(file_path: str) ‑> SessionModel
-
Load only works on a JSON or .pkl file.
:param file_path: full name and path to the SessionModel file :return: SessionModel from a JSON or .pkl file.
Expand source code
@staticmethod def load(file_path: str) -> "SessionModel": """ Load only works on a JSON or .pkl file. :param file_path: full name and path to the SessionModel file :return: SessionModel from a JSON or .pkl file. """ ext = os.path.splitext(file_path)[1] if ext == ".json": return SessionModel.from_dict(s_io.session_model_dict_from_json_file(file_path)) elif ext == ".pkl": return s_io.decompress_session_model(file_path) else: raise ValueError(f"{file_path} has unknown file extension; this function only accepts json and pkl files.")
Methods
def add_data_from_packet(self, packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM)
-
Adds the data from the packet to the SessionModel. If the packet doesn't match the key of the SessionModel, writes an error and no data is added
:param packet: packet to add
Expand source code
def add_data_from_packet(self, packet: api_m.RedvoxPacketM): """ Adds the data from the packet to the SessionModel. If the packet doesn't match the key of the SessionModel, writes an error and no data is added :param packet: packet to add """ if ( self.cloud_session.session_key() != f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ): self._errors.append( "Attempted to add packet with invalid key: " f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ) return local_ts = smu.get_local_timesync(packet) if local_ts is None: self._errors.append( f"Timesync doesn't exist in packet starting at " f"{packet.timing_information.packet_start_mach_timestamp}." ) else: timing = self.cloud_session.timing for f in local_ts[5]: smu.add_to_fst_buffer(timing.fst_lst.fst, timing.fst_lst.fst_max_size, f.ts, f) smu.add_to_lst_buffer(timing.fst_lst.lst, timing.fst_lst.lst_max_size, f.ts, f) timing.n_ex += local_ts[2] timing.mean_lat = (timing.mean_lat * self.cloud_session.n_pkts + local_ts[3]) / ( self.cloud_session.n_pkts + 1 ) timing.mean_off = (timing.mean_off * self.cloud_session.n_pkts + local_ts[4]) / ( self.cloud_session.n_pkts + 1 ) if local_ts[0] < timing.first_data_ts: timing.first_data_ts = local_ts[0] if local_ts[1] > timing.last_data_ts: timing.last_data_ts = local_ts[1] all_sensors = smu.get_all_sensors_in_packet(packet) for s in all_sensors: sensor = self.get_sensor(s[0], s[1]) if sensor is not None: sensor.sample_rate_stats = smu.add_to_stats(s[2], sensor.sample_rate_stats) else: self.cloud_session.sensors.append(cloud_sm.Sensor(s[0], s[1], smu.add_to_stats(s[2]))) self.add_dynamic_day(packet) self.cloud_session.n_pkts += 1
def add_dynamic_day(self, packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM) ‑> str
-
Add (or update an existing session if key exists) a dynamic session with length of 1 day using a single packet
:param packet: packet to read data from :return: the key to the new or updated dynamic session
Expand source code
def add_dynamic_day(self, packet: api_m.RedvoxPacketM) -> str: """ Add (or update an existing session if key exists) a dynamic session with length of 1 day using a single packet :param packet: packet to read data from :return: the key to the new or updated dynamic session """ data = smu.get_dynamic_data(packet) start_dt = dtu.datetime_from_epoch_microseconds_utc(packet.timing_information.packet_start_mach_timestamp) day_start_dt = dtu.datetime(start_dt.year, start_dt.month, start_dt.day) day_end_ts = int(dtu.datetime_to_epoch_microseconds_utc(day_start_dt + dtu.timedelta(days=1))) day_start_ts = int(dtu.datetime_to_epoch_microseconds_utc(day_start_dt)) dynamic_key = f"{day_start_ts}:{day_end_ts}" session_key = ( f"{packet.station_information.id}:{packet.station_information.uuid}:" f"{int(packet.timing_information.app_start_mach_timestamp)}" ) key = f"{session_key}:{dynamic_key}" hourly_key = self.add_dynamic_hour(data, packet.timing_information.packet_start_mach_timestamp, session_key) if key in self.dynamic_sessions.keys(): self._update_dynamic_session(key, data, [hourly_key]) else: self.dynamic_sessions[key] = cloud_sm.DynamicSession( 1, smu.add_location_data(data["location"]), smu.add_to_stats(data["battery"]), smu.add_to_stats(data["temperature"]), session_key, day_start_ts, day_end_ts, DAILY_SESSION_NAME, [hourly_key], ) return dynamic_key
def add_dynamic_hour(self, data: dict, packet_start: float, session_key: str) ‑> str
-
Add (or update an existing session if key exists) a dynamic session with length of 1 hour using a single packet
:param data: dictionary of data to add :param packet_start: starting timestamp of the packet in microseconds since epoch UTC :param session_key: the session key of the parent Session :return: the key to the new dynamic session
Expand source code
def add_dynamic_hour(self, data: dict, packet_start: float, session_key: str) -> str: """ Add (or update an existing session if key exists) a dynamic session with length of 1 hour using a single packet :param data: dictionary of data to add :param packet_start: starting timestamp of the packet in microseconds since epoch UTC :param session_key: the session key of the parent Session :return: the key to the new dynamic session """ start_dt = dtu.datetime_from_epoch_microseconds_utc(packet_start) hour_start_dt = dtu.datetime(start_dt.year, start_dt.month, start_dt.day, start_dt.hour) hour_end_ts = int(dtu.datetime_to_epoch_microseconds_utc(hour_start_dt + dtu.timedelta(hours=1))) hour_start_ts = int(dtu.datetime_to_epoch_microseconds_utc(hour_start_dt)) dynamic_key = f"{hour_start_ts}:{hour_end_ts}" key = f"{session_key}:{dynamic_key}" if key in self.dynamic_sessions.keys(): self._update_dynamic_session(key, data, [f"{int(packet_start)}"]) else: self.dynamic_sessions[key] = cloud_sm.DynamicSession( 1, smu.add_location_data(data["location"]), smu.add_to_stats(data["battery"]), smu.add_to_stats(data["temperature"]), session_key, hour_start_ts, hour_end_ts, HOURLY_SESSION_NAME, [f"{int(packet_start)}"], ) return dynamic_key
def as_dict(self) ‑> dict
-
:return: SessionModel as dictionary
Expand source code
def as_dict(self) -> dict: """ :return: SessionModel as dictionary """ return { "cloud_session": self.cloud_session.to_dict(), "dynamic_sessions": {n: m.to_dict() for n, m in self.dynamic_sessions.items()}, }
def audio_sample_rate_nominal_hz(self) ‑> float
-
:return: number of sensors in the Session
Expand source code
def audio_sample_rate_nominal_hz(self) -> float: """ :return: number of sensors in the Session """ for n in self.cloud_session.sensors: if n.name == "audio": return n.sample_rate_stats.welford.mean
def compress(self, out_dir: str = '.') ‑> pathlib.Path
-
Compresses this SessionModel to a file at out_dir. Uses the id and start_ts to name the file.
:param out_dir: Directory to save file to. Default "." (current directory) :return: The path to the written file.
Expand source code
def compress(self, out_dir: str = ".") -> Path: """ Compresses this SessionModel to a file at out_dir. Uses the id and start_ts to name the file. :param out_dir: Directory to save file to. Default "." (current directory) :return: The path to the written file. """ return s_io.compress_session_model(self, out_dir)
def default_file_name(self) ‑> str
-
:return: Default file name as [id]_[start_ts]_model, with start_ts as integer of microseconds since epoch UTC. File extension NOT included.
Expand source code
def default_file_name(self) -> str: """ :return: Default file name as [id]_[start_ts]_model, with start_ts as integer of microseconds since epoch UTC. File extension NOT included. """ return ( f"{self.cloud_session.id}_" f"{0 if np.isnan(self.cloud_session.start_ts) else self.cloud_session.start_ts}_model" )
def get_daily_dynamic_sessions(self) ‑> List[DynamicSession]
-
:return: all day-long dynamic sessions in the Session
Expand source code
def get_daily_dynamic_sessions(self) -> List[cloud_sm.DynamicSession]: """ :return: all day-long dynamic sessions in the Session """ return [n for n in self.dynamic_sessions.values() if n.dur == DAILY_SESSION_NAME]
def get_hourly_dynamic_sessions(self) ‑> List[DynamicSession]
-
:return: all hour-long dynamic sessions in the Session
Expand source code
def get_hourly_dynamic_sessions(self) -> List[cloud_sm.DynamicSession]: """ :return: all hour-long dynamic sessions in the Session """ return [n for n in self.dynamic_sessions.values() if n.dur == HOURLY_SESSION_NAME]
def get_sensor(self, name: str, desc: Optional[str] = None) ‑> Optional[Sensor]
-
:param name: name of the sensor to get :param desc: Optional description of the sensor to get. If None, will get the first sensor that matches the name given. Default None. :return: the first sensor that matches the name and description given or None if sensor was not found
Expand source code
def get_sensor(self, name: str, desc: Optional[str] = None) -> Optional[cloud_sm.Sensor]: """ :param name: name of the sensor to get :param desc: Optional description of the sensor to get. If None, will get the first sensor that matches the name given. Default None. :return: the first sensor that matches the name and description given or None if sensor was not found """ for s in self.cloud_session.sensors: if s.name == name: if desc is None or s.description == desc: return s return None
def get_sensor_names(self) ‑> List[str]
-
:return: number of sensors in the Session
Expand source code
def get_sensor_names(self) -> List[str]: """ :return: number of sensors in the Session """ return [n.name for n in self.cloud_session.sensors]
def num_sensors(self) ‑> int
-
:return: number of sensors in the Session
Expand source code
def num_sensors(self) -> int: """ :return: number of sensors in the Session """ return len(self.cloud_session.sensors)
def print_errors(self)
-
print all errors encountered by the SessionModel
Expand source code
def print_errors(self): """ print all errors encountered by the SessionModel """ self._errors.print()
def save(self, out_type: str = 'json', out_dir: str = '.') ‑> pathlib.Path
-
Save the SessionModel to disk. Options for out_type are "json" for JSON file and "pkl" for .pkl file. Defaults to "json". File will be named after id and start_ts of the SessionModel
:param out_type: "json" for JSON file and "pkl" for .pkl file :param out_dir: Directory to save file to. Default "." (current directory) :return: path to saved file
Expand source code
def save(self, out_type: str = "json", out_dir: str = ".") -> Path: """ Save the SessionModel to disk. Options for out_type are "json" for JSON file and "pkl" for .pkl file. Defaults to "json". File will be named after id and start_ts of the SessionModel :param out_type: "json" for JSON file and "pkl" for .pkl file :param out_dir: Directory to save file to. Default "." (current directory) :return: path to saved file """ if out_type == "pkl": return self.compress(out_dir) return s_io.session_model_to_json_file(self, out_dir)
def sdk_version(self) ‑> str
-
:return: sdk version used to create the SessionModel
Expand source code
def sdk_version(self) -> str: """ :return: sdk version used to create the SessionModel """ return self._sdk_version