Module redvox.common.api_reader_dw
For use with DataWindow specifically. Do NOT use with non-DataWindow objects Read Redvox data from a single directory Data files can be either API 900 or API 1000 data formats
Expand source code
"""
For use with DataWindow specifically. Do NOT use with non-DataWindow objects
Read Redvox data from a single directory
Data files can be either API 900 or API 1000 data formats
"""
from typing import List, Optional
import multiprocessing.pool
import numpy as np
import redvox.settings as settings
from redvox.common import io
from redvox.common.api_reader import ApiReader
from redvox.common.parallel_utils import maybe_parallel_map
from redvox.common.station import Station
class ApiReaderDw(ApiReader):
"""
For use with DataWindow specifically. Do NOT use with non-DataWindow objects
"""
def __init__(
self,
base_dir: str,
structured_dir: bool = False,
read_filter: io.ReadFilter = None,
correct_timestamps: bool = False,
use_model_correction: bool = True,
dw_base_dir: str = ".",
dw_save_mode: io.FileSystemSaveMode = io.FileSystemSaveMode.TEMP,
debug: bool = False,
pool: Optional[multiprocessing.pool.Pool] = None,
):
"""
initialize API reader for data window
:param base_dir: directory containing the files to read
:param structured_dir: if True, base_dir contains a specific directory structure used by the respective
api formats. If False, base_dir only has the data files. Default False.
:param read_filter: ReadFilter for the data files, if None, get everything. Default None
:param correct_timestamps: if True, correct the timestamps of the data. Default False
:param use_model_correction: if True, use the offset model of the station to correct the timestamps.
if correct_timestamps is False, this value doesn't matter. Default True
:param dw_base_dir: the directory to save DataWindow files to. if dw_save_mode is "FileSystemSaveMode.MEM",
this value doesn't matter. default "." (current directory)
:param dw_save_mode: save method for the data window. Default "FileSystemSaveMode.TEMP"; save to temp_dir
:param debug: if True, output program warnings/errors during function execution. Default False.
"""
super().__init__(base_dir, structured_dir, read_filter, debug, pool)
self.correct_timestamps = correct_timestamps
self.use_model_correction = use_model_correction
self.dw_base_dir = dw_base_dir
self.dw_save_mode = dw_save_mode
self.all_files_size = np.sum([idx.files_size() for idx in self.files_index])
self._stations = self._read_stations()
def _station_by_index(self, findex: io.Index) -> Station:
"""
builds station using the index of files to read
splits the index into smaller chunks if entire record cannot be held in memory
:param findex: index with files to build a station with
:return: Station built from files in findex, without building the data from parquet
"""
split_list = self._split_workload(findex)
use_temp_dir = True if len(split_list) > 1 else False
if len(split_list) > 0:
if self.debug and use_temp_dir:
print("Writing data to temporary disk; this may take a few minutes to complete.")
station_from_index = Station.create_from_indexes(
split_list,
correct_timestamps=self.correct_timestamps,
use_model_correction=self.use_model_correction,
base_out_dir=self.dw_base_dir,
use_temp_dir=use_temp_dir,
)
if self.debug:
print(f"station {station_from_index.id()} files read: {len(findex.entries)}")
if len(split_list) > 1:
print(f"required making {len(split_list)} smaller segments due to memory restraints")
if self.dw_save_mode == io.FileSystemSaveMode.MEM and use_temp_dir:
self.dw_save_mode = io.FileSystemSaveMode.TEMP
return station_from_index
self.errors.append("No files found to create station.")
return Station()
def get_stations(self, pool: Optional[multiprocessing.pool.Pool] = None) -> List[Station]:
"""
:return: a list of stations read by the ApiReader
"""
return self._stations
def _read_stations(self, pool: Optional[multiprocessing.pool.Pool] = None) -> List[Station]:
"""
:param pool: optional multiprocessing pool
:return: List of all stations in the ApiReader, without building the data from parquet
"""
if settings.is_parallelism_enabled() and len(self.files_index) > 1:
return list(maybe_parallel_map(pool, self._station_by_index, self.files_index, chunk_size=1))
return list(map(self._station_by_index, self.files_index))
def get_station_by_id(self, get_id: str) -> Optional[List[Station]]:
"""
:param get_id: the id to filter on
:return: list of all stations with the requested id or None if id can't be found
"""
result = [s for s in self._stations if s.id() == get_id]
if len(result) < 1:
return None
return result
Classes
class ApiReaderDw (base_dir: str, structured_dir: bool = False, read_filter: ReadFilter = None, correct_timestamps: bool = False, use_model_correction: bool = True, dw_base_dir: str = '.', dw_save_mode: FileSystemSaveMode = FileSystemSaveMode.TEMP, debug: bool = False, pool: Optional[multiprocessing.pool.Pool] = None)
-
For use with DataWindow specifically. Do NOT use with non-DataWindow objects
initialize API reader for data window
:param base_dir: directory containing the files to read :param structured_dir: if True, base_dir contains a specific directory structure used by the respective api formats. If False, base_dir only has the data files. Default False. :param read_filter: ReadFilter for the data files, if None, get everything. Default None :param correct_timestamps: if True, correct the timestamps of the data. Default False :param use_model_correction: if True, use the offset model of the station to correct the timestamps. if correct_timestamps is False, this value doesn't matter. Default True :param dw_base_dir: the directory to save DataWindow files to. if dw_save_mode is "FileSystemSaveMode.MEM", this value doesn't matter. default "." (current directory) :param dw_save_mode: save method for the data window. Default "FileSystemSaveMode.TEMP"; save to temp_dir :param debug: if True, output program warnings/errors during function execution. Default False.
Expand source code
class ApiReaderDw(ApiReader): """ For use with DataWindow specifically. Do NOT use with non-DataWindow objects """ def __init__( self, base_dir: str, structured_dir: bool = False, read_filter: io.ReadFilter = None, correct_timestamps: bool = False, use_model_correction: bool = True, dw_base_dir: str = ".", dw_save_mode: io.FileSystemSaveMode = io.FileSystemSaveMode.TEMP, debug: bool = False, pool: Optional[multiprocessing.pool.Pool] = None, ): """ initialize API reader for data window :param base_dir: directory containing the files to read :param structured_dir: if True, base_dir contains a specific directory structure used by the respective api formats. If False, base_dir only has the data files. Default False. :param read_filter: ReadFilter for the data files, if None, get everything. Default None :param correct_timestamps: if True, correct the timestamps of the data. Default False :param use_model_correction: if True, use the offset model of the station to correct the timestamps. if correct_timestamps is False, this value doesn't matter. Default True :param dw_base_dir: the directory to save DataWindow files to. if dw_save_mode is "FileSystemSaveMode.MEM", this value doesn't matter. default "." (current directory) :param dw_save_mode: save method for the data window. Default "FileSystemSaveMode.TEMP"; save to temp_dir :param debug: if True, output program warnings/errors during function execution. Default False. """ super().__init__(base_dir, structured_dir, read_filter, debug, pool) self.correct_timestamps = correct_timestamps self.use_model_correction = use_model_correction self.dw_base_dir = dw_base_dir self.dw_save_mode = dw_save_mode self.all_files_size = np.sum([idx.files_size() for idx in self.files_index]) self._stations = self._read_stations() def _station_by_index(self, findex: io.Index) -> Station: """ builds station using the index of files to read splits the index into smaller chunks if entire record cannot be held in memory :param findex: index with files to build a station with :return: Station built from files in findex, without building the data from parquet """ split_list = self._split_workload(findex) use_temp_dir = True if len(split_list) > 1 else False if len(split_list) > 0: if self.debug and use_temp_dir: print("Writing data to temporary disk; this may take a few minutes to complete.") station_from_index = Station.create_from_indexes( split_list, correct_timestamps=self.correct_timestamps, use_model_correction=self.use_model_correction, base_out_dir=self.dw_base_dir, use_temp_dir=use_temp_dir, ) if self.debug: print(f"station {station_from_index.id()} files read: {len(findex.entries)}") if len(split_list) > 1: print(f"required making {len(split_list)} smaller segments due to memory restraints") if self.dw_save_mode == io.FileSystemSaveMode.MEM and use_temp_dir: self.dw_save_mode = io.FileSystemSaveMode.TEMP return station_from_index self.errors.append("No files found to create station.") return Station() def get_stations(self, pool: Optional[multiprocessing.pool.Pool] = None) -> List[Station]: """ :return: a list of stations read by the ApiReader """ return self._stations def _read_stations(self, pool: Optional[multiprocessing.pool.Pool] = None) -> List[Station]: """ :param pool: optional multiprocessing pool :return: List of all stations in the ApiReader, without building the data from parquet """ if settings.is_parallelism_enabled() and len(self.files_index) > 1: return list(maybe_parallel_map(pool, self._station_by_index, self.files_index, chunk_size=1)) return list(map(self._station_by_index, self.files_index)) def get_station_by_id(self, get_id: str) -> Optional[List[Station]]: """ :param get_id: the id to filter on :return: list of all stations with the requested id or None if id can't be found """ result = [s for s in self._stations if s.id() == get_id] if len(result) < 1: return None return result
Ancestors
Methods
def get_stations(self, pool: Optional[multiprocessing.pool.Pool] = None) ‑> List[Station]
-
:return: a list of stations read by the ApiReader
Expand source code
def get_stations(self, pool: Optional[multiprocessing.pool.Pool] = None) -> List[Station]: """ :return: a list of stations read by the ApiReader """ return self._stations
Inherited members