Module redvox.common.io

This module provides IO primitives for working with cross-API RedVox data.

Expand source code
"""
This module provides IO primitives for working with cross-API RedVox data.
"""
import enum
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from glob import glob
import numpy as np
import json
import os.path
import multiprocessing
import multiprocessing.pool
import tempfile
from pathlib import Path, PurePath
from shutil import copy2, move, rmtree
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    Optional,
    Set,
    Union,
    TYPE_CHECKING,
    Callable,
)

import lz4.frame

from redvox.api900.reader import read_rdvxz_file, read_buffer
from redvox.api900.reader_utils import calculate_uncompressed_size
from redvox.common import api_conversions as ac
from redvox.api1000.common.common import check_type
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
from redvox.api1000.proto.redvox_api_m_pb2 import RedvoxPacketM
from redvox.common.versioning import check_version, ApiVersion
from redvox.common.date_time_utils import (
    datetime_from_epoch_microseconds_utc as dt_us,
    datetime_from_epoch_milliseconds_utc as dt_ms,
    datetime_to_epoch_microseconds_utc as us_dt,
    truncate_dt_ymd,
    truncate_dt_ymdh,
)
from redvox.common.parallel_utils import maybe_parallel_map

if TYPE_CHECKING:
    from redvox.api900.wrapped_redvox_packet import WrappedRedvoxPacket
    from redvox.api900.lib.api900_pb2 import RedvoxPacket


def remove_dir_contents(dir_path: Path):
    """
    removes all contents of the directory specified by dir_path

    :param dir_path: path to directory to remove files from
    """
    if dir_path.is_dir():
        for entry in os.listdir(dir_path):
            rmv_path = os.path.join(dir_path, entry)
            if os.path.isdir(rmv_path):
                rmtree(rmv_path)
            else:
                os.remove(rmv_path)
    else:
        print(f"{dir_path} is not a directory; cannot remove contents!")


class FileSystemSaveMode(enum.Enum):
    """
    Enumeration of saving methodology
    """
    MEM = 0   # save using memory
    TEMP = 1  # save using temporary directory
    DISK = 2  # save using path on disk

    @staticmethod
    def get_save_mode(use_temp: bool, use_disk: bool) -> "FileSystemSaveMode":
        """
        :param use_temp: use temporary directory
        :param use_disk: use path on disk
        :return: the mode used to save (use_temp is evaluated before use_disk)
        """
        if use_temp:
            return FileSystemSaveMode.TEMP  # use_temp takes priority
        elif use_disk:
            return FileSystemSaveMode.DISK  # if here, use_temp is always false
        return FileSystemSaveMode.MEM


class FileSystemWriter:
    """
    This class holds basic information about writing and reading objects from a file system
    If user does not enable saving to disk, we use a temporary directory to store large files

    Properties:
        file_name: str, the name of the file (do not include extension)

        file_ext: str, the extension used by the file (do not include the .)  Default "NONE"

        base_dir: str, the directory to save the file to.  Default "." (current dir)

    Protected:
        _save_mode: FileSystemSaveMode, determines how files get saved

        _temp_dir: TemporaryDirectory, temporary directory for large files when not saving to disk
    """

    def __init__(self, file_name: str, file_ext: str = "none", base_dir: str = ".",
                 save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM):
        """
        initialize FileSystemWriter

        :param file_name: name of file
        :param file_ext: extension of file, default "none"
        :param base_dir: directory to save file to, default "." (current dir)
        :param save_mode: determines how to save files to system, default MEM (no save, use RAM)
        """
        self.file_name: str = file_name
        self.file_extension: str = file_ext.lower()
        self.base_dir: str = base_dir
        self._save_mode: FileSystemSaveMode = save_mode
        self._temp_dir = tempfile.TemporaryDirectory()

    def __repr__(self):
        return f"file_name: {self.file_name}, " \
               f"extension: {self.file_extension}, " \
               f"base_dir: {self.base_dir}, " \
               f"save_mode: {self._save_mode.value if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.value}"

    def __str__(self):
        return f"file_name: {self.file_name}, " \
               f"extension: {self.file_extension}, " \
               f"base_dir: {self.base_dir}, " \
               f"save_mode: {self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.name}"

    def __del__(self):
        """
        remove temp dir
        """
        self._temp_dir.cleanup()

    def is_use_temp(self) -> bool:
        """
        :return: if writing to temp dir
        """
        if hasattr(self, "_save_mode"):
            return self._save_mode == FileSystemSaveMode.TEMP
        return False

    def set_use_temp(self, use_temp: bool):
        """
        :param use_temp: if true, sets mode to use temp dir, otherwise no change
        """
        if use_temp:
            self._save_mode = FileSystemSaveMode.TEMP

    def get_temp(self) -> str:
        """
        :return: path of temp directory
        """
        return self._temp_dir.name

    def is_use_disk(self) -> bool:
        """
        :return: if writing to path on disk
        """
        if hasattr(self, "_save_mode"):
            return self._save_mode == FileSystemSaveMode.DISK
        return False

    def set_use_disk(self, use_disk: bool):
        """
        :param use_disk: if true, sets mode to use the disk, otherwise no change
        """
        if use_disk:
            self._save_mode = FileSystemSaveMode.DISK

    def is_use_mem(self) -> bool:
        """
        :return: if writing data to memory
        """
        if hasattr(self, "_save_mode"):
            return self._save_mode == FileSystemSaveMode.MEM
        return False

    def set_use_mem(self, use_mem: bool):
        """
        :param use_mem: if true, sets mode to use the system's RAM, otherwise no change
        """
        if use_mem:
            self._save_mode = FileSystemSaveMode.MEM

    def is_save_disk(self) -> bool:
        """
        :return: if writing data to disk (temp dir or user defined path) instead of using memory
        """
        if hasattr(self, '_save_mode'):
            return self._save_mode != FileSystemSaveMode.MEM
        return False

    def save_dir(self) -> str:
        """
        :return: directory where file would be saved based on save mode; returns empty string if saving to memory
        """
        if self.is_use_disk():
            return self.base_dir
        elif self.is_use_temp():
            return self._temp_dir.name
        return ""

    def set_save_mode(self, save_mode: FileSystemSaveMode):
        """
        set the save mode

        :param save_mode: updated save mode
        """
        self._save_mode = save_mode

    def save_mode(self) -> FileSystemSaveMode:
        """
        :return: the save mode
        """
        return self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP

    def full_name(self) -> str:
        """
        :return: file name with extension
        """
        return f"{self.file_name}.{self.file_extension}"

    def full_path(self) -> str:
        """
        :return: the full path to where the file would be written
        """
        return os.path.join(self.save_dir(), self.full_name())

    def set_name_and_extension(self, name: str, ext: str):
        """
        set the name and extension of the output file.  Do not include the . for the extension
        :param name: file name
        :param ext: file extension
        """
        self.file_name = name
        self.file_extension = ext

    def set_name(self, name: str):
        """
        set the name of the output file.
        :param name: file name
        """
        self.file_name = name

    def set_extension(self, ext: str):
        """
        set the extension of the output file.  Do not include the . for the extension
        :param ext: file extension
        """
        self.file_extension = ext

    def json_file_name(self) -> str:
        """
        :return: file name with .json extension
        """
        return f"{self.file_name}.json"

    def json_path(self) -> Path:
        """
        :return: full path to json file
        """
        return Path(self.save_dir()).joinpath(self.json_file_name())

    def create_dir(self):
        """
        if saving to disk, remove the directory if it exists, then create an empty directory to save things into
        if saving to temp dir, remove any files in the temp dir before saving to dir
        """
        if self.is_use_disk():
            if os.path.exists(self.save_dir()):
                remove_dir_contents(Path(self.save_dir()))
            else:
                os.makedirs(self.save_dir())
        elif self.is_use_temp():
            self._temp_dir.cleanup()
            self._temp_dir = tempfile.TemporaryDirectory()

    def as_dict(self) -> dict:
        """
        :return: FileSystemWriter as dictionary
        """
        return {
            "file_name": self.file_name,
            "file_extension": self.file_extension,
            "base_dir": self.base_dir,
            "save_mode": self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.name
        }

    @staticmethod
    def from_dict(data_dict: Dict) -> "FileSystemWriter":
        """
        :param data_dict: dictionary to convert to FileSystemWriter
        :return: FileSystemWriter from dict
        """
        return FileSystemWriter(data_dict["file_name"], data_dict["file_extension"], data_dict["base_dir"],
                                FileSystemSaveMode[data_dict["save_mode"]])


def dict_to_json(dct: dict) -> str:
    """
    :param dct: dictionary to convert to json
    :return: dictionary as json string
    """
    return json.dumps(dct)


def json_to_dict(json_str: str) -> Dict:
    """
    :param json_str: string of json to convert to dictionary
    :return: json string as a dictionary
    """
    return json.loads(json_str)


def json_file_to_dict(file_path: str) -> Dict:
    """
    :param file_path: full path of file to load data from.
    :return: json file as python dictionary
    """
    with open(file_path, "r") as f_p:
        return json_to_dict(f_p.read())


def get_json_file(file_dir: str) -> Optional[str]:
    """
    Finds the first json file in the file_dir specified or None if there is no file

    :param file_dir: directory to find json file in
    :return: full name of first json file in the directory or None if no files found
    """
    file_names = glob(os.path.join(file_dir, "*.json"))
    if len(file_names) < 1:
        return None
    return Path(file_names[0]).name


def _is_int(value: str) -> Optional[int]:
    """
    Tests if a given str is a valid integer. If it is, the integer is returned, if it is not, None is returned.

    :param value: The string to test.
    :return: The integer value if it is valid, or None if it is not valid.
    """
    try:
        return int(value)
    except ValueError:
        return None


def _not_none(value: Optional[Any]) -> bool:
    """
    Tests that the given value is not None.

    :param value: The value to test.
    :return: True if the value is not None, False if it is None.
    """
    return value is not None


@dataclass
class IndexEntry:
    """
    This class represents a single index entry. It extracts and encapsulated API agnostic fields that represent the
    information stored in standard RedVox file names.
    """

    full_path: str
    station_id: str
    date_time: datetime
    extension: str
    api_version: ApiVersion
    compressed_file_size_bytes: int = 0
    decompressed_file_size_bytes: int = 0

    @staticmethod
    def from_path(path_str: str, strict: bool = True) -> Optional["IndexEntry"]:
        """
        Attempts to parse a file path into an IndexEntry. If a given path is not recognized as a valid RedVox file,
        None will be returned instead.

        :param path_str: The file system path to attempt to parse.
        :param strict: When set, None is returned if the referenced file DNE.
        :return: Either an IndexEntry or successful parse or None.
        """
        api_version: ApiVersion = check_version(path_str)
        path: Path = Path(path_str)
        name: str = path.stem
        ext: str = path.suffix

        # Attempt to parse file name parts
        split_name = name.split("_")
        if len(split_name) != 2:
            return None

        station_id: str = split_name[0]
        ts_str: str = split_name[1]

        # If you have a filename with a dot, but not an extension, i.e. "0000000001_0.", we need to remove the dot
        # from the end and make in the extension
        if len(ts_str) > 0 and ts_str[-1] == ".":
            ts_str = ts_str[:-1]
            ext = "."

        timestamp: Optional[int] = _is_int(ts_str)

        # Ensure that both the station ID and timestamp can be represented as ints
        if _is_int(station_id) is None or timestamp is None:
            return None

        # Parse the datetime per the specified API version
        date_time: datetime
        if api_version == ApiVersion.API_1000:
            date_time = dt_us(timestamp)
        else:
            date_time = dt_ms(timestamp)

        full_path: str
        try:
            full_path = str(path.resolve(strict=True))
        except FileNotFoundError:
            if strict:
                return None
            full_path = path_str

        return IndexEntry(full_path, station_id, date_time, ext, api_version)._set_compressed_decompressed_lz4_size()

    @staticmethod
    def from_native(entry) -> "IndexEntry":
        """
        Converts a native index entry into a python index entry.

        :param entry: A native index entry.
        :return: A python index entry.
        """
        return IndexEntry(
            entry.full_path,
            entry.station_id,
            dt_us(entry.date_time),
            entry.extension,
            ApiVersion.from_str(entry.api_version)
        )._set_compressed_decompressed_lz4_size()

    def to_native(self):
        import redvox_native

        entry = redvox_native.IndexEntry(
            self.full_path,
            self.station_id,
            us_dt(self.date_time),
            self.extension,
            self.api_version.value
        )
        return entry

    def _set_compressed_decompressed_lz4_size(self):
        """
        set the compressed and decompressed file size in bytes of an lz4 file being read by the IndexEntry.
        default is 0 for both file sizes

        :return: updated self
        """
        if os.path.exists(self.full_path):
            self.compressed_file_size_bytes = os.path.getsize(self.full_path)
            with open(self.full_path, "rb") as fp:
                if self.api_version == ApiVersion.API_1000:
                    header = lz4.frame.get_frame_info(fp.read())
                    self.decompressed_file_size_bytes = header["content_size"]
                elif self.api_version == ApiVersion.API_900:
                    self.decompressed_file_size_bytes = calculate_uncompressed_size(fp.read())
        return self

    def read(self) -> Optional[Union[WrappedRedvoxPacketM, "WrappedRedvoxPacket"]]:
        """
        Reads, decompresses, deserializes, and wraps the RedVox file pointed to by this entry.

        :return: One of WrappedRedvoxPacket, WrappedRedvoxPacketM, or None.
        """
        if self.api_version == ApiVersion.API_900:
            return read_rdvxz_file(self.full_path)
        elif self.api_version == ApiVersion.API_1000:
            return WrappedRedvoxPacketM.from_compressed_path(self.full_path)
        else:
            return None

    def read_raw(self) -> Optional[Union["RedvoxPacket", RedvoxPacketM]]:
        """
        Reads, decompresses, and deserializes the RedVox file pointed to by this entry.

        :return: One of RedvoxPacket, RedvoxPacketM, or None. Note that these are the raw protobuf types.
        """
        if self.api_version == ApiVersion.API_900:
            with open(self.full_path, "rb") as buf_in:
                return read_buffer(buf_in.read())
        elif self.api_version == ApiVersion.API_1000:
            with lz4.frame.open(self.full_path, "rb") as serialized_in:
                proto: RedvoxPacketM = RedvoxPacketM()
                proto.ParseFromString(serialized_in.read())
                return proto
        else:
            return None

    def _into_native(self):
        pass

    def __eq__(self, other: object) -> bool:
        """
        Tests if this value is equal to another value.
        This along with __lt__ are used to fulfill the total ordering contract. Compares this entry's full path to
        another entries full path.

        :param other: Other IndexEntry to compare against.
        :return: True if this full path is less than the other full path.
        """
        if isinstance(other, IndexEntry):
            return self.full_path == other.full_path

        return False


# noinspection DuplicatedCode
@dataclass
class ReadFilter:
    """
    Filter RedVox files from the file system.
    """

    start_dt: Optional[datetime] = None
    end_dt: Optional[datetime] = None
    station_ids: Optional[Set[str]] = None
    extensions: Optional[Set[str]] = field(default_factory=lambda: {".rdvxm", ".rdvxz"})
    start_dt_buf: Optional[timedelta] = timedelta(minutes=2.0)
    end_dt_buf: Optional[timedelta] = timedelta(minutes=2.0)
    api_versions: Optional[Set[ApiVersion]] = field(
        default_factory=lambda: {ApiVersion.API_900, ApiVersion.API_1000}
    )

    @staticmethod
    def empty() -> "ReadFilter":
        """
        :return: A ReadFilter with ALL filters set to None. This is opposed to the default
                 which sets sane defaults for extensions, APIs, and window buffers.
        """
        return ReadFilter(None, None, None, None, None, None, None)

    def clone(self) -> "ReadFilter":
        """
        :return: a copy of the calling ReadFilter
        """
        return_filter = ReadFilter()
        return (
            return_filter.with_start_dt(self.start_dt)
            .with_end_dt(self.end_dt)
            .with_station_ids(self.station_ids)
            .with_extensions(self.extensions)
            .with_start_dt_buf(self.start_dt_buf)
            .with_end_dt_buf(self.end_dt_buf)
            .with_api_versions(self.api_versions)
        )

    def with_start_dt(self, start_dt: Optional[datetime]) -> "ReadFilter":
        """
        Adds a start datetime filter.

        :param start_dt: Start datetime that files should come after.
        :return: A modified instance of this filter
        """
        check_type(start_dt, [datetime, None])
        self.start_dt = start_dt
        return self

    def with_start_ts(self, start_ts: Optional[float]) -> "ReadFilter":
        """
        Adds a start time filter.

        :param start_ts: Start timestamp (microseconds)
        :return: A modified instance of this filter
        """
        check_type(start_ts, [int, float, None])
        if start_ts is None:
            return self.with_start_dt(None)

        return self.with_start_dt(dt_us(start_ts))

    def with_end_dt(self, end_dt: Optional[datetime]) -> "ReadFilter":
        """
        Adds an end datetime filter.

        :param end_dt: Filter for which packets should come before.
        :return: A modified instance of this filter
        """
        check_type(end_dt, [datetime, None])
        self.end_dt = end_dt
        return self

    def with_end_ts(self, end_ts: Optional[float]) -> "ReadFilter":
        """
        Like with_end_dt, but uses a microsecond timestamp.

        :param end_ts: Timestamp microseconds.
        :return: A modified instance of this filter
        """
        check_type(end_ts, [int, float, None])
        if end_ts is None:
            return self.with_end_dt(None)

        return self.with_end_dt(dt_us(end_ts))

    def with_station_ids(self, station_ids: Optional[Set[str]]) -> "ReadFilter":
        """
        Add a station id filter. Filters against provided station ids.

        :param station_ids: Station ids to filter against.
        :return: A modified instance of this filter
        """
        check_type(station_ids, [set, None])
        self.station_ids = station_ids
        return self

    def with_extensions(self, extensions: Optional[Set[str]]) -> "ReadFilter":
        """
        Filters against known file extensions.

        :param extensions: One or more extensions to filter against
        :return: A modified instance of this filter
        """
        check_type(extensions, [set, None])
        self.extensions = extensions
        return self

    def with_start_dt_buf(self, start_dt_buf: Optional[timedelta]) -> "ReadFilter":
        """
        Modifies the time buffer prepended to the start time.

        :param start_dt_buf: Amount of time to buffer before start time.
        :return: A modified instance of self.
        """
        check_type(start_dt_buf, [timedelta, None])
        self.start_dt_buf = start_dt_buf
        return self

    def with_end_dt_buf(self, end_dt_buf: Optional[timedelta]) -> "ReadFilter":
        """
        Modifies the time buffer appended to the end time.

        :param end_dt_buf: Amount of time to buffer after end time.
        :return: A modified instance of self.
        """
        check_type(end_dt_buf, [timedelta, None])
        self.end_dt_buf = end_dt_buf
        return self

    def with_api_versions(
        self, api_versions: Optional[Set[ApiVersion]]
    ) -> "ReadFilter":
        """
        Filters for specified API versions.

        :param api_versions: A set containing valid ApiVersion enums that should be included.
        :return: A modified instance of self.
        """
        check_type(api_versions, [set, None])
        self.api_versions = api_versions
        return self

    def apply_dt(
        self, date_time: datetime, dt_fn: Callable[[datetime], datetime] = lambda dt: dt
    ) -> bool:
        """
        Tests if a given datetime passes this filter.

        :param date_time: Datetime to test
        :param dt_fn: An (optional) function that will transform one datetime into another.
        :return: True if the datetime is included, False otherwise
        """
        check_type(date_time, [datetime])
        start_buf: timedelta = (
            timedelta(seconds=0) if self.start_dt_buf is None else self.start_dt_buf
        )
        if self.start_dt is not None and date_time < (dt_fn(self.start_dt - start_buf)):
            return False

        end_buf: timedelta = (
            timedelta(seconds=0) if self.end_dt_buf is None else self.end_dt_buf
        )
        if self.end_dt is not None and date_time > (dt_fn(self.end_dt + end_buf)):
            return False

        return True

    def apply(self, entry: IndexEntry) -> bool:
        """
        Applies this filter to the given IndexEntry.

        :param entry: The entry to test.
        :return: True if the entry is accepted by the filter, False otherwise.
        """
        check_type(entry, [IndexEntry])

        if not self.apply_dt(entry.date_time):
            return False

        if self.station_ids is not None and entry.station_id not in self.station_ids:
            return False

        if self.extensions is not None and entry.extension not in self.extensions:
            return False

        if self.api_versions is not None and entry.api_version not in self.api_versions:
            return False

        return True


@dataclass
class IndexStationSummary:
    """
    Summary of a single station in the index.
    """

    station_id: str
    api_version: ApiVersion
    total_packets: int
    first_packet: datetime
    last_packet: datetime
    single_packet_decompressed_size_bytes: int

    @staticmethod
    def from_entry(entry: IndexEntry) -> "IndexStationSummary":
        """
        Instantiates a new summary from a given IndexEntry.

        :param entry: Entry to copy information from.
        :return: An instance of IndexStationSummary.
        """
        return IndexStationSummary(
            entry.station_id,
            entry.api_version,
            1,
            first_packet=entry.date_time,
            last_packet=entry.date_time,
            single_packet_decompressed_size_bytes=entry.decompressed_file_size_bytes
        )

    def update(self, entry: IndexEntry) -> None:
        """
        Updates this summary given a new index entry.

        :param entry: Entry to update this summary from.
        """
        self.total_packets += 1
        if entry.date_time < self.first_packet:
            self.first_packet = entry.date_time

        if entry.date_time > self.last_packet:
            self.last_packet = entry.date_time


@dataclass
class IndexSummary:
    """
    Summarizes the contents of the index.
    """

    station_summaries: Dict[ApiVersion, Dict[str, IndexStationSummary]]

    def station_ids(self, api_version: ApiVersion = None) -> List[str]:
        """
        Returns the station IDs referenced by this index.

        :param api_version: An (optional) filter to only return packets for a specified RedVox API version.
                            None will collect station IDs from all API versions.
        :return: The station IDs referenced by this index.
        """
        if api_version is not None:
            return list(
                set(
                    map(
                        lambda summary: summary.station_id,
                        self.station_summaries[api_version].values(),
                    )
                )
            )
        else:
            # noinspection PyTypeChecker
            return list(
                set(
                    map(
                        lambda summary: summary.station_id,
                        self.station_summaries[ApiVersion.API_900].values(),
                    )
                )
            ) + list(
                set(
                    map(
                        lambda summary: summary.station_id,
                        self.station_summaries[ApiVersion.API_1000].values(),
                    )
                )
            )

    def total_packets(self, api_version: ApiVersion = None) -> int:
        """
        Returns the total number of packets referenced by this index.

        :param api_version: An (optional) filter to only return packets for a specified RedVox API version.
                            None will count packets from all API versions.
        :return: The total number of packets referenced by this index.
        """
        if api_version is not None:
            return sum(
                map(
                    lambda summary: summary.total_packets,
                    self.station_summaries[api_version].values(),
                )
            )
        else:
            # noinspection PyTypeChecker
            return sum(
                map(
                    lambda summary: summary.total_packets,
                    self.station_summaries[ApiVersion.API_900].values(),
                )
            ) + sum(
                map(
                    lambda summary: summary.total_packets,
                    self.station_summaries[ApiVersion.API_1000].values(),
                )
            )

    @staticmethod
    def from_index(index: "Index") -> "IndexSummary":
        """
        Builds an IndexSummary from a given index.

        :param index: Index to build summary from.
        :return: An instance of IndexSummary.
        """
        station_summaries: Dict[
            ApiVersion, Dict[str, IndexStationSummary]
        ] = defaultdict(dict)

        entry: IndexEntry
        for entry in index.entries:
            sub_entry: Dict[str, IndexStationSummary] = station_summaries[
                entry.api_version
            ]
            if entry.station_id in sub_entry:
                # Update existing station summary
                sub_entry[entry.station_id].update(entry)
            else:
                # Create new station summary
                sub_entry[entry.station_id] = IndexStationSummary.from_entry(entry)

        return IndexSummary(station_summaries)


@dataclass
class Index:
    """
    An index of available RedVox files from the file system.
    """

    entries: List[IndexEntry] = field(default_factory=lambda: [])

    @staticmethod
    def from_native(index_native) -> "Index":
        """
        Converts a native index into a python index.

        :param index_native: A native index.
        :return: A Python index.
        """
        entries: List[IndexEntry] = list(
            map(IndexEntry.from_native, index_native.entries)
        )
        return Index(entries)._set_decompressed_file_size()

    def to_native(self):
        import redvox_native

        native_index = redvox_native.Index()
        native_index.entries = list(map(IndexEntry.to_native, self.entries))
        return native_index

    def max_decompressed_file_size(self) -> int:
        """
        :return: the maximum decompressed file size in the entries
        """
        return max([fi.decompressed_file_size_bytes for fi in self.entries]) if len(self.entries) > 0 else np.nan

    def get_decompressed_file_size(self) -> int:
        """
        :return: the decompressed size of the first file in the list of entries
        """
        if len(self.entries) == 0:
            return np.nan
        if self.entries[0].decompressed_file_size_bytes == 0 and os.path.exists(self.entries[0].full_path):
            with lz4.frame.open(self.entries[0].full_path, 'rb') as fr:
                return len(fr.read())
        return self.entries[0].decompressed_file_size_bytes

    def _set_decompressed_file_size(self) -> "Index":
        """
        updates the decompressed size of all entries if the maximum decompressed size is 0, otherwise makes no changes

        :return: updated self
        """
        if self.max_decompressed_file_size() == 0:
            new_size = self.get_decompressed_file_size()
            for ie in self.entries:
                ie.decompressed_file_size_bytes = new_size
        return self

    def sort(self) -> None:
        """
        Sorts the entries stored in this index.
        """
        self.entries = sorted(
            self.entries,
            key=lambda entry: (entry.api_version, entry.station_id, entry.date_time),
        )

    def append(self, entries: Iterator[IndexEntry]) -> None:
        """
        Appends new entries to this index.

        :param entries: Entries to append.
        """
        self.entries.extend(entries)
        self._set_decompressed_file_size()

    def summarize(self) -> IndexSummary:
        """
        :return: A summary of the contents of this index.
        """
        return IndexSummary.from_index(self)

    def get_index_for_station_id(self, station_id: str) -> "Index":
        """
        :param station_id: id to get entries for
        :return: Index containing only the entries for the station requested
        """
        return Index([en for en in self.entries if en.station_id == station_id])

    def stream_raw(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> Iterator[Union["RedvoxPacket", RedvoxPacketM]]:
        """
        Read, decompress, deserialize, and then stream RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be streamed.
        :return: An iterator over RedvoxPacket and RedvoxPacketM instances.
        """
        filtered: Iterator[IndexEntry] = filter(read_filter.apply, self.entries)
        # noinspection Mypy
        return map(IndexEntry.read_raw, filtered)

    def stream(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> Iterator[Union["WrappedRedvoxPacket", WrappedRedvoxPacketM]]:
        """
        Read, decompress, deserialize, wrap, and then stream RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be streamed.
        :return: An iterator over WrappedRedvoxPacket and WrappedRedvoxPacketM instances.
        """
        filtered: Iterator[IndexEntry] = filter(read_filter.apply, self.entries)
        # noinspection Mypy
        return map(IndexEntry.read, filtered)

    def read_raw(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> List[Union["RedvoxPacket", RedvoxPacketM]]:
        """
        Read, decompress, and deserialize RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be read.
        :return: An list of RedvoxPacket and RedvoxPacketM instances.
        """
        return list(self.stream_raw(read_filter))

    def read(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> List[Union["WrappedRedvoxPacket", WrappedRedvoxPacketM]]:
        """
        Read, decompress, deserialize, and wrap RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be read.
        :return: An list of WrappedRedvoxPacket and WrappedRedvoxPacketM instances.
        """
        return list(self.stream(read_filter))

    def files_size(self) -> float:
        """
        :return: sum of file size in bytes of index
        """
        return float(np.sum([entry.decompressed_file_size_bytes for entry in self.entries]))

    def read_contents(self) -> List[RedvoxPacketM]:
        """
        read all the files in the index

        :return: list of RedvoxPacketM, converted from API 900 if necessary
        """
        result: List[RedvoxPacketM] = []

        # Iterate over the API 900 packets in a memory efficient way
        # and convert to API 1000
        # noinspection PyTypeChecker
        for packet_900 in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_900})
        ):
            # noinspection Mypy
            result.append(
                ac.convert_api_900_to_1000_raw(packet_900)
            )

        # Grab the API 1000 packets
        # noinspection PyTypeChecker
        for packet in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_1000})
        ):
            # noinspection Mypy
            result.append(packet)

        return result

    def read_first_packet(self) -> Optional[RedvoxPacketM]:
        """
        read the first packet of the index

        :return: single RedvoxPacketM, converted from API 900 if necessary or None if no packet to read
        """
        # Grab the API 1000 packets
        # noinspection PyTypeChecker
        for packet in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_1000})
        ):
            # noinspection Mypy
            return packet

        # Iterate over the API 900 packets in a memory efficient way
        # and convert to API 1000
        # noinspection PyTypeChecker
        for packet_900 in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_900})
        ):
            # noinspection Mypy
            return ac.convert_api_900_to_1000_raw(packet_900)

        return None


# The following constants are used for identifying valid RedVox API 900 and API 1000 structured directory layouts.
__VALID_YEARS: Set[str] = {f"{i:04}" for i in range(2015, 2031)}
__VALID_MONTHS: Set[str] = {f"{i:02}" for i in range(1, 13)}
__VALID_DATES: Set[str] = {f"{i:02}" for i in range(1, 32)}
__VALID_HOURS: Set[str] = {f"{i:02}" for i in range(0, 24)}


def _list_subdirs(base_dir: str, valid_choices: Set[str]) -> Iterator[str]:
    """
    Lists sub-directors in a given base directory that match the provided choices.

    :param base_dir: Base dir to find sub dirs in.
    :param valid_choices: A list of valid directory names.
    :return: A list of valid subdirs.
    """
    subdirs: Iterator[str] = map(
        lambda p: PurePath(p).name, glob(os.path.join(base_dir, "*", ""))
    )
    return filter(valid_choices.__contains__, subdirs)


# These fields are set at runtime and provide the implementation (either native or pure python) for IO methods
__INDEX_STRUCTURED_FN: Callable[
    [str, ReadFilter, Optional[multiprocessing.pool.Pool]], Index
]
__INDEX_STRUCTURED_900_FN: Callable[
    [str, ReadFilter, bool, Optional[multiprocessing.pool.Pool]], Index
]
__INDEX_STRUCTURED_1000_FN: Callable[
    [str, ReadFilter, bool, Optional[multiprocessing.pool.Pool]], Index
]
__INDEX_UNSTRUCTURED_FN: Callable[
    [str, ReadFilter, bool, Optional[multiprocessing.pool.Pool]], Index
]


def __map_opt(fn, v):
    """
    Maps the provided function on the value if v is not None, otherwise, returns None.

    :param fn: The mapping function.
    :param v: The optional value to map.
    :return: The optional mapped value.
    """
    if v is None:
        return None
    return fn(v)


def __dur2us(dur: timedelta) -> float:
    """
    Converts a timedelta into microseconds.

    :param dur: timedelta to convert
    :return: Number of microseconds in the time delta.
    """
    return dur.total_seconds() * 1_000_000.0


def __api_native(apis_py: Set[ApiVersion]) -> Set[str]:
    """
    Convert python ApiVersions into native ApiVersions.

    :param apis_py: Python API versions.
    :return: Native API versions.
    """
    r: Set[str] = set()
    for api_py in apis_py:
        if api_py == ApiVersion.API_900:
            r.add("Api900")
            continue
        if api_py == ApiVersion.API_1000:
            r.add("Api1000")

    return r


def index_unstructured_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Returns the list of file paths that match the given filter for unstructured data.

    :param base_dir: Directory containing unstructured data.
    :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows.
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: An iterator of valid paths.
    """
    check_type(base_dir, [str])
    check_type(read_filter, [ReadFilter])

    index: Index = Index()

    extensions: Set[str] = (
        read_filter.extensions if read_filter.extensions is not None else {""}
    )

    all_paths: List[str] = []

    extension: str
    for extension in extensions:
        pattern: str = str(PurePath(base_dir).joinpath(f"*{extension}"))
        paths: List[str] = glob(os.path.join(base_dir, pattern))
        all_paths.extend(paths)

    all_entries: Iterator[Optional[IndexEntry]] = maybe_parallel_map(
        pool,
        IndexEntry.from_path,
        iter(all_paths),
        lambda: len(all_paths) > 128,
        chunk_size=64,
    )

    # if len(all_paths) > 128:
    #     _pool: multiprocessing.pool.Pool = (
    #         multiprocessing.Pool() if pool is None else pool
    #     )
    #     all_entries = _pool.imap(IndexEntry.from_path, iter(all_paths))
    #     if pool is None:
    #         _pool.close()
    # else:
    #     all_entries = map(IndexEntry.from_path, all_paths)

    entries: Iterator[IndexEntry] = filter(
        read_filter.apply, filter(_not_none, all_entries)
    )

    index.append(entries)

    if sort:
        index.sort()

    return index


def index_structured_api_900_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API 900 directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api900)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    index: Index = Index()

    _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool

    for year in _list_subdirs(base_dir, __VALID_YEARS):
        for month in _list_subdirs(os.path.join(base_dir, year), __VALID_MONTHS):
            for day in _list_subdirs(
                os.path.join(base_dir, year, month), __VALID_DATES
            ):
                # Before scanning for *.rdvxz files, let's see if the current year, month, day, are in the
                # filter's range. If not, we can short circuit and skip getting the *.rdvxz files.
                if not read_filter.apply_dt(
                    datetime(int(year), int(month), int(day)), dt_fn=truncate_dt_ymd
                ):
                    continue

                data_dir: str = os.path.join(base_dir, year, month, day)
                entries: Iterator[IndexEntry] = iter(
                    index_unstructured_py(
                        data_dir, read_filter, sort=False, pool=_pool
                    ).entries
                )
                index.append(entries)

    if pool is None:
        _pool.close()

    if sort:
        index.sort()
    return index


def index_structured_api_1000_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API M directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api1000)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    index: Index = Index()

    _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool

    for year in _list_subdirs(base_dir, __VALID_YEARS):
        for month in _list_subdirs(os.path.join(base_dir, year), __VALID_MONTHS):
            for day in _list_subdirs(
                os.path.join(base_dir, year, month), __VALID_DATES
            ):
                for hour in _list_subdirs(
                    os.path.join(base_dir, year, month, day), __VALID_HOURS
                ):
                    # Before scanning for *.rdvxm files, let's see if the current year, month, day, hour are in the
                    # filter's range. If not, we can short circuit and skip getting the *.rdvxm files.
                    if not read_filter.apply_dt(
                        datetime(int(year), int(month), int(day), int(hour)),
                        dt_fn=truncate_dt_ymdh,
                    ):
                        continue

                    data_dir: str = os.path.join(base_dir, year, month, day, hour)
                    entries: Iterator[IndexEntry] = iter(
                        index_unstructured_py(
                            data_dir, read_filter, sort=False, pool=_pool
                        ).entries
                    )
                    index.append(entries)

    if pool is None:
        _pool.close()

    if sort:
        index.sort()
    return index


def index_structured_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Indexes both API 900 and API 1000 structured directory layouts.

    :param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of
                     API 900 and API 1000.
    :param read_filter: Filter to further filter results.
    :param pool: Pool for multiprocessing
    :return: An Index of RedVox files.
    """
    base_path: PurePath = PurePath(base_dir)

    _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool

    # API 900
    if base_path.name == "api900":
        return index_structured_api_900_py(base_dir, read_filter, pool=_pool)
    # API 1000
    elif base_path.name == "api1000":
        return index_structured_api_1000_py(base_dir, read_filter, pool=_pool)
    # Maybe parent to one or both?
    else:
        index: Index = Index()
        subdirs: List[str] = list(_list_subdirs(base_dir, {"api900", "api1000"}))
        if "api900" in subdirs:
            index.append(
                iter(
                    index_structured_api_900_py(
                        str(base_path.joinpath("api900")),
                        read_filter,
                        sort=False,
                        pool=_pool,
                    ).entries
                )
            )

        if "api1000" in subdirs:
            index.append(
                iter(
                    index_structured_api_1000_py(
                        str(base_path.joinpath("api1000")),
                        read_filter,
                        sort=False,
                        pool=_pool,
                    ).entries
                )
            )

        if pool is None:
            _pool.close()

        index.sort()
        return index


# Here we try to import the redvox_native module which provides natively compiled io functions.
# This dynamically sets which functions are called at runtime. Either the native version (if found)
# or the pure Python version.
try:
    # noinspection PyUnresolvedReferences
    import redvox_native

    def __into_read_filter_native(read_filter: ReadFilter):
        """
        Converts a python read filter into a native read filter.

        :param read_filter: Python read filter to convert.
        :return: A native read filter.
        """
        read_filter_native = redvox_native.ReadFilter()
        read_filter_native.start_dt = __map_opt(us_dt, read_filter.start_dt)
        read_filter_native.end_dt = __map_opt(us_dt, read_filter.end_dt)
        read_filter_native.start_dt_buf = __map_opt(__dur2us, read_filter.start_dt_buf)
        read_filter_native.end_dt_buf = __map_opt(__dur2us, read_filter.end_dt_buf)
        read_filter_native.station_ids = read_filter.station_ids
        read_filter_native.extensions = read_filter.extensions
        read_filter_native.api_versions = __map_opt(
            __api_native, read_filter.api_versions
        )

        return read_filter_native

    def __index_structured_900_native(
        base_dir: str,
        read_filter: ReadFilter,
        sort: bool,
        pool: Optional[multiprocessing.pool.Pool],
    ) -> Index:
        """
        This parses a structured API 900 directory structure and identifies files that match the provided filter.

        :param base_dir: Base directory (should be named api900)
        :param read_filter: Filter to filter files with
        :param sort: When True, the resulting Index will be sorted before being returned (default=True).
        :param pool: Pool for multiprocessing (not used in native)
        :return: A list of wrapped packets on an empty list if none match the filter or none are found
        """
        read_filter = __into_read_filter_native(read_filter)
        return Index.from_native(
            redvox_native.index_structured_900(base_dir, read_filter, sort)
        )

    def __index_structured_1000_native(
        base_dir: str,
        read_filter: ReadFilter,
        sort: bool,
        pool: Optional[multiprocessing.pool.Pool],
    ) -> Index:
        """
        This parses a structured API M directory structure and identifies files that match the provided filter.

        :param base_dir: Base directory (should be named api1000)
        :param read_filter: Filter to filter files with
        :param sort: When True, the resulting Index will be sorted before being returned (default=True).
        :param pool: Pool for multiprocessing (not used in native)
        :return: A list of wrapped packets on an empty list if none match the filter or none are found
        """
        read_filter = __into_read_filter_native(read_filter)
        return Index.from_native(
            redvox_native.index_structured_1000(base_dir, read_filter, sort)
        )

    def __index_structured_native(
        base_dir: str,
        read_filter: ReadFilter,
        pool: Optional[multiprocessing.pool.Pool],
    ) -> Index:
        """
        Indexes both API 900 and API 1000 structured directory layouts.

        :param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of
                         API 900 and API 1000.
        :param read_filter: Filter to further filter results.
        :param pool: Pool for multiprocessing (not used in native)
        :return: An Index of RedVox files.
        """
        read_filter = __into_read_filter_native(read_filter)
        return Index.from_native(redvox_native.index_structured(base_dir, read_filter))

    def __index_unstructured_native(
        base_dir: str,
        read_filter: ReadFilter,
        sort: bool,
        pool: Optional[multiprocessing.pool.Pool],
    ) -> Index:
        """
        Returns the list of file paths that match the given filter for unstructured data.

        :param base_dir: Directory containing unstructured data.
        :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows.
        :param sort: When True, the resulting Index will be sorted before being returned (default=True).
        :param pool: Pool for multiprocessing (not used in native implementation)
        :return: An iterator of valid paths.
        """
        read_filter = __into_read_filter_native(read_filter)
        return Index.from_native(
            redvox_native.index_unstructured(base_dir, read_filter, sort)
        )

    __INDEX_STRUCTURED_FN = __index_structured_native
    __INDEX_STRUCTURED_900_FN = __index_structured_900_native
    __INDEX_STRUCTURED_1000_FN = __index_structured_1000_native
    __INDEX_UNSTRUCTURED_FN = __index_unstructured_native
except ImportError:
    __INDEX_STRUCTURED_900_FN = index_structured_api_900_py
    __INDEX_STRUCTURED_1000_FN = index_structured_api_1000_py
    __INDEX_STRUCTURED_FN = index_structured_py
    __INDEX_UNSTRUCTURED_FN = index_unstructured_py


def index_unstructured(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Returns the list of file paths that match the given filter for unstructured data.

    :param base_dir: Directory containing unstructured data.
    :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows.
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: An iterator of valid paths.
    """
    return __INDEX_UNSTRUCTURED_FN(base_dir, read_filter, sort, pool)


def index_structured_api_900(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API 900 directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api900)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    return __INDEX_STRUCTURED_900_FN(base_dir, read_filter, sort, pool)


def index_structured_api_1000(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API M directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api1000)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    return __INDEX_STRUCTURED_1000_FN(base_dir, read_filter, sort, pool)


def index_structured(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Indexes both API 900 and API 1000 structured directory layouts.

    :param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of
                     API 900 and API 1000.
    :param read_filter: Filter to further filter results.
    :param pool: Pool for multiprocessing
    :return: An Index of RedVox files.
    """
    return __INDEX_STRUCTURED_FN(base_dir, read_filter, pool)


def sort_unstructured_redvox_data(
    input_dir: str,
    output_dir: Optional[str] = None,
    read_filter: ReadFilter = ReadFilter(),
    copy: bool = True,
) -> bool:
    """
    takes all redvox files in input_dir and sorts them into appropriate sub-directories

    :param input_dir: directory containing all the files to sort
    :param output_dir: optional directory to put the results in; if this is None, uses the input_dir, default None.
    :param read_filter: optional ReadFilter to limit which files to sort, default empty filter (sort everything)
    :param copy: optional value that when set ensures the file contents are copied into the new structure. When this
                 is set to False, the files will instead by moved.

    :return: True if success, False if failure
    """
    if output_dir is None:
        output_dir = input_dir
    check_type(input_dir, [str])
    check_type(output_dir, [str])
    check_type(read_filter, [ReadFilter])

    if not os.path.exists(input_dir):
        print(
            f"Directory with files to sort: {input_dir} does not exist.  Stopping program."
        )
        return False

    if not os.path.exists(output_dir):
        print(
            f"Base directory for creation: {output_dir} does not exist.  Please create it.  Stopping program."
        )
        return False

    index: Index = Index()
    extension: str
    for extension in read_filter.extensions:
        pattern: str = str(PurePath(input_dir).joinpath(f"*{extension}"))
        paths: List[str] = glob(os.path.join(input_dir, pattern))
        entries: Iterator[IndexEntry] = filter(
            read_filter.apply, filter(_not_none, map(IndexEntry.from_path, paths))
        )
        index.append(entries)

    if len(index.entries) < 1:
        print(
            f"Directory with files to sort: {input_dir} does not contain Redvox data to read.  Stopping program."
        )
        return False

    for value in index.entries:
        api_version = value.api_version
        if api_version == ApiVersion.API_1000:
            file_out_dir = str(
                PurePath(output_dir).joinpath(
                    "api1000",
                    f"{value.date_time.year:04}",
                    f"{value.date_time.month:02}",
                    f"{value.date_time.day:02}",
                    f"{value.date_time.hour:02}",
                )
            )
        elif api_version == ApiVersion.API_900:
            file_out_dir = str(
                PurePath(output_dir).joinpath(
                    "api900",
                    f"{value.date_time.year:04}",
                    f"{value.date_time.month:02}",
                    f"{value.date_time.day:02}",
                )
            )
        else:
            print(
                f"Unknown API version {api_version} found in data.  Stopping program."
            )
            return False
        os.makedirs(file_out_dir, exist_ok=True)

        if copy:
            copy2(value.full_path, file_out_dir)
        else:
            move(value.full_path, file_out_dir)

    return True

Functions

def dict_to_json(dct: dict) ‑> str

:param dct: dictionary to convert to json :return: dictionary as json string

Expand source code
def dict_to_json(dct: dict) -> str:
    """
    :param dct: dictionary to convert to json
    :return: dictionary as json string
    """
    return json.dumps(dct)
def get_json_file(file_dir: str) ‑> Optional[str]

Finds the first json file in the file_dir specified or None if there is no file

:param file_dir: directory to find json file in :return: full name of first json file in the directory or None if no files found

Expand source code
def get_json_file(file_dir: str) -> Optional[str]:
    """
    Finds the first json file in the file_dir specified or None if there is no file

    :param file_dir: directory to find json file in
    :return: full name of first json file in the directory or None if no files found
    """
    file_names = glob(os.path.join(file_dir, "*.json"))
    if len(file_names) < 1:
        return None
    return Path(file_names[0]).name
def index_structured(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

Indexes both API 900 and API 1000 structured directory layouts.

:param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of API 900 and API 1000. :param read_filter: Filter to further filter results. :param pool: Pool for multiprocessing :return: An Index of RedVox files.

Expand source code
def index_structured(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Indexes both API 900 and API 1000 structured directory layouts.

    :param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of
                     API 900 and API 1000.
    :param read_filter: Filter to further filter results.
    :param pool: Pool for multiprocessing
    :return: An Index of RedVox files.
    """
    return __INDEX_STRUCTURED_FN(base_dir, read_filter, pool)
def index_structured_api_1000(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), sort: bool = True, pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

This parses a structured API M directory structure and identifies files that match the provided filter.

:param base_dir: Base directory (should be named api1000) :param read_filter: Filter to filter files with :param sort: When True, the resulting Index will be sorted before being returned (default=True). :param pool: Pool for multiprocessing :return: A list of wrapped packets on an empty list if none match the filter or none are found

Expand source code
def index_structured_api_1000(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API M directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api1000)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    return __INDEX_STRUCTURED_1000_FN(base_dir, read_filter, sort, pool)
def index_structured_api_1000_py(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), sort: bool = True, pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

This parses a structured API M directory structure and identifies files that match the provided filter.

:param base_dir: Base directory (should be named api1000) :param read_filter: Filter to filter files with :param sort: When True, the resulting Index will be sorted before being returned (default=True). :param pool: Pool for multiprocessing :return: A list of wrapped packets on an empty list if none match the filter or none are found

Expand source code
def index_structured_api_1000_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API M directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api1000)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    index: Index = Index()

    _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool

    for year in _list_subdirs(base_dir, __VALID_YEARS):
        for month in _list_subdirs(os.path.join(base_dir, year), __VALID_MONTHS):
            for day in _list_subdirs(
                os.path.join(base_dir, year, month), __VALID_DATES
            ):
                for hour in _list_subdirs(
                    os.path.join(base_dir, year, month, day), __VALID_HOURS
                ):
                    # Before scanning for *.rdvxm files, let's see if the current year, month, day, hour are in the
                    # filter's range. If not, we can short circuit and skip getting the *.rdvxm files.
                    if not read_filter.apply_dt(
                        datetime(int(year), int(month), int(day), int(hour)),
                        dt_fn=truncate_dt_ymdh,
                    ):
                        continue

                    data_dir: str = os.path.join(base_dir, year, month, day, hour)
                    entries: Iterator[IndexEntry] = iter(
                        index_unstructured_py(
                            data_dir, read_filter, sort=False, pool=_pool
                        ).entries
                    )
                    index.append(entries)

    if pool is None:
        _pool.close()

    if sort:
        index.sort()
    return index
def index_structured_api_900(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), sort: bool = True, pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

This parses a structured API 900 directory structure and identifies files that match the provided filter.

:param base_dir: Base directory (should be named api900) :param read_filter: Filter to filter files with :param sort: When True, the resulting Index will be sorted before being returned (default=True). :param pool: Pool for multiprocessing :return: A list of wrapped packets on an empty list if none match the filter or none are found

Expand source code
def index_structured_api_900(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API 900 directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api900)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    return __INDEX_STRUCTURED_900_FN(base_dir, read_filter, sort, pool)
def index_structured_api_900_py(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), sort: bool = True, pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

This parses a structured API 900 directory structure and identifies files that match the provided filter.

:param base_dir: Base directory (should be named api900) :param read_filter: Filter to filter files with :param sort: When True, the resulting Index will be sorted before being returned (default=True). :param pool: Pool for multiprocessing :return: A list of wrapped packets on an empty list if none match the filter or none are found

Expand source code
def index_structured_api_900_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    This parses a structured API 900 directory structure and identifies files that match the provided filter.

    :param base_dir: Base directory (should be named api900)
    :param read_filter: Filter to filter files with
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: A list of wrapped packets on an empty list if none match the filter or none are found
    """
    index: Index = Index()

    _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool

    for year in _list_subdirs(base_dir, __VALID_YEARS):
        for month in _list_subdirs(os.path.join(base_dir, year), __VALID_MONTHS):
            for day in _list_subdirs(
                os.path.join(base_dir, year, month), __VALID_DATES
            ):
                # Before scanning for *.rdvxz files, let's see if the current year, month, day, are in the
                # filter's range. If not, we can short circuit and skip getting the *.rdvxz files.
                if not read_filter.apply_dt(
                    datetime(int(year), int(month), int(day)), dt_fn=truncate_dt_ymd
                ):
                    continue

                data_dir: str = os.path.join(base_dir, year, month, day)
                entries: Iterator[IndexEntry] = iter(
                    index_unstructured_py(
                        data_dir, read_filter, sort=False, pool=_pool
                    ).entries
                )
                index.append(entries)

    if pool is None:
        _pool.close()

    if sort:
        index.sort()
    return index
def index_structured_py(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

Indexes both API 900 and API 1000 structured directory layouts.

:param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of API 900 and API 1000. :param read_filter: Filter to further filter results. :param pool: Pool for multiprocessing :return: An Index of RedVox files.

Expand source code
def index_structured_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Indexes both API 900 and API 1000 structured directory layouts.

    :param base_dir: The base_dir may either end with api900, api1000, or be a parent directory to one or both of
                     API 900 and API 1000.
    :param read_filter: Filter to further filter results.
    :param pool: Pool for multiprocessing
    :return: An Index of RedVox files.
    """
    base_path: PurePath = PurePath(base_dir)

    _pool: multiprocessing.pool.Pool = multiprocessing.Pool() if pool is None else pool

    # API 900
    if base_path.name == "api900":
        return index_structured_api_900_py(base_dir, read_filter, pool=_pool)
    # API 1000
    elif base_path.name == "api1000":
        return index_structured_api_1000_py(base_dir, read_filter, pool=_pool)
    # Maybe parent to one or both?
    else:
        index: Index = Index()
        subdirs: List[str] = list(_list_subdirs(base_dir, {"api900", "api1000"}))
        if "api900" in subdirs:
            index.append(
                iter(
                    index_structured_api_900_py(
                        str(base_path.joinpath("api900")),
                        read_filter,
                        sort=False,
                        pool=_pool,
                    ).entries
                )
            )

        if "api1000" in subdirs:
            index.append(
                iter(
                    index_structured_api_1000_py(
                        str(base_path.joinpath("api1000")),
                        read_filter,
                        sort=False,
                        pool=_pool,
                    ).entries
                )
            )

        if pool is None:
            _pool.close()

        index.sort()
        return index
def index_unstructured(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), sort: bool = True, pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

Returns the list of file paths that match the given filter for unstructured data.

:param base_dir: Directory containing unstructured data. :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows. :param sort: When True, the resulting Index will be sorted before being returned (default=True). :param pool: Pool for multiprocessing :return: An iterator of valid paths.

Expand source code
def index_unstructured(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Returns the list of file paths that match the given filter for unstructured data.

    :param base_dir: Directory containing unstructured data.
    :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows.
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: An iterator of valid paths.
    """
    return __INDEX_UNSTRUCTURED_FN(base_dir, read_filter, sort, pool)
def index_unstructured_py(base_dir: str, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), sort: bool = True, pool: Optional[multiprocessing.pool.Pool] = None) ‑> Index

Returns the list of file paths that match the given filter for unstructured data.

:param base_dir: Directory containing unstructured data. :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows. :param sort: When True, the resulting Index will be sorted before being returned (default=True). :param pool: Pool for multiprocessing :return: An iterator of valid paths.

Expand source code
def index_unstructured_py(
    base_dir: str,
    read_filter: ReadFilter = ReadFilter(),
    sort: bool = True,
    pool: Optional[multiprocessing.pool.Pool] = None,
) -> Index:
    """
    Returns the list of file paths that match the given filter for unstructured data.

    :param base_dir: Directory containing unstructured data.
    :param read_filter: An (optional) ReadFilter for specifying station IDs and time windows.
    :param sort: When True, the resulting Index will be sorted before being returned (default=True).
    :param pool: Pool for multiprocessing
    :return: An iterator of valid paths.
    """
    check_type(base_dir, [str])
    check_type(read_filter, [ReadFilter])

    index: Index = Index()

    extensions: Set[str] = (
        read_filter.extensions if read_filter.extensions is not None else {""}
    )

    all_paths: List[str] = []

    extension: str
    for extension in extensions:
        pattern: str = str(PurePath(base_dir).joinpath(f"*{extension}"))
        paths: List[str] = glob(os.path.join(base_dir, pattern))
        all_paths.extend(paths)

    all_entries: Iterator[Optional[IndexEntry]] = maybe_parallel_map(
        pool,
        IndexEntry.from_path,
        iter(all_paths),
        lambda: len(all_paths) > 128,
        chunk_size=64,
    )

    # if len(all_paths) > 128:
    #     _pool: multiprocessing.pool.Pool = (
    #         multiprocessing.Pool() if pool is None else pool
    #     )
    #     all_entries = _pool.imap(IndexEntry.from_path, iter(all_paths))
    #     if pool is None:
    #         _pool.close()
    # else:
    #     all_entries = map(IndexEntry.from_path, all_paths)

    entries: Iterator[IndexEntry] = filter(
        read_filter.apply, filter(_not_none, all_entries)
    )

    index.append(entries)

    if sort:
        index.sort()

    return index
def json_file_to_dict(file_path: str) ‑> Dict

:param file_path: full path of file to load data from. :return: json file as python dictionary

Expand source code
def json_file_to_dict(file_path: str) -> Dict:
    """
    :param file_path: full path of file to load data from.
    :return: json file as python dictionary
    """
    with open(file_path, "r") as f_p:
        return json_to_dict(f_p.read())
def json_to_dict(json_str: str) ‑> Dict

:param json_str: string of json to convert to dictionary :return: json string as a dictionary

Expand source code
def json_to_dict(json_str: str) -> Dict:
    """
    :param json_str: string of json to convert to dictionary
    :return: json string as a dictionary
    """
    return json.loads(json_str)
def remove_dir_contents(dir_path: pathlib.Path)

removes all contents of the directory specified by dir_path

:param dir_path: path to directory to remove files from

Expand source code
def remove_dir_contents(dir_path: Path):
    """
    removes all contents of the directory specified by dir_path

    :param dir_path: path to directory to remove files from
    """
    if dir_path.is_dir():
        for entry in os.listdir(dir_path):
            rmv_path = os.path.join(dir_path, entry)
            if os.path.isdir(rmv_path):
                rmtree(rmv_path)
            else:
                os.remove(rmv_path)
    else:
        print(f"{dir_path} is not a directory; cannot remove contents!")
def sort_unstructured_redvox_data(input_dir: str, output_dir: Optional[str] = None, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>}), copy: bool = True) ‑> bool

takes all redvox files in input_dir and sorts them into appropriate sub-directories

:param input_dir: directory containing all the files to sort :param output_dir: optional directory to put the results in; if this is None, uses the input_dir, default None. :param read_filter: optional ReadFilter to limit which files to sort, default empty filter (sort everything) :param copy: optional value that when set ensures the file contents are copied into the new structure. When this is set to False, the files will instead by moved.

:return: True if success, False if failure

Expand source code
def sort_unstructured_redvox_data(
    input_dir: str,
    output_dir: Optional[str] = None,
    read_filter: ReadFilter = ReadFilter(),
    copy: bool = True,
) -> bool:
    """
    takes all redvox files in input_dir and sorts them into appropriate sub-directories

    :param input_dir: directory containing all the files to sort
    :param output_dir: optional directory to put the results in; if this is None, uses the input_dir, default None.
    :param read_filter: optional ReadFilter to limit which files to sort, default empty filter (sort everything)
    :param copy: optional value that when set ensures the file contents are copied into the new structure. When this
                 is set to False, the files will instead by moved.

    :return: True if success, False if failure
    """
    if output_dir is None:
        output_dir = input_dir
    check_type(input_dir, [str])
    check_type(output_dir, [str])
    check_type(read_filter, [ReadFilter])

    if not os.path.exists(input_dir):
        print(
            f"Directory with files to sort: {input_dir} does not exist.  Stopping program."
        )
        return False

    if not os.path.exists(output_dir):
        print(
            f"Base directory for creation: {output_dir} does not exist.  Please create it.  Stopping program."
        )
        return False

    index: Index = Index()
    extension: str
    for extension in read_filter.extensions:
        pattern: str = str(PurePath(input_dir).joinpath(f"*{extension}"))
        paths: List[str] = glob(os.path.join(input_dir, pattern))
        entries: Iterator[IndexEntry] = filter(
            read_filter.apply, filter(_not_none, map(IndexEntry.from_path, paths))
        )
        index.append(entries)

    if len(index.entries) < 1:
        print(
            f"Directory with files to sort: {input_dir} does not contain Redvox data to read.  Stopping program."
        )
        return False

    for value in index.entries:
        api_version = value.api_version
        if api_version == ApiVersion.API_1000:
            file_out_dir = str(
                PurePath(output_dir).joinpath(
                    "api1000",
                    f"{value.date_time.year:04}",
                    f"{value.date_time.month:02}",
                    f"{value.date_time.day:02}",
                    f"{value.date_time.hour:02}",
                )
            )
        elif api_version == ApiVersion.API_900:
            file_out_dir = str(
                PurePath(output_dir).joinpath(
                    "api900",
                    f"{value.date_time.year:04}",
                    f"{value.date_time.month:02}",
                    f"{value.date_time.day:02}",
                )
            )
        else:
            print(
                f"Unknown API version {api_version} found in data.  Stopping program."
            )
            return False
        os.makedirs(file_out_dir, exist_ok=True)

        if copy:
            copy2(value.full_path, file_out_dir)
        else:
            move(value.full_path, file_out_dir)

    return True

Classes

class FileSystemSaveMode (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumeration of saving methodology

Expand source code
class FileSystemSaveMode(enum.Enum):
    """
    Enumeration of saving methodology
    """
    MEM = 0   # save using memory
    TEMP = 1  # save using temporary directory
    DISK = 2  # save using path on disk

    @staticmethod
    def get_save_mode(use_temp: bool, use_disk: bool) -> "FileSystemSaveMode":
        """
        :param use_temp: use temporary directory
        :param use_disk: use path on disk
        :return: the mode used to save (use_temp is evaluated before use_disk)
        """
        if use_temp:
            return FileSystemSaveMode.TEMP  # use_temp takes priority
        elif use_disk:
            return FileSystemSaveMode.DISK  # if here, use_temp is always false
        return FileSystemSaveMode.MEM

Ancestors

  • enum.Enum

Class variables

var DISK
var MEM
var TEMP

Static methods

def get_save_mode(use_temp: bool, use_disk: bool) ‑> FileSystemSaveMode

:param use_temp: use temporary directory :param use_disk: use path on disk :return: the mode used to save (use_temp is evaluated before use_disk)

Expand source code
@staticmethod
def get_save_mode(use_temp: bool, use_disk: bool) -> "FileSystemSaveMode":
    """
    :param use_temp: use temporary directory
    :param use_disk: use path on disk
    :return: the mode used to save (use_temp is evaluated before use_disk)
    """
    if use_temp:
        return FileSystemSaveMode.TEMP  # use_temp takes priority
    elif use_disk:
        return FileSystemSaveMode.DISK  # if here, use_temp is always false
    return FileSystemSaveMode.MEM
class FileSystemWriter (file_name: str, file_ext: str = 'none', base_dir: str = '.', save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM)

This class holds basic information about writing and reading objects from a file system If user does not enable saving to disk, we use a temporary directory to store large files

Properties

file_name: str, the name of the file (do not include extension)

file_ext: str, the extension used by the file (do not include the .) Default "NONE"

base_dir: str, the directory to save the file to. Default "." (current dir)

Protected

_save_mode: FileSystemSaveMode, determines how files get saved

_temp_dir: TemporaryDirectory, temporary directory for large files when not saving to disk

initialize FileSystemWriter

:param file_name: name of file :param file_ext: extension of file, default "none" :param base_dir: directory to save file to, default "." (current dir) :param save_mode: determines how to save files to system, default MEM (no save, use RAM)

Expand source code
class FileSystemWriter:
    """
    This class holds basic information about writing and reading objects from a file system
    If user does not enable saving to disk, we use a temporary directory to store large files

    Properties:
        file_name: str, the name of the file (do not include extension)

        file_ext: str, the extension used by the file (do not include the .)  Default "NONE"

        base_dir: str, the directory to save the file to.  Default "." (current dir)

    Protected:
        _save_mode: FileSystemSaveMode, determines how files get saved

        _temp_dir: TemporaryDirectory, temporary directory for large files when not saving to disk
    """

    def __init__(self, file_name: str, file_ext: str = "none", base_dir: str = ".",
                 save_mode: FileSystemSaveMode = FileSystemSaveMode.MEM):
        """
        initialize FileSystemWriter

        :param file_name: name of file
        :param file_ext: extension of file, default "none"
        :param base_dir: directory to save file to, default "." (current dir)
        :param save_mode: determines how to save files to system, default MEM (no save, use RAM)
        """
        self.file_name: str = file_name
        self.file_extension: str = file_ext.lower()
        self.base_dir: str = base_dir
        self._save_mode: FileSystemSaveMode = save_mode
        self._temp_dir = tempfile.TemporaryDirectory()

    def __repr__(self):
        return f"file_name: {self.file_name}, " \
               f"extension: {self.file_extension}, " \
               f"base_dir: {self.base_dir}, " \
               f"save_mode: {self._save_mode.value if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.value}"

    def __str__(self):
        return f"file_name: {self.file_name}, " \
               f"extension: {self.file_extension}, " \
               f"base_dir: {self.base_dir}, " \
               f"save_mode: {self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.name}"

    def __del__(self):
        """
        remove temp dir
        """
        self._temp_dir.cleanup()

    def is_use_temp(self) -> bool:
        """
        :return: if writing to temp dir
        """
        if hasattr(self, "_save_mode"):
            return self._save_mode == FileSystemSaveMode.TEMP
        return False

    def set_use_temp(self, use_temp: bool):
        """
        :param use_temp: if true, sets mode to use temp dir, otherwise no change
        """
        if use_temp:
            self._save_mode = FileSystemSaveMode.TEMP

    def get_temp(self) -> str:
        """
        :return: path of temp directory
        """
        return self._temp_dir.name

    def is_use_disk(self) -> bool:
        """
        :return: if writing to path on disk
        """
        if hasattr(self, "_save_mode"):
            return self._save_mode == FileSystemSaveMode.DISK
        return False

    def set_use_disk(self, use_disk: bool):
        """
        :param use_disk: if true, sets mode to use the disk, otherwise no change
        """
        if use_disk:
            self._save_mode = FileSystemSaveMode.DISK

    def is_use_mem(self) -> bool:
        """
        :return: if writing data to memory
        """
        if hasattr(self, "_save_mode"):
            return self._save_mode == FileSystemSaveMode.MEM
        return False

    def set_use_mem(self, use_mem: bool):
        """
        :param use_mem: if true, sets mode to use the system's RAM, otherwise no change
        """
        if use_mem:
            self._save_mode = FileSystemSaveMode.MEM

    def is_save_disk(self) -> bool:
        """
        :return: if writing data to disk (temp dir or user defined path) instead of using memory
        """
        if hasattr(self, '_save_mode'):
            return self._save_mode != FileSystemSaveMode.MEM
        return False

    def save_dir(self) -> str:
        """
        :return: directory where file would be saved based on save mode; returns empty string if saving to memory
        """
        if self.is_use_disk():
            return self.base_dir
        elif self.is_use_temp():
            return self._temp_dir.name
        return ""

    def set_save_mode(self, save_mode: FileSystemSaveMode):
        """
        set the save mode

        :param save_mode: updated save mode
        """
        self._save_mode = save_mode

    def save_mode(self) -> FileSystemSaveMode:
        """
        :return: the save mode
        """
        return self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP

    def full_name(self) -> str:
        """
        :return: file name with extension
        """
        return f"{self.file_name}.{self.file_extension}"

    def full_path(self) -> str:
        """
        :return: the full path to where the file would be written
        """
        return os.path.join(self.save_dir(), self.full_name())

    def set_name_and_extension(self, name: str, ext: str):
        """
        set the name and extension of the output file.  Do not include the . for the extension
        :param name: file name
        :param ext: file extension
        """
        self.file_name = name
        self.file_extension = ext

    def set_name(self, name: str):
        """
        set the name of the output file.
        :param name: file name
        """
        self.file_name = name

    def set_extension(self, ext: str):
        """
        set the extension of the output file.  Do not include the . for the extension
        :param ext: file extension
        """
        self.file_extension = ext

    def json_file_name(self) -> str:
        """
        :return: file name with .json extension
        """
        return f"{self.file_name}.json"

    def json_path(self) -> Path:
        """
        :return: full path to json file
        """
        return Path(self.save_dir()).joinpath(self.json_file_name())

    def create_dir(self):
        """
        if saving to disk, remove the directory if it exists, then create an empty directory to save things into
        if saving to temp dir, remove any files in the temp dir before saving to dir
        """
        if self.is_use_disk():
            if os.path.exists(self.save_dir()):
                remove_dir_contents(Path(self.save_dir()))
            else:
                os.makedirs(self.save_dir())
        elif self.is_use_temp():
            self._temp_dir.cleanup()
            self._temp_dir = tempfile.TemporaryDirectory()

    def as_dict(self) -> dict:
        """
        :return: FileSystemWriter as dictionary
        """
        return {
            "file_name": self.file_name,
            "file_extension": self.file_extension,
            "base_dir": self.base_dir,
            "save_mode": self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.name
        }

    @staticmethod
    def from_dict(data_dict: Dict) -> "FileSystemWriter":
        """
        :param data_dict: dictionary to convert to FileSystemWriter
        :return: FileSystemWriter from dict
        """
        return FileSystemWriter(data_dict["file_name"], data_dict["file_extension"], data_dict["base_dir"],
                                FileSystemSaveMode[data_dict["save_mode"]])

Subclasses

Static methods

def from_dict(data_dict: Dict) ‑> FileSystemWriter

:param data_dict: dictionary to convert to FileSystemWriter :return: FileSystemWriter from dict

Expand source code
@staticmethod
def from_dict(data_dict: Dict) -> "FileSystemWriter":
    """
    :param data_dict: dictionary to convert to FileSystemWriter
    :return: FileSystemWriter from dict
    """
    return FileSystemWriter(data_dict["file_name"], data_dict["file_extension"], data_dict["base_dir"],
                            FileSystemSaveMode[data_dict["save_mode"]])

Methods

def as_dict(self) ‑> dict

:return: FileSystemWriter as dictionary

Expand source code
def as_dict(self) -> dict:
    """
    :return: FileSystemWriter as dictionary
    """
    return {
        "file_name": self.file_name,
        "file_extension": self.file_extension,
        "base_dir": self.base_dir,
        "save_mode": self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP.name
    }
def create_dir(self)

if saving to disk, remove the directory if it exists, then create an empty directory to save things into if saving to temp dir, remove any files in the temp dir before saving to dir

Expand source code
def create_dir(self):
    """
    if saving to disk, remove the directory if it exists, then create an empty directory to save things into
    if saving to temp dir, remove any files in the temp dir before saving to dir
    """
    if self.is_use_disk():
        if os.path.exists(self.save_dir()):
            remove_dir_contents(Path(self.save_dir()))
        else:
            os.makedirs(self.save_dir())
    elif self.is_use_temp():
        self._temp_dir.cleanup()
        self._temp_dir = tempfile.TemporaryDirectory()
def full_name(self) ‑> str

:return: file name with extension

Expand source code
def full_name(self) -> str:
    """
    :return: file name with extension
    """
    return f"{self.file_name}.{self.file_extension}"
def full_path(self) ‑> str

:return: the full path to where the file would be written

Expand source code
def full_path(self) -> str:
    """
    :return: the full path to where the file would be written
    """
    return os.path.join(self.save_dir(), self.full_name())
def get_temp(self) ‑> str

:return: path of temp directory

Expand source code
def get_temp(self) -> str:
    """
    :return: path of temp directory
    """
    return self._temp_dir.name
def is_save_disk(self) ‑> bool

:return: if writing data to disk (temp dir or user defined path) instead of using memory

Expand source code
def is_save_disk(self) -> bool:
    """
    :return: if writing data to disk (temp dir or user defined path) instead of using memory
    """
    if hasattr(self, '_save_mode'):
        return self._save_mode != FileSystemSaveMode.MEM
    return False
def is_use_disk(self) ‑> bool

:return: if writing to path on disk

Expand source code
def is_use_disk(self) -> bool:
    """
    :return: if writing to path on disk
    """
    if hasattr(self, "_save_mode"):
        return self._save_mode == FileSystemSaveMode.DISK
    return False
def is_use_mem(self) ‑> bool

:return: if writing data to memory

Expand source code
def is_use_mem(self) -> bool:
    """
    :return: if writing data to memory
    """
    if hasattr(self, "_save_mode"):
        return self._save_mode == FileSystemSaveMode.MEM
    return False
def is_use_temp(self) ‑> bool

:return: if writing to temp dir

Expand source code
def is_use_temp(self) -> bool:
    """
    :return: if writing to temp dir
    """
    if hasattr(self, "_save_mode"):
        return self._save_mode == FileSystemSaveMode.TEMP
    return False
def json_file_name(self) ‑> str

:return: file name with .json extension

Expand source code
def json_file_name(self) -> str:
    """
    :return: file name with .json extension
    """
    return f"{self.file_name}.json"
def json_path(self) ‑> pathlib.Path

:return: full path to json file

Expand source code
def json_path(self) -> Path:
    """
    :return: full path to json file
    """
    return Path(self.save_dir()).joinpath(self.json_file_name())
def save_dir(self) ‑> str

:return: directory where file would be saved based on save mode; returns empty string if saving to memory

Expand source code
def save_dir(self) -> str:
    """
    :return: directory where file would be saved based on save mode; returns empty string if saving to memory
    """
    if self.is_use_disk():
        return self.base_dir
    elif self.is_use_temp():
        return self._temp_dir.name
    return ""
def save_mode(self) ‑> FileSystemSaveMode

:return: the save mode

Expand source code
def save_mode(self) -> FileSystemSaveMode:
    """
    :return: the save mode
    """
    return self._save_mode.name if hasattr(self, '_save_mode') else FileSystemSaveMode.TEMP
def set_extension(self, ext: str)

set the extension of the output file. Do not include the . for the extension :param ext: file extension

Expand source code
def set_extension(self, ext: str):
    """
    set the extension of the output file.  Do not include the . for the extension
    :param ext: file extension
    """
    self.file_extension = ext
def set_name(self, name: str)

set the name of the output file. :param name: file name

Expand source code
def set_name(self, name: str):
    """
    set the name of the output file.
    :param name: file name
    """
    self.file_name = name
def set_name_and_extension(self, name: str, ext: str)

set the name and extension of the output file. Do not include the . for the extension :param name: file name :param ext: file extension

Expand source code
def set_name_and_extension(self, name: str, ext: str):
    """
    set the name and extension of the output file.  Do not include the . for the extension
    :param name: file name
    :param ext: file extension
    """
    self.file_name = name
    self.file_extension = ext
def set_save_mode(self, save_mode: FileSystemSaveMode)

set the save mode

:param save_mode: updated save mode

Expand source code
def set_save_mode(self, save_mode: FileSystemSaveMode):
    """
    set the save mode

    :param save_mode: updated save mode
    """
    self._save_mode = save_mode
def set_use_disk(self, use_disk: bool)

:param use_disk: if true, sets mode to use the disk, otherwise no change

Expand source code
def set_use_disk(self, use_disk: bool):
    """
    :param use_disk: if true, sets mode to use the disk, otherwise no change
    """
    if use_disk:
        self._save_mode = FileSystemSaveMode.DISK
def set_use_mem(self, use_mem: bool)

:param use_mem: if true, sets mode to use the system's RAM, otherwise no change

Expand source code
def set_use_mem(self, use_mem: bool):
    """
    :param use_mem: if true, sets mode to use the system's RAM, otherwise no change
    """
    if use_mem:
        self._save_mode = FileSystemSaveMode.MEM
def set_use_temp(self, use_temp: bool)

:param use_temp: if true, sets mode to use temp dir, otherwise no change

Expand source code
def set_use_temp(self, use_temp: bool):
    """
    :param use_temp: if true, sets mode to use temp dir, otherwise no change
    """
    if use_temp:
        self._save_mode = FileSystemSaveMode.TEMP
class Index (entries: List[IndexEntry] = <factory>)

An index of available RedVox files from the file system.

Expand source code
@dataclass
class Index:
    """
    An index of available RedVox files from the file system.
    """

    entries: List[IndexEntry] = field(default_factory=lambda: [])

    @staticmethod
    def from_native(index_native) -> "Index":
        """
        Converts a native index into a python index.

        :param index_native: A native index.
        :return: A Python index.
        """
        entries: List[IndexEntry] = list(
            map(IndexEntry.from_native, index_native.entries)
        )
        return Index(entries)._set_decompressed_file_size()

    def to_native(self):
        import redvox_native

        native_index = redvox_native.Index()
        native_index.entries = list(map(IndexEntry.to_native, self.entries))
        return native_index

    def max_decompressed_file_size(self) -> int:
        """
        :return: the maximum decompressed file size in the entries
        """
        return max([fi.decompressed_file_size_bytes for fi in self.entries]) if len(self.entries) > 0 else np.nan

    def get_decompressed_file_size(self) -> int:
        """
        :return: the decompressed size of the first file in the list of entries
        """
        if len(self.entries) == 0:
            return np.nan
        if self.entries[0].decompressed_file_size_bytes == 0 and os.path.exists(self.entries[0].full_path):
            with lz4.frame.open(self.entries[0].full_path, 'rb') as fr:
                return len(fr.read())
        return self.entries[0].decompressed_file_size_bytes

    def _set_decompressed_file_size(self) -> "Index":
        """
        updates the decompressed size of all entries if the maximum decompressed size is 0, otherwise makes no changes

        :return: updated self
        """
        if self.max_decompressed_file_size() == 0:
            new_size = self.get_decompressed_file_size()
            for ie in self.entries:
                ie.decompressed_file_size_bytes = new_size
        return self

    def sort(self) -> None:
        """
        Sorts the entries stored in this index.
        """
        self.entries = sorted(
            self.entries,
            key=lambda entry: (entry.api_version, entry.station_id, entry.date_time),
        )

    def append(self, entries: Iterator[IndexEntry]) -> None:
        """
        Appends new entries to this index.

        :param entries: Entries to append.
        """
        self.entries.extend(entries)
        self._set_decompressed_file_size()

    def summarize(self) -> IndexSummary:
        """
        :return: A summary of the contents of this index.
        """
        return IndexSummary.from_index(self)

    def get_index_for_station_id(self, station_id: str) -> "Index":
        """
        :param station_id: id to get entries for
        :return: Index containing only the entries for the station requested
        """
        return Index([en for en in self.entries if en.station_id == station_id])

    def stream_raw(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> Iterator[Union["RedvoxPacket", RedvoxPacketM]]:
        """
        Read, decompress, deserialize, and then stream RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be streamed.
        :return: An iterator over RedvoxPacket and RedvoxPacketM instances.
        """
        filtered: Iterator[IndexEntry] = filter(read_filter.apply, self.entries)
        # noinspection Mypy
        return map(IndexEntry.read_raw, filtered)

    def stream(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> Iterator[Union["WrappedRedvoxPacket", WrappedRedvoxPacketM]]:
        """
        Read, decompress, deserialize, wrap, and then stream RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be streamed.
        :return: An iterator over WrappedRedvoxPacket and WrappedRedvoxPacketM instances.
        """
        filtered: Iterator[IndexEntry] = filter(read_filter.apply, self.entries)
        # noinspection Mypy
        return map(IndexEntry.read, filtered)

    def read_raw(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> List[Union["RedvoxPacket", RedvoxPacketM]]:
        """
        Read, decompress, and deserialize RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be read.
        :return: An list of RedvoxPacket and RedvoxPacketM instances.
        """
        return list(self.stream_raw(read_filter))

    def read(
        self, read_filter: ReadFilter = ReadFilter()
    ) -> List[Union["WrappedRedvoxPacket", WrappedRedvoxPacketM]]:
        """
        Read, decompress, deserialize, and wrap RedVox data pointed to by this index.

        :param read_filter: Additional filtering to specify which data should be read.
        :return: An list of WrappedRedvoxPacket and WrappedRedvoxPacketM instances.
        """
        return list(self.stream(read_filter))

    def files_size(self) -> float:
        """
        :return: sum of file size in bytes of index
        """
        return float(np.sum([entry.decompressed_file_size_bytes for entry in self.entries]))

    def read_contents(self) -> List[RedvoxPacketM]:
        """
        read all the files in the index

        :return: list of RedvoxPacketM, converted from API 900 if necessary
        """
        result: List[RedvoxPacketM] = []

        # Iterate over the API 900 packets in a memory efficient way
        # and convert to API 1000
        # noinspection PyTypeChecker
        for packet_900 in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_900})
        ):
            # noinspection Mypy
            result.append(
                ac.convert_api_900_to_1000_raw(packet_900)
            )

        # Grab the API 1000 packets
        # noinspection PyTypeChecker
        for packet in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_1000})
        ):
            # noinspection Mypy
            result.append(packet)

        return result

    def read_first_packet(self) -> Optional[RedvoxPacketM]:
        """
        read the first packet of the index

        :return: single RedvoxPacketM, converted from API 900 if necessary or None if no packet to read
        """
        # Grab the API 1000 packets
        # noinspection PyTypeChecker
        for packet in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_1000})
        ):
            # noinspection Mypy
            return packet

        # Iterate over the API 900 packets in a memory efficient way
        # and convert to API 1000
        # noinspection PyTypeChecker
        for packet_900 in self.stream_raw(
                ReadFilter.empty().with_api_versions({ApiVersion.API_900})
        ):
            # noinspection Mypy
            return ac.convert_api_900_to_1000_raw(packet_900)

        return None

Class variables

var entries : List[IndexEntry]

Static methods

def from_native(index_native) ‑> Index

Converts a native index into a python index.

:param index_native: A native index. :return: A Python index.

Expand source code
@staticmethod
def from_native(index_native) -> "Index":
    """
    Converts a native index into a python index.

    :param index_native: A native index.
    :return: A Python index.
    """
    entries: List[IndexEntry] = list(
        map(IndexEntry.from_native, index_native.entries)
    )
    return Index(entries)._set_decompressed_file_size()

Methods

def append(self, entries: Iterator[IndexEntry]) ‑> None

Appends new entries to this index.

:param entries: Entries to append.

Expand source code
def append(self, entries: Iterator[IndexEntry]) -> None:
    """
    Appends new entries to this index.

    :param entries: Entries to append.
    """
    self.entries.extend(entries)
    self._set_decompressed_file_size()
def files_size(self) ‑> float

:return: sum of file size in bytes of index

Expand source code
def files_size(self) -> float:
    """
    :return: sum of file size in bytes of index
    """
    return float(np.sum([entry.decompressed_file_size_bytes for entry in self.entries]))
def get_decompressed_file_size(self) ‑> int

:return: the decompressed size of the first file in the list of entries

Expand source code
def get_decompressed_file_size(self) -> int:
    """
    :return: the decompressed size of the first file in the list of entries
    """
    if len(self.entries) == 0:
        return np.nan
    if self.entries[0].decompressed_file_size_bytes == 0 and os.path.exists(self.entries[0].full_path):
        with lz4.frame.open(self.entries[0].full_path, 'rb') as fr:
            return len(fr.read())
    return self.entries[0].decompressed_file_size_bytes
def get_index_for_station_id(self, station_id: str) ‑> Index

:param station_id: id to get entries for :return: Index containing only the entries for the station requested

Expand source code
def get_index_for_station_id(self, station_id: str) -> "Index":
    """
    :param station_id: id to get entries for
    :return: Index containing only the entries for the station requested
    """
    return Index([en for en in self.entries if en.station_id == station_id])
def max_decompressed_file_size(self) ‑> int

:return: the maximum decompressed file size in the entries

Expand source code
def max_decompressed_file_size(self) -> int:
    """
    :return: the maximum decompressed file size in the entries
    """
    return max([fi.decompressed_file_size_bytes for fi in self.entries]) if len(self.entries) > 0 else np.nan
def read(self, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>})) ‑> List[Union[WrappedRedvoxPacket, WrappedRedvoxPacketM]]

Read, decompress, deserialize, and wrap RedVox data pointed to by this index.

:param read_filter: Additional filtering to specify which data should be read. :return: An list of WrappedRedvoxPacket and WrappedRedvoxPacketM instances.

Expand source code
def read(
    self, read_filter: ReadFilter = ReadFilter()
) -> List[Union["WrappedRedvoxPacket", WrappedRedvoxPacketM]]:
    """
    Read, decompress, deserialize, and wrap RedVox data pointed to by this index.

    :param read_filter: Additional filtering to specify which data should be read.
    :return: An list of WrappedRedvoxPacket and WrappedRedvoxPacketM instances.
    """
    return list(self.stream(read_filter))
def read_contents(self) ‑> List[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM]

read all the files in the index

:return: list of RedvoxPacketM, converted from API 900 if necessary

Expand source code
def read_contents(self) -> List[RedvoxPacketM]:
    """
    read all the files in the index

    :return: list of RedvoxPacketM, converted from API 900 if necessary
    """
    result: List[RedvoxPacketM] = []

    # Iterate over the API 900 packets in a memory efficient way
    # and convert to API 1000
    # noinspection PyTypeChecker
    for packet_900 in self.stream_raw(
            ReadFilter.empty().with_api_versions({ApiVersion.API_900})
    ):
        # noinspection Mypy
        result.append(
            ac.convert_api_900_to_1000_raw(packet_900)
        )

    # Grab the API 1000 packets
    # noinspection PyTypeChecker
    for packet in self.stream_raw(
            ReadFilter.empty().with_api_versions({ApiVersion.API_1000})
    ):
        # noinspection Mypy
        result.append(packet)

    return result
def read_first_packet(self) ‑> Optional[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM]

read the first packet of the index

:return: single RedvoxPacketM, converted from API 900 if necessary or None if no packet to read

Expand source code
def read_first_packet(self) -> Optional[RedvoxPacketM]:
    """
    read the first packet of the index

    :return: single RedvoxPacketM, converted from API 900 if necessary or None if no packet to read
    """
    # Grab the API 1000 packets
    # noinspection PyTypeChecker
    for packet in self.stream_raw(
            ReadFilter.empty().with_api_versions({ApiVersion.API_1000})
    ):
        # noinspection Mypy
        return packet

    # Iterate over the API 900 packets in a memory efficient way
    # and convert to API 1000
    # noinspection PyTypeChecker
    for packet_900 in self.stream_raw(
            ReadFilter.empty().with_api_versions({ApiVersion.API_900})
    ):
        # noinspection Mypy
        return ac.convert_api_900_to_1000_raw(packet_900)

    return None
def read_raw(self, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>})) ‑> List[Union[RedvoxPacket, src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM]]

Read, decompress, and deserialize RedVox data pointed to by this index.

:param read_filter: Additional filtering to specify which data should be read. :return: An list of RedvoxPacket and RedvoxPacketM instances.

Expand source code
def read_raw(
    self, read_filter: ReadFilter = ReadFilter()
) -> List[Union["RedvoxPacket", RedvoxPacketM]]:
    """
    Read, decompress, and deserialize RedVox data pointed to by this index.

    :param read_filter: Additional filtering to specify which data should be read.
    :return: An list of RedvoxPacket and RedvoxPacketM instances.
    """
    return list(self.stream_raw(read_filter))
def sort(self) ‑> None

Sorts the entries stored in this index.

Expand source code
def sort(self) -> None:
    """
    Sorts the entries stored in this index.
    """
    self.entries = sorted(
        self.entries,
        key=lambda entry: (entry.api_version, entry.station_id, entry.date_time),
    )
def stream(self, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>})) ‑> Iterator[Union[WrappedRedvoxPacket, WrappedRedvoxPacketM]]

Read, decompress, deserialize, wrap, and then stream RedVox data pointed to by this index.

:param read_filter: Additional filtering to specify which data should be streamed. :return: An iterator over WrappedRedvoxPacket and WrappedRedvoxPacketM instances.

Expand source code
def stream(
    self, read_filter: ReadFilter = ReadFilter()
) -> Iterator[Union["WrappedRedvoxPacket", WrappedRedvoxPacketM]]:
    """
    Read, decompress, deserialize, wrap, and then stream RedVox data pointed to by this index.

    :param read_filter: Additional filtering to specify which data should be streamed.
    :return: An iterator over WrappedRedvoxPacket and WrappedRedvoxPacketM instances.
    """
    filtered: Iterator[IndexEntry] = filter(read_filter.apply, self.entries)
    # noinspection Mypy
    return map(IndexEntry.read, filtered)
def stream_raw(self, read_filter: ReadFilter = ReadFilter(start_dt=None, end_dt=None, station_ids=None, extensions={'.rdvxz', '.rdvxm'}, start_dt_buf=datetime.timedelta(seconds=120), end_dt_buf=datetime.timedelta(seconds=120), api_versions={<ApiVersion.API_900: 'API_900'>, <ApiVersion.API_1000: 'API_1000'>})) ‑> Iterator[Union[RedvoxPacket, src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM]]

Read, decompress, deserialize, and then stream RedVox data pointed to by this index.

:param read_filter: Additional filtering to specify which data should be streamed. :return: An iterator over RedvoxPacket and RedvoxPacketM instances.

Expand source code
def stream_raw(
    self, read_filter: ReadFilter = ReadFilter()
) -> Iterator[Union["RedvoxPacket", RedvoxPacketM]]:
    """
    Read, decompress, deserialize, and then stream RedVox data pointed to by this index.

    :param read_filter: Additional filtering to specify which data should be streamed.
    :return: An iterator over RedvoxPacket and RedvoxPacketM instances.
    """
    filtered: Iterator[IndexEntry] = filter(read_filter.apply, self.entries)
    # noinspection Mypy
    return map(IndexEntry.read_raw, filtered)
def summarize(self) ‑> IndexSummary

:return: A summary of the contents of this index.

Expand source code
def summarize(self) -> IndexSummary:
    """
    :return: A summary of the contents of this index.
    """
    return IndexSummary.from_index(self)
def to_native(self)
Expand source code
def to_native(self):
    import redvox_native

    native_index = redvox_native.Index()
    native_index.entries = list(map(IndexEntry.to_native, self.entries))
    return native_index
class IndexEntry (full_path: str, station_id: str, date_time: datetime.datetime, extension: str, api_version: ApiVersion, compressed_file_size_bytes: int = 0, decompressed_file_size_bytes: int = 0)

This class represents a single index entry. It extracts and encapsulated API agnostic fields that represent the information stored in standard RedVox file names.

Expand source code
@dataclass
class IndexEntry:
    """
    This class represents a single index entry. It extracts and encapsulated API agnostic fields that represent the
    information stored in standard RedVox file names.
    """

    full_path: str
    station_id: str
    date_time: datetime
    extension: str
    api_version: ApiVersion
    compressed_file_size_bytes: int = 0
    decompressed_file_size_bytes: int = 0

    @staticmethod
    def from_path(path_str: str, strict: bool = True) -> Optional["IndexEntry"]:
        """
        Attempts to parse a file path into an IndexEntry. If a given path is not recognized as a valid RedVox file,
        None will be returned instead.

        :param path_str: The file system path to attempt to parse.
        :param strict: When set, None is returned if the referenced file DNE.
        :return: Either an IndexEntry or successful parse or None.
        """
        api_version: ApiVersion = check_version(path_str)
        path: Path = Path(path_str)
        name: str = path.stem
        ext: str = path.suffix

        # Attempt to parse file name parts
        split_name = name.split("_")
        if len(split_name) != 2:
            return None

        station_id: str = split_name[0]
        ts_str: str = split_name[1]

        # If you have a filename with a dot, but not an extension, i.e. "0000000001_0.", we need to remove the dot
        # from the end and make in the extension
        if len(ts_str) > 0 and ts_str[-1] == ".":
            ts_str = ts_str[:-1]
            ext = "."

        timestamp: Optional[int] = _is_int(ts_str)

        # Ensure that both the station ID and timestamp can be represented as ints
        if _is_int(station_id) is None or timestamp is None:
            return None

        # Parse the datetime per the specified API version
        date_time: datetime
        if api_version == ApiVersion.API_1000:
            date_time = dt_us(timestamp)
        else:
            date_time = dt_ms(timestamp)

        full_path: str
        try:
            full_path = str(path.resolve(strict=True))
        except FileNotFoundError:
            if strict:
                return None
            full_path = path_str

        return IndexEntry(full_path, station_id, date_time, ext, api_version)._set_compressed_decompressed_lz4_size()

    @staticmethod
    def from_native(entry) -> "IndexEntry":
        """
        Converts a native index entry into a python index entry.

        :param entry: A native index entry.
        :return: A python index entry.
        """
        return IndexEntry(
            entry.full_path,
            entry.station_id,
            dt_us(entry.date_time),
            entry.extension,
            ApiVersion.from_str(entry.api_version)
        )._set_compressed_decompressed_lz4_size()

    def to_native(self):
        import redvox_native

        entry = redvox_native.IndexEntry(
            self.full_path,
            self.station_id,
            us_dt(self.date_time),
            self.extension,
            self.api_version.value
        )
        return entry

    def _set_compressed_decompressed_lz4_size(self):
        """
        set the compressed and decompressed file size in bytes of an lz4 file being read by the IndexEntry.
        default is 0 for both file sizes

        :return: updated self
        """
        if os.path.exists(self.full_path):
            self.compressed_file_size_bytes = os.path.getsize(self.full_path)
            with open(self.full_path, "rb") as fp:
                if self.api_version == ApiVersion.API_1000:
                    header = lz4.frame.get_frame_info(fp.read())
                    self.decompressed_file_size_bytes = header["content_size"]
                elif self.api_version == ApiVersion.API_900:
                    self.decompressed_file_size_bytes = calculate_uncompressed_size(fp.read())
        return self

    def read(self) -> Optional[Union[WrappedRedvoxPacketM, "WrappedRedvoxPacket"]]:
        """
        Reads, decompresses, deserializes, and wraps the RedVox file pointed to by this entry.

        :return: One of WrappedRedvoxPacket, WrappedRedvoxPacketM, or None.
        """
        if self.api_version == ApiVersion.API_900:
            return read_rdvxz_file(self.full_path)
        elif self.api_version == ApiVersion.API_1000:
            return WrappedRedvoxPacketM.from_compressed_path(self.full_path)
        else:
            return None

    def read_raw(self) -> Optional[Union["RedvoxPacket", RedvoxPacketM]]:
        """
        Reads, decompresses, and deserializes the RedVox file pointed to by this entry.

        :return: One of RedvoxPacket, RedvoxPacketM, or None. Note that these are the raw protobuf types.
        """
        if self.api_version == ApiVersion.API_900:
            with open(self.full_path, "rb") as buf_in:
                return read_buffer(buf_in.read())
        elif self.api_version == ApiVersion.API_1000:
            with lz4.frame.open(self.full_path, "rb") as serialized_in:
                proto: RedvoxPacketM = RedvoxPacketM()
                proto.ParseFromString(serialized_in.read())
                return proto
        else:
            return None

    def _into_native(self):
        pass

    def __eq__(self, other: object) -> bool:
        """
        Tests if this value is equal to another value.
        This along with __lt__ are used to fulfill the total ordering contract. Compares this entry's full path to
        another entries full path.

        :param other: Other IndexEntry to compare against.
        :return: True if this full path is less than the other full path.
        """
        if isinstance(other, IndexEntry):
            return self.full_path == other.full_path

        return False

Class variables

var api_versionApiVersion
var compressed_file_size_bytes : int
var date_time : datetime.datetime
var decompressed_file_size_bytes : int
var extension : str
var full_path : str
var station_id : str

Static methods

def from_native(entry) ‑> IndexEntry

Converts a native index entry into a python index entry.

:param entry: A native index entry. :return: A python index entry.

Expand source code
@staticmethod
def from_native(entry) -> "IndexEntry":
    """
    Converts a native index entry into a python index entry.

    :param entry: A native index entry.
    :return: A python index entry.
    """
    return IndexEntry(
        entry.full_path,
        entry.station_id,
        dt_us(entry.date_time),
        entry.extension,
        ApiVersion.from_str(entry.api_version)
    )._set_compressed_decompressed_lz4_size()
def from_path(path_str: str, strict: bool = True) ‑> Optional[IndexEntry]

Attempts to parse a file path into an IndexEntry. If a given path is not recognized as a valid RedVox file, None will be returned instead.

:param path_str: The file system path to attempt to parse. :param strict: When set, None is returned if the referenced file DNE. :return: Either an IndexEntry or successful parse or None.

Expand source code
@staticmethod
def from_path(path_str: str, strict: bool = True) -> Optional["IndexEntry"]:
    """
    Attempts to parse a file path into an IndexEntry. If a given path is not recognized as a valid RedVox file,
    None will be returned instead.

    :param path_str: The file system path to attempt to parse.
    :param strict: When set, None is returned if the referenced file DNE.
    :return: Either an IndexEntry or successful parse or None.
    """
    api_version: ApiVersion = check_version(path_str)
    path: Path = Path(path_str)
    name: str = path.stem
    ext: str = path.suffix

    # Attempt to parse file name parts
    split_name = name.split("_")
    if len(split_name) != 2:
        return None

    station_id: str = split_name[0]
    ts_str: str = split_name[1]

    # If you have a filename with a dot, but not an extension, i.e. "0000000001_0.", we need to remove the dot
    # from the end and make in the extension
    if len(ts_str) > 0 and ts_str[-1] == ".":
        ts_str = ts_str[:-1]
        ext = "."

    timestamp: Optional[int] = _is_int(ts_str)

    # Ensure that both the station ID and timestamp can be represented as ints
    if _is_int(station_id) is None or timestamp is None:
        return None

    # Parse the datetime per the specified API version
    date_time: datetime
    if api_version == ApiVersion.API_1000:
        date_time = dt_us(timestamp)
    else:
        date_time = dt_ms(timestamp)

    full_path: str
    try:
        full_path = str(path.resolve(strict=True))
    except FileNotFoundError:
        if strict:
            return None
        full_path = path_str

    return IndexEntry(full_path, station_id, date_time, ext, api_version)._set_compressed_decompressed_lz4_size()

Methods

def read(self) ‑> Union[WrappedRedvoxPacketM, WrappedRedvoxPacket, ForwardRef(None)]

Reads, decompresses, deserializes, and wraps the RedVox file pointed to by this entry.

:return: One of WrappedRedvoxPacket, WrappedRedvoxPacketM, or None.

Expand source code
def read(self) -> Optional[Union[WrappedRedvoxPacketM, "WrappedRedvoxPacket"]]:
    """
    Reads, decompresses, deserializes, and wraps the RedVox file pointed to by this entry.

    :return: One of WrappedRedvoxPacket, WrappedRedvoxPacketM, or None.
    """
    if self.api_version == ApiVersion.API_900:
        return read_rdvxz_file(self.full_path)
    elif self.api_version == ApiVersion.API_1000:
        return WrappedRedvoxPacketM.from_compressed_path(self.full_path)
    else:
        return None
def read_raw(self) ‑> Union[RedvoxPacket, src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM, ForwardRef(None)]

Reads, decompresses, and deserializes the RedVox file pointed to by this entry.

:return: One of RedvoxPacket, RedvoxPacketM, or None. Note that these are the raw protobuf types.

Expand source code
def read_raw(self) -> Optional[Union["RedvoxPacket", RedvoxPacketM]]:
    """
    Reads, decompresses, and deserializes the RedVox file pointed to by this entry.

    :return: One of RedvoxPacket, RedvoxPacketM, or None. Note that these are the raw protobuf types.
    """
    if self.api_version == ApiVersion.API_900:
        with open(self.full_path, "rb") as buf_in:
            return read_buffer(buf_in.read())
    elif self.api_version == ApiVersion.API_1000:
        with lz4.frame.open(self.full_path, "rb") as serialized_in:
            proto: RedvoxPacketM = RedvoxPacketM()
            proto.ParseFromString(serialized_in.read())
            return proto
    else:
        return None
def to_native(self)
Expand source code
def to_native(self):
    import redvox_native

    entry = redvox_native.IndexEntry(
        self.full_path,
        self.station_id,
        us_dt(self.date_time),
        self.extension,
        self.api_version.value
    )
    return entry
class IndexStationSummary (station_id: str, api_version: ApiVersion, total_packets: int, first_packet: datetime.datetime, last_packet: datetime.datetime, single_packet_decompressed_size_bytes: int)

Summary of a single station in the index.

Expand source code
@dataclass
class IndexStationSummary:
    """
    Summary of a single station in the index.
    """

    station_id: str
    api_version: ApiVersion
    total_packets: int
    first_packet: datetime
    last_packet: datetime
    single_packet_decompressed_size_bytes: int

    @staticmethod
    def from_entry(entry: IndexEntry) -> "IndexStationSummary":
        """
        Instantiates a new summary from a given IndexEntry.

        :param entry: Entry to copy information from.
        :return: An instance of IndexStationSummary.
        """
        return IndexStationSummary(
            entry.station_id,
            entry.api_version,
            1,
            first_packet=entry.date_time,
            last_packet=entry.date_time,
            single_packet_decompressed_size_bytes=entry.decompressed_file_size_bytes
        )

    def update(self, entry: IndexEntry) -> None:
        """
        Updates this summary given a new index entry.

        :param entry: Entry to update this summary from.
        """
        self.total_packets += 1
        if entry.date_time < self.first_packet:
            self.first_packet = entry.date_time

        if entry.date_time > self.last_packet:
            self.last_packet = entry.date_time

Class variables

var api_versionApiVersion
var first_packet : datetime.datetime
var last_packet : datetime.datetime
var single_packet_decompressed_size_bytes : int
var station_id : str
var total_packets : int

Static methods

def from_entry(entry: IndexEntry) ‑> IndexStationSummary

Instantiates a new summary from a given IndexEntry.

:param entry: Entry to copy information from. :return: An instance of IndexStationSummary.

Expand source code
@staticmethod
def from_entry(entry: IndexEntry) -> "IndexStationSummary":
    """
    Instantiates a new summary from a given IndexEntry.

    :param entry: Entry to copy information from.
    :return: An instance of IndexStationSummary.
    """
    return IndexStationSummary(
        entry.station_id,
        entry.api_version,
        1,
        first_packet=entry.date_time,
        last_packet=entry.date_time,
        single_packet_decompressed_size_bytes=entry.decompressed_file_size_bytes
    )

Methods

def update(self, entry: IndexEntry) ‑> None

Updates this summary given a new index entry.

:param entry: Entry to update this summary from.

Expand source code
def update(self, entry: IndexEntry) -> None:
    """
    Updates this summary given a new index entry.

    :param entry: Entry to update this summary from.
    """
    self.total_packets += 1
    if entry.date_time < self.first_packet:
        self.first_packet = entry.date_time

    if entry.date_time > self.last_packet:
        self.last_packet = entry.date_time
class IndexSummary (station_summaries: Dict[ApiVersion, Dict[str, IndexStationSummary]])

Summarizes the contents of the index.

Expand source code
@dataclass
class IndexSummary:
    """
    Summarizes the contents of the index.
    """

    station_summaries: Dict[ApiVersion, Dict[str, IndexStationSummary]]

    def station_ids(self, api_version: ApiVersion = None) -> List[str]:
        """
        Returns the station IDs referenced by this index.

        :param api_version: An (optional) filter to only return packets for a specified RedVox API version.
                            None will collect station IDs from all API versions.
        :return: The station IDs referenced by this index.
        """
        if api_version is not None:
            return list(
                set(
                    map(
                        lambda summary: summary.station_id,
                        self.station_summaries[api_version].values(),
                    )
                )
            )
        else:
            # noinspection PyTypeChecker
            return list(
                set(
                    map(
                        lambda summary: summary.station_id,
                        self.station_summaries[ApiVersion.API_900].values(),
                    )
                )
            ) + list(
                set(
                    map(
                        lambda summary: summary.station_id,
                        self.station_summaries[ApiVersion.API_1000].values(),
                    )
                )
            )

    def total_packets(self, api_version: ApiVersion = None) -> int:
        """
        Returns the total number of packets referenced by this index.

        :param api_version: An (optional) filter to only return packets for a specified RedVox API version.
                            None will count packets from all API versions.
        :return: The total number of packets referenced by this index.
        """
        if api_version is not None:
            return sum(
                map(
                    lambda summary: summary.total_packets,
                    self.station_summaries[api_version].values(),
                )
            )
        else:
            # noinspection PyTypeChecker
            return sum(
                map(
                    lambda summary: summary.total_packets,
                    self.station_summaries[ApiVersion.API_900].values(),
                )
            ) + sum(
                map(
                    lambda summary: summary.total_packets,
                    self.station_summaries[ApiVersion.API_1000].values(),
                )
            )

    @staticmethod
    def from_index(index: "Index") -> "IndexSummary":
        """
        Builds an IndexSummary from a given index.

        :param index: Index to build summary from.
        :return: An instance of IndexSummary.
        """
        station_summaries: Dict[
            ApiVersion, Dict[str, IndexStationSummary]
        ] = defaultdict(dict)

        entry: IndexEntry
        for entry in index.entries:
            sub_entry: Dict[str, IndexStationSummary] = station_summaries[
                entry.api_version
            ]
            if entry.station_id in sub_entry:
                # Update existing station summary
                sub_entry[entry.station_id].update(entry)
            else:
                # Create new station summary
                sub_entry[entry.station_id] = IndexStationSummary.from_entry(entry)

        return IndexSummary(station_summaries)

Class variables

var station_summaries : Dict[ApiVersion, Dict[str, IndexStationSummary]]

Static methods

def from_index(index: Index) ‑> IndexSummary

Builds an IndexSummary from a given index.

:param index: Index to build summary from. :return: An instance of IndexSummary.

Expand source code
@staticmethod
def from_index(index: "Index") -> "IndexSummary":
    """
    Builds an IndexSummary from a given index.

    :param index: Index to build summary from.
    :return: An instance of IndexSummary.
    """
    station_summaries: Dict[
        ApiVersion, Dict[str, IndexStationSummary]
    ] = defaultdict(dict)

    entry: IndexEntry
    for entry in index.entries:
        sub_entry: Dict[str, IndexStationSummary] = station_summaries[
            entry.api_version
        ]
        if entry.station_id in sub_entry:
            # Update existing station summary
            sub_entry[entry.station_id].update(entry)
        else:
            # Create new station summary
            sub_entry[entry.station_id] = IndexStationSummary.from_entry(entry)

    return IndexSummary(station_summaries)

Methods

def station_ids(self, api_version: ApiVersion = None) ‑> List[str]

Returns the station IDs referenced by this index.

:param api_version: An (optional) filter to only return packets for a specified RedVox API version. None will collect station IDs from all API versions. :return: The station IDs referenced by this index.

Expand source code
def station_ids(self, api_version: ApiVersion = None) -> List[str]:
    """
    Returns the station IDs referenced by this index.

    :param api_version: An (optional) filter to only return packets for a specified RedVox API version.
                        None will collect station IDs from all API versions.
    :return: The station IDs referenced by this index.
    """
    if api_version is not None:
        return list(
            set(
                map(
                    lambda summary: summary.station_id,
                    self.station_summaries[api_version].values(),
                )
            )
        )
    else:
        # noinspection PyTypeChecker
        return list(
            set(
                map(
                    lambda summary: summary.station_id,
                    self.station_summaries[ApiVersion.API_900].values(),
                )
            )
        ) + list(
            set(
                map(
                    lambda summary: summary.station_id,
                    self.station_summaries[ApiVersion.API_1000].values(),
                )
            )
        )
def total_packets(self, api_version: ApiVersion = None) ‑> int

Returns the total number of packets referenced by this index.

:param api_version: An (optional) filter to only return packets for a specified RedVox API version. None will count packets from all API versions. :return: The total number of packets referenced by this index.

Expand source code
def total_packets(self, api_version: ApiVersion = None) -> int:
    """
    Returns the total number of packets referenced by this index.

    :param api_version: An (optional) filter to only return packets for a specified RedVox API version.
                        None will count packets from all API versions.
    :return: The total number of packets referenced by this index.
    """
    if api_version is not None:
        return sum(
            map(
                lambda summary: summary.total_packets,
                self.station_summaries[api_version].values(),
            )
        )
    else:
        # noinspection PyTypeChecker
        return sum(
            map(
                lambda summary: summary.total_packets,
                self.station_summaries[ApiVersion.API_900].values(),
            )
        ) + sum(
            map(
                lambda summary: summary.total_packets,
                self.station_summaries[ApiVersion.API_1000].values(),
            )
        )
class ReadFilter (start_dt: Optional[datetime.datetime] = None, end_dt: Optional[datetime.datetime] = None, station_ids: Optional[Set[str]] = None, extensions: Optional[Set[str]] = <factory>, start_dt_buf: Optional[datetime.timedelta] = datetime.timedelta(seconds=120), end_dt_buf: Optional[datetime.timedelta] = datetime.timedelta(seconds=120), api_versions: Optional[Set[ApiVersion]] = <factory>)

Filter RedVox files from the file system.

Expand source code
@dataclass
class ReadFilter:
    """
    Filter RedVox files from the file system.
    """

    start_dt: Optional[datetime] = None
    end_dt: Optional[datetime] = None
    station_ids: Optional[Set[str]] = None
    extensions: Optional[Set[str]] = field(default_factory=lambda: {".rdvxm", ".rdvxz"})
    start_dt_buf: Optional[timedelta] = timedelta(minutes=2.0)
    end_dt_buf: Optional[timedelta] = timedelta(minutes=2.0)
    api_versions: Optional[Set[ApiVersion]] = field(
        default_factory=lambda: {ApiVersion.API_900, ApiVersion.API_1000}
    )

    @staticmethod
    def empty() -> "ReadFilter":
        """
        :return: A ReadFilter with ALL filters set to None. This is opposed to the default
                 which sets sane defaults for extensions, APIs, and window buffers.
        """
        return ReadFilter(None, None, None, None, None, None, None)

    def clone(self) -> "ReadFilter":
        """
        :return: a copy of the calling ReadFilter
        """
        return_filter = ReadFilter()
        return (
            return_filter.with_start_dt(self.start_dt)
            .with_end_dt(self.end_dt)
            .with_station_ids(self.station_ids)
            .with_extensions(self.extensions)
            .with_start_dt_buf(self.start_dt_buf)
            .with_end_dt_buf(self.end_dt_buf)
            .with_api_versions(self.api_versions)
        )

    def with_start_dt(self, start_dt: Optional[datetime]) -> "ReadFilter":
        """
        Adds a start datetime filter.

        :param start_dt: Start datetime that files should come after.
        :return: A modified instance of this filter
        """
        check_type(start_dt, [datetime, None])
        self.start_dt = start_dt
        return self

    def with_start_ts(self, start_ts: Optional[float]) -> "ReadFilter":
        """
        Adds a start time filter.

        :param start_ts: Start timestamp (microseconds)
        :return: A modified instance of this filter
        """
        check_type(start_ts, [int, float, None])
        if start_ts is None:
            return self.with_start_dt(None)

        return self.with_start_dt(dt_us(start_ts))

    def with_end_dt(self, end_dt: Optional[datetime]) -> "ReadFilter":
        """
        Adds an end datetime filter.

        :param end_dt: Filter for which packets should come before.
        :return: A modified instance of this filter
        """
        check_type(end_dt, [datetime, None])
        self.end_dt = end_dt
        return self

    def with_end_ts(self, end_ts: Optional[float]) -> "ReadFilter":
        """
        Like with_end_dt, but uses a microsecond timestamp.

        :param end_ts: Timestamp microseconds.
        :return: A modified instance of this filter
        """
        check_type(end_ts, [int, float, None])
        if end_ts is None:
            return self.with_end_dt(None)

        return self.with_end_dt(dt_us(end_ts))

    def with_station_ids(self, station_ids: Optional[Set[str]]) -> "ReadFilter":
        """
        Add a station id filter. Filters against provided station ids.

        :param station_ids: Station ids to filter against.
        :return: A modified instance of this filter
        """
        check_type(station_ids, [set, None])
        self.station_ids = station_ids
        return self

    def with_extensions(self, extensions: Optional[Set[str]]) -> "ReadFilter":
        """
        Filters against known file extensions.

        :param extensions: One or more extensions to filter against
        :return: A modified instance of this filter
        """
        check_type(extensions, [set, None])
        self.extensions = extensions
        return self

    def with_start_dt_buf(self, start_dt_buf: Optional[timedelta]) -> "ReadFilter":
        """
        Modifies the time buffer prepended to the start time.

        :param start_dt_buf: Amount of time to buffer before start time.
        :return: A modified instance of self.
        """
        check_type(start_dt_buf, [timedelta, None])
        self.start_dt_buf = start_dt_buf
        return self

    def with_end_dt_buf(self, end_dt_buf: Optional[timedelta]) -> "ReadFilter":
        """
        Modifies the time buffer appended to the end time.

        :param end_dt_buf: Amount of time to buffer after end time.
        :return: A modified instance of self.
        """
        check_type(end_dt_buf, [timedelta, None])
        self.end_dt_buf = end_dt_buf
        return self

    def with_api_versions(
        self, api_versions: Optional[Set[ApiVersion]]
    ) -> "ReadFilter":
        """
        Filters for specified API versions.

        :param api_versions: A set containing valid ApiVersion enums that should be included.
        :return: A modified instance of self.
        """
        check_type(api_versions, [set, None])
        self.api_versions = api_versions
        return self

    def apply_dt(
        self, date_time: datetime, dt_fn: Callable[[datetime], datetime] = lambda dt: dt
    ) -> bool:
        """
        Tests if a given datetime passes this filter.

        :param date_time: Datetime to test
        :param dt_fn: An (optional) function that will transform one datetime into another.
        :return: True if the datetime is included, False otherwise
        """
        check_type(date_time, [datetime])
        start_buf: timedelta = (
            timedelta(seconds=0) if self.start_dt_buf is None else self.start_dt_buf
        )
        if self.start_dt is not None and date_time < (dt_fn(self.start_dt - start_buf)):
            return False

        end_buf: timedelta = (
            timedelta(seconds=0) if self.end_dt_buf is None else self.end_dt_buf
        )
        if self.end_dt is not None and date_time > (dt_fn(self.end_dt + end_buf)):
            return False

        return True

    def apply(self, entry: IndexEntry) -> bool:
        """
        Applies this filter to the given IndexEntry.

        :param entry: The entry to test.
        :return: True if the entry is accepted by the filter, False otherwise.
        """
        check_type(entry, [IndexEntry])

        if not self.apply_dt(entry.date_time):
            return False

        if self.station_ids is not None and entry.station_id not in self.station_ids:
            return False

        if self.extensions is not None and entry.extension not in self.extensions:
            return False

        if self.api_versions is not None and entry.api_version not in self.api_versions:
            return False

        return True

Class variables

var api_versions : Optional[Set[ApiVersion]]
var end_dt : Optional[datetime.datetime]
var end_dt_buf : Optional[datetime.timedelta]
var extensions : Optional[Set[str]]
var start_dt : Optional[datetime.datetime]
var start_dt_buf : Optional[datetime.timedelta]
var station_ids : Optional[Set[str]]

Static methods

def empty() ‑> ReadFilter

:return: A ReadFilter with ALL filters set to None. This is opposed to the default which sets sane defaults for extensions, APIs, and window buffers.

Expand source code
@staticmethod
def empty() -> "ReadFilter":
    """
    :return: A ReadFilter with ALL filters set to None. This is opposed to the default
             which sets sane defaults for extensions, APIs, and window buffers.
    """
    return ReadFilter(None, None, None, None, None, None, None)

Methods

def apply(self, entry: IndexEntry) ‑> bool

Applies this filter to the given IndexEntry.

:param entry: The entry to test. :return: True if the entry is accepted by the filter, False otherwise.

Expand source code
def apply(self, entry: IndexEntry) -> bool:
    """
    Applies this filter to the given IndexEntry.

    :param entry: The entry to test.
    :return: True if the entry is accepted by the filter, False otherwise.
    """
    check_type(entry, [IndexEntry])

    if not self.apply_dt(entry.date_time):
        return False

    if self.station_ids is not None and entry.station_id not in self.station_ids:
        return False

    if self.extensions is not None and entry.extension not in self.extensions:
        return False

    if self.api_versions is not None and entry.api_version not in self.api_versions:
        return False

    return True
def apply_dt(self, date_time: datetime.datetime, dt_fn: Callable[[datetime.datetime], datetime.datetime] = <function ReadFilter.<lambda>>) ‑> bool

Tests if a given datetime passes this filter.

:param date_time: Datetime to test :param dt_fn: An (optional) function that will transform one datetime into another. :return: True if the datetime is included, False otherwise

Expand source code
def apply_dt(
    self, date_time: datetime, dt_fn: Callable[[datetime], datetime] = lambda dt: dt
) -> bool:
    """
    Tests if a given datetime passes this filter.

    :param date_time: Datetime to test
    :param dt_fn: An (optional) function that will transform one datetime into another.
    :return: True if the datetime is included, False otherwise
    """
    check_type(date_time, [datetime])
    start_buf: timedelta = (
        timedelta(seconds=0) if self.start_dt_buf is None else self.start_dt_buf
    )
    if self.start_dt is not None and date_time < (dt_fn(self.start_dt - start_buf)):
        return False

    end_buf: timedelta = (
        timedelta(seconds=0) if self.end_dt_buf is None else self.end_dt_buf
    )
    if self.end_dt is not None and date_time > (dt_fn(self.end_dt + end_buf)):
        return False

    return True
def clone(self) ‑> ReadFilter

:return: a copy of the calling ReadFilter

Expand source code
def clone(self) -> "ReadFilter":
    """
    :return: a copy of the calling ReadFilter
    """
    return_filter = ReadFilter()
    return (
        return_filter.with_start_dt(self.start_dt)
        .with_end_dt(self.end_dt)
        .with_station_ids(self.station_ids)
        .with_extensions(self.extensions)
        .with_start_dt_buf(self.start_dt_buf)
        .with_end_dt_buf(self.end_dt_buf)
        .with_api_versions(self.api_versions)
    )
def with_api_versions(self, api_versions: Optional[Set[ApiVersion]]) ‑> ReadFilter

Filters for specified API versions.

:param api_versions: A set containing valid ApiVersion enums that should be included. :return: A modified instance of self.

Expand source code
def with_api_versions(
    self, api_versions: Optional[Set[ApiVersion]]
) -> "ReadFilter":
    """
    Filters for specified API versions.

    :param api_versions: A set containing valid ApiVersion enums that should be included.
    :return: A modified instance of self.
    """
    check_type(api_versions, [set, None])
    self.api_versions = api_versions
    return self
def with_end_dt(self, end_dt: Optional[datetime.datetime]) ‑> ReadFilter

Adds an end datetime filter.

:param end_dt: Filter for which packets should come before. :return: A modified instance of this filter

Expand source code
def with_end_dt(self, end_dt: Optional[datetime]) -> "ReadFilter":
    """
    Adds an end datetime filter.

    :param end_dt: Filter for which packets should come before.
    :return: A modified instance of this filter
    """
    check_type(end_dt, [datetime, None])
    self.end_dt = end_dt
    return self
def with_end_dt_buf(self, end_dt_buf: Optional[datetime.timedelta]) ‑> ReadFilter

Modifies the time buffer appended to the end time.

:param end_dt_buf: Amount of time to buffer after end time. :return: A modified instance of self.

Expand source code
def with_end_dt_buf(self, end_dt_buf: Optional[timedelta]) -> "ReadFilter":
    """
    Modifies the time buffer appended to the end time.

    :param end_dt_buf: Amount of time to buffer after end time.
    :return: A modified instance of self.
    """
    check_type(end_dt_buf, [timedelta, None])
    self.end_dt_buf = end_dt_buf
    return self
def with_end_ts(self, end_ts: Optional[float]) ‑> ReadFilter

Like with_end_dt, but uses a microsecond timestamp.

:param end_ts: Timestamp microseconds. :return: A modified instance of this filter

Expand source code
def with_end_ts(self, end_ts: Optional[float]) -> "ReadFilter":
    """
    Like with_end_dt, but uses a microsecond timestamp.

    :param end_ts: Timestamp microseconds.
    :return: A modified instance of this filter
    """
    check_type(end_ts, [int, float, None])
    if end_ts is None:
        return self.with_end_dt(None)

    return self.with_end_dt(dt_us(end_ts))
def with_extensions(self, extensions: Optional[Set[str]]) ‑> ReadFilter

Filters against known file extensions.

:param extensions: One or more extensions to filter against :return: A modified instance of this filter

Expand source code
def with_extensions(self, extensions: Optional[Set[str]]) -> "ReadFilter":
    """
    Filters against known file extensions.

    :param extensions: One or more extensions to filter against
    :return: A modified instance of this filter
    """
    check_type(extensions, [set, None])
    self.extensions = extensions
    return self
def with_start_dt(self, start_dt: Optional[datetime.datetime]) ‑> ReadFilter

Adds a start datetime filter.

:param start_dt: Start datetime that files should come after. :return: A modified instance of this filter

Expand source code
def with_start_dt(self, start_dt: Optional[datetime]) -> "ReadFilter":
    """
    Adds a start datetime filter.

    :param start_dt: Start datetime that files should come after.
    :return: A modified instance of this filter
    """
    check_type(start_dt, [datetime, None])
    self.start_dt = start_dt
    return self
def with_start_dt_buf(self, start_dt_buf: Optional[datetime.timedelta]) ‑> ReadFilter

Modifies the time buffer prepended to the start time.

:param start_dt_buf: Amount of time to buffer before start time. :return: A modified instance of self.

Expand source code
def with_start_dt_buf(self, start_dt_buf: Optional[timedelta]) -> "ReadFilter":
    """
    Modifies the time buffer prepended to the start time.

    :param start_dt_buf: Amount of time to buffer before start time.
    :return: A modified instance of self.
    """
    check_type(start_dt_buf, [timedelta, None])
    self.start_dt_buf = start_dt_buf
    return self
def with_start_ts(self, start_ts: Optional[float]) ‑> ReadFilter

Adds a start time filter.

:param start_ts: Start timestamp (microseconds) :return: A modified instance of this filter

Expand source code
def with_start_ts(self, start_ts: Optional[float]) -> "ReadFilter":
    """
    Adds a start time filter.

    :param start_ts: Start timestamp (microseconds)
    :return: A modified instance of this filter
    """
    check_type(start_ts, [int, float, None])
    if start_ts is None:
        return self.with_start_dt(None)

    return self.with_start_dt(dt_us(start_ts))
def with_station_ids(self, station_ids: Optional[Set[str]]) ‑> ReadFilter

Add a station id filter. Filters against provided station ids.

:param station_ids: Station ids to filter against. :return: A modified instance of this filter

Expand source code
def with_station_ids(self, station_ids: Optional[Set[str]]) -> "ReadFilter":
    """
    Add a station id filter. Filters against provided station ids.

    :param station_ids: Station ids to filter against.
    :return: A modified instance of this filter
    """
    check_type(station_ids, [set, None])
    self.station_ids = station_ids
    return self
class RedvoxPacketM (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
var DoubleSamplePayload

A ProtocolMessage

var EventStream

A ProtocolMessage

var MetadataEntry

A ProtocolMessage

var SamplePayload

A ProtocolMessage

var Sensors

A ProtocolMessage

var StationInformation

A ProtocolMessage

var SummaryStatistics

A ProtocolMessage

var TimingInformation

A ProtocolMessage

var TimingPayload

A ProtocolMessage

var Unit