Module redvox.common.data_window
This module creates specific time-bounded segments of data for users combines the base data files into a single composite object based on the user parameters
Expand source code
"""
This module creates specific time-bounded segments of data for users
combines the base data files into a single composite object based on the user parameters
"""
from pathlib import Path
from typing import Optional, Set, List, Dict, Iterable
from datetime import timedelta
from dataclasses import dataclass
from dataclasses_json import dataclass_json
import shutil
import os
import inspect
import pprint
import multiprocessing
import multiprocessing.pool
import numpy as np
import pyarrow as pa
import redvox
from redvox.common import run_me, io, data_window_io as dw_io, date_time_utils as dtu, gap_and_pad_utils as gpu
from redvox.common.data_window_configuration import DataWindowConfigFile
from redvox.common.parallel_utils import maybe_parallel_map
from redvox.common.station import Station, STATION_ID_LENGTH
from redvox.common.sensor_data import SensorType, SensorData
from redvox.common.api_reader_dw import ApiReaderDw
from redvox.common.errors import RedVoxExceptions
DEFAULT_START_BUFFER_TD: timedelta = timedelta(minutes=2.0) # default padding to start time of data
DEFAULT_END_BUFFER_TD: timedelta = timedelta(minutes=2.0) # default padding to end time of data
# minimum default length of time in seconds for data to be off by to be considered suspicious
DATA_DROP_DURATION_S: float = 0.2
@dataclass_json
@dataclass
class EventOrigin:
"""
The origin event's latitude, longitude, altitude and their standard deviations, the device used to measure
the location data and the radius of the event
Properties:
provider: str, source of the location data (i.e. "GPS" or "NETWORK"), default "UNKNOWN"
latitude: float, best estimate of latitude in degrees, default np.nan
latitude_std: float, standard deviation of best estimate of latitude, default np.nan
longitude: float, best estimate of longitude in degrees, default np.nan
longitude_std: float, standard deviation of best estimate of longitude, default np.nan
altitude: float, best estimate of altitude in meters, default np.nan
altitude_std: float, standard deviation of best estimate of altitude, default np.nan
event_radius_m: float, radius of event in meters, default 0.0
"""
provider: str = "UNKNOWN"
latitude: float = np.nan
latitude_std: float = np.nan
longitude: float = np.nan
longitude_std: float = np.nan
altitude: float = np.nan
altitude_std: float = np.nan
event_radius_m: float = 0.0
class DataWindowConfig:
"""
Configuration of DataWindow properties
Properties:
input_dir: str, the directory that contains all the data. REQUIRED
structured_layout: bool, if True, the input_dir contains specially named and organized
directories of data. Default True
start_datetime: optional datetime, start datetime of the window.
If None, uses the first timestamp of the filtered data. Default None
end_datetime: optional datetime, non-inclusive end datetime of the window.
If None, uses the last timestamp of the filtered data + 1. Default None
start_buffer_td: timedelta, the amount of time to include before the start_datetime when filtering data.
Negative values are converted to 0. Default DEFAULT_START_BUFFER_TD (2 minutes)
end_buffer_td: timedelta, the amount of time to include after the end_datetime when filtering data.
Negative values are converted to 0. Default DEFAULT_END_BUFFER_TD (2 minutes)
drop_time_s: float, the minimum amount of seconds between data files that would indicate a gap.
Negative values are converted to default value. Default DATA_DROP_DURATION_S (0.2 seconds)
station_ids: optional set of strings, representing the station ids to filter on.
If empty or None, get any ids found in the input directory. You may pass in any iterable, as long as it can be
turned into a set. Default None
extensions: optional set of strings, representing file extensions to filter on.
If None, gets as much data as it can in the input directory. Default None
api_versions: optional set of ApiVersions, representing api versions to filter on.
If None, get as much data as it can in the input directory. Default None
apply_correction: bool, if True, update the timestamps in the data based on best station offset. Default True
copy_edge_points: enumeration of DataPointCreationMode. Determines how new points are created.
Valid values are NAN, COPY, and INTERPOLATE. Default COPY
use_model_correction: bool, if True, use the offset model's correction functions, otherwise use the best
offset. Default True
"""
def __init__(
self,
input_dir: str,
structured_layout: bool = True,
start_datetime: Optional[dtu.datetime] = None,
end_datetime: Optional[dtu.datetime] = None,
start_buffer_td: timedelta = DEFAULT_START_BUFFER_TD,
end_buffer_td: timedelta = DEFAULT_END_BUFFER_TD,
drop_time_s: float = DATA_DROP_DURATION_S,
station_ids: Optional[Iterable[str]] = None,
extensions: Optional[Set[str]] = None,
api_versions: Optional[Set[io.ApiVersion]] = None,
apply_correction: bool = True,
use_model_correction: bool = True,
copy_edge_points: gpu.DataPointCreationMode = gpu.DataPointCreationMode.COPY,
):
self.input_dir: str = input_dir
self.structured_layout: bool = structured_layout
self.start_datetime: Optional[dtu.datetime] = start_datetime
self.end_datetime: Optional[dtu.datetime] = end_datetime
self.start_buffer_td: timedelta = (
start_buffer_td if start_buffer_td > timedelta(seconds=0) else timedelta(seconds=0)
)
self.end_buffer_td: timedelta = end_buffer_td if end_buffer_td > timedelta(seconds=0) else timedelta(seconds=0)
self.drop_time_s: float = drop_time_s if drop_time_s > 0 else DATA_DROP_DURATION_S
self.station_ids: Optional[Set[str]] = set(station_ids) if station_ids else None
self.extensions: Optional[Set[str]] = extensions
self.api_versions: Optional[Set[io.ApiVersion]] = api_versions
self.apply_correction: bool = apply_correction
self.use_model_correction = use_model_correction
self.copy_edge_points = copy_edge_points
def __repr__(self):
return (
f"input_dir: {self.input_dir}, "
f"structured_layout: {self.structured_layout}, "
f"start_datetime: {self.start_datetime.__repr__()}, "
f"end_datetime: {self.end_datetime.__repr__()}, "
f"start_buffer_td: {self.start_buffer_td.__repr__()}, "
f"end_buffer_td: {self.end_buffer_td.__repr__()}, "
f"drop_time_s: {self.drop_time_s}, "
f"station_ids: {list(self.station_ids) if self.station_ids else []}, "
f"extensions: {list(self.extensions) if self.extensions else []}, "
f"api_versions: {[a_v.value for a_v in self.api_versions] if self.api_versions else []}, "
f"apply_correction: {self.apply_correction}, "
f"use_model_correction: {self.use_model_correction}, "
f"copy_edge_points: {self.copy_edge_points.value}"
)
def __str__(self):
return (
f"input_dir: {self.input_dir}, "
f"structured_layout: {self.structured_layout}, "
f"start_datetime: "
f"{self.start_datetime.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.start_datetime else None}, "
f"end_datetime: {self.end_datetime.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.end_datetime else None}, "
f"start_buffer_td (in s): {self.start_buffer_td.total_seconds()}, "
f"end_buffer_td (in s): {self.end_buffer_td.total_seconds()}, "
f"drop_time_s: {self.drop_time_s}, "
f"station_ids: {list(self.station_ids) if self.station_ids else []}, "
f"extensions: {list(self.extensions) if self.extensions else []}, "
f"api_versions: {[a_v.value for a_v in self.api_versions] if self.api_versions else []}, "
f"apply_correction: {self.apply_correction}, "
f"use_model_correction: {self.use_model_correction}, "
f"copy_edge_points: {self.copy_edge_points.name}"
)
def to_dict(self) -> Dict:
return {
"input_dir": self.input_dir,
"structured_layout": self.structured_layout,
"start_datetime": dtu.datetime_to_epoch_microseconds_utc(self.start_datetime)
if self.start_datetime
else None,
"end_datetime": dtu.datetime_to_epoch_microseconds_utc(self.end_datetime) if self.end_datetime else None,
"start_buffer_td": self.start_buffer_td.total_seconds(),
"end_buffer_td": self.end_buffer_td.total_seconds(),
"drop_time_s": self.drop_time_s,
"station_ids": list(self.station_ids) if self.station_ids else [],
"extensions": list(self.extensions) if self.extensions else [],
"api_versions": [a_v.value for a_v in self.api_versions] if self.api_versions else [],
"apply_correction": self.apply_correction,
"use_model_correction": self.use_model_correction,
"copy_edge_points": self.copy_edge_points.value,
}
@staticmethod
def from_dict(data_dict: Dict) -> "DataWindowConfig":
return DataWindowConfig(
data_dict["input_dir"],
data_dict["structured_layout"],
dtu.datetime_from_epoch_microseconds_utc(data_dict["start_datetime"])
if data_dict["start_datetime"]
else None,
dtu.datetime_from_epoch_microseconds_utc(data_dict["end_datetime"]) if data_dict["end_datetime"] else None,
timedelta(seconds=data_dict["start_buffer_td"]),
timedelta(seconds=data_dict["end_buffer_td"]),
data_dict["drop_time_s"],
data_dict["station_ids"],
set(data_dict["extensions"]),
set([io.ApiVersion.from_str(v) for v in data_dict["api_versions"]]),
data_dict["apply_correction"],
data_dict["use_model_correction"],
gpu.DataPointCreationMode(data_dict["copy_edge_points"]),
)
class DataWindow:
"""
Holds the data for a given time window; adds interpolated timestamps to fill gaps and pad the start and end values
If a start time is given, data starting from that time will be included.
If an end time is given, data up to but not including that time will be included.
Refer to the DataWindowConfig class for more details on DataWindow parameters.
Properties:
event_name: str, name of the DataWindow. defaults to "dw"
event_origin: Optional EventOrigin which describes the physical location and radius of the
origin event. Default empty EventOrigin (no valid data)
config: optional DataWindowConfig with information on how to construct DataWindow from
Redvox (.rdvx*) files. Default None
sdk_version: str, the version of the Redvox SDK used to create the DataWindow
debug: bool, if True, outputs additional information during initialization. Default False
Protected:
_fs_writer: DataWindowFileSystemWriter; includes event_name, output directory (Default "."),
output type (options: "PARQUET", "LZ4", "JSON", "NONE". Default NONE), and option to make a
runme.py example file (Default False)
_stations: List of Stations that belong to the DataWindow
_errors: RedVoxExceptions; contains a list of all errors encountered by the DataWindow
"""
def __init__(
self,
event_name: str = "dw",
event_origin: Optional[EventOrigin] = None,
config: Optional[DataWindowConfig] = None,
output_dir: str = ".",
out_type: str = "NONE",
make_runme: bool = False,
debug: bool = False,
):
"""
Initialize the DataWindow.
If no config is passed, the DataWindow will not process any data.
:param event_name: name of the DataWindow. defaults to "dw"
:param event_origin: Optional EventOrigin which describes the physical location and radius of the
origin event. Default empty EventOrigin (no valid data)
:param config: Optional DataWindowConfig which describes how to extract data from Redvox files.
Default None
:param output_dir: output directory for saving files. Default "." (current directory)
:param out_type: type of file to save the DataWindow as. Options: "PARQUET", "LZ4", "JSON", "NONE".
Default "NONE" (no saving)
:param make_runme: if True, saves an example runme.py file with the data. Default False
:param debug: if True, outputs additional information during initialization. Default False
"""
self.event_name: str = event_name
self.event_origin: EventOrigin = event_origin if event_origin else EventOrigin()
self._fs_writer = dw_io.DataWindowFileSystemWriter(self.event_name, out_type, output_dir, make_runme)
self.debug: bool = debug
self._sdk_version: str = redvox.VERSION
self._errors = RedVoxExceptions("DataWindow")
self._stations: List[Station] = []
self._config = config
if config:
if config.start_datetime and config.end_datetime and (config.end_datetime <= config.start_datetime):
self._errors.append(
"DataWindow will not work when end datetime is before or equal to start datetime.\n"
f"Your times: {config.end_datetime} <= {config.start_datetime}"
)
else:
self.create_data_window()
if self.debug:
self.print_errors()
def __repr__(self):
return (
f"event_name: {self.event_name}, "
f"event_origin: {self.event_origin.__repr__()}, "
f"config: {self._config.__repr__()}, "
f"output_dir: {os.path.abspath(self.save_dir())}, "
f"out_type: {self._fs_writer.file_extension}, "
f"make_runme: {self._fs_writer.make_run_me}, "
f"sdk_version: {self._sdk_version}, "
f"debug: {self.debug}"
)
def __str__(self):
return (
f"event_name: {self.event_name}, "
f"event_origin: {self.event_origin.__str__()}, "
f"config: {self._config.__str__()}, "
f"output_dir: {os.path.abspath(self.save_dir())}, "
f"out_type: {self._fs_writer.file_extension}, "
f"make_runme: {self._fs_writer.make_run_me}, "
f"sdk_version: {self._sdk_version}, "
f"debug: {self.debug}"
)
def save_dir(self) -> str:
"""
:return: directory data is saved to (empty string means saving to memory)
"""
return self._fs_writer.save_dir()
def set_save_dir(self, new_save_dir: Optional[str] = "."):
"""
:param new_save_dir: directory to save data to; default current directory, or "."
"""
self._fs_writer.base_dir = new_save_dir
def is_make_runme(self) -> bool:
"""
:return: if DataWindow will be saved with a runme file
"""
return self._fs_writer.make_run_me
def set_make_runme(self, make_runme: bool = False):
"""
:param make_runme: if True, DataWindow will create a runme file when saved. Default False
"""
self._fs_writer.make_run_me = make_runme
def fs_writer(self) -> dw_io.DataWindowFileSystemWriter:
"""
:return: DataWindowFileSystemWriter for DataWindow
"""
return self._fs_writer
def out_type(self) -> str:
"""
:return: string of the output type of the DataWindow
"""
return self._fs_writer.file_extension
def set_out_type(self, new_out_type: str):
"""
set the output type of the DataWindow. options are "NONE", "PARQUET", "LZ4" and "JSON".
Invalid values become "NONE"
:param new_out_type: new output type of the DataWindow
"""
self._fs_writer.set_extension(new_out_type)
def as_dict(self) -> Dict:
"""
:return: DataWindow properties as dictionary
"""
return {
"event_name": self.event_name,
"event_origin": self.event_origin.to_dict(),
"start_time": self.start_date(),
"end_time": self.end_date(),
"base_dir": self.save_dir(),
"stations": [s.default_station_json_file_name() for s in self._stations],
"config": self._config.to_dict(),
"debug": self.debug,
"errors": self._errors.as_dict(),
"sdk_version": self._sdk_version,
"out_type": self._fs_writer.file_extension,
"make_runme": self._fs_writer.make_run_me,
}
def pretty(self) -> str:
"""
:return: DataWindow as dictionary, but easier to read
"""
# noinspection Mypy
return pprint.pformat(self.as_dict())
@staticmethod
def from_config(config: DataWindowConfigFile) -> "DataWindow":
"""
Use a config file to create a DataWindow
:param config: DataWindowConfigFile to load from
:return: DataWindow
"""
event_origin = EventOrigin(
config.origin_provider,
config.origin_latitude,
config.origin_latitude_std,
config.origin_longitude,
config.origin_longitude_std,
config.origin_altitude,
config.origin_altitude_std,
config.origin_event_radius_m,
)
dw_config = DataWindowConfig(
config.input_directory,
config.structured_layout,
config.start_dt(),
config.end_dt(),
config.start_buffer_td(),
config.end_buffer_td(),
config.drop_time_seconds,
config.station_ids,
config.extensions,
config.api_versions,
config.apply_correction,
config.use_model_correction,
config.copy_edge_points(),
)
return DataWindow(
config.event_name,
event_origin,
dw_config,
config.output_dir,
config.output_type,
config.make_runme,
config.debug,
)
@staticmethod
def from_config_file(file: str) -> "DataWindow":
"""
Loads a configuration file to create the DataWindow
:param file: full path to config file
:return: DataWindow
"""
return DataWindow.from_config(DataWindowConfigFile.from_path(file))
@staticmethod
def deserialize(path: str) -> "DataWindow":
"""
Decompresses and deserializes a DataWindow written to disk.
:param path: Path to the serialized and compressed DataWindow.
:return: An instance of a DataWindow.
"""
return dw_io.deserialize_data_window(path)
def serialize(self, compression_factor: int = 4) -> Path:
"""
Serializes and compresses this DataWindow to a file.
Uses the event_name and out_dir to name the file.
:param compression_factor: A value between 1 and 12. Higher values provide better compression, but take
longer. (default=4).
:return: The path to the written file.
"""
return dw_io.serialize_data_window(self, self.save_dir(), f"{self.event_name}.pkl.lz4", compression_factor)
def _to_json_file(self) -> Path:
"""
Converts the DataWindow metadata into a JSON file and compresses the DataWindow and writes it to disk.
:return: The path to the written file
"""
return dw_io.data_window_to_json(self, self.save_dir())
def to_json(self) -> str:
"""
:return: The DataWindow metadata into a JSON string.
"""
return dw_io.data_window_as_json(self)
@staticmethod
def from_json(json_str: str) -> "DataWindow":
"""
Read the DataWindow from a JSON string. If file is improperly formatted, raises a ValueError.
:param json_str: the JSON to read
:return: The DataWindow as defined by the JSON
"""
return DataWindow.from_json_dict(dw_io.json_to_dict(json_str))
@staticmethod
def from_json_dict(json_dict: Dict) -> "DataWindow":
"""
Reads a JSON dictionary and loads the data into the DataWindow.
If dictionary is improperly formatted, raises a ValueError.
:param json_dict: the dictionary to read
:return: The DataWindow as defined by the JSON
"""
if (
"out_type" not in json_dict.keys()
or json_dict["out_type"].upper() not in dw_io.DataWindowOutputType.list_names()
):
raise ValueError(
"Dictionary loading type is invalid or unknown. "
'Check the value of "out_type"; it must be one of: '
f"{dw_io.DataWindowOutputType.list_non_none_names()}"
)
else:
out_type = dw_io.DataWindowOutputType.str_to_type(json_dict["out_type"])
if out_type == dw_io.DataWindowOutputType.PARQUET or out_type == dw_io.DataWindowOutputType.JSON:
dwin = DataWindow(
json_dict["event_name"],
EventOrigin.from_dict(json_dict["event_origin"]),
None,
json_dict["base_dir"],
json_dict["out_type"],
json_dict["make_runme"],
json_dict["debug"],
)
dwin._config = DataWindowConfig.from_dict(json_dict["config"])
dwin._errors = RedVoxExceptions.from_dict(json_dict["errors"])
dwin._sdk_version = json_dict["sdk_version"]
for st in json_dict["stations"]:
dwin.add_station(Station.from_json_file(os.path.join(json_dict["base_dir"], st), f"{st}.json"))
elif out_type == dw_io.DataWindowOutputType.LZ4:
dwin = DataWindow.deserialize(os.path.join(json_dict["base_dir"], f"{json_dict['event_name']}.pkl.lz4"))
else:
dwin = DataWindow()
return dwin
def save(self) -> Path:
"""
save the DataWindow to disk if saving is enabled
if saving is not enabled, adds an error to the DataWindow and returns an empty path.
:return: the path to where the files exist; an empty path means no files were saved
"""
if self._fs_writer.is_save_disk():
if self._fs_writer.is_use_disk() and self._fs_writer.make_run_me:
shutil.copyfile(
os.path.abspath(inspect.getfile(run_me)), os.path.join(self._fs_writer.save_dir(), "runme.py")
)
if self._fs_writer.file_extension in ["parquet", "json"]:
return self._to_json_file()
elif self._fs_writer.file_extension == "lz4":
return self.serialize()
else:
self._errors.append("Saving not enabled.")
print("WARNING: Cannot save data window without knowing extension.")
return Path()
@staticmethod
def load(file_path: str) -> "DataWindow":
"""
load from json metadata and lz4 compressed file or directory of files.
If you have a pkl.lz4 file, use the deserialize() method instead.
:param file_path: full path of json metadata file to load
:return: DataWindow from json metadata
"""
cur_path = os.getcwd()
path_dir = os.path.dirname(file_path)
if path_dir:
os.chdir(os.path.dirname(file_path))
result = DataWindow.from_json_dict(dw_io.json_file_to_data_window(file_path))
os.chdir(cur_path)
return result
def config(self) -> DataWindowConfig:
"""
:return: settings used to create the DataWindow
"""
return self._config
def sdk_version(self) -> str:
"""
:return: sdk version used to create the DataWindow
"""
return self._sdk_version
def set_sdk_version(self, version: str):
"""
:param version: the sdk version to set
"""
self._sdk_version = version
def start_date(self) -> float:
"""
:return: minimum start timestamp of the data or np.nan if no data
"""
if len(self._stations) > 0:
return np.min([s.first_data_timestamp() for s in self._stations])
return np.nan
def end_date(self) -> float:
"""
:return: maximum end timestamp of the data or np.nan if no data
"""
if len(self._stations) > 0:
return np.max([s.last_data_timestamp() for s in self._stations])
return np.nan
def stations(self) -> List[Station]:
"""
:return: list of stations in the DataWindow
"""
return self._stations
def station_ids(self) -> List[str]:
"""
:return: ids of stations in the DataWindow
"""
return [s.id() for s in self._stations]
def add_station(self, station: Station):
"""
add a station to the DataWindow
:param station: Station to add
"""
self._stations.append(station)
def remove_station(self, station_id: Optional[str] = None, start_date: Optional[float] = None):
"""
Remove the first station from the DataWindow, or a specific station if given the id and/or start date.
* If an id is given, the first station with that id will be removed.
* If a start date is given, the removed station will start at or after the start date.
* Start date is in microseconds since epoch UTC.
:param station_id: id of station to remove
:param start_date: start date that is at or before the station to remove
"""
id_removals = []
sd_removals = []
if station_id is None and start_date is None:
self._stations.pop()
else:
if station_id is not None:
for s in range(len(self._stations)):
if self._stations[s].id == station_id:
id_removals.append(s)
if start_date is not None:
for s in range(len(self._stations)):
if self._stations[s].start_date() >= start_date:
sd_removals.append(s)
if len(id_removals) > 0 and start_date is None:
self._stations.pop(id_removals.pop())
elif len(sd_removals) > 0 and station_id is None:
self._stations.pop(sd_removals.pop())
elif len(id_removals) > 0 and len(sd_removals) > 0:
for a in id_removals:
for b in sd_removals:
if a == b:
self._stations.pop(a)
return
if a < b:
continue
def first_station(self, station_id: Optional[str] = None) -> Optional[Station]:
"""
:param station_id: optional station id to filter on
:return: first station matching params; if no params given, gets first station in list.
returns None if no station with given station_id exists.
"""
if len(self._stations) < 1:
self._errors.append(f"Attempted to get a station, but there are no stations in the data window!")
if self.debug:
print(f"Attempted to get a station, but there are no stations in the data window!")
return None
elif station_id:
result = [s for s in self._stations if s.get_key().check_key(station_id, None, None)]
if len(result) > 0:
return result[0]
self._errors.append(f"Attempted to get station {station_id}, but that station is not in this data window!")
if self.debug:
print(f"Attempted to get station {station_id}, but that station is not in this data window!")
return None
return self._stations[0]
def get_station(
self, station_id: str, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None
) -> Optional[List[Station]]:
"""
Get stations from the DataWindow. Must give at least the station's id. Other parameters may be None,
which means the value will be ignored when searching. Results will match all non-None parameters given.
:param station_id: station id to get data for
:param station_uuid: station uuid, default None
:param start_timestamp: station start timestamp in microseconds since UTC epoch, default None
:return: A list of valid stations or None if the station cannot be found
"""
result = [s for s in self._stations if s.get_key().check_key(station_id, station_uuid, start_timestamp)]
if len(result) > 0:
return result
self._errors.append(f"Attempted to get station {station_id}, but that station is not in this data window!")
if self.debug:
print(f"Attempted to get station {station_id}, but that station is not in this data window!")
return None
def create_data_window(self, pool: Optional[multiprocessing.pool.Pool] = None):
"""
updates the DataWindow to contain only the data within the window parameters
stations without audio or any data outside the window are removed
"""
# Let's create and manage a single pool of workers that we can utilize throughout
# the instantiation of the data window.
_pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool
r_f = io.ReadFilter()
if self._config.start_datetime:
r_f.with_start_dt(self._config.start_datetime)
if self._config.end_datetime:
r_f.with_end_dt(self._config.end_datetime)
if self._config.station_ids:
r_f.with_station_ids(self._config.station_ids)
if self._config.extensions:
r_f.with_extensions(self._config.extensions)
else:
self._config.extensions = r_f.extensions
if self._config.api_versions:
r_f.with_api_versions(self._config.api_versions)
else:
self._config.api_versions = r_f.api_versions
r_f.with_start_dt_buf(self._config.start_buffer_td)
r_f.with_end_dt_buf(self._config.end_buffer_td)
if self.debug:
print("Reading files from disk. This may take a few minutes to complete.")
# get the data to convert into a window
a_r = ApiReaderDw(
self._config.input_dir,
self._config.structured_layout,
r_f,
correct_timestamps=self._config.apply_correction,
use_model_correction=self._config.use_model_correction,
dw_base_dir=self.save_dir(),
dw_save_mode=self._fs_writer.save_mode(),
debug=self.debug,
pool=_pool,
)
# self._errors.extend_error(a_r.errors)
if self._fs_writer.is_use_mem() and a_r.dw_save_mode != self._fs_writer.save_mode():
if self.debug:
print("Estimated size of files exceeds available memory.")
print("Automatically using temporary directory to store data.")
self._fs_writer.set_use_temp(True)
# Parallel update
# Apply timing correction in parallel by station
sts = a_r.get_stations()
if self.debug:
print("number of stations loaded: ", len(sts))
for st in maybe_parallel_map(_pool, lambda s: s, iter(sts), chunk_size=1):
self.create_window_in_sensors(st, self._config.start_datetime, self._config.end_datetime)
if self.debug:
print("station processed: ", st.id())
# check for stations without data
self._check_for_audio()
self._check_valid_ids()
# update the default data window name if we have data and the default name exists
if self.event_name == "dw" and len(self._stations) > 0:
self.event_name = f"dw_{int(self.start_date())}_{len(self._stations)}"
# must update the start and end in order for the data to be saved
# update remaining data window values if they're still default
if not self._config.start_datetime and len(self._stations) > 0:
self._config.start_datetime = dtu.datetime_from_epoch_microseconds_utc(
np.min([t.first_data_timestamp() for t in self._stations])
)
# end_datetime is non-inclusive, so it must be greater than our latest timestamp
if not self._config.end_datetime and len(self._stations) > 0:
self._config.end_datetime = dtu.datetime_from_epoch_microseconds_utc(
np.max([t.last_data_timestamp() for t in self._stations]) + 1
)
# If the pool was created by this function, then it needs to managed by this function.
if pool is None:
_pool.close()
def _check_for_audio(self):
"""
removes any station without audio data from the DataWindow
"""
remove = []
for s in self._stations:
if not s.has_audio_sensor():
remove.append(s.get_key())
if len(remove) > 0:
self._stations = [s for s in self._stations if s.get_key() not in remove]
def _check_valid_ids(self):
"""
if there are stations, searches the config's station_ids for any ids not in the data collected
and creates an error message for each id requested but has no data
if there are no stations, creates a single error message declaring no data found
"""
if len(self._stations) < 1 and self._config.station_ids:
if len(self._config.station_ids) > 1:
add_ids = f"for all stations {self._config.station_ids} "
else:
add_ids = ""
self._errors.append(
f"No data matching criteria {add_ids}in {self._config.input_dir}"
f"\nPlease adjust parameters of DataWindow"
)
elif len(self._stations) > 0 and self._config.station_ids:
for ids in self._config.station_ids:
if ids.zfill(STATION_ID_LENGTH) not in [i.id() for i in self._stations]:
self._errors.append(f"Requested {ids} but there is no data to read for that station")
def create_window_in_sensors(
self,
station: Station,
start_datetime: Optional[dtu.datetime] = None,
end_datetime: Optional[dtu.datetime] = None,
):
"""
truncate the sensors in the station to only contain data from start_datetime to end_datetime.
if the start and/or end are not specified, uses the audio start and end to truncate the other sensors.
returns nothing, updates the station in place
:param station: station object to truncate sensors of
:param start_datetime: datetime of start of window, default None
:param end_datetime: datetime of end of window, default None
"""
start_datetime = dtu.datetime_to_epoch_microseconds_utc(start_datetime) if start_datetime else 0
end_datetime = dtu.datetime_to_epoch_microseconds_utc(end_datetime if end_datetime else dtu.datetime.max)
self.process_sensor(station.audio_sensor(), station.id(), start_datetime, end_datetime)
if station.has_audio_data():
for sensor in [s for s in station.data() if s.type() != SensorType.AUDIO]:
self.process_sensor(
sensor,
station.id(),
station.audio_sensor().first_data_timestamp(),
station.audio_sensor().last_data_timestamp(),
)
# recalculate metadata
station.update_first_and_last_data_timestamps()
station.set_packet_metadata(
[
meta
for meta in station.packet_metadata()
if meta.packet_start_mach_timestamp < station.last_data_timestamp()
and meta.packet_end_mach_timestamp >= station.first_data_timestamp()
]
)
station.event_data().create_event_window(station.first_data_timestamp(), station.last_data_timestamp())
if self._fs_writer.is_save_disk():
station.set_save_mode(io.FileSystemSaveMode.DISK)
station.set_save_dir(self.save_dir() if self._fs_writer.is_use_disk() else self._fs_writer.get_temp())
self._stations.append(station)
def process_sensor(
self, sensor: SensorData, station_id: str, start_date_timestamp: float, end_date_timestamp: float
):
"""
process a sensor to fit within the DataWindow. Updates sensor in place, returns nothing.
:param sensor: sensor to process
:param station_id: station id
:param start_date_timestamp: start of DataWindow
:param end_date_timestamp: end of DataWindow
"""
if sensor.num_samples() > 0:
# get only the timestamps between the start and end timestamps
before_start = np.where(sensor.data_timestamps() < start_date_timestamp)[0]
after_end = np.where(end_date_timestamp <= sensor.data_timestamps())[0]
# start_index is inclusive of window start
if len(before_start) > 0:
last_before_start = before_start[-1]
start_index = last_before_start + 1
else:
last_before_start = None
start_index = 0
# end_index is non-inclusive of window end
if len(after_end) > 0:
first_after_end = after_end[0]
end_index = first_after_end
else:
first_after_end = None
end_index = sensor.num_samples()
# check if all the samples have been cut off
is_audio = sensor.type() == SensorType.AUDIO
if end_index <= start_index:
self._errors.append(
f"Data window for {station_id} {'Audio' if is_audio else sensor.type().name} "
f"sensor has truncated all data points"
)
# adjust data window to match the conditions of the remaining data
if is_audio:
sensor.empty_data_table()
elif last_before_start is not None and first_after_end is None:
first_entry = sensor.pyarrow_table().slice(last_before_start, 1).to_pydict()
first_entry["timestamps"] = [start_date_timestamp]
sensor.write_pyarrow_table(pa.Table.from_pydict(first_entry))
elif last_before_start is None and first_after_end is not None:
last_entry = sensor.pyarrow_table().slice(first_after_end, 1).to_pydict()
last_entry["timestamps"] = [start_date_timestamp]
sensor.write_pyarrow_table(pa.Table.from_pydict(last_entry))
elif last_before_start is not None and first_after_end is not None:
sensor.write_pyarrow_table(
sensor.interpolate(
start_date_timestamp,
last_before_start,
1,
self._config.copy_edge_points == gpu.DataPointCreationMode.COPY,
)
)
else:
_arrow = sensor.pyarrow_table().slice(start_index, end_index - start_index)
# if sensor is audio or location, we want nan'd edge points
if sensor.type() in [SensorType.LOCATION, SensorType.AUDIO]:
new_point_mode = gpu.DataPointCreationMode.NAN
else:
new_point_mode = self._config.copy_edge_points
# add in the data points at the edges of the window if there are defined start and/or end times
slice_start = _arrow["timestamps"].to_numpy()[0]
slice_end = _arrow["timestamps"].to_numpy()[-1]
if not is_audio:
end_sample_interval = end_date_timestamp - slice_end
end_samples_to_add = 1
start_sample_interval = start_date_timestamp - slice_start
start_samples_to_add = 1
else:
end_sample_interval = dtu.seconds_to_microseconds(sensor.sample_interval_s())
start_sample_interval = -end_sample_interval
if self._config.end_datetime:
end_samples_to_add = int(
(dtu.datetime_to_epoch_microseconds_utc(self._config.end_datetime) - slice_end)
/ end_sample_interval
)
else:
end_samples_to_add = 0
if self._config.start_datetime:
start_samples_to_add = int(
(slice_start - dtu.datetime_to_epoch_microseconds_utc(self._config.start_datetime))
/ end_sample_interval
)
else:
start_samples_to_add = 0
# add to end
_arrow = gpu.add_data_points_to_df(
data_table=_arrow,
start_index=_arrow.num_rows - 1,
sample_interval_micros=end_sample_interval,
num_samples_to_add=end_samples_to_add,
point_creation_mode=new_point_mode,
)
# add to begin
_arrow = gpu.add_data_points_to_df(
data_table=_arrow,
start_index=0,
sample_interval_micros=start_sample_interval,
num_samples_to_add=start_samples_to_add,
point_creation_mode=new_point_mode,
)
sensor.sort_by_data_timestamps(_arrow)
else:
self._errors.append(f"Data window for {station_id} {sensor.type().name} " f"sensor has no data points!")
def print_errors(self):
"""
prints errors to screen
"""
self._errors.print()
for stn in self._stations:
stn.print_errors()
Classes
class DataWindow (event_name: str = 'dw', event_origin: Optional[EventOrigin] = None, config: Optional[DataWindowConfig] = None, output_dir: str = '.', out_type: str = 'NONE', make_runme: bool = False, debug: bool = False)
-
Holds the data for a given time window; adds interpolated timestamps to fill gaps and pad the start and end values
If a start time is given, data starting from that time will be included.
If an end time is given, data up to but not including that time will be included.
Refer to the DataWindowConfig class for more details on DataWindow parameters.
Properties
event_name: str, name of the DataWindow. defaults to "dw"
event_origin: Optional EventOrigin which describes the physical location and radius of the origin event. Default empty EventOrigin (no valid data)
config: optional DataWindowConfig with information on how to construct DataWindow from Redvox (.rdvx*) files. Default None
sdk_version: str, the version of the Redvox SDK used to create the DataWindow
debug: bool, if True, outputs additional information during initialization. Default False
Protected
_fs_writer: DataWindowFileSystemWriter; includes event_name, output directory (Default "."), output type (options: "PARQUET", "LZ4", "JSON", "NONE". Default NONE), and option to make a runme.py example file (Default False)
_stations: List of Stations that belong to the DataWindow
_errors: RedVoxExceptions; contains a list of all errors encountered by the DataWindow
Initialize the DataWindow.
If no config is passed, the DataWindow will not process any data.
:param event_name: name of the DataWindow. defaults to "dw" :param event_origin: Optional EventOrigin which describes the physical location and radius of the origin event. Default empty EventOrigin (no valid data) :param config: Optional DataWindowConfig which describes how to extract data from Redvox files. Default None :param output_dir: output directory for saving files. Default "." (current directory) :param out_type: type of file to save the DataWindow as. Options: "PARQUET", "LZ4", "JSON", "NONE". Default "NONE" (no saving) :param make_runme: if True, saves an example runme.py file with the data. Default False :param debug: if True, outputs additional information during initialization. Default False
Expand source code
class DataWindow: """ Holds the data for a given time window; adds interpolated timestamps to fill gaps and pad the start and end values If a start time is given, data starting from that time will be included. If an end time is given, data up to but not including that time will be included. Refer to the DataWindowConfig class for more details on DataWindow parameters. Properties: event_name: str, name of the DataWindow. defaults to "dw" event_origin: Optional EventOrigin which describes the physical location and radius of the origin event. Default empty EventOrigin (no valid data) config: optional DataWindowConfig with information on how to construct DataWindow from Redvox (.rdvx*) files. Default None sdk_version: str, the version of the Redvox SDK used to create the DataWindow debug: bool, if True, outputs additional information during initialization. Default False Protected: _fs_writer: DataWindowFileSystemWriter; includes event_name, output directory (Default "."), output type (options: "PARQUET", "LZ4", "JSON", "NONE". Default NONE), and option to make a runme.py example file (Default False) _stations: List of Stations that belong to the DataWindow _errors: RedVoxExceptions; contains a list of all errors encountered by the DataWindow """ def __init__( self, event_name: str = "dw", event_origin: Optional[EventOrigin] = None, config: Optional[DataWindowConfig] = None, output_dir: str = ".", out_type: str = "NONE", make_runme: bool = False, debug: bool = False, ): """ Initialize the DataWindow. If no config is passed, the DataWindow will not process any data. :param event_name: name of the DataWindow. defaults to "dw" :param event_origin: Optional EventOrigin which describes the physical location and radius of the origin event. Default empty EventOrigin (no valid data) :param config: Optional DataWindowConfig which describes how to extract data from Redvox files. Default None :param output_dir: output directory for saving files. Default "." (current directory) :param out_type: type of file to save the DataWindow as. Options: "PARQUET", "LZ4", "JSON", "NONE". Default "NONE" (no saving) :param make_runme: if True, saves an example runme.py file with the data. Default False :param debug: if True, outputs additional information during initialization. Default False """ self.event_name: str = event_name self.event_origin: EventOrigin = event_origin if event_origin else EventOrigin() self._fs_writer = dw_io.DataWindowFileSystemWriter(self.event_name, out_type, output_dir, make_runme) self.debug: bool = debug self._sdk_version: str = redvox.VERSION self._errors = RedVoxExceptions("DataWindow") self._stations: List[Station] = [] self._config = config if config: if config.start_datetime and config.end_datetime and (config.end_datetime <= config.start_datetime): self._errors.append( "DataWindow will not work when end datetime is before or equal to start datetime.\n" f"Your times: {config.end_datetime} <= {config.start_datetime}" ) else: self.create_data_window() if self.debug: self.print_errors() def __repr__(self): return ( f"event_name: {self.event_name}, " f"event_origin: {self.event_origin.__repr__()}, " f"config: {self._config.__repr__()}, " f"output_dir: {os.path.abspath(self.save_dir())}, " f"out_type: {self._fs_writer.file_extension}, " f"make_runme: {self._fs_writer.make_run_me}, " f"sdk_version: {self._sdk_version}, " f"debug: {self.debug}" ) def __str__(self): return ( f"event_name: {self.event_name}, " f"event_origin: {self.event_origin.__str__()}, " f"config: {self._config.__str__()}, " f"output_dir: {os.path.abspath(self.save_dir())}, " f"out_type: {self._fs_writer.file_extension}, " f"make_runme: {self._fs_writer.make_run_me}, " f"sdk_version: {self._sdk_version}, " f"debug: {self.debug}" ) def save_dir(self) -> str: """ :return: directory data is saved to (empty string means saving to memory) """ return self._fs_writer.save_dir() def set_save_dir(self, new_save_dir: Optional[str] = "."): """ :param new_save_dir: directory to save data to; default current directory, or "." """ self._fs_writer.base_dir = new_save_dir def is_make_runme(self) -> bool: """ :return: if DataWindow will be saved with a runme file """ return self._fs_writer.make_run_me def set_make_runme(self, make_runme: bool = False): """ :param make_runme: if True, DataWindow will create a runme file when saved. Default False """ self._fs_writer.make_run_me = make_runme def fs_writer(self) -> dw_io.DataWindowFileSystemWriter: """ :return: DataWindowFileSystemWriter for DataWindow """ return self._fs_writer def out_type(self) -> str: """ :return: string of the output type of the DataWindow """ return self._fs_writer.file_extension def set_out_type(self, new_out_type: str): """ set the output type of the DataWindow. options are "NONE", "PARQUET", "LZ4" and "JSON". Invalid values become "NONE" :param new_out_type: new output type of the DataWindow """ self._fs_writer.set_extension(new_out_type) def as_dict(self) -> Dict: """ :return: DataWindow properties as dictionary """ return { "event_name": self.event_name, "event_origin": self.event_origin.to_dict(), "start_time": self.start_date(), "end_time": self.end_date(), "base_dir": self.save_dir(), "stations": [s.default_station_json_file_name() for s in self._stations], "config": self._config.to_dict(), "debug": self.debug, "errors": self._errors.as_dict(), "sdk_version": self._sdk_version, "out_type": self._fs_writer.file_extension, "make_runme": self._fs_writer.make_run_me, } def pretty(self) -> str: """ :return: DataWindow as dictionary, but easier to read """ # noinspection Mypy return pprint.pformat(self.as_dict()) @staticmethod def from_config(config: DataWindowConfigFile) -> "DataWindow": """ Use a config file to create a DataWindow :param config: DataWindowConfigFile to load from :return: DataWindow """ event_origin = EventOrigin( config.origin_provider, config.origin_latitude, config.origin_latitude_std, config.origin_longitude, config.origin_longitude_std, config.origin_altitude, config.origin_altitude_std, config.origin_event_radius_m, ) dw_config = DataWindowConfig( config.input_directory, config.structured_layout, config.start_dt(), config.end_dt(), config.start_buffer_td(), config.end_buffer_td(), config.drop_time_seconds, config.station_ids, config.extensions, config.api_versions, config.apply_correction, config.use_model_correction, config.copy_edge_points(), ) return DataWindow( config.event_name, event_origin, dw_config, config.output_dir, config.output_type, config.make_runme, config.debug, ) @staticmethod def from_config_file(file: str) -> "DataWindow": """ Loads a configuration file to create the DataWindow :param file: full path to config file :return: DataWindow """ return DataWindow.from_config(DataWindowConfigFile.from_path(file)) @staticmethod def deserialize(path: str) -> "DataWindow": """ Decompresses and deserializes a DataWindow written to disk. :param path: Path to the serialized and compressed DataWindow. :return: An instance of a DataWindow. """ return dw_io.deserialize_data_window(path) def serialize(self, compression_factor: int = 4) -> Path: """ Serializes and compresses this DataWindow to a file. Uses the event_name and out_dir to name the file. :param compression_factor: A value between 1 and 12. Higher values provide better compression, but take longer. (default=4). :return: The path to the written file. """ return dw_io.serialize_data_window(self, self.save_dir(), f"{self.event_name}.pkl.lz4", compression_factor) def _to_json_file(self) -> Path: """ Converts the DataWindow metadata into a JSON file and compresses the DataWindow and writes it to disk. :return: The path to the written file """ return dw_io.data_window_to_json(self, self.save_dir()) def to_json(self) -> str: """ :return: The DataWindow metadata into a JSON string. """ return dw_io.data_window_as_json(self) @staticmethod def from_json(json_str: str) -> "DataWindow": """ Read the DataWindow from a JSON string. If file is improperly formatted, raises a ValueError. :param json_str: the JSON to read :return: The DataWindow as defined by the JSON """ return DataWindow.from_json_dict(dw_io.json_to_dict(json_str)) @staticmethod def from_json_dict(json_dict: Dict) -> "DataWindow": """ Reads a JSON dictionary and loads the data into the DataWindow. If dictionary is improperly formatted, raises a ValueError. :param json_dict: the dictionary to read :return: The DataWindow as defined by the JSON """ if ( "out_type" not in json_dict.keys() or json_dict["out_type"].upper() not in dw_io.DataWindowOutputType.list_names() ): raise ValueError( "Dictionary loading type is invalid or unknown. " 'Check the value of "out_type"; it must be one of: ' f"{dw_io.DataWindowOutputType.list_non_none_names()}" ) else: out_type = dw_io.DataWindowOutputType.str_to_type(json_dict["out_type"]) if out_type == dw_io.DataWindowOutputType.PARQUET or out_type == dw_io.DataWindowOutputType.JSON: dwin = DataWindow( json_dict["event_name"], EventOrigin.from_dict(json_dict["event_origin"]), None, json_dict["base_dir"], json_dict["out_type"], json_dict["make_runme"], json_dict["debug"], ) dwin._config = DataWindowConfig.from_dict(json_dict["config"]) dwin._errors = RedVoxExceptions.from_dict(json_dict["errors"]) dwin._sdk_version = json_dict["sdk_version"] for st in json_dict["stations"]: dwin.add_station(Station.from_json_file(os.path.join(json_dict["base_dir"], st), f"{st}.json")) elif out_type == dw_io.DataWindowOutputType.LZ4: dwin = DataWindow.deserialize(os.path.join(json_dict["base_dir"], f"{json_dict['event_name']}.pkl.lz4")) else: dwin = DataWindow() return dwin def save(self) -> Path: """ save the DataWindow to disk if saving is enabled if saving is not enabled, adds an error to the DataWindow and returns an empty path. :return: the path to where the files exist; an empty path means no files were saved """ if self._fs_writer.is_save_disk(): if self._fs_writer.is_use_disk() and self._fs_writer.make_run_me: shutil.copyfile( os.path.abspath(inspect.getfile(run_me)), os.path.join(self._fs_writer.save_dir(), "runme.py") ) if self._fs_writer.file_extension in ["parquet", "json"]: return self._to_json_file() elif self._fs_writer.file_extension == "lz4": return self.serialize() else: self._errors.append("Saving not enabled.") print("WARNING: Cannot save data window without knowing extension.") return Path() @staticmethod def load(file_path: str) -> "DataWindow": """ load from json metadata and lz4 compressed file or directory of files. If you have a pkl.lz4 file, use the deserialize() method instead. :param file_path: full path of json metadata file to load :return: DataWindow from json metadata """ cur_path = os.getcwd() path_dir = os.path.dirname(file_path) if path_dir: os.chdir(os.path.dirname(file_path)) result = DataWindow.from_json_dict(dw_io.json_file_to_data_window(file_path)) os.chdir(cur_path) return result def config(self) -> DataWindowConfig: """ :return: settings used to create the DataWindow """ return self._config def sdk_version(self) -> str: """ :return: sdk version used to create the DataWindow """ return self._sdk_version def set_sdk_version(self, version: str): """ :param version: the sdk version to set """ self._sdk_version = version def start_date(self) -> float: """ :return: minimum start timestamp of the data or np.nan if no data """ if len(self._stations) > 0: return np.min([s.first_data_timestamp() for s in self._stations]) return np.nan def end_date(self) -> float: """ :return: maximum end timestamp of the data or np.nan if no data """ if len(self._stations) > 0: return np.max([s.last_data_timestamp() for s in self._stations]) return np.nan def stations(self) -> List[Station]: """ :return: list of stations in the DataWindow """ return self._stations def station_ids(self) -> List[str]: """ :return: ids of stations in the DataWindow """ return [s.id() for s in self._stations] def add_station(self, station: Station): """ add a station to the DataWindow :param station: Station to add """ self._stations.append(station) def remove_station(self, station_id: Optional[str] = None, start_date: Optional[float] = None): """ Remove the first station from the DataWindow, or a specific station if given the id and/or start date. * If an id is given, the first station with that id will be removed. * If a start date is given, the removed station will start at or after the start date. * Start date is in microseconds since epoch UTC. :param station_id: id of station to remove :param start_date: start date that is at or before the station to remove """ id_removals = [] sd_removals = [] if station_id is None and start_date is None: self._stations.pop() else: if station_id is not None: for s in range(len(self._stations)): if self._stations[s].id == station_id: id_removals.append(s) if start_date is not None: for s in range(len(self._stations)): if self._stations[s].start_date() >= start_date: sd_removals.append(s) if len(id_removals) > 0 and start_date is None: self._stations.pop(id_removals.pop()) elif len(sd_removals) > 0 and station_id is None: self._stations.pop(sd_removals.pop()) elif len(id_removals) > 0 and len(sd_removals) > 0: for a in id_removals: for b in sd_removals: if a == b: self._stations.pop(a) return if a < b: continue def first_station(self, station_id: Optional[str] = None) -> Optional[Station]: """ :param station_id: optional station id to filter on :return: first station matching params; if no params given, gets first station in list. returns None if no station with given station_id exists. """ if len(self._stations) < 1: self._errors.append(f"Attempted to get a station, but there are no stations in the data window!") if self.debug: print(f"Attempted to get a station, but there are no stations in the data window!") return None elif station_id: result = [s for s in self._stations if s.get_key().check_key(station_id, None, None)] if len(result) > 0: return result[0] self._errors.append(f"Attempted to get station {station_id}, but that station is not in this data window!") if self.debug: print(f"Attempted to get station {station_id}, but that station is not in this data window!") return None return self._stations[0] def get_station( self, station_id: str, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None ) -> Optional[List[Station]]: """ Get stations from the DataWindow. Must give at least the station's id. Other parameters may be None, which means the value will be ignored when searching. Results will match all non-None parameters given. :param station_id: station id to get data for :param station_uuid: station uuid, default None :param start_timestamp: station start timestamp in microseconds since UTC epoch, default None :return: A list of valid stations or None if the station cannot be found """ result = [s for s in self._stations if s.get_key().check_key(station_id, station_uuid, start_timestamp)] if len(result) > 0: return result self._errors.append(f"Attempted to get station {station_id}, but that station is not in this data window!") if self.debug: print(f"Attempted to get station {station_id}, but that station is not in this data window!") return None def create_data_window(self, pool: Optional[multiprocessing.pool.Pool] = None): """ updates the DataWindow to contain only the data within the window parameters stations without audio or any data outside the window are removed """ # Let's create and manage a single pool of workers that we can utilize throughout # the instantiation of the data window. _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool r_f = io.ReadFilter() if self._config.start_datetime: r_f.with_start_dt(self._config.start_datetime) if self._config.end_datetime: r_f.with_end_dt(self._config.end_datetime) if self._config.station_ids: r_f.with_station_ids(self._config.station_ids) if self._config.extensions: r_f.with_extensions(self._config.extensions) else: self._config.extensions = r_f.extensions if self._config.api_versions: r_f.with_api_versions(self._config.api_versions) else: self._config.api_versions = r_f.api_versions r_f.with_start_dt_buf(self._config.start_buffer_td) r_f.with_end_dt_buf(self._config.end_buffer_td) if self.debug: print("Reading files from disk. This may take a few minutes to complete.") # get the data to convert into a window a_r = ApiReaderDw( self._config.input_dir, self._config.structured_layout, r_f, correct_timestamps=self._config.apply_correction, use_model_correction=self._config.use_model_correction, dw_base_dir=self.save_dir(), dw_save_mode=self._fs_writer.save_mode(), debug=self.debug, pool=_pool, ) # self._errors.extend_error(a_r.errors) if self._fs_writer.is_use_mem() and a_r.dw_save_mode != self._fs_writer.save_mode(): if self.debug: print("Estimated size of files exceeds available memory.") print("Automatically using temporary directory to store data.") self._fs_writer.set_use_temp(True) # Parallel update # Apply timing correction in parallel by station sts = a_r.get_stations() if self.debug: print("number of stations loaded: ", len(sts)) for st in maybe_parallel_map(_pool, lambda s: s, iter(sts), chunk_size=1): self.create_window_in_sensors(st, self._config.start_datetime, self._config.end_datetime) if self.debug: print("station processed: ", st.id()) # check for stations without data self._check_for_audio() self._check_valid_ids() # update the default data window name if we have data and the default name exists if self.event_name == "dw" and len(self._stations) > 0: self.event_name = f"dw_{int(self.start_date())}_{len(self._stations)}" # must update the start and end in order for the data to be saved # update remaining data window values if they're still default if not self._config.start_datetime and len(self._stations) > 0: self._config.start_datetime = dtu.datetime_from_epoch_microseconds_utc( np.min([t.first_data_timestamp() for t in self._stations]) ) # end_datetime is non-inclusive, so it must be greater than our latest timestamp if not self._config.end_datetime and len(self._stations) > 0: self._config.end_datetime = dtu.datetime_from_epoch_microseconds_utc( np.max([t.last_data_timestamp() for t in self._stations]) + 1 ) # If the pool was created by this function, then it needs to managed by this function. if pool is None: _pool.close() def _check_for_audio(self): """ removes any station without audio data from the DataWindow """ remove = [] for s in self._stations: if not s.has_audio_sensor(): remove.append(s.get_key()) if len(remove) > 0: self._stations = [s for s in self._stations if s.get_key() not in remove] def _check_valid_ids(self): """ if there are stations, searches the config's station_ids for any ids not in the data collected and creates an error message for each id requested but has no data if there are no stations, creates a single error message declaring no data found """ if len(self._stations) < 1 and self._config.station_ids: if len(self._config.station_ids) > 1: add_ids = f"for all stations {self._config.station_ids} " else: add_ids = "" self._errors.append( f"No data matching criteria {add_ids}in {self._config.input_dir}" f"\nPlease adjust parameters of DataWindow" ) elif len(self._stations) > 0 and self._config.station_ids: for ids in self._config.station_ids: if ids.zfill(STATION_ID_LENGTH) not in [i.id() for i in self._stations]: self._errors.append(f"Requested {ids} but there is no data to read for that station") def create_window_in_sensors( self, station: Station, start_datetime: Optional[dtu.datetime] = None, end_datetime: Optional[dtu.datetime] = None, ): """ truncate the sensors in the station to only contain data from start_datetime to end_datetime. if the start and/or end are not specified, uses the audio start and end to truncate the other sensors. returns nothing, updates the station in place :param station: station object to truncate sensors of :param start_datetime: datetime of start of window, default None :param end_datetime: datetime of end of window, default None """ start_datetime = dtu.datetime_to_epoch_microseconds_utc(start_datetime) if start_datetime else 0 end_datetime = dtu.datetime_to_epoch_microseconds_utc(end_datetime if end_datetime else dtu.datetime.max) self.process_sensor(station.audio_sensor(), station.id(), start_datetime, end_datetime) if station.has_audio_data(): for sensor in [s for s in station.data() if s.type() != SensorType.AUDIO]: self.process_sensor( sensor, station.id(), station.audio_sensor().first_data_timestamp(), station.audio_sensor().last_data_timestamp(), ) # recalculate metadata station.update_first_and_last_data_timestamps() station.set_packet_metadata( [ meta for meta in station.packet_metadata() if meta.packet_start_mach_timestamp < station.last_data_timestamp() and meta.packet_end_mach_timestamp >= station.first_data_timestamp() ] ) station.event_data().create_event_window(station.first_data_timestamp(), station.last_data_timestamp()) if self._fs_writer.is_save_disk(): station.set_save_mode(io.FileSystemSaveMode.DISK) station.set_save_dir(self.save_dir() if self._fs_writer.is_use_disk() else self._fs_writer.get_temp()) self._stations.append(station) def process_sensor( self, sensor: SensorData, station_id: str, start_date_timestamp: float, end_date_timestamp: float ): """ process a sensor to fit within the DataWindow. Updates sensor in place, returns nothing. :param sensor: sensor to process :param station_id: station id :param start_date_timestamp: start of DataWindow :param end_date_timestamp: end of DataWindow """ if sensor.num_samples() > 0: # get only the timestamps between the start and end timestamps before_start = np.where(sensor.data_timestamps() < start_date_timestamp)[0] after_end = np.where(end_date_timestamp <= sensor.data_timestamps())[0] # start_index is inclusive of window start if len(before_start) > 0: last_before_start = before_start[-1] start_index = last_before_start + 1 else: last_before_start = None start_index = 0 # end_index is non-inclusive of window end if len(after_end) > 0: first_after_end = after_end[0] end_index = first_after_end else: first_after_end = None end_index = sensor.num_samples() # check if all the samples have been cut off is_audio = sensor.type() == SensorType.AUDIO if end_index <= start_index: self._errors.append( f"Data window for {station_id} {'Audio' if is_audio else sensor.type().name} " f"sensor has truncated all data points" ) # adjust data window to match the conditions of the remaining data if is_audio: sensor.empty_data_table() elif last_before_start is not None and first_after_end is None: first_entry = sensor.pyarrow_table().slice(last_before_start, 1).to_pydict() first_entry["timestamps"] = [start_date_timestamp] sensor.write_pyarrow_table(pa.Table.from_pydict(first_entry)) elif last_before_start is None and first_after_end is not None: last_entry = sensor.pyarrow_table().slice(first_after_end, 1).to_pydict() last_entry["timestamps"] = [start_date_timestamp] sensor.write_pyarrow_table(pa.Table.from_pydict(last_entry)) elif last_before_start is not None and first_after_end is not None: sensor.write_pyarrow_table( sensor.interpolate( start_date_timestamp, last_before_start, 1, self._config.copy_edge_points == gpu.DataPointCreationMode.COPY, ) ) else: _arrow = sensor.pyarrow_table().slice(start_index, end_index - start_index) # if sensor is audio or location, we want nan'd edge points if sensor.type() in [SensorType.LOCATION, SensorType.AUDIO]: new_point_mode = gpu.DataPointCreationMode.NAN else: new_point_mode = self._config.copy_edge_points # add in the data points at the edges of the window if there are defined start and/or end times slice_start = _arrow["timestamps"].to_numpy()[0] slice_end = _arrow["timestamps"].to_numpy()[-1] if not is_audio: end_sample_interval = end_date_timestamp - slice_end end_samples_to_add = 1 start_sample_interval = start_date_timestamp - slice_start start_samples_to_add = 1 else: end_sample_interval = dtu.seconds_to_microseconds(sensor.sample_interval_s()) start_sample_interval = -end_sample_interval if self._config.end_datetime: end_samples_to_add = int( (dtu.datetime_to_epoch_microseconds_utc(self._config.end_datetime) - slice_end) / end_sample_interval ) else: end_samples_to_add = 0 if self._config.start_datetime: start_samples_to_add = int( (slice_start - dtu.datetime_to_epoch_microseconds_utc(self._config.start_datetime)) / end_sample_interval ) else: start_samples_to_add = 0 # add to end _arrow = gpu.add_data_points_to_df( data_table=_arrow, start_index=_arrow.num_rows - 1, sample_interval_micros=end_sample_interval, num_samples_to_add=end_samples_to_add, point_creation_mode=new_point_mode, ) # add to begin _arrow = gpu.add_data_points_to_df( data_table=_arrow, start_index=0, sample_interval_micros=start_sample_interval, num_samples_to_add=start_samples_to_add, point_creation_mode=new_point_mode, ) sensor.sort_by_data_timestamps(_arrow) else: self._errors.append(f"Data window for {station_id} {sensor.type().name} " f"sensor has no data points!") def print_errors(self): """ prints errors to screen """ self._errors.print() for stn in self._stations: stn.print_errors()
Static methods
def deserialize(path: str) ‑> DataWindow
-
Decompresses and deserializes a DataWindow written to disk.
:param path: Path to the serialized and compressed DataWindow. :return: An instance of a DataWindow.
Expand source code
@staticmethod def deserialize(path: str) -> "DataWindow": """ Decompresses and deserializes a DataWindow written to disk. :param path: Path to the serialized and compressed DataWindow. :return: An instance of a DataWindow. """ return dw_io.deserialize_data_window(path)
def from_config(config: DataWindowConfigFile) ‑> DataWindow
-
Use a config file to create a DataWindow
:param config: DataWindowConfigFile to load from :return: DataWindow
Expand source code
@staticmethod def from_config(config: DataWindowConfigFile) -> "DataWindow": """ Use a config file to create a DataWindow :param config: DataWindowConfigFile to load from :return: DataWindow """ event_origin = EventOrigin( config.origin_provider, config.origin_latitude, config.origin_latitude_std, config.origin_longitude, config.origin_longitude_std, config.origin_altitude, config.origin_altitude_std, config.origin_event_radius_m, ) dw_config = DataWindowConfig( config.input_directory, config.structured_layout, config.start_dt(), config.end_dt(), config.start_buffer_td(), config.end_buffer_td(), config.drop_time_seconds, config.station_ids, config.extensions, config.api_versions, config.apply_correction, config.use_model_correction, config.copy_edge_points(), ) return DataWindow( config.event_name, event_origin, dw_config, config.output_dir, config.output_type, config.make_runme, config.debug, )
def from_config_file(file: str) ‑> DataWindow
-
Loads a configuration file to create the DataWindow
:param file: full path to config file :return: DataWindow
Expand source code
@staticmethod def from_config_file(file: str) -> "DataWindow": """ Loads a configuration file to create the DataWindow :param file: full path to config file :return: DataWindow """ return DataWindow.from_config(DataWindowConfigFile.from_path(file))
def from_json(json_str: str) ‑> DataWindow
-
Read the DataWindow from a JSON string. If file is improperly formatted, raises a ValueError.
:param json_str: the JSON to read :return: The DataWindow as defined by the JSON
Expand source code
@staticmethod def from_json(json_str: str) -> "DataWindow": """ Read the DataWindow from a JSON string. If file is improperly formatted, raises a ValueError. :param json_str: the JSON to read :return: The DataWindow as defined by the JSON """ return DataWindow.from_json_dict(dw_io.json_to_dict(json_str))
def from_json_dict(json_dict: Dict) ‑> DataWindow
-
Reads a JSON dictionary and loads the data into the DataWindow. If dictionary is improperly formatted, raises a ValueError.
:param json_dict: the dictionary to read :return: The DataWindow as defined by the JSON
Expand source code
@staticmethod def from_json_dict(json_dict: Dict) -> "DataWindow": """ Reads a JSON dictionary and loads the data into the DataWindow. If dictionary is improperly formatted, raises a ValueError. :param json_dict: the dictionary to read :return: The DataWindow as defined by the JSON """ if ( "out_type" not in json_dict.keys() or json_dict["out_type"].upper() not in dw_io.DataWindowOutputType.list_names() ): raise ValueError( "Dictionary loading type is invalid or unknown. " 'Check the value of "out_type"; it must be one of: ' f"{dw_io.DataWindowOutputType.list_non_none_names()}" ) else: out_type = dw_io.DataWindowOutputType.str_to_type(json_dict["out_type"]) if out_type == dw_io.DataWindowOutputType.PARQUET or out_type == dw_io.DataWindowOutputType.JSON: dwin = DataWindow( json_dict["event_name"], EventOrigin.from_dict(json_dict["event_origin"]), None, json_dict["base_dir"], json_dict["out_type"], json_dict["make_runme"], json_dict["debug"], ) dwin._config = DataWindowConfig.from_dict(json_dict["config"]) dwin._errors = RedVoxExceptions.from_dict(json_dict["errors"]) dwin._sdk_version = json_dict["sdk_version"] for st in json_dict["stations"]: dwin.add_station(Station.from_json_file(os.path.join(json_dict["base_dir"], st), f"{st}.json")) elif out_type == dw_io.DataWindowOutputType.LZ4: dwin = DataWindow.deserialize(os.path.join(json_dict["base_dir"], f"{json_dict['event_name']}.pkl.lz4")) else: dwin = DataWindow() return dwin
def load(file_path: str) ‑> DataWindow
-
load from json metadata and lz4 compressed file or directory of files.
If you have a pkl.lz4 file, use the deserialize() method instead.
:param file_path: full path of json metadata file to load :return: DataWindow from json metadata
Expand source code
@staticmethod def load(file_path: str) -> "DataWindow": """ load from json metadata and lz4 compressed file or directory of files. If you have a pkl.lz4 file, use the deserialize() method instead. :param file_path: full path of json metadata file to load :return: DataWindow from json metadata """ cur_path = os.getcwd() path_dir = os.path.dirname(file_path) if path_dir: os.chdir(os.path.dirname(file_path)) result = DataWindow.from_json_dict(dw_io.json_file_to_data_window(file_path)) os.chdir(cur_path) return result
Methods
def add_station(self, station: Station)
-
add a station to the DataWindow
:param station: Station to add
Expand source code
def add_station(self, station: Station): """ add a station to the DataWindow :param station: Station to add """ self._stations.append(station)
def as_dict(self) ‑> Dict
-
:return: DataWindow properties as dictionary
Expand source code
def as_dict(self) -> Dict: """ :return: DataWindow properties as dictionary """ return { "event_name": self.event_name, "event_origin": self.event_origin.to_dict(), "start_time": self.start_date(), "end_time": self.end_date(), "base_dir": self.save_dir(), "stations": [s.default_station_json_file_name() for s in self._stations], "config": self._config.to_dict(), "debug": self.debug, "errors": self._errors.as_dict(), "sdk_version": self._sdk_version, "out_type": self._fs_writer.file_extension, "make_runme": self._fs_writer.make_run_me, }
def config(self) ‑> DataWindowConfig
-
:return: settings used to create the DataWindow
Expand source code
def config(self) -> DataWindowConfig: """ :return: settings used to create the DataWindow """ return self._config
def create_data_window(self, pool: Optional[multiprocessing.pool.Pool] = None)
-
updates the DataWindow to contain only the data within the window parameters stations without audio or any data outside the window are removed
Expand source code
def create_data_window(self, pool: Optional[multiprocessing.pool.Pool] = None): """ updates the DataWindow to contain only the data within the window parameters stations without audio or any data outside the window are removed """ # Let's create and manage a single pool of workers that we can utilize throughout # the instantiation of the data window. _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool r_f = io.ReadFilter() if self._config.start_datetime: r_f.with_start_dt(self._config.start_datetime) if self._config.end_datetime: r_f.with_end_dt(self._config.end_datetime) if self._config.station_ids: r_f.with_station_ids(self._config.station_ids) if self._config.extensions: r_f.with_extensions(self._config.extensions) else: self._config.extensions = r_f.extensions if self._config.api_versions: r_f.with_api_versions(self._config.api_versions) else: self._config.api_versions = r_f.api_versions r_f.with_start_dt_buf(self._config.start_buffer_td) r_f.with_end_dt_buf(self._config.end_buffer_td) if self.debug: print("Reading files from disk. This may take a few minutes to complete.") # get the data to convert into a window a_r = ApiReaderDw( self._config.input_dir, self._config.structured_layout, r_f, correct_timestamps=self._config.apply_correction, use_model_correction=self._config.use_model_correction, dw_base_dir=self.save_dir(), dw_save_mode=self._fs_writer.save_mode(), debug=self.debug, pool=_pool, ) # self._errors.extend_error(a_r.errors) if self._fs_writer.is_use_mem() and a_r.dw_save_mode != self._fs_writer.save_mode(): if self.debug: print("Estimated size of files exceeds available memory.") print("Automatically using temporary directory to store data.") self._fs_writer.set_use_temp(True) # Parallel update # Apply timing correction in parallel by station sts = a_r.get_stations() if self.debug: print("number of stations loaded: ", len(sts)) for st in maybe_parallel_map(_pool, lambda s: s, iter(sts), chunk_size=1): self.create_window_in_sensors(st, self._config.start_datetime, self._config.end_datetime) if self.debug: print("station processed: ", st.id()) # check for stations without data self._check_for_audio() self._check_valid_ids() # update the default data window name if we have data and the default name exists if self.event_name == "dw" and len(self._stations) > 0: self.event_name = f"dw_{int(self.start_date())}_{len(self._stations)}" # must update the start and end in order for the data to be saved # update remaining data window values if they're still default if not self._config.start_datetime and len(self._stations) > 0: self._config.start_datetime = dtu.datetime_from_epoch_microseconds_utc( np.min([t.first_data_timestamp() for t in self._stations]) ) # end_datetime is non-inclusive, so it must be greater than our latest timestamp if not self._config.end_datetime and len(self._stations) > 0: self._config.end_datetime = dtu.datetime_from_epoch_microseconds_utc( np.max([t.last_data_timestamp() for t in self._stations]) + 1 ) # If the pool was created by this function, then it needs to managed by this function. if pool is None: _pool.close()
def create_window_in_sensors(self, station: Station, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None)
-
truncate the sensors in the station to only contain data from start_datetime to end_datetime.
if the start and/or end are not specified, uses the audio start and end to truncate the other sensors.
returns nothing, updates the station in place
:param station: station object to truncate sensors of :param start_datetime: datetime of start of window, default None :param end_datetime: datetime of end of window, default None
Expand source code
def create_window_in_sensors( self, station: Station, start_datetime: Optional[dtu.datetime] = None, end_datetime: Optional[dtu.datetime] = None, ): """ truncate the sensors in the station to only contain data from start_datetime to end_datetime. if the start and/or end are not specified, uses the audio start and end to truncate the other sensors. returns nothing, updates the station in place :param station: station object to truncate sensors of :param start_datetime: datetime of start of window, default None :param end_datetime: datetime of end of window, default None """ start_datetime = dtu.datetime_to_epoch_microseconds_utc(start_datetime) if start_datetime else 0 end_datetime = dtu.datetime_to_epoch_microseconds_utc(end_datetime if end_datetime else dtu.datetime.max) self.process_sensor(station.audio_sensor(), station.id(), start_datetime, end_datetime) if station.has_audio_data(): for sensor in [s for s in station.data() if s.type() != SensorType.AUDIO]: self.process_sensor( sensor, station.id(), station.audio_sensor().first_data_timestamp(), station.audio_sensor().last_data_timestamp(), ) # recalculate metadata station.update_first_and_last_data_timestamps() station.set_packet_metadata( [ meta for meta in station.packet_metadata() if meta.packet_start_mach_timestamp < station.last_data_timestamp() and meta.packet_end_mach_timestamp >= station.first_data_timestamp() ] ) station.event_data().create_event_window(station.first_data_timestamp(), station.last_data_timestamp()) if self._fs_writer.is_save_disk(): station.set_save_mode(io.FileSystemSaveMode.DISK) station.set_save_dir(self.save_dir() if self._fs_writer.is_use_disk() else self._fs_writer.get_temp()) self._stations.append(station)
def end_date(self) ‑> float
-
:return: maximum end timestamp of the data or np.nan if no data
Expand source code
def end_date(self) -> float: """ :return: maximum end timestamp of the data or np.nan if no data """ if len(self._stations) > 0: return np.max([s.last_data_timestamp() for s in self._stations]) return np.nan
def first_station(self, station_id: Optional[str] = None) ‑> Optional[Station]
-
:param station_id: optional station id to filter on :return: first station matching params; if no params given, gets first station in list. returns None if no station with given station_id exists.
Expand source code
def first_station(self, station_id: Optional[str] = None) -> Optional[Station]: """ :param station_id: optional station id to filter on :return: first station matching params; if no params given, gets first station in list. returns None if no station with given station_id exists. """ if len(self._stations) < 1: self._errors.append(f"Attempted to get a station, but there are no stations in the data window!") if self.debug: print(f"Attempted to get a station, but there are no stations in the data window!") return None elif station_id: result = [s for s in self._stations if s.get_key().check_key(station_id, None, None)] if len(result) > 0: return result[0] self._errors.append(f"Attempted to get station {station_id}, but that station is not in this data window!") if self.debug: print(f"Attempted to get station {station_id}, but that station is not in this data window!") return None return self._stations[0]
def fs_writer(self) ‑> DataWindowFileSystemWriter
-
:return: DataWindowFileSystemWriter for DataWindow
Expand source code
def fs_writer(self) -> dw_io.DataWindowFileSystemWriter: """ :return: DataWindowFileSystemWriter for DataWindow """ return self._fs_writer
def get_station(self, station_id: str, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None) ‑> Optional[List[Station]]
-
Get stations from the DataWindow. Must give at least the station's id. Other parameters may be None, which means the value will be ignored when searching. Results will match all non-None parameters given.
:param station_id: station id to get data for :param station_uuid: station uuid, default None :param start_timestamp: station start timestamp in microseconds since UTC epoch, default None :return: A list of valid stations or None if the station cannot be found
Expand source code
def get_station( self, station_id: str, station_uuid: Optional[str] = None, start_timestamp: Optional[float] = None ) -> Optional[List[Station]]: """ Get stations from the DataWindow. Must give at least the station's id. Other parameters may be None, which means the value will be ignored when searching. Results will match all non-None parameters given. :param station_id: station id to get data for :param station_uuid: station uuid, default None :param start_timestamp: station start timestamp in microseconds since UTC epoch, default None :return: A list of valid stations or None if the station cannot be found """ result = [s for s in self._stations if s.get_key().check_key(station_id, station_uuid, start_timestamp)] if len(result) > 0: return result self._errors.append(f"Attempted to get station {station_id}, but that station is not in this data window!") if self.debug: print(f"Attempted to get station {station_id}, but that station is not in this data window!") return None
def is_make_runme(self) ‑> bool
-
:return: if DataWindow will be saved with a runme file
Expand source code
def is_make_runme(self) -> bool: """ :return: if DataWindow will be saved with a runme file """ return self._fs_writer.make_run_me
def out_type(self) ‑> str
-
:return: string of the output type of the DataWindow
Expand source code
def out_type(self) -> str: """ :return: string of the output type of the DataWindow """ return self._fs_writer.file_extension
def pretty(self) ‑> str
-
:return: DataWindow as dictionary, but easier to read
Expand source code
def pretty(self) -> str: """ :return: DataWindow as dictionary, but easier to read """ # noinspection Mypy return pprint.pformat(self.as_dict())
def print_errors(self)
-
prints errors to screen
Expand source code
def print_errors(self): """ prints errors to screen """ self._errors.print() for stn in self._stations: stn.print_errors()
def process_sensor(self, sensor: SensorData, station_id: str, start_date_timestamp: float, end_date_timestamp: float)
-
process a sensor to fit within the DataWindow. Updates sensor in place, returns nothing.
:param sensor: sensor to process :param station_id: station id :param start_date_timestamp: start of DataWindow :param end_date_timestamp: end of DataWindow
Expand source code
def process_sensor( self, sensor: SensorData, station_id: str, start_date_timestamp: float, end_date_timestamp: float ): """ process a sensor to fit within the DataWindow. Updates sensor in place, returns nothing. :param sensor: sensor to process :param station_id: station id :param start_date_timestamp: start of DataWindow :param end_date_timestamp: end of DataWindow """ if sensor.num_samples() > 0: # get only the timestamps between the start and end timestamps before_start = np.where(sensor.data_timestamps() < start_date_timestamp)[0] after_end = np.where(end_date_timestamp <= sensor.data_timestamps())[0] # start_index is inclusive of window start if len(before_start) > 0: last_before_start = before_start[-1] start_index = last_before_start + 1 else: last_before_start = None start_index = 0 # end_index is non-inclusive of window end if len(after_end) > 0: first_after_end = after_end[0] end_index = first_after_end else: first_after_end = None end_index = sensor.num_samples() # check if all the samples have been cut off is_audio = sensor.type() == SensorType.AUDIO if end_index <= start_index: self._errors.append( f"Data window for {station_id} {'Audio' if is_audio else sensor.type().name} " f"sensor has truncated all data points" ) # adjust data window to match the conditions of the remaining data if is_audio: sensor.empty_data_table() elif last_before_start is not None and first_after_end is None: first_entry = sensor.pyarrow_table().slice(last_before_start, 1).to_pydict() first_entry["timestamps"] = [start_date_timestamp] sensor.write_pyarrow_table(pa.Table.from_pydict(first_entry)) elif last_before_start is None and first_after_end is not None: last_entry = sensor.pyarrow_table().slice(first_after_end, 1).to_pydict() last_entry["timestamps"] = [start_date_timestamp] sensor.write_pyarrow_table(pa.Table.from_pydict(last_entry)) elif last_before_start is not None and first_after_end is not None: sensor.write_pyarrow_table( sensor.interpolate( start_date_timestamp, last_before_start, 1, self._config.copy_edge_points == gpu.DataPointCreationMode.COPY, ) ) else: _arrow = sensor.pyarrow_table().slice(start_index, end_index - start_index) # if sensor is audio or location, we want nan'd edge points if sensor.type() in [SensorType.LOCATION, SensorType.AUDIO]: new_point_mode = gpu.DataPointCreationMode.NAN else: new_point_mode = self._config.copy_edge_points # add in the data points at the edges of the window if there are defined start and/or end times slice_start = _arrow["timestamps"].to_numpy()[0] slice_end = _arrow["timestamps"].to_numpy()[-1] if not is_audio: end_sample_interval = end_date_timestamp - slice_end end_samples_to_add = 1 start_sample_interval = start_date_timestamp - slice_start start_samples_to_add = 1 else: end_sample_interval = dtu.seconds_to_microseconds(sensor.sample_interval_s()) start_sample_interval = -end_sample_interval if self._config.end_datetime: end_samples_to_add = int( (dtu.datetime_to_epoch_microseconds_utc(self._config.end_datetime) - slice_end) / end_sample_interval ) else: end_samples_to_add = 0 if self._config.start_datetime: start_samples_to_add = int( (slice_start - dtu.datetime_to_epoch_microseconds_utc(self._config.start_datetime)) / end_sample_interval ) else: start_samples_to_add = 0 # add to end _arrow = gpu.add_data_points_to_df( data_table=_arrow, start_index=_arrow.num_rows - 1, sample_interval_micros=end_sample_interval, num_samples_to_add=end_samples_to_add, point_creation_mode=new_point_mode, ) # add to begin _arrow = gpu.add_data_points_to_df( data_table=_arrow, start_index=0, sample_interval_micros=start_sample_interval, num_samples_to_add=start_samples_to_add, point_creation_mode=new_point_mode, ) sensor.sort_by_data_timestamps(_arrow) else: self._errors.append(f"Data window for {station_id} {sensor.type().name} " f"sensor has no data points!")
def remove_station(self, station_id: Optional[str] = None, start_date: Optional[float] = None)
-
Remove the first station from the DataWindow, or a specific station if given the id and/or start date.
- If an id is given, the first station with that id will be removed.
- If a start date is given, the removed station will start at or after the start date.
- Start date is in microseconds since epoch UTC.
:param station_id: id of station to remove :param start_date: start date that is at or before the station to remove
Expand source code
def remove_station(self, station_id: Optional[str] = None, start_date: Optional[float] = None): """ Remove the first station from the DataWindow, or a specific station if given the id and/or start date. * If an id is given, the first station with that id will be removed. * If a start date is given, the removed station will start at or after the start date. * Start date is in microseconds since epoch UTC. :param station_id: id of station to remove :param start_date: start date that is at or before the station to remove """ id_removals = [] sd_removals = [] if station_id is None and start_date is None: self._stations.pop() else: if station_id is not None: for s in range(len(self._stations)): if self._stations[s].id == station_id: id_removals.append(s) if start_date is not None: for s in range(len(self._stations)): if self._stations[s].start_date() >= start_date: sd_removals.append(s) if len(id_removals) > 0 and start_date is None: self._stations.pop(id_removals.pop()) elif len(sd_removals) > 0 and station_id is None: self._stations.pop(sd_removals.pop()) elif len(id_removals) > 0 and len(sd_removals) > 0: for a in id_removals: for b in sd_removals: if a == b: self._stations.pop(a) return if a < b: continue
def save(self) ‑> pathlib.Path
-
save the DataWindow to disk if saving is enabled if saving is not enabled, adds an error to the DataWindow and returns an empty path.
:return: the path to where the files exist; an empty path means no files were saved
Expand source code
def save(self) -> Path: """ save the DataWindow to disk if saving is enabled if saving is not enabled, adds an error to the DataWindow and returns an empty path. :return: the path to where the files exist; an empty path means no files were saved """ if self._fs_writer.is_save_disk(): if self._fs_writer.is_use_disk() and self._fs_writer.make_run_me: shutil.copyfile( os.path.abspath(inspect.getfile(run_me)), os.path.join(self._fs_writer.save_dir(), "runme.py") ) if self._fs_writer.file_extension in ["parquet", "json"]: return self._to_json_file() elif self._fs_writer.file_extension == "lz4": return self.serialize() else: self._errors.append("Saving not enabled.") print("WARNING: Cannot save data window without knowing extension.") return Path()
def save_dir(self) ‑> str
-
:return: directory data is saved to (empty string means saving to memory)
Expand source code
def save_dir(self) -> str: """ :return: directory data is saved to (empty string means saving to memory) """ return self._fs_writer.save_dir()
def sdk_version(self) ‑> str
-
:return: sdk version used to create the DataWindow
Expand source code
def sdk_version(self) -> str: """ :return: sdk version used to create the DataWindow """ return self._sdk_version
def serialize(self, compression_factor: int = 4) ‑> pathlib.Path
-
Serializes and compresses this DataWindow to a file. Uses the event_name and out_dir to name the file.
:param compression_factor: A value between 1 and 12. Higher values provide better compression, but take longer. (default=4). :return: The path to the written file.
Expand source code
def serialize(self, compression_factor: int = 4) -> Path: """ Serializes and compresses this DataWindow to a file. Uses the event_name and out_dir to name the file. :param compression_factor: A value between 1 and 12. Higher values provide better compression, but take longer. (default=4). :return: The path to the written file. """ return dw_io.serialize_data_window(self, self.save_dir(), f"{self.event_name}.pkl.lz4", compression_factor)
def set_make_runme(self, make_runme: bool = False)
-
:param make_runme: if True, DataWindow will create a runme file when saved. Default False
Expand source code
def set_make_runme(self, make_runme: bool = False): """ :param make_runme: if True, DataWindow will create a runme file when saved. Default False """ self._fs_writer.make_run_me = make_runme
def set_out_type(self, new_out_type: str)
-
set the output type of the DataWindow. options are "NONE", "PARQUET", "LZ4" and "JSON". Invalid values become "NONE"
:param new_out_type: new output type of the DataWindow
Expand source code
def set_out_type(self, new_out_type: str): """ set the output type of the DataWindow. options are "NONE", "PARQUET", "LZ4" and "JSON". Invalid values become "NONE" :param new_out_type: new output type of the DataWindow """ self._fs_writer.set_extension(new_out_type)
def set_save_dir(self, new_save_dir: Optional[str] = '.')
-
:param new_save_dir: directory to save data to; default current directory, or "."
Expand source code
def set_save_dir(self, new_save_dir: Optional[str] = "."): """ :param new_save_dir: directory to save data to; default current directory, or "." """ self._fs_writer.base_dir = new_save_dir
def set_sdk_version(self, version: str)
-
:param version: the sdk version to set
Expand source code
def set_sdk_version(self, version: str): """ :param version: the sdk version to set """ self._sdk_version = version
def start_date(self) ‑> float
-
:return: minimum start timestamp of the data or np.nan if no data
Expand source code
def start_date(self) -> float: """ :return: minimum start timestamp of the data or np.nan if no data """ if len(self._stations) > 0: return np.min([s.first_data_timestamp() for s in self._stations]) return np.nan
def station_ids(self) ‑> List[str]
-
:return: ids of stations in the DataWindow
Expand source code
def station_ids(self) -> List[str]: """ :return: ids of stations in the DataWindow """ return [s.id() for s in self._stations]
def stations(self) ‑> List[Station]
-
:return: list of stations in the DataWindow
Expand source code
def stations(self) -> List[Station]: """ :return: list of stations in the DataWindow """ return self._stations
def to_json(self) ‑> str
-
:return: The DataWindow metadata into a JSON string.
Expand source code
def to_json(self) -> str: """ :return: The DataWindow metadata into a JSON string. """ return dw_io.data_window_as_json(self)
class DataWindowConfig (input_dir: str, structured_layout: bool = True, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, start_buffer_td: datetime.timedelta = datetime.timedelta(seconds=120), end_buffer_td: datetime.timedelta = datetime.timedelta(seconds=120), drop_time_s: float = 0.2, station_ids: Optional[Iterable[str]] = None, extensions: Optional[Set[str]] = None, api_versions: Optional[Set[ApiVersion]] = None, apply_correction: bool = True, use_model_correction: bool = True, copy_edge_points: DataPointCreationMode = DataPointCreationMode.COPY)
-
Configuration of DataWindow properties
Properties
input_dir: str, the directory that contains all the data. REQUIRED
structured_layout: bool, if True, the input_dir contains specially named and organized directories of data. Default True
start_datetime: optional datetime, start datetime of the window. If None, uses the first timestamp of the filtered data. Default None
end_datetime: optional datetime, non-inclusive end datetime of the window. If None, uses the last timestamp of the filtered data + 1. Default None
start_buffer_td: timedelta, the amount of time to include before the start_datetime when filtering data. Negative values are converted to 0. Default DEFAULT_START_BUFFER_TD (2 minutes)
end_buffer_td: timedelta, the amount of time to include after the end_datetime when filtering data. Negative values are converted to 0. Default DEFAULT_END_BUFFER_TD (2 minutes)
drop_time_s: float, the minimum amount of seconds between data files that would indicate a gap. Negative values are converted to default value. Default DATA_DROP_DURATION_S (0.2 seconds)
station_ids: optional set of strings, representing the station ids to filter on. If empty or None, get any ids found in the input directory. You may pass in any iterable, as long as it can be turned into a set. Default None
extensions: optional set of strings, representing file extensions to filter on. If None, gets as much data as it can in the input directory. Default None
api_versions: optional set of ApiVersions, representing api versions to filter on. If None, get as much data as it can in the input directory. Default None
apply_correction: bool, if True, update the timestamps in the data based on best station offset. Default True
copy_edge_points: enumeration of DataPointCreationMode. Determines how new points are created. Valid values are NAN, COPY, and INTERPOLATE. Default COPY
use_model_correction: bool, if True, use the offset model's correction functions, otherwise use the best offset. Default True
Expand source code
class DataWindowConfig: """ Configuration of DataWindow properties Properties: input_dir: str, the directory that contains all the data. REQUIRED structured_layout: bool, if True, the input_dir contains specially named and organized directories of data. Default True start_datetime: optional datetime, start datetime of the window. If None, uses the first timestamp of the filtered data. Default None end_datetime: optional datetime, non-inclusive end datetime of the window. If None, uses the last timestamp of the filtered data + 1. Default None start_buffer_td: timedelta, the amount of time to include before the start_datetime when filtering data. Negative values are converted to 0. Default DEFAULT_START_BUFFER_TD (2 minutes) end_buffer_td: timedelta, the amount of time to include after the end_datetime when filtering data. Negative values are converted to 0. Default DEFAULT_END_BUFFER_TD (2 minutes) drop_time_s: float, the minimum amount of seconds between data files that would indicate a gap. Negative values are converted to default value. Default DATA_DROP_DURATION_S (0.2 seconds) station_ids: optional set of strings, representing the station ids to filter on. If empty or None, get any ids found in the input directory. You may pass in any iterable, as long as it can be turned into a set. Default None extensions: optional set of strings, representing file extensions to filter on. If None, gets as much data as it can in the input directory. Default None api_versions: optional set of ApiVersions, representing api versions to filter on. If None, get as much data as it can in the input directory. Default None apply_correction: bool, if True, update the timestamps in the data based on best station offset. Default True copy_edge_points: enumeration of DataPointCreationMode. Determines how new points are created. Valid values are NAN, COPY, and INTERPOLATE. Default COPY use_model_correction: bool, if True, use the offset model's correction functions, otherwise use the best offset. Default True """ def __init__( self, input_dir: str, structured_layout: bool = True, start_datetime: Optional[dtu.datetime] = None, end_datetime: Optional[dtu.datetime] = None, start_buffer_td: timedelta = DEFAULT_START_BUFFER_TD, end_buffer_td: timedelta = DEFAULT_END_BUFFER_TD, drop_time_s: float = DATA_DROP_DURATION_S, station_ids: Optional[Iterable[str]] = None, extensions: Optional[Set[str]] = None, api_versions: Optional[Set[io.ApiVersion]] = None, apply_correction: bool = True, use_model_correction: bool = True, copy_edge_points: gpu.DataPointCreationMode = gpu.DataPointCreationMode.COPY, ): self.input_dir: str = input_dir self.structured_layout: bool = structured_layout self.start_datetime: Optional[dtu.datetime] = start_datetime self.end_datetime: Optional[dtu.datetime] = end_datetime self.start_buffer_td: timedelta = ( start_buffer_td if start_buffer_td > timedelta(seconds=0) else timedelta(seconds=0) ) self.end_buffer_td: timedelta = end_buffer_td if end_buffer_td > timedelta(seconds=0) else timedelta(seconds=0) self.drop_time_s: float = drop_time_s if drop_time_s > 0 else DATA_DROP_DURATION_S self.station_ids: Optional[Set[str]] = set(station_ids) if station_ids else None self.extensions: Optional[Set[str]] = extensions self.api_versions: Optional[Set[io.ApiVersion]] = api_versions self.apply_correction: bool = apply_correction self.use_model_correction = use_model_correction self.copy_edge_points = copy_edge_points def __repr__(self): return ( f"input_dir: {self.input_dir}, " f"structured_layout: {self.structured_layout}, " f"start_datetime: {self.start_datetime.__repr__()}, " f"end_datetime: {self.end_datetime.__repr__()}, " f"start_buffer_td: {self.start_buffer_td.__repr__()}, " f"end_buffer_td: {self.end_buffer_td.__repr__()}, " f"drop_time_s: {self.drop_time_s}, " f"station_ids: {list(self.station_ids) if self.station_ids else []}, " f"extensions: {list(self.extensions) if self.extensions else []}, " f"api_versions: {[a_v.value for a_v in self.api_versions] if self.api_versions else []}, " f"apply_correction: {self.apply_correction}, " f"use_model_correction: {self.use_model_correction}, " f"copy_edge_points: {self.copy_edge_points.value}" ) def __str__(self): return ( f"input_dir: {self.input_dir}, " f"structured_layout: {self.structured_layout}, " f"start_datetime: " f"{self.start_datetime.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.start_datetime else None}, " f"end_datetime: {self.end_datetime.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if self.end_datetime else None}, " f"start_buffer_td (in s): {self.start_buffer_td.total_seconds()}, " f"end_buffer_td (in s): {self.end_buffer_td.total_seconds()}, " f"drop_time_s: {self.drop_time_s}, " f"station_ids: {list(self.station_ids) if self.station_ids else []}, " f"extensions: {list(self.extensions) if self.extensions else []}, " f"api_versions: {[a_v.value for a_v in self.api_versions] if self.api_versions else []}, " f"apply_correction: {self.apply_correction}, " f"use_model_correction: {self.use_model_correction}, " f"copy_edge_points: {self.copy_edge_points.name}" ) def to_dict(self) -> Dict: return { "input_dir": self.input_dir, "structured_layout": self.structured_layout, "start_datetime": dtu.datetime_to_epoch_microseconds_utc(self.start_datetime) if self.start_datetime else None, "end_datetime": dtu.datetime_to_epoch_microseconds_utc(self.end_datetime) if self.end_datetime else None, "start_buffer_td": self.start_buffer_td.total_seconds(), "end_buffer_td": self.end_buffer_td.total_seconds(), "drop_time_s": self.drop_time_s, "station_ids": list(self.station_ids) if self.station_ids else [], "extensions": list(self.extensions) if self.extensions else [], "api_versions": [a_v.value for a_v in self.api_versions] if self.api_versions else [], "apply_correction": self.apply_correction, "use_model_correction": self.use_model_correction, "copy_edge_points": self.copy_edge_points.value, } @staticmethod def from_dict(data_dict: Dict) -> "DataWindowConfig": return DataWindowConfig( data_dict["input_dir"], data_dict["structured_layout"], dtu.datetime_from_epoch_microseconds_utc(data_dict["start_datetime"]) if data_dict["start_datetime"] else None, dtu.datetime_from_epoch_microseconds_utc(data_dict["end_datetime"]) if data_dict["end_datetime"] else None, timedelta(seconds=data_dict["start_buffer_td"]), timedelta(seconds=data_dict["end_buffer_td"]), data_dict["drop_time_s"], data_dict["station_ids"], set(data_dict["extensions"]), set([io.ApiVersion.from_str(v) for v in data_dict["api_versions"]]), data_dict["apply_correction"], data_dict["use_model_correction"], gpu.DataPointCreationMode(data_dict["copy_edge_points"]), )
Static methods
def from_dict(data_dict: Dict) ‑> DataWindowConfig
-
Expand source code
@staticmethod def from_dict(data_dict: Dict) -> "DataWindowConfig": return DataWindowConfig( data_dict["input_dir"], data_dict["structured_layout"], dtu.datetime_from_epoch_microseconds_utc(data_dict["start_datetime"]) if data_dict["start_datetime"] else None, dtu.datetime_from_epoch_microseconds_utc(data_dict["end_datetime"]) if data_dict["end_datetime"] else None, timedelta(seconds=data_dict["start_buffer_td"]), timedelta(seconds=data_dict["end_buffer_td"]), data_dict["drop_time_s"], data_dict["station_ids"], set(data_dict["extensions"]), set([io.ApiVersion.from_str(v) for v in data_dict["api_versions"]]), data_dict["apply_correction"], data_dict["use_model_correction"], gpu.DataPointCreationMode(data_dict["copy_edge_points"]), )
Methods
def to_dict(self) ‑> Dict
-
Expand source code
def to_dict(self) -> Dict: return { "input_dir": self.input_dir, "structured_layout": self.structured_layout, "start_datetime": dtu.datetime_to_epoch_microseconds_utc(self.start_datetime) if self.start_datetime else None, "end_datetime": dtu.datetime_to_epoch_microseconds_utc(self.end_datetime) if self.end_datetime else None, "start_buffer_td": self.start_buffer_td.total_seconds(), "end_buffer_td": self.end_buffer_td.total_seconds(), "drop_time_s": self.drop_time_s, "station_ids": list(self.station_ids) if self.station_ids else [], "extensions": list(self.extensions) if self.extensions else [], "api_versions": [a_v.value for a_v in self.api_versions] if self.api_versions else [], "apply_correction": self.apply_correction, "use_model_correction": self.use_model_correction, "copy_edge_points": self.copy_edge_points.value, }
class EventOrigin (provider: str = 'UNKNOWN', latitude: float = nan, latitude_std: float = nan, longitude: float = nan, longitude_std: float = nan, altitude: float = nan, altitude_std: float = nan, event_radius_m: float = 0.0)
-
The origin event's latitude, longitude, altitude and their standard deviations, the device used to measure the location data and the radius of the event
Properties
provider: str, source of the location data (i.e. "GPS" or "NETWORK"), default "UNKNOWN"
latitude: float, best estimate of latitude in degrees, default np.nan
latitude_std: float, standard deviation of best estimate of latitude, default np.nan
longitude: float, best estimate of longitude in degrees, default np.nan
longitude_std: float, standard deviation of best estimate of longitude, default np.nan
altitude: float, best estimate of altitude in meters, default np.nan
altitude_std: float, standard deviation of best estimate of altitude, default np.nan
event_radius_m: float, radius of event in meters, default 0.0
Expand source code
@dataclass_json @dataclass class EventOrigin: """ The origin event's latitude, longitude, altitude and their standard deviations, the device used to measure the location data and the radius of the event Properties: provider: str, source of the location data (i.e. "GPS" or "NETWORK"), default "UNKNOWN" latitude: float, best estimate of latitude in degrees, default np.nan latitude_std: float, standard deviation of best estimate of latitude, default np.nan longitude: float, best estimate of longitude in degrees, default np.nan longitude_std: float, standard deviation of best estimate of longitude, default np.nan altitude: float, best estimate of altitude in meters, default np.nan altitude_std: float, standard deviation of best estimate of altitude, default np.nan event_radius_m: float, radius of event in meters, default 0.0 """ provider: str = "UNKNOWN" latitude: float = np.nan latitude_std: float = np.nan longitude: float = np.nan longitude_std: float = np.nan altitude: float = np.nan altitude_std: float = np.nan event_radius_m: float = 0.0
Class variables
var altitude : float
var altitude_std : float
var event_radius_m : float
var latitude : float
var latitude_std : float
var longitude : float
var longitude_std : float
var provider : str
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)