Module redvox.common.event_stream
This module provides classes to organize events recorded on a station. It will ignore machine learning events.
Expand source code
"""
This module provides classes to organize events recorded on a station.
It will ignore machine learning events.
"""
from typing import List, Optional, Dict, Union
from dataclasses import dataclass, field
from pathlib import Path
import enum
import os
import re
import numpy as np
from dataclasses_json import dataclass_json
from redvox.api1000.common.mapping import Mapping
from redvox.api1000.proto.redvox_api_m_pb2 import RedvoxPacketM
from redvox.api1000.wrapped_redvox_packet import event_streams as es
from redvox.api1000.wrapped_redvox_packet import ml
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
from redvox.common.errors import RedVoxExceptions
from redvox.common import offset_model as om
from redvox.common.io import FileSystemWriter as Fsw, FileSystemSaveMode, json_file_to_dict
import redvox.common.event_stream_io as io
class EventDataTypes(enum.Enum):
"""
Enumeration of data types for event data
"""
STRING = 0 # string data
NUMERIC = 1 # numeric data
BOOLEAN = 2 # boolean data
BYTE = 3 # bytes data
@staticmethod
def types_list() -> List["EventDataTypes"]:
"""
:return: the values of EventDataTypes as a list in order of: STRING, NUMERIC, BOOLEAN, BYTE
"""
return [EventDataTypes.STRING, EventDataTypes.NUMERIC, EventDataTypes.BOOLEAN, EventDataTypes.BYTE]
def get_empty_event_data_dict() -> dict:
"""
:return: an empty data dictionary for event data
"""
return {EventDataTypes.STRING: {}, EventDataTypes.NUMERIC: {}, EventDataTypes.BOOLEAN: {}, EventDataTypes.BYTE: {}}
class Event:
"""
stores event data from Redvox Api1000 packets
ALL timestamps in microseconds since epoch UTC unless otherwise stated
"""
def __init__(
self,
timestamp: float,
name: str = "event",
data: Optional[Dict[EventDataTypes, dict]] = None,
save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM,
base_dir: str = ".",
):
"""
initialize Event
:param timestamp: timestamp when Event occurred in microseconds since epoch UTC
:param name: name of the Event. Default "event"
:param data: a structured dictionary of the data. Dictionary must look like:
{EventDataTypes.STRING: {s_values}, EventDataTypes.NUMERIC: {n_values},
EventDataTypes.BOOLEAN: {o_values}, EventDataTypes.BYTE: {b_values}}
where {*_values} is a dictionary of string: data and can be empty. Default None
:param save_mode: FileSystemSaveMode that determines how data is saved.
Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory)
and TEMP (save to temporary directory)
:param base_dir: the location of the parquet file that holds the data. Not used if save_data is False.
Default current directory (".")
"""
self.name = name
self.metadata = {}
self._errors = RedVoxExceptions("Event")
self._fs_writer = Fsw(f"event_{name}", "json", base_dir, save_mode)
self._timestamp = timestamp
self._uncorrected_timestamp = timestamp
self._data = get_empty_event_data_dict() if data is None else data
def __repr__(self):
return (
f"name: {self.name}, "
f"timestamp: {self._timestamp}, "
f"uncorrected_timestamp: {self._uncorrected_timestamp}, "
f"schema: {self.get_schema()}, "
f"save_mode: {self._fs_writer.save_mode()}"
)
def __str__(self):
return (
f"name: {self.name}, "
f"timestamp: {self._timestamp}, "
f"uncorrected_timestamp: {self._uncorrected_timestamp}, "
f"schema: {self.__schema_as_str()}"
)
def as_dict(self) -> dict:
"""
:return: EventStream as a dictionary
"""
return {
"name": self.name,
"timestamp": self._timestamp,
"uncorrected_timestamp": self._uncorrected_timestamp,
"metadata": self.metadata,
"data": self.__data_as_dict(),
"errors": self._errors.as_dict(),
}
def __data_as_dict(self) -> dict:
return {
EventDataTypes.STRING.name: self.get_string_values(),
EventDataTypes.NUMERIC.name: self.get_numeric_values(),
EventDataTypes.BOOLEAN.name: self.get_boolean_values(),
EventDataTypes.BYTE.name: self.get_byte_values(),
}
def __schema_as_str(self) -> str:
result = ""
for f in self._data.keys():
result += f"{f.name}: {list(self._data[f].keys())}"
if f != EventDataTypes.BYTE:
result += ", "
return result
@staticmethod
def __get_items(payload: Mapping[str]):
return payload.get_metadata().items()
@staticmethod
def __get_items_raw(payload):
return payload.items()
@staticmethod
def __get_keys(ptype: str, payload: Mapping[str]):
return ptype, payload.get_metadata().keys()
@staticmethod
def __get_keys_raw(ptype: str, payload):
return ptype, payload.keys()
@staticmethod
def __get_data_from_event(event: es.Event):
"""
load data from an Event;
gets data in order of: string, numeric, boolean, byte
:param event: event to load data from
"""
return map(
Event.__get_items,
[
event.get_string_payload(),
event.get_numeric_payload(),
event.get_boolean_payload(),
event.get_byte_payload(),
],
)
@staticmethod
def __get_data_from_event_raw(event: RedvoxPacketM.EventStream.Event):
"""
load data from an Event;
gets data in order of: string, numeric, boolean, byte
:param event: event to load data from
"""
return map(
Event.__get_items_raw,
[event.string_payload, event.numeric_payload, event.boolean_payload, event.byte_payload],
)
def _set_data(self, data: iter):
"""
sets the data of the Event
:param data: an iterable of data to insert
"""
for g, h in map(lambda l, p: (l, p), data, EventDataTypes.types_list()):
for k, v in g:
self._data[h][k] = v
def read_event(self, event: es.Event) -> "Event":
"""
read the payloads of a Redvox Event, separate the data by payload type, then add it to the SDK Event
:param event: event to process
:return: updated self
"""
self.name = event.get_description()
self._fs_writer.file_name = f"event_{self.name}"
self.metadata = event.get_metadata()
self._set_data(self.__get_data_from_event(event))
return self
def read_raw(self, event: RedvoxPacketM.EventStream.Event) -> "Event":
"""
read the contents of a Redvox Api1000 protobuf stream
:param event: the protobuf stream to read
"""
self.name = event.description
self._fs_writer.file_name = f"event_{self.name}"
self.metadata = dict(event.metadata)
self._set_data(self.__get_data_from_event_raw(event))
return self
def get_string_schema(self) -> List[str]:
"""
:return: the column names of string typed data as a list of strings
"""
return self.get_schema()[EventDataTypes.STRING]
def get_numeric_schema(self) -> List[str]:
"""
:return: the column names of numeric typed data as a list of strings
"""
return self.get_schema()[EventDataTypes.NUMERIC]
def get_boolean_schema(self) -> List[str]:
"""
:return: the column names of boolean typed data as a list of strings
"""
return self.get_schema()[EventDataTypes.BOOLEAN]
def get_byte_schema(self) -> List[str]:
"""
:return: the column names of byte typed data as a list of strings
"""
return self.get_schema()[EventDataTypes.BYTE]
def get_schema(self) -> Dict[EventDataTypes, list]:
"""
:return: the dictionary that summarizes the data names and types
"""
result = {}
for f in self._data.keys():
result[f] = [k for k in self._data[f].keys()]
return result
def get_data_keys(self) -> List[str]:
"""
:return: the keys of the data in the event
"""
result = []
for f in self._data.keys():
result.extend([k for k in self._data[f].keys()])
return result
def get_string_values(self) -> dict:
"""
:return: the string data as a dictionary
"""
return self._data[EventDataTypes.STRING]
def get_numeric_values(self) -> dict:
"""
:return: the numeric data as a dictionary
"""
return self._data[EventDataTypes.NUMERIC]
def get_boolean_values(self) -> dict:
"""
:return: the boolean data as a dictionary
"""
return self._data[EventDataTypes.BOOLEAN]
def get_byte_values(self) -> dict:
"""
:return: the byte data as a dictionary
"""
return self._data[EventDataTypes.BYTE]
def get_string_item(self, data_key: str) -> Optional[str]:
"""
get a string data value with a key matching data_key
:param data_key: the name of the data value to look for
:return: string data if it exists, None otherwise
"""
strs = self.get_string_values()
for s in strs.keys():
if s == data_key:
return strs[s]
return None
def get_numeric_item(self, data_key: str) -> Optional[float]:
"""
get a numeric data value with a key matching data_key
:param data_key: the name of the data value to look for
:return: numeric data if it exists, None otherwise
"""
nums = self.get_numeric_values()
for s in nums.keys():
if s == data_key:
return nums[s]
return None
def get_boolean_item(self, data_key: str) -> Optional[bool]:
"""
get a boolean data value with a key matching data_key
:param data_key: the name of the data value to look for
:return: boolean data if it exists, None otherwise
"""
boos = self.get_boolean_values()
for s in boos.keys():
if s == data_key:
return boos[s]
return None
def get_byte_item(self, data_key: str) -> Optional[str]:
"""
get a byte data value with a key matching data_key
:param data_key: the name of the data value to look for
:return: byte data if it exists, None otherwise
"""
byts = self.get_byte_values()
for s in byts.keys():
if s == data_key:
return byts[s]
return None
def get_item(self, data_key: str) -> Union[List[str], str, bool, float]:
"""
:param data_key: key of data to get
:return: data with matching data_key or the list of all possible data keys
"""
for r in [
self.get_string_item(data_key),
self.get_numeric_item(data_key),
self.get_boolean_item(data_key),
self.get_byte_item(data_key),
]:
if r is not None:
return r
return self.get_data_keys()
def get_classification(self, index: int = 0) -> dict:
"""
get a classification from an event (anything that ends with "_X" where X is the index value)
:param index: index of classification, default 0
:return: dictionary of data
"""
result = {}
for s in self._data.keys():
for b, v in self._data[s].items():
match = re.search(f"_{index}$", b)
if match is not None:
result[b] = v
return result
def get_string_column(self, column_name: str) -> Dict[str, str]:
"""
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater.
this function will return all string data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get
:return: a dictionary of string data
"""
result = {}
strs = self.get_string_values()
for s, v in strs.items():
match = re.match(f"{column_name}_*", s)
if match is not None:
result[s] = v
return result
def get_numeric_column(self, column_name: str) -> Dict[str, float]:
"""
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater.
this function will return all numeric data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get
:return: a dictionary of numeric data
"""
result = {}
strs = self.get_numeric_values()
for s, v in strs.items():
match = re.match(f"{column_name}_*", s)
if match is not None:
result[s] = v
return result
def get_boolean_column(self, column_name: str) -> Dict[str, str]:
"""
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater.
this function will return all boolean data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get
:return: a dictionary of boolean data
"""
result = {}
strs = self.get_boolean_values()
for s, v in strs.items():
match = re.match(f"{column_name}_*", s)
if match is not None:
result[s] = v
return result
def get_byte_column(self, column_name: str) -> Dict[str, str]:
"""
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater.
this function will return all byte data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get
:return: a dictionary of byte data
"""
result = {}
strs = self.get_byte_values()
for s, v in strs.items():
match = re.match(f"{column_name}_*", s)
if match is not None:
result[s] = v
return result
def get_timestamp(self) -> float:
"""
:return: timestamp of the Event
"""
return self._timestamp
def get_uncorrected_timestamp(self) -> float:
"""
:return: uncorrected timestamp of the Event
"""
return self._uncorrected_timestamp
def is_timestamp_corrected(self) -> bool:
"""
:return: if timestamp of Event is updated
"""
return self._timestamp != self._uncorrected_timestamp
def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False):
"""
updates the timestamp of the Event
:param offset_model: model used to update the timestamps
:param use_model_function: if True, use the model's slope function to update the timestamps.
otherwise uses the best offset (model's intercept value). Default False
"""
if self.is_timestamp_corrected():
self._errors.append("Timestamps already corrected!")
else:
self._timestamp = offset_model.update_time(self._timestamp, use_model_function)
def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False):
"""
undo the update to the timestamp of the Event
:param offset_model: model used to update the timestamps
:param use_model_function: if True, use the model's slope function to update the timestamps.
otherwise uses the best offset (model's intercept value). Default False
"""
if not self.is_timestamp_corrected():
self._errors.append("Timestamps already not corrected!")
else:
self._timestamp = offset_model.get_original_time(self._timestamp, use_model_function)
def default_json_file_name(self) -> str:
"""
:return: default event json file name (event_[event.name]): note there is no extension
"""
return f"event_{self.name}"
def is_save_to_disk(self) -> bool:
"""
:return: True if sensor will be saved to disk
"""
return self._fs_writer.is_save_disk()
def set_save_to_disk(self, save: bool):
"""
:param save: If True, save to disk
"""
self._fs_writer.save_to_disk = save
def set_save_mode(self, save_mode: FileSystemSaveMode):
"""
set the save mode
:param save_mode: new save mode
"""
self._fs_writer.set_save_mode(save_mode)
def save_mode(self) -> FileSystemSaveMode:
"""
:return: the save mode
"""
return self._fs_writer.save_mode()
def set_file_name(self, new_file: Optional[str] = None):
"""
* set the pyarrow file name or use the default: event_{Event.name}
* Do not give an extension
:param new_file: optional file name to change to; default None (use default name)
"""
self._fs_writer.file_name = new_file if new_file else f"event_{self.name}"
def full_file_name(self) -> str:
"""
:return: full name of file containing the data
"""
return self._fs_writer.full_name()
def file_name(self) -> str:
"""
:return: file name without extension
"""
return self._fs_writer.file_name
def set_save_dir(self, new_dir: Optional[str] = None):
"""
set the pyarrow directory or use the default: "." (current directory)
:param new_dir: the directory to change to; default None (use current directory)
"""
self._fs_writer.base_dir = new_dir if new_dir else "."
def save_dir(self) -> str:
"""
:return: directory containing parquet files for the sensor
"""
return self._fs_writer.save_dir()
def full_path(self) -> str:
"""
:return: the full path to the data file
"""
return self._fs_writer.full_path()
def fs_writer(self) -> Fsw:
"""
:return: FileSystemWriter object
"""
return self._fs_writer
def has_data(self) -> bool:
"""
:return: True if Event contains at least one data point
"""
return sum([len(self._data[j].keys()) for j in EventDataTypes.types_list()]) > 0
def data(self) -> dict:
"""
:return: the data
"""
return self._data
@staticmethod
def from_json_dict(json_dict: dict) -> "Event":
"""
:param json_dict: json dictionary to parse
:return: Event from json dict
"""
if "timestamp" in json_dict.keys():
data = get_empty_event_data_dict()
data[EventDataTypes.STRING] = json_dict["data"]["STRING"]
data[EventDataTypes.NUMERIC] = json_dict["data"]["NUMERIC"]
data[EventDataTypes.BOOLEAN] = json_dict["data"]["BOOLEAN"]
data[EventDataTypes.BYTE] = json_dict["data"]["BYTE"]
result = Event(json_dict["timestamp"], json_dict["name"], data, FileSystemSaveMode.DISK)
result.metadata = json_dict["metadata"]
result._uncorrected_timestamp = json_dict["uncorrected_timestamp"]
result.set_errors(RedVoxExceptions.from_dict(json_dict["errors"]))
else:
result = Event(np.nan, "Empty")
result.append_error(f"Loading from json dict failed; missing Event timestamp.")
return result
@staticmethod
def from_json_file(file_dir: str, file_name: str) -> "Event":
"""
:param file_dir: full path to containing directory for the file
:param file_name: name of file to load data from
:return: Event from json file
"""
json_data = json_file_to_dict(os.path.join(file_dir, f"{file_name}"))
if "timestamp" in json_data.keys():
data = get_empty_event_data_dict()
data[EventDataTypes.STRING] = json_data["data"]["STRING"]
data[EventDataTypes.NUMERIC] = json_data["data"]["NUMERIC"]
data[EventDataTypes.BOOLEAN] = json_data["data"]["BOOLEAN"]
data[EventDataTypes.BYTE] = json_data["data"]["BYTE"]
result = Event(json_data["timestamp"], json_data["name"], data, FileSystemSaveMode.DISK, file_dir)
result.metadata = json_data["metadata"]
result._uncorrected_timestamp = json_data["uncorrected_timestamp"]
result.set_errors(RedVoxExceptions.from_dict(json_data["errors"]))
else:
result = Event(np.nan, "Empty")
result.append_error(f"Loading from {file_name} failed; missing Event timestamp.")
return result
def to_json_file(self, file_name: Optional[str] = None) -> Path:
"""
saves the EventStream as a json file
:param file_name: the optional base file name. Do not include a file extension.
If None, a default file name is created using this format:
event_[event.name].json
:return: path to json file
"""
return io.event_to_json_file(self, file_name)
def errors(self) -> RedVoxExceptions:
"""
:return: errors of the sensor
"""
return self._errors
def set_errors(self, errors: RedVoxExceptions):
"""
sets the errors of the Sensor
:param errors: errors to set
"""
self._errors = errors
def append_error(self, error: str):
"""
add an error to the Sensor
:param error: error to add
"""
self._errors.append(error)
def print_errors(self):
"""
print all errors to screen
"""
self._errors.print()
@dataclass_json
@dataclass
class EventStream:
"""
stores multiple events.
ALL timestamps in microseconds since epoch UTC unless otherwise stated
Properties:
name: string; name of the EventStream. Default "stream"
events: List[Event]; all events in the stream. Default empty list
input_sample_rate: int; audio sample rate. Default 0
samples_per_window: int; samples per window of the events. Default 0
samples_per_hop: int; samples per hop of the events. Default 0
model_version: string; version of the model. Default "0.0"
metadata: Dict[str, str]; metadata as dict of strings. Default empty dict
debug: boolean; if True, outputs additional information at runtime. Default False.
"""
name: str = "stream"
events: List[Event] = field(default_factory=lambda: [])
input_sample_rate: int = 0
samples_per_window: int = 0
samples_per_hop: int = 0
model_version: str = "n/a"
metadata: Dict[str, str] = field(default_factory=lambda: {})
debug: bool = False
def __repr__(self):
return (
f"name: {self.name}, "
f"events: {[s.__repr__() for s in self.events]}, "
f"input_sample_rate: {self.input_sample_rate}, "
f"samples_per_window: {self.samples_per_window}, "
f"samples_per_hop: {self.samples_per_hop}, "
f"model_version: {self.model_version}"
)
def __str__(self):
return (
f"name: {self.name}, "
f"events: {[s.__str__() for s in self.events]}, "
f"input_sample_rate: {self.input_sample_rate}, "
f"samples_per_window: {self.samples_per_window}, "
f"samples_per_hop: {self.samples_per_hop}, "
f"model_version: {self.model_version}"
)
def as_dict(self) -> dict:
"""
:return: EventStream as a dictionary
"""
return {
"name": self.name,
"events": [e.as_dict() for e in self.events],
"input_sample_rate": self.input_sample_rate,
"samples_per_window": self.samples_per_window,
"samples_per_hop": self.samples_per_hop,
"model_version": self.model_version,
"metadata": self.metadata,
}
def has_data(self):
"""
:return: if there is at least one event
"""
return len(self.events) > 0
def has_events(self) -> bool:
"""
:return: True if there are one or more events in the stream
"""
return len(self.events) > 0
def get_event(self, index: int = 0) -> Optional[Event]:
"""
:param index: index of event to get. Use negative values to select from the end of the list.
Default 0 (first event)
:return: Event at the index, or None if the event/index doesn't exist
"""
if 0 > index:
index += len(self.events)
if 0 <= index < len(self.events):
return self.events[index]
return None
def get_data_column(self, column_name: str) -> list:
"""
return a list of data with key column_name from each of the events
if column_name doesn't exist, gets a list of valid column_names
:param column_name: key of data to get
:return: list of data named column_name or the list of all possible column names
"""
result = []
column_list = set()
for r in self.events:
val = r.get_item(column_name)
if type(val) != list:
result.append(val)
else:
for v in val:
column_list.add(v)
if len(result) > 0:
return result
return list(column_list)
@staticmethod
def from_eventstream(
stream: RedvoxPacketM.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = "."
) -> "EventStream":
"""
convert a Redvox Api1000 Packet EventStream into its sdk version
:param stream: Redvox Api1000 Packet EventStream to read data from
:param save_mode: FileSystemSaveMode that determines how Event data is saved.
Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory)
and TEMP (save to temporary directory)
:param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False.
Default current directory (".")
:return: EventStream (sdk version)
"""
result = EventStream(stream.name, metadata=dict(stream.metadata))
if "input_sample_rate" in stream.metadata.keys():
result.input_sample_rate = int(stream.metadata.get("input_sample_rate"))
if "input_samples_per_window" in stream.metadata.keys():
result.samples_per_window = int(stream.metadata.get("input_samples_per_window"))
if "input_samples_per_hop" in stream.metadata.keys():
result.samples_per_hop = int(stream.metadata.get("input_samples_per_hop"))
if "model_version" in stream.metadata.keys():
result.model_version = stream.metadata.get("model_version")
result.add_events(stream, save_mode=save_mode, base_dir=base_dir)
return result
def add_events(
self,
stream: RedvoxPacketM.EventStream,
save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM,
base_dir: str = ".",
):
"""
add events from a Redvox Api1000 Packet EventStream with the same name.
Does nothing if names do not match
:param stream: stream of events to add
:param save_mode: FileSystemSaveMode that determines how Event data is saved.
Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory)
and TEMP (save to temporary directory)
:param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False.
Default current directory (".")
"""
if self.name == stream.name:
timestamps = stream.timestamps.timestamps
events = stream.events
for i in range(len(timestamps)):
self.events.append(Event(timestamps[i], save_mode=save_mode, base_dir=base_dir).read_raw(events[i]))
elif self.debug:
print(f"Stream name mismatch while adding to EventStream. Expected {self.name}, got {stream.name}.")
def sort_events(self, asc: bool = True):
"""
sort the events in the stream via ascending or descending timestamp order
:param asc: if True, data is sorted in ascending order
"""
self.events.sort(key=lambda e: e.get_timestamp(), reverse=not asc)
def num_events(self) -> int:
"""
:return: number of events in stream
"""
return len(self.events)
def sample_rate_hz(self):
"""
:return: sample rate of events in the stream in hz
"""
return np.mean(np.diff([e.get_timestamp() for e in self.events]))
def window_sample_rate_hz(self):
"""
:return: idealized event sample window rate in hz
"""
return self.input_sample_rate / self.samples_per_window
def hop_sample_rate_hz(self):
"""
:return: idealized event sample hop rate in hz
"""
return self.input_sample_rate / self.samples_per_hop
def create_event_window(self, start: float = -np.inf, end: float = np.inf):
"""
removes any event in the stream that doesn't match start <= event < end
adds empty events to beginning and end of data (as long as the corresponding input values are not infinity)
default start is negative infinity, default end is infinity
all times in microseconds since epoch UTC
:param start: inclusive start time of events to keep
:param end: exclusive end time of events to keep
"""
self.events = [s for s in self.events if start <= s.get_timestamp() < end]
if self.num_events() > 0:
if start < self.events[0].get_timestamp() and not np.isinf(start):
self.events.insert(0, Event(start, self.name))
if not np.isinf(end):
self.events.append(Event(end - 1, self.name))
def get_file_names(self) -> List[str]:
"""
:return: the names of the files which store the event data
"""
return [e.file_name() for e in self.events]
def save_streams(self):
"""
saves all streams to disk
note: use the function set_save_dir() to change where events are saved
"""
for e in self.events:
if e.is_save_to_disk():
e.to_json_file()
def set_save_dir(self, new_dir: str):
"""
change the directory where events are saved to
:param new_dir: new directory path
"""
for e in self.events:
e.set_save_dir(new_dir)
def set_save_mode(self, new_save_mode: FileSystemSaveMode):
"""
update the save mode for all EventStream
:param new_save_mode: save mode to set
"""
for e in self.events:
e.set_save_mode(new_save_mode)
def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False):
"""
update the timestamps in the data
:param offset_model: model used to update the timestamps
:param use_model_function: if True, use the model's slope function to update the timestamps.
otherwise uses the best offset (model's intercept value). Default False
"""
for evnt in self.events:
evnt.update_timestamps(offset_model, use_model_function)
def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False):
"""
undo the update to the timestamps in the data
:param offset_model: model used to update the timestamps
:param use_model_function: if True, use the model's slope function to update the timestamps.
otherwise uses the best offset (model's intercept value). Default False
"""
for evnt in self.events:
evnt.original_timestamps(offset_model, use_model_function)
@staticmethod
def from_json_dict(json_dict: dict) -> "EventStream":
"""
:param json_dict: json dict to parse
:return: EventStream from json dict
"""
if "name" in json_dict.keys():
result = EventStream(
json_dict["name"],
[Event.from_json_dict(e) for e in json_dict["events"]],
json_dict["input_sample_rate"],
json_dict["samples_per_window"],
json_dict["samples_per_hop"],
json_dict["model_version"],
json_dict["metadata"],
)
else:
result = EventStream("Empty Stream; no name for identification")
return result
@staticmethod
def from_json_file(file_dir: str, file_name: str) -> "EventStream":
"""
:param file_dir: full path to containing directory for the file
:param file_name: name of file to load data from
:return: EventStream from json file
"""
json_data = json_file_to_dict(os.path.join(file_dir, f"{file_name}"))
if "name" in json_data.keys():
result = EventStream(
json_data["name"],
json_data["events"],
json_data["input_sample_rate"],
json_data["samples_per_window"],
json_data["samples_per_hop"],
json_data["model_version"],
json_data["metadata"],
)
result.set_save_mode(FileSystemSaveMode.DISK)
result.set_save_dir(file_dir)
else:
result = EventStream("Empty Stream; no name for identification")
return result
def to_json_file(self, file_dir: str = ".", file_name: Optional[str] = None) -> Path:
"""
saves the EventStream as a json file
:param file_dir: the directory to save the file into. default current directory (".")
:param file_name: the optional base file name. Do not include a file extension.
If None, a default file name is created using this format:
eventstream_[eventstream.name].json
:return: path to json file
"""
return io.eventstream_to_json_file(self, file_dir, file_name)
def print_errors(self):
"""
print all errors to screen
"""
for e in self.events:
e.print_errors()
@dataclass_json
@dataclass
class EventStreams:
"""
stores multiple event streams per station.
ALL timestamps in microseconds since epoch UTC unless otherwise stated
Properties:
streams: List[EventStream]; list of all EventStream. Default empty list
ml_data: Optional ExtractedMl from the packets. Default None
debug: bool; if True, output additional information during runtime. Default False
"""
streams: List[EventStream] = field(default_factory=lambda: [])
ml_data: Optional[ml.ExtractedMl] = None
debug: bool = False
def __repr__(self):
return f"streams: {[s.__repr__() for s in self.streams]}, ml: {self.ml_data}, debug: {self.debug}"
def __str__(self):
return f"streams: {[s.__str__() for s in self.streams]}, ml: {self.ml_data}"
def as_dict(self) -> dict:
"""
:return: EventStreams as dict
"""
return {
"streams": [s.as_dict() for s in self.streams],
"ml_data": self.ml_data.to_dict() if self.ml_data else None,
}
def read_from_packet(self, packet: RedvoxPacketM):
"""
read the eventstream payload from a single Redvox Api1000 packet
:param packet: packet to read data from
"""
for st in packet.event_streams:
if st.name == ml.ML_EVENT_STREAM_NAME:
if self.ml_data:
self.ml_data.windows.extend(ml.extract_ml_windows(_find_ml_event_stream(packet)))
else:
self.ml_data = ml.extract_ml_from_packet(WrappedRedvoxPacketM(packet))
else:
if st.name in self.get_stream_names() and self.get_stream(st.name).has_data():
self.get_stream(st.name).add_events(st)
else:
self.remove_stream(st.name)
self.streams.append(EventStream.from_eventstream(st))
def read_from_packets_list(self, packets: List[RedvoxPacketM]):
"""
read the eventstream payload from multiple Redvox Api1000 packets
:param packets: packets to read data from
"""
for p in packets:
if type(p) == RedvoxPacketM:
self.read_from_packet(p)
def append(self, other_stream: EventStream):
"""
append another EventStream to an existing EventStream or add to the list of EventStream
:param other_stream: other EventStream to add
"""
if other_stream.name in self.get_stream_names():
self.get_stream(other_stream.name).add_events(other_stream)
else:
self.streams.append(other_stream)
def append_ml(self, other_ml: ml.ExtractedMl):
"""
append the windows from another extracted machine learning object or
set the existing ML object if its empty.
:param other_ml: other ExtractedMl to add
"""
if self.ml_data:
self.ml_data.windows.extend(other_ml.windows)
else:
self.ml_data = other_ml
def append_streams(self, other_streams: "EventStreams"):
"""
append another EventStreams object to an existing EventStreams object
:param other_streams: EventStreams to add
"""
for s in other_streams.streams:
self.append(s)
def remove_stream(self, stream_name: str):
"""
remove any stream with the same stream_name
:param stream_name: name of stream to remove
"""
self.streams = [s for s in self.streams if s.name != stream_name]
def get_stream(self, stream_name: str) -> Optional[EventStream]:
"""
:param stream_name: name of event stream to get
:return: the EventStream that has the name specified or None if it doesn't exist
"""
for s in self.streams:
if s.name == stream_name:
return s
if self.debug:
print(f"{stream_name} does not exist in streams. Use one of {[self.get_stream_names()]}")
return None
def get_stream_names(self) -> List[str]:
"""
:return: names of all streams
"""
return [s.name for s in self.streams]
def create_event_window(self, start: float = -np.inf, end: float = np.inf):
"""
removes any event in the streams and ML that doesn't match start <= event < end
default start is negative infinity, default end is infinity
all times in microseconds since epoch UTC
:param start: inclusive start time of events to keep
:param end: exclusive end time of events to keep
"""
for s in self.streams:
s.create_event_window(start, end)
if self.ml_data:
self.ml_data.windows = [s for s in self.ml_data.windows if start <= s.timestamp < end]
def set_save_dir(self, new_dir: str):
"""
change the directory where events are saved to
:param new_dir: new directory path
"""
for s in self.streams:
s.set_save_dir(new_dir)
def set_save_mode(self, new_save_mode: FileSystemSaveMode):
"""
update the save mode for all EventStream
:param new_save_mode: save mode to set
"""
for s in self.streams:
s.set_save_mode(new_save_mode)
def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False):
"""
update the timestamps in the data
:param offset_model: model used to update the timestamps
:param use_model_function: if True, use the model's slope function to update the timestamps.
otherwise uses the best offset (model's intercept value). Default False
"""
for evnt in self.streams:
evnt.update_timestamps(offset_model, use_model_function)
if self.ml_data:
for w in self.ml_data.windows:
w.timestamp = offset_model.update_time(w.timestamp, use_model_function)
def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False):
"""
undo the update to the timestamps in the data
:param offset_model: model used to update the timestamps
:param use_model_function: if True, use the model's slope function to update the timestamps.
otherwise uses the best offset (model's intercept value). Default False
"""
for evnt in self.streams:
evnt.original_timestamps(offset_model, use_model_function)
if self.ml_data:
for w in self.ml_data.windows:
w.timestamp = offset_model.get_original_time(w.timestamp, use_model_function)
@staticmethod
def from_dict(in_dict: dict) -> "EventStreams":
"""
:param in_dict: dictionary representing an EventStreams object
:return: the EventStreams object from the dictionary
"""
result = EventStreams()
if "streams" in in_dict.keys():
result.streams = [EventStream.from_json_dict(s) for s in in_dict["streams"]]
if "ml_data" in in_dict.keys():
result.ml_data = ml.ExtractedMl.from_dict(in_dict["ml_data"])
return result
@staticmethod
def from_json_file(file_dir: str, file_name: str) -> "EventStreams":
"""
:param file_dir: full path to containing directory for the file
:param file_name: name of file to load data from
:return: EventStreams from json file
"""
return EventStreams.from_dict(json_file_to_dict(os.path.join(file_dir, f"{file_name}")))
def to_json_file(self, file_dir: str = ".", file_name: Optional[str] = None) -> Path:
"""
saves the EventStream as a json file
:param file_dir: the directory to save the file into. default current directory (".")
:param file_name: the optional base file name. Do not include a file extension.
If None, a default file name is created using this format:
eventstreams.json
:return: path to json file
"""
return io.eventstreams_to_json_file(self, file_dir, file_name)
def _find_ml_event_stream(packet: RedvoxPacketM) -> Optional[es.EventStream]:
"""
Attempts to find an event stream with ML data.
:param packet: The packet to search in.
:return: An instance of the matching event stream or None.
"""
stream: RedvoxPacketM.EventStream
for stream in packet.event_streams:
if stream.name == ml.ML_EVENT_STREAM_NAME:
return es.EventStream(stream)
return None
def _get_ml_from_packet(packet: RedvoxPacketM) -> Optional[ml.ExtractedMl]:
"""
reads the machine learning payload from a single Redvox Api1000 packet
:param packet: packet to read machine learning data from
"""
stream: Optional[es.EventStream] = _find_ml_event_stream(packet)
return None if stream is None else ml.extract_ml_from_event_stream(stream)
Functions
def get_empty_event_data_dict() ‑> dict
-
:return: an empty data dictionary for event data
Expand source code
def get_empty_event_data_dict() -> dict: """ :return: an empty data dictionary for event data """ return {EventDataTypes.STRING: {}, EventDataTypes.NUMERIC: {}, EventDataTypes.BOOLEAN: {}, EventDataTypes.BYTE: {}}
Classes
class Event (timestamp: float, name: str = 'event', data: Optional[Dict[EventDataTypes, dict]] = None, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = '.')
-
stores event data from Redvox Api1000 packets
ALL timestamps in microseconds since epoch UTC unless otherwise stated
initialize Event
:param timestamp: timestamp when Event occurred in microseconds since epoch UTC :param name: name of the Event. Default "event" :param data: a structured dictionary of the data. Dictionary must look like: {EventDataTypes.STRING: {s_values}, EventDataTypes.NUMERIC: {n_values}, EventDataTypes.BOOLEAN: {o_values}, EventDataTypes.BYTE: {b_values}} where {*_values} is a dictionary of string: data and can be empty. Default None :param save_mode: FileSystemSaveMode that determines how data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the data. Not used if save_data is False. Default current directory (".")
Expand source code
class Event: """ stores event data from Redvox Api1000 packets ALL timestamps in microseconds since epoch UTC unless otherwise stated """ def __init__( self, timestamp: float, name: str = "event", data: Optional[Dict[EventDataTypes, dict]] = None, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = ".", ): """ initialize Event :param timestamp: timestamp when Event occurred in microseconds since epoch UTC :param name: name of the Event. Default "event" :param data: a structured dictionary of the data. Dictionary must look like: {EventDataTypes.STRING: {s_values}, EventDataTypes.NUMERIC: {n_values}, EventDataTypes.BOOLEAN: {o_values}, EventDataTypes.BYTE: {b_values}} where {*_values} is a dictionary of string: data and can be empty. Default None :param save_mode: FileSystemSaveMode that determines how data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the data. Not used if save_data is False. Default current directory (".") """ self.name = name self.metadata = {} self._errors = RedVoxExceptions("Event") self._fs_writer = Fsw(f"event_{name}", "json", base_dir, save_mode) self._timestamp = timestamp self._uncorrected_timestamp = timestamp self._data = get_empty_event_data_dict() if data is None else data def __repr__(self): return ( f"name: {self.name}, " f"timestamp: {self._timestamp}, " f"uncorrected_timestamp: {self._uncorrected_timestamp}, " f"schema: {self.get_schema()}, " f"save_mode: {self._fs_writer.save_mode()}" ) def __str__(self): return ( f"name: {self.name}, " f"timestamp: {self._timestamp}, " f"uncorrected_timestamp: {self._uncorrected_timestamp}, " f"schema: {self.__schema_as_str()}" ) def as_dict(self) -> dict: """ :return: EventStream as a dictionary """ return { "name": self.name, "timestamp": self._timestamp, "uncorrected_timestamp": self._uncorrected_timestamp, "metadata": self.metadata, "data": self.__data_as_dict(), "errors": self._errors.as_dict(), } def __data_as_dict(self) -> dict: return { EventDataTypes.STRING.name: self.get_string_values(), EventDataTypes.NUMERIC.name: self.get_numeric_values(), EventDataTypes.BOOLEAN.name: self.get_boolean_values(), EventDataTypes.BYTE.name: self.get_byte_values(), } def __schema_as_str(self) -> str: result = "" for f in self._data.keys(): result += f"{f.name}: {list(self._data[f].keys())}" if f != EventDataTypes.BYTE: result += ", " return result @staticmethod def __get_items(payload: Mapping[str]): return payload.get_metadata().items() @staticmethod def __get_items_raw(payload): return payload.items() @staticmethod def __get_keys(ptype: str, payload: Mapping[str]): return ptype, payload.get_metadata().keys() @staticmethod def __get_keys_raw(ptype: str, payload): return ptype, payload.keys() @staticmethod def __get_data_from_event(event: es.Event): """ load data from an Event; gets data in order of: string, numeric, boolean, byte :param event: event to load data from """ return map( Event.__get_items, [ event.get_string_payload(), event.get_numeric_payload(), event.get_boolean_payload(), event.get_byte_payload(), ], ) @staticmethod def __get_data_from_event_raw(event: RedvoxPacketM.EventStream.Event): """ load data from an Event; gets data in order of: string, numeric, boolean, byte :param event: event to load data from """ return map( Event.__get_items_raw, [event.string_payload, event.numeric_payload, event.boolean_payload, event.byte_payload], ) def _set_data(self, data: iter): """ sets the data of the Event :param data: an iterable of data to insert """ for g, h in map(lambda l, p: (l, p), data, EventDataTypes.types_list()): for k, v in g: self._data[h][k] = v def read_event(self, event: es.Event) -> "Event": """ read the payloads of a Redvox Event, separate the data by payload type, then add it to the SDK Event :param event: event to process :return: updated self """ self.name = event.get_description() self._fs_writer.file_name = f"event_{self.name}" self.metadata = event.get_metadata() self._set_data(self.__get_data_from_event(event)) return self def read_raw(self, event: RedvoxPacketM.EventStream.Event) -> "Event": """ read the contents of a Redvox Api1000 protobuf stream :param event: the protobuf stream to read """ self.name = event.description self._fs_writer.file_name = f"event_{self.name}" self.metadata = dict(event.metadata) self._set_data(self.__get_data_from_event_raw(event)) return self def get_string_schema(self) -> List[str]: """ :return: the column names of string typed data as a list of strings """ return self.get_schema()[EventDataTypes.STRING] def get_numeric_schema(self) -> List[str]: """ :return: the column names of numeric typed data as a list of strings """ return self.get_schema()[EventDataTypes.NUMERIC] def get_boolean_schema(self) -> List[str]: """ :return: the column names of boolean typed data as a list of strings """ return self.get_schema()[EventDataTypes.BOOLEAN] def get_byte_schema(self) -> List[str]: """ :return: the column names of byte typed data as a list of strings """ return self.get_schema()[EventDataTypes.BYTE] def get_schema(self) -> Dict[EventDataTypes, list]: """ :return: the dictionary that summarizes the data names and types """ result = {} for f in self._data.keys(): result[f] = [k for k in self._data[f].keys()] return result def get_data_keys(self) -> List[str]: """ :return: the keys of the data in the event """ result = [] for f in self._data.keys(): result.extend([k for k in self._data[f].keys()]) return result def get_string_values(self) -> dict: """ :return: the string data as a dictionary """ return self._data[EventDataTypes.STRING] def get_numeric_values(self) -> dict: """ :return: the numeric data as a dictionary """ return self._data[EventDataTypes.NUMERIC] def get_boolean_values(self) -> dict: """ :return: the boolean data as a dictionary """ return self._data[EventDataTypes.BOOLEAN] def get_byte_values(self) -> dict: """ :return: the byte data as a dictionary """ return self._data[EventDataTypes.BYTE] def get_string_item(self, data_key: str) -> Optional[str]: """ get a string data value with a key matching data_key :param data_key: the name of the data value to look for :return: string data if it exists, None otherwise """ strs = self.get_string_values() for s in strs.keys(): if s == data_key: return strs[s] return None def get_numeric_item(self, data_key: str) -> Optional[float]: """ get a numeric data value with a key matching data_key :param data_key: the name of the data value to look for :return: numeric data if it exists, None otherwise """ nums = self.get_numeric_values() for s in nums.keys(): if s == data_key: return nums[s] return None def get_boolean_item(self, data_key: str) -> Optional[bool]: """ get a boolean data value with a key matching data_key :param data_key: the name of the data value to look for :return: boolean data if it exists, None otherwise """ boos = self.get_boolean_values() for s in boos.keys(): if s == data_key: return boos[s] return None def get_byte_item(self, data_key: str) -> Optional[str]: """ get a byte data value with a key matching data_key :param data_key: the name of the data value to look for :return: byte data if it exists, None otherwise """ byts = self.get_byte_values() for s in byts.keys(): if s == data_key: return byts[s] return None def get_item(self, data_key: str) -> Union[List[str], str, bool, float]: """ :param data_key: key of data to get :return: data with matching data_key or the list of all possible data keys """ for r in [ self.get_string_item(data_key), self.get_numeric_item(data_key), self.get_boolean_item(data_key), self.get_byte_item(data_key), ]: if r is not None: return r return self.get_data_keys() def get_classification(self, index: int = 0) -> dict: """ get a classification from an event (anything that ends with "_X" where X is the index value) :param index: index of classification, default 0 :return: dictionary of data """ result = {} for s in self._data.keys(): for b, v in self._data[s].items(): match = re.search(f"_{index}$", b) if match is not None: result[b] = v return result def get_string_column(self, column_name: str) -> Dict[str, str]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all string data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of string data """ result = {} strs = self.get_string_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result def get_numeric_column(self, column_name: str) -> Dict[str, float]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all numeric data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of numeric data """ result = {} strs = self.get_numeric_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result def get_boolean_column(self, column_name: str) -> Dict[str, str]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all boolean data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of boolean data """ result = {} strs = self.get_boolean_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result def get_byte_column(self, column_name: str) -> Dict[str, str]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all byte data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of byte data """ result = {} strs = self.get_byte_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result def get_timestamp(self) -> float: """ :return: timestamp of the Event """ return self._timestamp def get_uncorrected_timestamp(self) -> float: """ :return: uncorrected timestamp of the Event """ return self._uncorrected_timestamp def is_timestamp_corrected(self) -> bool: """ :return: if timestamp of Event is updated """ return self._timestamp != self._uncorrected_timestamp def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ updates the timestamp of the Event :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ if self.is_timestamp_corrected(): self._errors.append("Timestamps already corrected!") else: self._timestamp = offset_model.update_time(self._timestamp, use_model_function) def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ undo the update to the timestamp of the Event :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ if not self.is_timestamp_corrected(): self._errors.append("Timestamps already not corrected!") else: self._timestamp = offset_model.get_original_time(self._timestamp, use_model_function) def default_json_file_name(self) -> str: """ :return: default event json file name (event_[event.name]): note there is no extension """ return f"event_{self.name}" def is_save_to_disk(self) -> bool: """ :return: True if sensor will be saved to disk """ return self._fs_writer.is_save_disk() def set_save_to_disk(self, save: bool): """ :param save: If True, save to disk """ self._fs_writer.save_to_disk = save def set_save_mode(self, save_mode: FileSystemSaveMode): """ set the save mode :param save_mode: new save mode """ self._fs_writer.set_save_mode(save_mode) def save_mode(self) -> FileSystemSaveMode: """ :return: the save mode """ return self._fs_writer.save_mode() def set_file_name(self, new_file: Optional[str] = None): """ * set the pyarrow file name or use the default: event_{Event.name} * Do not give an extension :param new_file: optional file name to change to; default None (use default name) """ self._fs_writer.file_name = new_file if new_file else f"event_{self.name}" def full_file_name(self) -> str: """ :return: full name of file containing the data """ return self._fs_writer.full_name() def file_name(self) -> str: """ :return: file name without extension """ return self._fs_writer.file_name def set_save_dir(self, new_dir: Optional[str] = None): """ set the pyarrow directory or use the default: "." (current directory) :param new_dir: the directory to change to; default None (use current directory) """ self._fs_writer.base_dir = new_dir if new_dir else "." def save_dir(self) -> str: """ :return: directory containing parquet files for the sensor """ return self._fs_writer.save_dir() def full_path(self) -> str: """ :return: the full path to the data file """ return self._fs_writer.full_path() def fs_writer(self) -> Fsw: """ :return: FileSystemWriter object """ return self._fs_writer def has_data(self) -> bool: """ :return: True if Event contains at least one data point """ return sum([len(self._data[j].keys()) for j in EventDataTypes.types_list()]) > 0 def data(self) -> dict: """ :return: the data """ return self._data @staticmethod def from_json_dict(json_dict: dict) -> "Event": """ :param json_dict: json dictionary to parse :return: Event from json dict """ if "timestamp" in json_dict.keys(): data = get_empty_event_data_dict() data[EventDataTypes.STRING] = json_dict["data"]["STRING"] data[EventDataTypes.NUMERIC] = json_dict["data"]["NUMERIC"] data[EventDataTypes.BOOLEAN] = json_dict["data"]["BOOLEAN"] data[EventDataTypes.BYTE] = json_dict["data"]["BYTE"] result = Event(json_dict["timestamp"], json_dict["name"], data, FileSystemSaveMode.DISK) result.metadata = json_dict["metadata"] result._uncorrected_timestamp = json_dict["uncorrected_timestamp"] result.set_errors(RedVoxExceptions.from_dict(json_dict["errors"])) else: result = Event(np.nan, "Empty") result.append_error(f"Loading from json dict failed; missing Event timestamp.") return result @staticmethod def from_json_file(file_dir: str, file_name: str) -> "Event": """ :param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: Event from json file """ json_data = json_file_to_dict(os.path.join(file_dir, f"{file_name}")) if "timestamp" in json_data.keys(): data = get_empty_event_data_dict() data[EventDataTypes.STRING] = json_data["data"]["STRING"] data[EventDataTypes.NUMERIC] = json_data["data"]["NUMERIC"] data[EventDataTypes.BOOLEAN] = json_data["data"]["BOOLEAN"] data[EventDataTypes.BYTE] = json_data["data"]["BYTE"] result = Event(json_data["timestamp"], json_data["name"], data, FileSystemSaveMode.DISK, file_dir) result.metadata = json_data["metadata"] result._uncorrected_timestamp = json_data["uncorrected_timestamp"] result.set_errors(RedVoxExceptions.from_dict(json_data["errors"])) else: result = Event(np.nan, "Empty") result.append_error(f"Loading from {file_name} failed; missing Event timestamp.") return result def to_json_file(self, file_name: Optional[str] = None) -> Path: """ saves the EventStream as a json file :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: event_[event.name].json :return: path to json file """ return io.event_to_json_file(self, file_name) def errors(self) -> RedVoxExceptions: """ :return: errors of the sensor """ return self._errors def set_errors(self, errors: RedVoxExceptions): """ sets the errors of the Sensor :param errors: errors to set """ self._errors = errors def append_error(self, error: str): """ add an error to the Sensor :param error: error to add """ self._errors.append(error) def print_errors(self): """ print all errors to screen """ self._errors.print()
Static methods
def from_json_dict(json_dict: dict) ‑> Event
-
:param json_dict: json dictionary to parse :return: Event from json dict
Expand source code
@staticmethod def from_json_dict(json_dict: dict) -> "Event": """ :param json_dict: json dictionary to parse :return: Event from json dict """ if "timestamp" in json_dict.keys(): data = get_empty_event_data_dict() data[EventDataTypes.STRING] = json_dict["data"]["STRING"] data[EventDataTypes.NUMERIC] = json_dict["data"]["NUMERIC"] data[EventDataTypes.BOOLEAN] = json_dict["data"]["BOOLEAN"] data[EventDataTypes.BYTE] = json_dict["data"]["BYTE"] result = Event(json_dict["timestamp"], json_dict["name"], data, FileSystemSaveMode.DISK) result.metadata = json_dict["metadata"] result._uncorrected_timestamp = json_dict["uncorrected_timestamp"] result.set_errors(RedVoxExceptions.from_dict(json_dict["errors"])) else: result = Event(np.nan, "Empty") result.append_error(f"Loading from json dict failed; missing Event timestamp.") return result
def from_json_file(file_dir: str, file_name: str) ‑> Event
-
:param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: Event from json file
Expand source code
@staticmethod def from_json_file(file_dir: str, file_name: str) -> "Event": """ :param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: Event from json file """ json_data = json_file_to_dict(os.path.join(file_dir, f"{file_name}")) if "timestamp" in json_data.keys(): data = get_empty_event_data_dict() data[EventDataTypes.STRING] = json_data["data"]["STRING"] data[EventDataTypes.NUMERIC] = json_data["data"]["NUMERIC"] data[EventDataTypes.BOOLEAN] = json_data["data"]["BOOLEAN"] data[EventDataTypes.BYTE] = json_data["data"]["BYTE"] result = Event(json_data["timestamp"], json_data["name"], data, FileSystemSaveMode.DISK, file_dir) result.metadata = json_data["metadata"] result._uncorrected_timestamp = json_data["uncorrected_timestamp"] result.set_errors(RedVoxExceptions.from_dict(json_data["errors"])) else: result = Event(np.nan, "Empty") result.append_error(f"Loading from {file_name} failed; missing Event timestamp.") return result
Methods
def append_error(self, error: str)
-
add an error to the Sensor
:param error: error to add
Expand source code
def append_error(self, error: str): """ add an error to the Sensor :param error: error to add """ self._errors.append(error)
def as_dict(self) ‑> dict
-
:return: EventStream as a dictionary
Expand source code
def as_dict(self) -> dict: """ :return: EventStream as a dictionary """ return { "name": self.name, "timestamp": self._timestamp, "uncorrected_timestamp": self._uncorrected_timestamp, "metadata": self.metadata, "data": self.__data_as_dict(), "errors": self._errors.as_dict(), }
def data(self) ‑> dict
-
:return: the data
Expand source code
def data(self) -> dict: """ :return: the data """ return self._data
def default_json_file_name(self) ‑> str
-
:return: default event json file name (event_[event.name]): note there is no extension
Expand source code
def default_json_file_name(self) -> str: """ :return: default event json file name (event_[event.name]): note there is no extension """ return f"event_{self.name}"
def errors(self) ‑> RedVoxExceptions
-
:return: errors of the sensor
Expand source code
def errors(self) -> RedVoxExceptions: """ :return: errors of the sensor """ return self._errors
def file_name(self) ‑> str
-
:return: file name without extension
Expand source code
def file_name(self) -> str: """ :return: file name without extension """ return self._fs_writer.file_name
def fs_writer(self) ‑> FileSystemWriter
-
:return: FileSystemWriter object
Expand source code
def fs_writer(self) -> Fsw: """ :return: FileSystemWriter object """ return self._fs_writer
def full_file_name(self) ‑> str
-
:return: full name of file containing the data
Expand source code
def full_file_name(self) -> str: """ :return: full name of file containing the data """ return self._fs_writer.full_name()
def full_path(self) ‑> str
-
:return: the full path to the data file
Expand source code
def full_path(self) -> str: """ :return: the full path to the data file """ return self._fs_writer.full_path()
def get_boolean_column(self, column_name: str) ‑> Dict[str, str]
-
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all boolean data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get :return: a dictionary of boolean data
Expand source code
def get_boolean_column(self, column_name: str) -> Dict[str, str]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all boolean data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of boolean data """ result = {} strs = self.get_boolean_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result
def get_boolean_item(self, data_key: str) ‑> Optional[bool]
-
get a boolean data value with a key matching data_key
:param data_key: the name of the data value to look for :return: boolean data if it exists, None otherwise
Expand source code
def get_boolean_item(self, data_key: str) -> Optional[bool]: """ get a boolean data value with a key matching data_key :param data_key: the name of the data value to look for :return: boolean data if it exists, None otherwise """ boos = self.get_boolean_values() for s in boos.keys(): if s == data_key: return boos[s] return None
def get_boolean_schema(self) ‑> List[str]
-
:return: the column names of boolean typed data as a list of strings
Expand source code
def get_boolean_schema(self) -> List[str]: """ :return: the column names of boolean typed data as a list of strings """ return self.get_schema()[EventDataTypes.BOOLEAN]
def get_boolean_values(self) ‑> dict
-
:return: the boolean data as a dictionary
Expand source code
def get_boolean_values(self) -> dict: """ :return: the boolean data as a dictionary """ return self._data[EventDataTypes.BOOLEAN]
def get_byte_column(self, column_name: str) ‑> Dict[str, str]
-
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all byte data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get :return: a dictionary of byte data
Expand source code
def get_byte_column(self, column_name: str) -> Dict[str, str]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all byte data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of byte data """ result = {} strs = self.get_byte_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result
def get_byte_item(self, data_key: str) ‑> Optional[str]
-
get a byte data value with a key matching data_key
:param data_key: the name of the data value to look for :return: byte data if it exists, None otherwise
Expand source code
def get_byte_item(self, data_key: str) -> Optional[str]: """ get a byte data value with a key matching data_key :param data_key: the name of the data value to look for :return: byte data if it exists, None otherwise """ byts = self.get_byte_values() for s in byts.keys(): if s == data_key: return byts[s] return None
def get_byte_schema(self) ‑> List[str]
-
:return: the column names of byte typed data as a list of strings
Expand source code
def get_byte_schema(self) -> List[str]: """ :return: the column names of byte typed data as a list of strings """ return self.get_schema()[EventDataTypes.BYTE]
def get_byte_values(self) ‑> dict
-
:return: the byte data as a dictionary
Expand source code
def get_byte_values(self) -> dict: """ :return: the byte data as a dictionary """ return self._data[EventDataTypes.BYTE]
def get_classification(self, index: int = 0) ‑> dict
-
get a classification from an event (anything that ends with "_X" where X is the index value)
:param index: index of classification, default 0 :return: dictionary of data
Expand source code
def get_classification(self, index: int = 0) -> dict: """ get a classification from an event (anything that ends with "_X" where X is the index value) :param index: index of classification, default 0 :return: dictionary of data """ result = {} for s in self._data.keys(): for b, v in self._data[s].items(): match = re.search(f"_{index}$", b) if match is not None: result[b] = v return result
def get_data_keys(self) ‑> List[str]
-
:return: the keys of the data in the event
Expand source code
def get_data_keys(self) -> List[str]: """ :return: the keys of the data in the event """ result = [] for f in self._data.keys(): result.extend([k for k in self._data[f].keys()]) return result
def get_item(self, data_key: str) ‑> Union[List[str], str, bool, float]
-
:param data_key: key of data to get :return: data with matching data_key or the list of all possible data keys
Expand source code
def get_item(self, data_key: str) -> Union[List[str], str, bool, float]: """ :param data_key: key of data to get :return: data with matching data_key or the list of all possible data keys """ for r in [ self.get_string_item(data_key), self.get_numeric_item(data_key), self.get_boolean_item(data_key), self.get_byte_item(data_key), ]: if r is not None: return r return self.get_data_keys()
def get_numeric_column(self, column_name: str) ‑> Dict[str, float]
-
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all numeric data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get :return: a dictionary of numeric data
Expand source code
def get_numeric_column(self, column_name: str) -> Dict[str, float]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all numeric data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of numeric data """ result = {} strs = self.get_numeric_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result
def get_numeric_item(self, data_key: str) ‑> Optional[float]
-
get a numeric data value with a key matching data_key
:param data_key: the name of the data value to look for :return: numeric data if it exists, None otherwise
Expand source code
def get_numeric_item(self, data_key: str) -> Optional[float]: """ get a numeric data value with a key matching data_key :param data_key: the name of the data value to look for :return: numeric data if it exists, None otherwise """ nums = self.get_numeric_values() for s in nums.keys(): if s == data_key: return nums[s] return None
def get_numeric_schema(self) ‑> List[str]
-
:return: the column names of numeric typed data as a list of strings
Expand source code
def get_numeric_schema(self) -> List[str]: """ :return: the column names of numeric typed data as a list of strings """ return self.get_schema()[EventDataTypes.NUMERIC]
def get_numeric_values(self) ‑> dict
-
:return: the numeric data as a dictionary
Expand source code
def get_numeric_values(self) -> dict: """ :return: the numeric data as a dictionary """ return self._data[EventDataTypes.NUMERIC]
def get_schema(self) ‑> Dict[EventDataTypes, list]
-
:return: the dictionary that summarizes the data names and types
Expand source code
def get_schema(self) -> Dict[EventDataTypes, list]: """ :return: the dictionary that summarizes the data names and types """ result = {} for f in self._data.keys(): result[f] = [k for k in self._data[f].keys()] return result
def get_string_column(self, column_name: str) ‑> Dict[str, str]
-
note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all string data points with keys that start with [column_name]
:param column_name: the name of the column of event data to get :return: a dictionary of string data
Expand source code
def get_string_column(self, column_name: str) -> Dict[str, str]: """ note: data points in events are named [column_name]_[X], where [X] is an integer 0 or greater. this function will return all string data points with keys that start with [column_name] :param column_name: the name of the column of event data to get :return: a dictionary of string data """ result = {} strs = self.get_string_values() for s, v in strs.items(): match = re.match(f"{column_name}_*", s) if match is not None: result[s] = v return result
def get_string_item(self, data_key: str) ‑> Optional[str]
-
get a string data value with a key matching data_key
:param data_key: the name of the data value to look for :return: string data if it exists, None otherwise
Expand source code
def get_string_item(self, data_key: str) -> Optional[str]: """ get a string data value with a key matching data_key :param data_key: the name of the data value to look for :return: string data if it exists, None otherwise """ strs = self.get_string_values() for s in strs.keys(): if s == data_key: return strs[s] return None
def get_string_schema(self) ‑> List[str]
-
:return: the column names of string typed data as a list of strings
Expand source code
def get_string_schema(self) -> List[str]: """ :return: the column names of string typed data as a list of strings """ return self.get_schema()[EventDataTypes.STRING]
def get_string_values(self) ‑> dict
-
:return: the string data as a dictionary
Expand source code
def get_string_values(self) -> dict: """ :return: the string data as a dictionary """ return self._data[EventDataTypes.STRING]
def get_timestamp(self) ‑> float
-
:return: timestamp of the Event
Expand source code
def get_timestamp(self) -> float: """ :return: timestamp of the Event """ return self._timestamp
def get_uncorrected_timestamp(self) ‑> float
-
:return: uncorrected timestamp of the Event
Expand source code
def get_uncorrected_timestamp(self) -> float: """ :return: uncorrected timestamp of the Event """ return self._uncorrected_timestamp
def has_data(self) ‑> bool
-
:return: True if Event contains at least one data point
Expand source code
def has_data(self) -> bool: """ :return: True if Event contains at least one data point """ return sum([len(self._data[j].keys()) for j in EventDataTypes.types_list()]) > 0
def is_save_to_disk(self) ‑> bool
-
:return: True if sensor will be saved to disk
Expand source code
def is_save_to_disk(self) -> bool: """ :return: True if sensor will be saved to disk """ return self._fs_writer.is_save_disk()
def is_timestamp_corrected(self) ‑> bool
-
:return: if timestamp of Event is updated
Expand source code
def is_timestamp_corrected(self) -> bool: """ :return: if timestamp of Event is updated """ return self._timestamp != self._uncorrected_timestamp
def original_timestamps(self, offset_model: OffsetModel, use_model_function: bool = False)
-
undo the update to the timestamp of the Event
:param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False
Expand source code
def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ undo the update to the timestamp of the Event :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ if not self.is_timestamp_corrected(): self._errors.append("Timestamps already not corrected!") else: self._timestamp = offset_model.get_original_time(self._timestamp, use_model_function)
def print_errors(self)
-
print all errors to screen
Expand source code
def print_errors(self): """ print all errors to screen """ self._errors.print()
def read_event(self, event: Event) ‑> Event
-
read the payloads of a Redvox Event, separate the data by payload type, then add it to the SDK Event
:param event: event to process :return: updated self
Expand source code
def read_event(self, event: es.Event) -> "Event": """ read the payloads of a Redvox Event, separate the data by payload type, then add it to the SDK Event :param event: event to process :return: updated self """ self.name = event.get_description() self._fs_writer.file_name = f"event_{self.name}" self.metadata = event.get_metadata() self._set_data(self.__get_data_from_event(event)) return self
def read_raw(self, event: src.redvox_api_m.redvox_api_m_pb2.Event) ‑> Event
-
read the contents of a Redvox Api1000 protobuf stream
:param event: the protobuf stream to read
Expand source code
def read_raw(self, event: RedvoxPacketM.EventStream.Event) -> "Event": """ read the contents of a Redvox Api1000 protobuf stream :param event: the protobuf stream to read """ self.name = event.description self._fs_writer.file_name = f"event_{self.name}" self.metadata = dict(event.metadata) self._set_data(self.__get_data_from_event_raw(event)) return self
def save_dir(self) ‑> str
-
:return: directory containing parquet files for the sensor
Expand source code
def save_dir(self) -> str: """ :return: directory containing parquet files for the sensor """ return self._fs_writer.save_dir()
def save_mode(self) ‑> FileSystemSaveMode
-
:return: the save mode
Expand source code
def save_mode(self) -> FileSystemSaveMode: """ :return: the save mode """ return self._fs_writer.save_mode()
def set_errors(self, errors: RedVoxExceptions)
-
sets the errors of the Sensor
:param errors: errors to set
Expand source code
def set_errors(self, errors: RedVoxExceptions): """ sets the errors of the Sensor :param errors: errors to set """ self._errors = errors
def set_file_name(self, new_file: Optional[str] = None)
-
- set the pyarrow file name or use the default: event_{Event.name}
- Do not give an extension
:param new_file: optional file name to change to; default None (use default name)
Expand source code
def set_file_name(self, new_file: Optional[str] = None): """ * set the pyarrow file name or use the default: event_{Event.name} * Do not give an extension :param new_file: optional file name to change to; default None (use default name) """ self._fs_writer.file_name = new_file if new_file else f"event_{self.name}"
def set_save_dir(self, new_dir: Optional[str] = None)
-
set the pyarrow directory or use the default: "." (current directory)
:param new_dir: the directory to change to; default None (use current directory)
Expand source code
def set_save_dir(self, new_dir: Optional[str] = None): """ set the pyarrow directory or use the default: "." (current directory) :param new_dir: the directory to change to; default None (use current directory) """ self._fs_writer.base_dir = new_dir if new_dir else "."
def set_save_mode(self, save_mode: FileSystemSaveMode)
-
set the save mode
:param save_mode: new save mode
Expand source code
def set_save_mode(self, save_mode: FileSystemSaveMode): """ set the save mode :param save_mode: new save mode """ self._fs_writer.set_save_mode(save_mode)
def set_save_to_disk(self, save: bool)
-
:param save: If True, save to disk
Expand source code
def set_save_to_disk(self, save: bool): """ :param save: If True, save to disk """ self._fs_writer.save_to_disk = save
def to_json_file(self, file_name: Optional[str] = None) ‑> pathlib.Path
-
saves the EventStream as a json file
:param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: event_[event.name].json :return: path to json file
Expand source code
def to_json_file(self, file_name: Optional[str] = None) -> Path: """ saves the EventStream as a json file :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: event_[event.name].json :return: path to json file """ return io.event_to_json_file(self, file_name)
def update_timestamps(self, offset_model: OffsetModel, use_model_function: bool = False)
-
updates the timestamp of the Event
:param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False
Expand source code
def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ updates the timestamp of the Event :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ if self.is_timestamp_corrected(): self._errors.append("Timestamps already corrected!") else: self._timestamp = offset_model.update_time(self._timestamp, use_model_function)
class EventDataTypes (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumeration of data types for event data
Expand source code
class EventDataTypes(enum.Enum): """ Enumeration of data types for event data """ STRING = 0 # string data NUMERIC = 1 # numeric data BOOLEAN = 2 # boolean data BYTE = 3 # bytes data @staticmethod def types_list() -> List["EventDataTypes"]: """ :return: the values of EventDataTypes as a list in order of: STRING, NUMERIC, BOOLEAN, BYTE """ return [EventDataTypes.STRING, EventDataTypes.NUMERIC, EventDataTypes.BOOLEAN, EventDataTypes.BYTE]
Ancestors
- enum.Enum
Class variables
var BOOLEAN
var BYTE
var NUMERIC
var STRING
Static methods
def types_list() ‑> List[EventDataTypes]
-
:return: the values of EventDataTypes as a list in order of: STRING, NUMERIC, BOOLEAN, BYTE
Expand source code
@staticmethod def types_list() -> List["EventDataTypes"]: """ :return: the values of EventDataTypes as a list in order of: STRING, NUMERIC, BOOLEAN, BYTE """ return [EventDataTypes.STRING, EventDataTypes.NUMERIC, EventDataTypes.BOOLEAN, EventDataTypes.BYTE]
class EventStream (name: str = 'stream', events: List[Event] = <factory>, input_sample_rate: int = 0, samples_per_window: int = 0, samples_per_hop: int = 0, model_version: str = 'n/a', metadata: Dict[str, str] = <factory>, debug: bool = False)
-
stores multiple events.
ALL timestamps in microseconds since epoch UTC unless otherwise stated
Properties
name: string; name of the EventStream. Default "stream"
events: List[Event]; all events in the stream. Default empty list
input_sample_rate: int; audio sample rate. Default 0
samples_per_window: int; samples per window of the events. Default 0
samples_per_hop: int; samples per hop of the events. Default 0
model_version: string; version of the model. Default "0.0"
metadata: Dict[str, str]; metadata as dict of strings. Default empty dict
debug: boolean; if True, outputs additional information at runtime. Default False.
Expand source code
@dataclass_json @dataclass class EventStream: """ stores multiple events. ALL timestamps in microseconds since epoch UTC unless otherwise stated Properties: name: string; name of the EventStream. Default "stream" events: List[Event]; all events in the stream. Default empty list input_sample_rate: int; audio sample rate. Default 0 samples_per_window: int; samples per window of the events. Default 0 samples_per_hop: int; samples per hop of the events. Default 0 model_version: string; version of the model. Default "0.0" metadata: Dict[str, str]; metadata as dict of strings. Default empty dict debug: boolean; if True, outputs additional information at runtime. Default False. """ name: str = "stream" events: List[Event] = field(default_factory=lambda: []) input_sample_rate: int = 0 samples_per_window: int = 0 samples_per_hop: int = 0 model_version: str = "n/a" metadata: Dict[str, str] = field(default_factory=lambda: {}) debug: bool = False def __repr__(self): return ( f"name: {self.name}, " f"events: {[s.__repr__() for s in self.events]}, " f"input_sample_rate: {self.input_sample_rate}, " f"samples_per_window: {self.samples_per_window}, " f"samples_per_hop: {self.samples_per_hop}, " f"model_version: {self.model_version}" ) def __str__(self): return ( f"name: {self.name}, " f"events: {[s.__str__() for s in self.events]}, " f"input_sample_rate: {self.input_sample_rate}, " f"samples_per_window: {self.samples_per_window}, " f"samples_per_hop: {self.samples_per_hop}, " f"model_version: {self.model_version}" ) def as_dict(self) -> dict: """ :return: EventStream as a dictionary """ return { "name": self.name, "events": [e.as_dict() for e in self.events], "input_sample_rate": self.input_sample_rate, "samples_per_window": self.samples_per_window, "samples_per_hop": self.samples_per_hop, "model_version": self.model_version, "metadata": self.metadata, } def has_data(self): """ :return: if there is at least one event """ return len(self.events) > 0 def has_events(self) -> bool: """ :return: True if there are one or more events in the stream """ return len(self.events) > 0 def get_event(self, index: int = 0) -> Optional[Event]: """ :param index: index of event to get. Use negative values to select from the end of the list. Default 0 (first event) :return: Event at the index, or None if the event/index doesn't exist """ if 0 > index: index += len(self.events) if 0 <= index < len(self.events): return self.events[index] return None def get_data_column(self, column_name: str) -> list: """ return a list of data with key column_name from each of the events if column_name doesn't exist, gets a list of valid column_names :param column_name: key of data to get :return: list of data named column_name or the list of all possible column names """ result = [] column_list = set() for r in self.events: val = r.get_item(column_name) if type(val) != list: result.append(val) else: for v in val: column_list.add(v) if len(result) > 0: return result return list(column_list) @staticmethod def from_eventstream( stream: RedvoxPacketM.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = "." ) -> "EventStream": """ convert a Redvox Api1000 Packet EventStream into its sdk version :param stream: Redvox Api1000 Packet EventStream to read data from :param save_mode: FileSystemSaveMode that determines how Event data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False. Default current directory (".") :return: EventStream (sdk version) """ result = EventStream(stream.name, metadata=dict(stream.metadata)) if "input_sample_rate" in stream.metadata.keys(): result.input_sample_rate = int(stream.metadata.get("input_sample_rate")) if "input_samples_per_window" in stream.metadata.keys(): result.samples_per_window = int(stream.metadata.get("input_samples_per_window")) if "input_samples_per_hop" in stream.metadata.keys(): result.samples_per_hop = int(stream.metadata.get("input_samples_per_hop")) if "model_version" in stream.metadata.keys(): result.model_version = stream.metadata.get("model_version") result.add_events(stream, save_mode=save_mode, base_dir=base_dir) return result def add_events( self, stream: RedvoxPacketM.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = ".", ): """ add events from a Redvox Api1000 Packet EventStream with the same name. Does nothing if names do not match :param stream: stream of events to add :param save_mode: FileSystemSaveMode that determines how Event data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False. Default current directory (".") """ if self.name == stream.name: timestamps = stream.timestamps.timestamps events = stream.events for i in range(len(timestamps)): self.events.append(Event(timestamps[i], save_mode=save_mode, base_dir=base_dir).read_raw(events[i])) elif self.debug: print(f"Stream name mismatch while adding to EventStream. Expected {self.name}, got {stream.name}.") def sort_events(self, asc: bool = True): """ sort the events in the stream via ascending or descending timestamp order :param asc: if True, data is sorted in ascending order """ self.events.sort(key=lambda e: e.get_timestamp(), reverse=not asc) def num_events(self) -> int: """ :return: number of events in stream """ return len(self.events) def sample_rate_hz(self): """ :return: sample rate of events in the stream in hz """ return np.mean(np.diff([e.get_timestamp() for e in self.events])) def window_sample_rate_hz(self): """ :return: idealized event sample window rate in hz """ return self.input_sample_rate / self.samples_per_window def hop_sample_rate_hz(self): """ :return: idealized event sample hop rate in hz """ return self.input_sample_rate / self.samples_per_hop def create_event_window(self, start: float = -np.inf, end: float = np.inf): """ removes any event in the stream that doesn't match start <= event < end adds empty events to beginning and end of data (as long as the corresponding input values are not infinity) default start is negative infinity, default end is infinity all times in microseconds since epoch UTC :param start: inclusive start time of events to keep :param end: exclusive end time of events to keep """ self.events = [s for s in self.events if start <= s.get_timestamp() < end] if self.num_events() > 0: if start < self.events[0].get_timestamp() and not np.isinf(start): self.events.insert(0, Event(start, self.name)) if not np.isinf(end): self.events.append(Event(end - 1, self.name)) def get_file_names(self) -> List[str]: """ :return: the names of the files which store the event data """ return [e.file_name() for e in self.events] def save_streams(self): """ saves all streams to disk note: use the function set_save_dir() to change where events are saved """ for e in self.events: if e.is_save_to_disk(): e.to_json_file() def set_save_dir(self, new_dir: str): """ change the directory where events are saved to :param new_dir: new directory path """ for e in self.events: e.set_save_dir(new_dir) def set_save_mode(self, new_save_mode: FileSystemSaveMode): """ update the save mode for all EventStream :param new_save_mode: save mode to set """ for e in self.events: e.set_save_mode(new_save_mode) def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ update the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.events: evnt.update_timestamps(offset_model, use_model_function) def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ undo the update to the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.events: evnt.original_timestamps(offset_model, use_model_function) @staticmethod def from_json_dict(json_dict: dict) -> "EventStream": """ :param json_dict: json dict to parse :return: EventStream from json dict """ if "name" in json_dict.keys(): result = EventStream( json_dict["name"], [Event.from_json_dict(e) for e in json_dict["events"]], json_dict["input_sample_rate"], json_dict["samples_per_window"], json_dict["samples_per_hop"], json_dict["model_version"], json_dict["metadata"], ) else: result = EventStream("Empty Stream; no name for identification") return result @staticmethod def from_json_file(file_dir: str, file_name: str) -> "EventStream": """ :param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: EventStream from json file """ json_data = json_file_to_dict(os.path.join(file_dir, f"{file_name}")) if "name" in json_data.keys(): result = EventStream( json_data["name"], json_data["events"], json_data["input_sample_rate"], json_data["samples_per_window"], json_data["samples_per_hop"], json_data["model_version"], json_data["metadata"], ) result.set_save_mode(FileSystemSaveMode.DISK) result.set_save_dir(file_dir) else: result = EventStream("Empty Stream; no name for identification") return result def to_json_file(self, file_dir: str = ".", file_name: Optional[str] = None) -> Path: """ saves the EventStream as a json file :param file_dir: the directory to save the file into. default current directory (".") :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: eventstream_[eventstream.name].json :return: path to json file """ return io.eventstream_to_json_file(self, file_dir, file_name) def print_errors(self): """ print all errors to screen """ for e in self.events: e.print_errors()
Class variables
var debug : bool
var events : List[Event]
var input_sample_rate : int
var metadata : Dict[str, str]
var model_version : str
var name : str
var samples_per_hop : int
var samples_per_window : int
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_eventstream(stream: src.redvox_api_m.redvox_api_m_pb2.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = '.') ‑> EventStream
-
convert a Redvox Api1000 Packet EventStream into its sdk version
:param stream: Redvox Api1000 Packet EventStream to read data from :param save_mode: FileSystemSaveMode that determines how Event data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False. Default current directory (".") :return: EventStream (sdk version)
Expand source code
@staticmethod def from_eventstream( stream: RedvoxPacketM.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = "." ) -> "EventStream": """ convert a Redvox Api1000 Packet EventStream into its sdk version :param stream: Redvox Api1000 Packet EventStream to read data from :param save_mode: FileSystemSaveMode that determines how Event data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False. Default current directory (".") :return: EventStream (sdk version) """ result = EventStream(stream.name, metadata=dict(stream.metadata)) if "input_sample_rate" in stream.metadata.keys(): result.input_sample_rate = int(stream.metadata.get("input_sample_rate")) if "input_samples_per_window" in stream.metadata.keys(): result.samples_per_window = int(stream.metadata.get("input_samples_per_window")) if "input_samples_per_hop" in stream.metadata.keys(): result.samples_per_hop = int(stream.metadata.get("input_samples_per_hop")) if "model_version" in stream.metadata.keys(): result.model_version = stream.metadata.get("model_version") result.add_events(stream, save_mode=save_mode, base_dir=base_dir) return result
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def from_json_dict(json_dict: dict) ‑> EventStream
-
:param json_dict: json dict to parse :return: EventStream from json dict
Expand source code
@staticmethod def from_json_dict(json_dict: dict) -> "EventStream": """ :param json_dict: json dict to parse :return: EventStream from json dict """ if "name" in json_dict.keys(): result = EventStream( json_dict["name"], [Event.from_json_dict(e) for e in json_dict["events"]], json_dict["input_sample_rate"], json_dict["samples_per_window"], json_dict["samples_per_hop"], json_dict["model_version"], json_dict["metadata"], ) else: result = EventStream("Empty Stream; no name for identification") return result
def from_json_file(file_dir: str, file_name: str) ‑> EventStream
-
:param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: EventStream from json file
Expand source code
@staticmethod def from_json_file(file_dir: str, file_name: str) -> "EventStream": """ :param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: EventStream from json file """ json_data = json_file_to_dict(os.path.join(file_dir, f"{file_name}")) if "name" in json_data.keys(): result = EventStream( json_data["name"], json_data["events"], json_data["input_sample_rate"], json_data["samples_per_window"], json_data["samples_per_hop"], json_data["model_version"], json_data["metadata"], ) result.set_save_mode(FileSystemSaveMode.DISK) result.set_save_dir(file_dir) else: result = EventStream("Empty Stream; no name for identification") return result
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def add_events(self, stream: src.redvox_api_m.redvox_api_m_pb2.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = '.')
-
add events from a Redvox Api1000 Packet EventStream with the same name. Does nothing if names do not match
:param stream: stream of events to add :param save_mode: FileSystemSaveMode that determines how Event data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False. Default current directory (".")
Expand source code
def add_events( self, stream: RedvoxPacketM.EventStream, save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM, base_dir: str = ".", ): """ add events from a Redvox Api1000 Packet EventStream with the same name. Does nothing if names do not match :param stream: stream of events to add :param save_mode: FileSystemSaveMode that determines how Event data is saved. Default FileSystemSaveMode.MEM (use RAM). Other options are DISK (save to directory) and TEMP (save to temporary directory) :param base_dir: the location of the parquet file that holds the Event data. Not used if save_data is False. Default current directory (".") """ if self.name == stream.name: timestamps = stream.timestamps.timestamps events = stream.events for i in range(len(timestamps)): self.events.append(Event(timestamps[i], save_mode=save_mode, base_dir=base_dir).read_raw(events[i])) elif self.debug: print(f"Stream name mismatch while adding to EventStream. Expected {self.name}, got {stream.name}.")
def as_dict(self) ‑> dict
-
:return: EventStream as a dictionary
Expand source code
def as_dict(self) -> dict: """ :return: EventStream as a dictionary """ return { "name": self.name, "events": [e.as_dict() for e in self.events], "input_sample_rate": self.input_sample_rate, "samples_per_window": self.samples_per_window, "samples_per_hop": self.samples_per_hop, "model_version": self.model_version, "metadata": self.metadata, }
def create_event_window(self, start: float = -inf, end: float = inf)
-
removes any event in the stream that doesn't match start <= event < end adds empty events to beginning and end of data (as long as the corresponding input values are not infinity) default start is negative infinity, default end is infinity all times in microseconds since epoch UTC
:param start: inclusive start time of events to keep :param end: exclusive end time of events to keep
Expand source code
def create_event_window(self, start: float = -np.inf, end: float = np.inf): """ removes any event in the stream that doesn't match start <= event < end adds empty events to beginning and end of data (as long as the corresponding input values are not infinity) default start is negative infinity, default end is infinity all times in microseconds since epoch UTC :param start: inclusive start time of events to keep :param end: exclusive end time of events to keep """ self.events = [s for s in self.events if start <= s.get_timestamp() < end] if self.num_events() > 0: if start < self.events[0].get_timestamp() and not np.isinf(start): self.events.insert(0, Event(start, self.name)) if not np.isinf(end): self.events.append(Event(end - 1, self.name))
def get_data_column(self, column_name: str) ‑> list
-
return a list of data with key column_name from each of the events if column_name doesn't exist, gets a list of valid column_names
:param column_name: key of data to get :return: list of data named column_name or the list of all possible column names
Expand source code
def get_data_column(self, column_name: str) -> list: """ return a list of data with key column_name from each of the events if column_name doesn't exist, gets a list of valid column_names :param column_name: key of data to get :return: list of data named column_name or the list of all possible column names """ result = [] column_list = set() for r in self.events: val = r.get_item(column_name) if type(val) != list: result.append(val) else: for v in val: column_list.add(v) if len(result) > 0: return result return list(column_list)
def get_event(self, index: int = 0) ‑> Optional[Event]
-
:param index: index of event to get. Use negative values to select from the end of the list. Default 0 (first event) :return: Event at the index, or None if the event/index doesn't exist
Expand source code
def get_event(self, index: int = 0) -> Optional[Event]: """ :param index: index of event to get. Use negative values to select from the end of the list. Default 0 (first event) :return: Event at the index, or None if the event/index doesn't exist """ if 0 > index: index += len(self.events) if 0 <= index < len(self.events): return self.events[index] return None
def get_file_names(self) ‑> List[str]
-
:return: the names of the files which store the event data
Expand source code
def get_file_names(self) -> List[str]: """ :return: the names of the files which store the event data """ return [e.file_name() for e in self.events]
def has_data(self)
-
:return: if there is at least one event
Expand source code
def has_data(self): """ :return: if there is at least one event """ return len(self.events) > 0
def has_events(self) ‑> bool
-
:return: True if there are one or more events in the stream
Expand source code
def has_events(self) -> bool: """ :return: True if there are one or more events in the stream """ return len(self.events) > 0
def hop_sample_rate_hz(self)
-
:return: idealized event sample hop rate in hz
Expand source code
def hop_sample_rate_hz(self): """ :return: idealized event sample hop rate in hz """ return self.input_sample_rate / self.samples_per_hop
def num_events(self) ‑> int
-
:return: number of events in stream
Expand source code
def num_events(self) -> int: """ :return: number of events in stream """ return len(self.events)
def original_timestamps(self, offset_model: OffsetModel, use_model_function: bool = False)
-
undo the update to the timestamps in the data
:param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False
Expand source code
def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ undo the update to the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.events: evnt.original_timestamps(offset_model, use_model_function)
def print_errors(self)
-
print all errors to screen
Expand source code
def print_errors(self): """ print all errors to screen """ for e in self.events: e.print_errors()
def sample_rate_hz(self)
-
:return: sample rate of events in the stream in hz
Expand source code
def sample_rate_hz(self): """ :return: sample rate of events in the stream in hz """ return np.mean(np.diff([e.get_timestamp() for e in self.events]))
def save_streams(self)
-
saves all streams to disk
note: use the function set_save_dir() to change where events are saved
Expand source code
def save_streams(self): """ saves all streams to disk note: use the function set_save_dir() to change where events are saved """ for e in self.events: if e.is_save_to_disk(): e.to_json_file()
def set_save_dir(self, new_dir: str)
-
change the directory where events are saved to
:param new_dir: new directory path
Expand source code
def set_save_dir(self, new_dir: str): """ change the directory where events are saved to :param new_dir: new directory path """ for e in self.events: e.set_save_dir(new_dir)
def set_save_mode(self, new_save_mode: FileSystemSaveMode)
-
update the save mode for all EventStream
:param new_save_mode: save mode to set
Expand source code
def set_save_mode(self, new_save_mode: FileSystemSaveMode): """ update the save mode for all EventStream :param new_save_mode: save mode to set """ for e in self.events: e.set_save_mode(new_save_mode)
def sort_events(self, asc: bool = True)
-
sort the events in the stream via ascending or descending timestamp order
:param asc: if True, data is sorted in ascending order
Expand source code
def sort_events(self, asc: bool = True): """ sort the events in the stream via ascending or descending timestamp order :param asc: if True, data is sorted in ascending order """ self.events.sort(key=lambda e: e.get_timestamp(), reverse=not asc)
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
def to_json_file(self, file_dir: str = '.', file_name: Optional[str] = None) ‑> pathlib.Path
-
saves the EventStream as a json file
:param file_dir: the directory to save the file into. default current directory (".") :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: eventstream_[eventstream.name].json :return: path to json file
Expand source code
def to_json_file(self, file_dir: str = ".", file_name: Optional[str] = None) -> Path: """ saves the EventStream as a json file :param file_dir: the directory to save the file into. default current directory (".") :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: eventstream_[eventstream.name].json :return: path to json file """ return io.eventstream_to_json_file(self, file_dir, file_name)
def update_timestamps(self, offset_model: OffsetModel, use_model_function: bool = False)
-
update the timestamps in the data
:param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False
Expand source code
def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ update the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.events: evnt.update_timestamps(offset_model, use_model_function)
def window_sample_rate_hz(self)
-
:return: idealized event sample window rate in hz
Expand source code
def window_sample_rate_hz(self): """ :return: idealized event sample window rate in hz """ return self.input_sample_rate / self.samples_per_window
class EventStreams (streams: List[EventStream] = <factory>, ml_data: Optional[ExtractedMl] = None, debug: bool = False)
-
stores multiple event streams per station.
ALL timestamps in microseconds since epoch UTC unless otherwise stated
Properties
streams: List[EventStream]; list of all EventStream. Default empty list
ml_data: Optional ExtractedMl from the packets. Default None
debug: bool; if True, output additional information during runtime. Default False
Expand source code
@dataclass_json @dataclass class EventStreams: """ stores multiple event streams per station. ALL timestamps in microseconds since epoch UTC unless otherwise stated Properties: streams: List[EventStream]; list of all EventStream. Default empty list ml_data: Optional ExtractedMl from the packets. Default None debug: bool; if True, output additional information during runtime. Default False """ streams: List[EventStream] = field(default_factory=lambda: []) ml_data: Optional[ml.ExtractedMl] = None debug: bool = False def __repr__(self): return f"streams: {[s.__repr__() for s in self.streams]}, ml: {self.ml_data}, debug: {self.debug}" def __str__(self): return f"streams: {[s.__str__() for s in self.streams]}, ml: {self.ml_data}" def as_dict(self) -> dict: """ :return: EventStreams as dict """ return { "streams": [s.as_dict() for s in self.streams], "ml_data": self.ml_data.to_dict() if self.ml_data else None, } def read_from_packet(self, packet: RedvoxPacketM): """ read the eventstream payload from a single Redvox Api1000 packet :param packet: packet to read data from """ for st in packet.event_streams: if st.name == ml.ML_EVENT_STREAM_NAME: if self.ml_data: self.ml_data.windows.extend(ml.extract_ml_windows(_find_ml_event_stream(packet))) else: self.ml_data = ml.extract_ml_from_packet(WrappedRedvoxPacketM(packet)) else: if st.name in self.get_stream_names() and self.get_stream(st.name).has_data(): self.get_stream(st.name).add_events(st) else: self.remove_stream(st.name) self.streams.append(EventStream.from_eventstream(st)) def read_from_packets_list(self, packets: List[RedvoxPacketM]): """ read the eventstream payload from multiple Redvox Api1000 packets :param packets: packets to read data from """ for p in packets: if type(p) == RedvoxPacketM: self.read_from_packet(p) def append(self, other_stream: EventStream): """ append another EventStream to an existing EventStream or add to the list of EventStream :param other_stream: other EventStream to add """ if other_stream.name in self.get_stream_names(): self.get_stream(other_stream.name).add_events(other_stream) else: self.streams.append(other_stream) def append_ml(self, other_ml: ml.ExtractedMl): """ append the windows from another extracted machine learning object or set the existing ML object if its empty. :param other_ml: other ExtractedMl to add """ if self.ml_data: self.ml_data.windows.extend(other_ml.windows) else: self.ml_data = other_ml def append_streams(self, other_streams: "EventStreams"): """ append another EventStreams object to an existing EventStreams object :param other_streams: EventStreams to add """ for s in other_streams.streams: self.append(s) def remove_stream(self, stream_name: str): """ remove any stream with the same stream_name :param stream_name: name of stream to remove """ self.streams = [s for s in self.streams if s.name != stream_name] def get_stream(self, stream_name: str) -> Optional[EventStream]: """ :param stream_name: name of event stream to get :return: the EventStream that has the name specified or None if it doesn't exist """ for s in self.streams: if s.name == stream_name: return s if self.debug: print(f"{stream_name} does not exist in streams. Use one of {[self.get_stream_names()]}") return None def get_stream_names(self) -> List[str]: """ :return: names of all streams """ return [s.name for s in self.streams] def create_event_window(self, start: float = -np.inf, end: float = np.inf): """ removes any event in the streams and ML that doesn't match start <= event < end default start is negative infinity, default end is infinity all times in microseconds since epoch UTC :param start: inclusive start time of events to keep :param end: exclusive end time of events to keep """ for s in self.streams: s.create_event_window(start, end) if self.ml_data: self.ml_data.windows = [s for s in self.ml_data.windows if start <= s.timestamp < end] def set_save_dir(self, new_dir: str): """ change the directory where events are saved to :param new_dir: new directory path """ for s in self.streams: s.set_save_dir(new_dir) def set_save_mode(self, new_save_mode: FileSystemSaveMode): """ update the save mode for all EventStream :param new_save_mode: save mode to set """ for s in self.streams: s.set_save_mode(new_save_mode) def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ update the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.streams: evnt.update_timestamps(offset_model, use_model_function) if self.ml_data: for w in self.ml_data.windows: w.timestamp = offset_model.update_time(w.timestamp, use_model_function) def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ undo the update to the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.streams: evnt.original_timestamps(offset_model, use_model_function) if self.ml_data: for w in self.ml_data.windows: w.timestamp = offset_model.get_original_time(w.timestamp, use_model_function) @staticmethod def from_dict(in_dict: dict) -> "EventStreams": """ :param in_dict: dictionary representing an EventStreams object :return: the EventStreams object from the dictionary """ result = EventStreams() if "streams" in in_dict.keys(): result.streams = [EventStream.from_json_dict(s) for s in in_dict["streams"]] if "ml_data" in in_dict.keys(): result.ml_data = ml.ExtractedMl.from_dict(in_dict["ml_data"]) return result @staticmethod def from_json_file(file_dir: str, file_name: str) -> "EventStreams": """ :param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: EventStreams from json file """ return EventStreams.from_dict(json_file_to_dict(os.path.join(file_dir, f"{file_name}"))) def to_json_file(self, file_dir: str = ".", file_name: Optional[str] = None) -> Path: """ saves the EventStream as a json file :param file_dir: the directory to save the file into. default current directory (".") :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: eventstreams.json :return: path to json file """ return io.eventstreams_to_json_file(self, file_dir, file_name)
Class variables
var debug : bool
var ml_data : Optional[ExtractedMl]
var streams : List[EventStream]
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def from_json_file(file_dir: str, file_name: str) ‑> EventStreams
-
:param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: EventStreams from json file
Expand source code
@staticmethod def from_json_file(file_dir: str, file_name: str) -> "EventStreams": """ :param file_dir: full path to containing directory for the file :param file_name: name of file to load data from :return: EventStreams from json file """ return EventStreams.from_dict(json_file_to_dict(os.path.join(file_dir, f"{file_name}")))
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def append(self, other_stream: EventStream)
-
append another EventStream to an existing EventStream or add to the list of EventStream
:param other_stream: other EventStream to add
Expand source code
def append(self, other_stream: EventStream): """ append another EventStream to an existing EventStream or add to the list of EventStream :param other_stream: other EventStream to add """ if other_stream.name in self.get_stream_names(): self.get_stream(other_stream.name).add_events(other_stream) else: self.streams.append(other_stream)
def append_ml(self, other_ml: ExtractedMl)
-
append the windows from another extracted machine learning object or set the existing ML object if its empty.
:param other_ml: other ExtractedMl to add
Expand source code
def append_ml(self, other_ml: ml.ExtractedMl): """ append the windows from another extracted machine learning object or set the existing ML object if its empty. :param other_ml: other ExtractedMl to add """ if self.ml_data: self.ml_data.windows.extend(other_ml.windows) else: self.ml_data = other_ml
def append_streams(self, other_streams: EventStreams)
-
append another EventStreams object to an existing EventStreams object
:param other_streams: EventStreams to add
Expand source code
def append_streams(self, other_streams: "EventStreams"): """ append another EventStreams object to an existing EventStreams object :param other_streams: EventStreams to add """ for s in other_streams.streams: self.append(s)
def as_dict(self) ‑> dict
-
:return: EventStreams as dict
Expand source code
def as_dict(self) -> dict: """ :return: EventStreams as dict """ return { "streams": [s.as_dict() for s in self.streams], "ml_data": self.ml_data.to_dict() if self.ml_data else None, }
def create_event_window(self, start: float = -inf, end: float = inf)
-
removes any event in the streams and ML that doesn't match start <= event < end default start is negative infinity, default end is infinity all times in microseconds since epoch UTC
:param start: inclusive start time of events to keep :param end: exclusive end time of events to keep
Expand source code
def create_event_window(self, start: float = -np.inf, end: float = np.inf): """ removes any event in the streams and ML that doesn't match start <= event < end default start is negative infinity, default end is infinity all times in microseconds since epoch UTC :param start: inclusive start time of events to keep :param end: exclusive end time of events to keep """ for s in self.streams: s.create_event_window(start, end) if self.ml_data: self.ml_data.windows = [s for s in self.ml_data.windows if start <= s.timestamp < end]
def get_stream(self, stream_name: str) ‑> Optional[EventStream]
-
:param stream_name: name of event stream to get :return: the EventStream that has the name specified or None if it doesn't exist
Expand source code
def get_stream(self, stream_name: str) -> Optional[EventStream]: """ :param stream_name: name of event stream to get :return: the EventStream that has the name specified or None if it doesn't exist """ for s in self.streams: if s.name == stream_name: return s if self.debug: print(f"{stream_name} does not exist in streams. Use one of {[self.get_stream_names()]}") return None
def get_stream_names(self) ‑> List[str]
-
:return: names of all streams
Expand source code
def get_stream_names(self) -> List[str]: """ :return: names of all streams """ return [s.name for s in self.streams]
def original_timestamps(self, offset_model: OffsetModel, use_model_function: bool = False)
-
undo the update to the timestamps in the data
:param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False
Expand source code
def original_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ undo the update to the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.streams: evnt.original_timestamps(offset_model, use_model_function) if self.ml_data: for w in self.ml_data.windows: w.timestamp = offset_model.get_original_time(w.timestamp, use_model_function)
def read_from_packet(self, packet: src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM)
-
read the eventstream payload from a single Redvox Api1000 packet
:param packet: packet to read data from
Expand source code
def read_from_packet(self, packet: RedvoxPacketM): """ read the eventstream payload from a single Redvox Api1000 packet :param packet: packet to read data from """ for st in packet.event_streams: if st.name == ml.ML_EVENT_STREAM_NAME: if self.ml_data: self.ml_data.windows.extend(ml.extract_ml_windows(_find_ml_event_stream(packet))) else: self.ml_data = ml.extract_ml_from_packet(WrappedRedvoxPacketM(packet)) else: if st.name in self.get_stream_names() and self.get_stream(st.name).has_data(): self.get_stream(st.name).add_events(st) else: self.remove_stream(st.name) self.streams.append(EventStream.from_eventstream(st))
def read_from_packets_list(self, packets: List[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM])
-
read the eventstream payload from multiple Redvox Api1000 packets
:param packets: packets to read data from
Expand source code
def read_from_packets_list(self, packets: List[RedvoxPacketM]): """ read the eventstream payload from multiple Redvox Api1000 packets :param packets: packets to read data from """ for p in packets: if type(p) == RedvoxPacketM: self.read_from_packet(p)
def remove_stream(self, stream_name: str)
-
remove any stream with the same stream_name
:param stream_name: name of stream to remove
Expand source code
def remove_stream(self, stream_name: str): """ remove any stream with the same stream_name :param stream_name: name of stream to remove """ self.streams = [s for s in self.streams if s.name != stream_name]
def set_save_dir(self, new_dir: str)
-
change the directory where events are saved to
:param new_dir: new directory path
Expand source code
def set_save_dir(self, new_dir: str): """ change the directory where events are saved to :param new_dir: new directory path """ for s in self.streams: s.set_save_dir(new_dir)
def set_save_mode(self, new_save_mode: FileSystemSaveMode)
-
update the save mode for all EventStream
:param new_save_mode: save mode to set
Expand source code
def set_save_mode(self, new_save_mode: FileSystemSaveMode): """ update the save mode for all EventStream :param new_save_mode: save mode to set """ for s in self.streams: s.set_save_mode(new_save_mode)
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
def to_json_file(self, file_dir: str = '.', file_name: Optional[str] = None) ‑> pathlib.Path
-
saves the EventStream as a json file
:param file_dir: the directory to save the file into. default current directory (".") :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: eventstreams.json :return: path to json file
Expand source code
def to_json_file(self, file_dir: str = ".", file_name: Optional[str] = None) -> Path: """ saves the EventStream as a json file :param file_dir: the directory to save the file into. default current directory (".") :param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: eventstreams.json :return: path to json file """ return io.eventstreams_to_json_file(self, file_dir, file_name)
def update_timestamps(self, offset_model: OffsetModel, use_model_function: bool = False)
-
update the timestamps in the data
:param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False
Expand source code
def update_timestamps(self, offset_model: om.OffsetModel, use_model_function: bool = False): """ update the timestamps in the data :param offset_model: model used to update the timestamps :param use_model_function: if True, use the model's slope function to update the timestamps. otherwise uses the best offset (model's intercept value). Default False """ for evnt in self.streams: evnt.update_timestamps(offset_model, use_model_function) if self.ml_data: for w in self.ml_data.windows: w.timestamp = offset_model.update_time(w.timestamp, use_model_function)
class RedvoxPacketM (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
var DoubleSamplePayload
-
A ProtocolMessage
var EventStream
-
A ProtocolMessage
var MetadataEntry
-
A ProtocolMessage
var SamplePayload
-
A ProtocolMessage
var Sensors
-
A ProtocolMessage
var StationInformation
-
A ProtocolMessage
var SummaryStatistics
-
A ProtocolMessage
var TimingInformation
-
A ProtocolMessage
var TimingPayload
-
A ProtocolMessage
var Unit