Module redvox.tests.me_test_big

Data set available at: https://drive.google.com/file/d/14YjOjS3Tvm8I3M84KIPxgCzsVDbUMwoM/view?usp=sharing

This replicates some of the crucial steps in building a report. Namely:

  1. Building a DataWindow
  2. Serializing a DataWindow
  3. Summarizing a DataWindow
  4. Converting a DataWindow into a DataFrame
  5. Building audio wiggles

Watch timing and memory usage. Feel free to comment out certain stations or entire steps to test individual parts of the report building process. With the latest non-beta version of the SDK, I generally notice that the building of the DF takes significantly more memory than the building of the DW.

"main" is probably the only function you need to edit. Everything above it is supporting code for building the summary.

Expand source code
"""
Data set available at: https://drive.google.com/file/d/14YjOjS3Tvm8I3M84KIPxgCzsVDbUMwoM/view?usp=sharing

This replicates some of the crucial steps in building a report. Namely:

1. Building a DataWindow
2. Serializing a DataWindow
3. Summarizing a DataWindow
4. Converting a DataWindow into a DataFrame
5. Building audio wiggles

Watch timing and memory usage. Feel free to comment out certain stations or entire steps to test individual parts of the
report building process. With the latest non-beta version of the SDK, I generally notice that the building of the DF
takes significantly more memory than the building of the DW.

"main" is probably the only function you need to edit. Everything above it is supporting code for building the summary.
"""

import argparse
import datetime
import enum
import math
import os.path
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from redpandas.redpd_df import redpd_dataframe
from redpandas.redpd_plot.wiggles import plot_wiggles_pandas
from redvox.api1000.wrapped_redvox_packet.sensors.location import LocationProvider
from redvox.api1000.wrapped_redvox_packet.station_information import (
    CellServiceState,
    NetworkType,
    OsType,
    PowerState,
)
from redvox.common.data_window import DataWindow, DataWindowConfig
from redvox.common.sensor_data import SensorData
from redvox.common.sensor_reader_utils import Sensor
from redvox.common.station import Station


@dataclass
class TimeRange:
    start_ts_us: float
    end_ts_us: float

    def timedelta(self) -> datetime.timedelta:
        return datetime.timedelta(microseconds=(self.end_ts_us - self.start_ts_us))

    def diff(self, other: "TimeRange") -> datetime.timedelta:
        return self.timedelta() - other.timedelta()


@dataclass
class SensorSummary:
    sensor_type: str
    sensor_name: str
    n_samples: int
    n_nan_samples: int
    sample_rate: float


@dataclass
class Location:
    latitude: float
    longitude: float
    altitude: float


@dataclass
class LocationSummary:
    name: str
    provider: str
    location: Location
    latitude_standard_deviation: float
    longitude_standard_deviation: float
    altitude_standard_deviation: float


@dataclass
class HealthSummary:
    timestamps: List[float]
    battery_charge_remaining: List[float]
    battery_current_strength: List[float]
    internal_temp_c: List[float]
    network_type: List[str]
    network_strength: List[float]
    power_state: List[str]
    avail_ram: List[float]
    avail_disk: List[float]
    cell_service: List[str]


@dataclass
class StationSummary:
    time_range: TimeRange
    time_corrected: bool
    best_offset: float
    best_latency: float
    gaps: List[TimeRange]
    station_id: str
    station_uuid: str
    location: Optional[LocationSummary]
    sensors: List[SensorSummary]
    percent_data_available: float
    health: Optional[HealthSummary]
    os: str
    os_version: str
    client_version: str


@dataclass
class DataWindowSummary:
    time_range: TimeRange
    station_summaries: List[StationSummary]

    @staticmethod
    def from_data_window(data_window: DataWindow) -> "DataWindowSummary":
        return from_data_window(data_window)


def ts_us(dt: datetime.datetime) -> float:
    return dt.timestamp() * 1_000_000.0


def n_nans(sensor: Sensor) -> int:
    return np.count_nonzero(np.isnan(sensor.samples()))


POTENTIAL_SENSORS: List[str] = [
    "accelerometer",
    "ambient_temperature",
    "audio",
    "barometer",
    "best_location",
    "compressed_audio",
    "gravity",
    "gyroscope",
    # "health",
    "image",
    "infrared",
    "light",
    "linear_acceleration",
    "location",
    "magnetometer",
    "orientation",
    "pressure",
    "proximity",
    "relative_humidity",
    "rotation_vector",
]


def summarize_sensors(station: Station) -> List[SensorSummary]:
    sensor_summaries: List[SensorSummary] = []

    potential_sensor: str
    for potential_sensor in POTENTIAL_SENSORS:
        if getattr(station, f"has_{potential_sensor}_sensor")():
            sensor: SensorData = getattr(station, f"{potential_sensor}_sensor")()
            sensor_summaries.append(
                SensorSummary(
                    potential_sensor,
                    sensor.name,
                    sensor.num_samples(),
                    n_nans(sensor),
                    sensor.sample_rate_hz(),
                )
            )

    return sensor_summaries


def from_data_window(data_window: DataWindow) -> DataWindowSummary:
    time_range: TimeRange = TimeRange(
        ts_us(data_window.config().start_datetime), ts_us(data_window.config().end_datetime)
    )
    station_summaries: List[StationSummary] = []

    station_id: str
    for station_id in data_window.station_ids():
        stations: Optional[List[Station]] = data_window.get_station(station_id)
        if stations is not None:
            station: Station
            for station in stations:
                station_summary: StationSummary = StationSummary(
                    TimeRange(station.first_data_timestamp(), station.last_data_timestamp()),
                    station.is_timestamps_updated(),
                    station.timesync_data().best_offset(),
                    station.timesync_data().best_latency(),
                    extract_gaps(station),
                    station.id(),
                    station.uuid(),
                    extract_location_summary(station),
                    summarize_sensors(station),
                    percent_data_available(station),
                    extract_health_summary(station),
                    OsType(station.metadata().os).name,
                    station.metadata().os_version,
                    station.metadata().app_version,
                )
                station_summaries.append(station_summary)
    data_window_summary: DataWindowSummary = DataWindowSummary(time_range, station_summaries)
    return data_window_summary


def extract_gaps(station: Station) -> List[TimeRange]:
    # noinspection PyProtectedMember
    gaps: List[Tuple[float, float]] = station._gaps
    return list(map(lambda tup2: TimeRange(tup2[0], tup2[1]), gaps))


LOCATION_PROVIDER_IDX: int = 10
LATITUDE_IDX: int = 1
LONGITUDE_IDX: int = 2
ALTITUDE_IDX: int = 3
HORIZONTAL_ACCURACY_IDX: int = 6
VERTICAL_ACCURACY_IDX: int = 7


def extract_location_summary(station: Station) -> Optional[LocationSummary]:
    sensor_data: Optional[SensorData]
    if station.has_best_location_data():
        sensor_data = station.best_location_sensor()
    elif station.has_location_data():
        sensor_data = station.location_sensor()
    else:
        sensor_data = None

    if sensor_data is not None:
        if sensor_data.num_samples() == 0:
            return None

        samples: np.ndarray = sensor_data.samples()

        if sensor_data.num_samples() == 1:
            return LocationSummary(
                sensor_data.name,
                LocationProvider(int(samples[LOCATION_PROVIDER_IDX][0])).name,
                Location(
                    samples[LATITUDE_IDX][0],
                    samples[LONGITUDE_IDX][0],
                    samples[ALTITUDE_IDX][0],
                ),
                0.0,
                0.0,
                0.0,
            )

        # Find the location with the "best" (lowest combined error) accuracy
        def orz(f: float) -> float:
            if math.isnan(f):
                return 0.0
            return f

        def lowest_error_idx() -> Optional[int]:
            hori_acc_samples: np.ndarray = samples[HORIZONTAL_ACCURACY_IDX]
            vert_acc_samples: np.ndarray = samples[VERTICAL_ACCURACY_IDX]
            min_acc: float = float("+inf")
            min_acc_idx: Optional[int] = None

            min_len: int = min(len(hori_acc_samples), len(vert_acc_samples))
            for j in range(min_len):
                hori: float = hori_acc_samples[j]
                vert: float = vert_acc_samples[j]
                if math.isnan(hori) and math.isnan(vert):
                    continue

                if hori == 0.0 and vert == 0.0:
                    continue

                acc: float = orz(hori) + orz(vert)
                if acc < min_acc:
                    min_acc = acc
                    min_acc_idx = j

            return min_acc_idx

        def first_valid_lat_lng() -> Optional[LocationSummary]:
            lat_samples: np.ndarray = samples[LATITUDE_IDX]
            lng_samples: np.ndarray = samples[LONGITUDE_IDX]

            min_len: int = min(len(lat_samples), len(lng_samples))

            for j in range(min_len):
                lat: float = lat_samples[j]
                lng: float = lng_samples[j]
                if (not math.isnan(lat)) and (not math.isnan(lng)):
                    return LocationSummary(
                        sensor_data.name if sensor_data is not None else "n/a",
                        LocationProvider(int(samples[LOCATION_PROVIDER_IDX][j])).name,
                        Location(lat, lng, 0.0),
                        samples[LATITUDE_IDX].std(),
                        samples[LONGITUDE_IDX].std(),
                        samples[ALTITUDE_IDX].std(),
                    )

            return None

        best_idx: Optional[int] = lowest_error_idx()
        if best_idx is not None:
            return LocationSummary(
                sensor_data.name,
                LocationProvider(int(samples[LOCATION_PROVIDER_IDX][best_idx])).name,
                Location(samples[LATITUDE_IDX][best_idx], samples[LONGITUDE_IDX][best_idx], 0.0),
                samples[LATITUDE_IDX].std(),
                samples[LONGITUDE_IDX].std(),
                samples[ALTITUDE_IDX].std(),
            )

        else:
            # Try to get any available location
            return first_valid_lat_lng()

    return None


def percent_data_available(station: Station) -> float:
    audio: SensorData = station.audio_sensor()
    total_nans: int = n_nans(audio)
    total_samples: int = audio.num_samples()
    return 100.0 - (total_nans / total_samples * 100.0)


def extract_enum_name(enum_type, value) -> str:
    if isinstance(value, enum.Enum):
        return value.name

    if isinstance(value, (int, float)):
        return enum_type(int(value)).name

    return "n/a"


def extract_health_summary(station: Station) -> Optional[HealthSummary]:
    if station.has_health_data():
        sensor_data: SensorData = station.health_sensor()
        samples: np.ndarray = sensor_data.samples()
        health_summary: HealthSummary = HealthSummary(
            list(sensor_data.data_timestamps()),
            list(samples[0]),
            list(samples[1]),
            list(samples[2]),
            list(
                map(
                    lambda network_type: extract_enum_name(NetworkType, network_type),
                    list(samples[3]),
                )
            ),
            list(samples[4]),
            list(
                map(
                    lambda power_state: extract_enum_name(PowerState, power_state),
                    list(samples[5]),
                )
            ),
            list(samples[6]),
            list(samples[7]),
            list(
                map(
                    lambda cell_service: extract_enum_name(CellServiceState, cell_service),
                    list(samples[8]),
                )
            ),
        )
        return health_summary

    return None


def main(input_dir: str, output_dir: str) -> None:
    available_stations: List[str] = [
        # "1637620004",
        "1637620005",
        # "1637621008",
        # "1637621009"
    ]

    # Build DW
    print("Building data window...", end="")
    start: datetime = datetime.datetime.now()
    data_window: DataWindow = DataWindow(
        config=DataWindowConfig(input_dir=input_dir, station_ids=available_stations),
        output_dir=output_dir,
        out_type="LZ4",
    )
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Serialize DW
    print("Serializing data window...", end="")
    start = datetime.datetime.now()
    serialized_data_window_path: Path = data_window.serialize()
    print(f"Done. {serialized_data_window_path} ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Summarize DW
    print("Summarizing data window...", end="")
    start = datetime.datetime.now()
    data_window_summary: DataWindowSummary = DataWindowSummary.from_data_window(data_window)
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Build DF
    print("Building data frame...", end="")
    start = datetime.datetime.now()
    data_frame: pd.DataFrame = redpd_dataframe(data_window, ["audio"])
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Build wiggles
    print("Building audio wiggles...", end="")
    fig: plt.Figure = plot_wiggles_pandas(data_frame, show_figure=False)
    fig.savefig(os.path.join(output_dir, "audio_wiggles.png"))
    fig.clf()
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("input_dir")
    parser.add_argument("--output_dir", required=False)
    args = parser.parse_args()

    if args.output_dir is None:
        args.output_dir = args.input_dir

    main(args.input_dir, args.output_dir)

Functions

def extract_enum_name(enum_type, value) ‑> str
Expand source code
def extract_enum_name(enum_type, value) -> str:
    if isinstance(value, enum.Enum):
        return value.name

    if isinstance(value, (int, float)):
        return enum_type(int(value)).name

    return "n/a"
def extract_gaps(station: Station) ‑> List[TimeRange]
Expand source code
def extract_gaps(station: Station) -> List[TimeRange]:
    # noinspection PyProtectedMember
    gaps: List[Tuple[float, float]] = station._gaps
    return list(map(lambda tup2: TimeRange(tup2[0], tup2[1]), gaps))
def extract_health_summary(station: Station) ‑> Optional[HealthSummary]
Expand source code
def extract_health_summary(station: Station) -> Optional[HealthSummary]:
    if station.has_health_data():
        sensor_data: SensorData = station.health_sensor()
        samples: np.ndarray = sensor_data.samples()
        health_summary: HealthSummary = HealthSummary(
            list(sensor_data.data_timestamps()),
            list(samples[0]),
            list(samples[1]),
            list(samples[2]),
            list(
                map(
                    lambda network_type: extract_enum_name(NetworkType, network_type),
                    list(samples[3]),
                )
            ),
            list(samples[4]),
            list(
                map(
                    lambda power_state: extract_enum_name(PowerState, power_state),
                    list(samples[5]),
                )
            ),
            list(samples[6]),
            list(samples[7]),
            list(
                map(
                    lambda cell_service: extract_enum_name(CellServiceState, cell_service),
                    list(samples[8]),
                )
            ),
        )
        return health_summary

    return None
def extract_location_summary(station: Station) ‑> Optional[LocationSummary]
Expand source code
def extract_location_summary(station: Station) -> Optional[LocationSummary]:
    sensor_data: Optional[SensorData]
    if station.has_best_location_data():
        sensor_data = station.best_location_sensor()
    elif station.has_location_data():
        sensor_data = station.location_sensor()
    else:
        sensor_data = None

    if sensor_data is not None:
        if sensor_data.num_samples() == 0:
            return None

        samples: np.ndarray = sensor_data.samples()

        if sensor_data.num_samples() == 1:
            return LocationSummary(
                sensor_data.name,
                LocationProvider(int(samples[LOCATION_PROVIDER_IDX][0])).name,
                Location(
                    samples[LATITUDE_IDX][0],
                    samples[LONGITUDE_IDX][0],
                    samples[ALTITUDE_IDX][0],
                ),
                0.0,
                0.0,
                0.0,
            )

        # Find the location with the "best" (lowest combined error) accuracy
        def orz(f: float) -> float:
            if math.isnan(f):
                return 0.0
            return f

        def lowest_error_idx() -> Optional[int]:
            hori_acc_samples: np.ndarray = samples[HORIZONTAL_ACCURACY_IDX]
            vert_acc_samples: np.ndarray = samples[VERTICAL_ACCURACY_IDX]
            min_acc: float = float("+inf")
            min_acc_idx: Optional[int] = None

            min_len: int = min(len(hori_acc_samples), len(vert_acc_samples))
            for j in range(min_len):
                hori: float = hori_acc_samples[j]
                vert: float = vert_acc_samples[j]
                if math.isnan(hori) and math.isnan(vert):
                    continue

                if hori == 0.0 and vert == 0.0:
                    continue

                acc: float = orz(hori) + orz(vert)
                if acc < min_acc:
                    min_acc = acc
                    min_acc_idx = j

            return min_acc_idx

        def first_valid_lat_lng() -> Optional[LocationSummary]:
            lat_samples: np.ndarray = samples[LATITUDE_IDX]
            lng_samples: np.ndarray = samples[LONGITUDE_IDX]

            min_len: int = min(len(lat_samples), len(lng_samples))

            for j in range(min_len):
                lat: float = lat_samples[j]
                lng: float = lng_samples[j]
                if (not math.isnan(lat)) and (not math.isnan(lng)):
                    return LocationSummary(
                        sensor_data.name if sensor_data is not None else "n/a",
                        LocationProvider(int(samples[LOCATION_PROVIDER_IDX][j])).name,
                        Location(lat, lng, 0.0),
                        samples[LATITUDE_IDX].std(),
                        samples[LONGITUDE_IDX].std(),
                        samples[ALTITUDE_IDX].std(),
                    )

            return None

        best_idx: Optional[int] = lowest_error_idx()
        if best_idx is not None:
            return LocationSummary(
                sensor_data.name,
                LocationProvider(int(samples[LOCATION_PROVIDER_IDX][best_idx])).name,
                Location(samples[LATITUDE_IDX][best_idx], samples[LONGITUDE_IDX][best_idx], 0.0),
                samples[LATITUDE_IDX].std(),
                samples[LONGITUDE_IDX].std(),
                samples[ALTITUDE_IDX].std(),
            )

        else:
            # Try to get any available location
            return first_valid_lat_lng()

    return None
def from_data_window(data_window: DataWindow) ‑> DataWindowSummary
Expand source code
def from_data_window(data_window: DataWindow) -> DataWindowSummary:
    time_range: TimeRange = TimeRange(
        ts_us(data_window.config().start_datetime), ts_us(data_window.config().end_datetime)
    )
    station_summaries: List[StationSummary] = []

    station_id: str
    for station_id in data_window.station_ids():
        stations: Optional[List[Station]] = data_window.get_station(station_id)
        if stations is not None:
            station: Station
            for station in stations:
                station_summary: StationSummary = StationSummary(
                    TimeRange(station.first_data_timestamp(), station.last_data_timestamp()),
                    station.is_timestamps_updated(),
                    station.timesync_data().best_offset(),
                    station.timesync_data().best_latency(),
                    extract_gaps(station),
                    station.id(),
                    station.uuid(),
                    extract_location_summary(station),
                    summarize_sensors(station),
                    percent_data_available(station),
                    extract_health_summary(station),
                    OsType(station.metadata().os).name,
                    station.metadata().os_version,
                    station.metadata().app_version,
                )
                station_summaries.append(station_summary)
    data_window_summary: DataWindowSummary = DataWindowSummary(time_range, station_summaries)
    return data_window_summary
def main(input_dir: str, output_dir: str) ‑> None
Expand source code
def main(input_dir: str, output_dir: str) -> None:
    available_stations: List[str] = [
        # "1637620004",
        "1637620005",
        # "1637621008",
        # "1637621009"
    ]

    # Build DW
    print("Building data window...", end="")
    start: datetime = datetime.datetime.now()
    data_window: DataWindow = DataWindow(
        config=DataWindowConfig(input_dir=input_dir, station_ids=available_stations),
        output_dir=output_dir,
        out_type="LZ4",
    )
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Serialize DW
    print("Serializing data window...", end="")
    start = datetime.datetime.now()
    serialized_data_window_path: Path = data_window.serialize()
    print(f"Done. {serialized_data_window_path} ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Summarize DW
    print("Summarizing data window...", end="")
    start = datetime.datetime.now()
    data_window_summary: DataWindowSummary = DataWindowSummary.from_data_window(data_window)
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Build DF
    print("Building data frame...", end="")
    start = datetime.datetime.now()
    data_frame: pd.DataFrame = redpd_dataframe(data_window, ["audio"])
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")

    # Build wiggles
    print("Building audio wiggles...", end="")
    fig: plt.Figure = plot_wiggles_pandas(data_frame, show_figure=False)
    fig.savefig(os.path.join(output_dir, "audio_wiggles.png"))
    fig.clf()
    print(f"Done. ({(datetime.datetime.now() - start).total_seconds()}s)")
def n_nans(sensor: Union[src.redvox_api_m.redvox_api_m_pb2.Xyz, src.redvox_api_m.redvox_api_m_pb2.Single, src.redvox_api_m.redvox_api_m_pb2.Audio, src.redvox_api_m.redvox_api_m_pb2.Image, src.redvox_api_m.redvox_api_m_pb2.Location, src.redvox_api_m.redvox_api_m_pb2.CompressedAudio]) ‑> int
Expand source code
def n_nans(sensor: Sensor) -> int:
    return np.count_nonzero(np.isnan(sensor.samples()))
def percent_data_available(station: Station) ‑> float
Expand source code
def percent_data_available(station: Station) -> float:
    audio: SensorData = station.audio_sensor()
    total_nans: int = n_nans(audio)
    total_samples: int = audio.num_samples()
    return 100.0 - (total_nans / total_samples * 100.0)
def summarize_sensors(station: Station) ‑> List[SensorSummary]
Expand source code
def summarize_sensors(station: Station) -> List[SensorSummary]:
    sensor_summaries: List[SensorSummary] = []

    potential_sensor: str
    for potential_sensor in POTENTIAL_SENSORS:
        if getattr(station, f"has_{potential_sensor}_sensor")():
            sensor: SensorData = getattr(station, f"{potential_sensor}_sensor")()
            sensor_summaries.append(
                SensorSummary(
                    potential_sensor,
                    sensor.name,
                    sensor.num_samples(),
                    n_nans(sensor),
                    sensor.sample_rate_hz(),
                )
            )

    return sensor_summaries
def ts_us(dt: datetime.datetime) ‑> float
Expand source code
def ts_us(dt: datetime.datetime) -> float:
    return dt.timestamp() * 1_000_000.0

Classes

class DataWindowSummary (time_range: TimeRange, station_summaries: List[StationSummary])

DataWindowSummary(time_range: redvox.tests.me_test_big.TimeRange, station_summaries: List[redvox.tests.me_test_big.StationSummary])

Expand source code
@dataclass
class DataWindowSummary:
    time_range: TimeRange
    station_summaries: List[StationSummary]

    @staticmethod
    def from_data_window(data_window: DataWindow) -> "DataWindowSummary":
        return from_data_window(data_window)

Class variables

var station_summaries : List[StationSummary]
var time_rangeTimeRange

Static methods

def from_data_window(data_window: DataWindow) ‑> DataWindowSummary
Expand source code
@staticmethod
def from_data_window(data_window: DataWindow) -> "DataWindowSummary":
    return from_data_window(data_window)
class HealthSummary (timestamps: List[float], battery_charge_remaining: List[float], battery_current_strength: List[float], internal_temp_c: List[float], network_type: List[str], network_strength: List[float], power_state: List[str], avail_ram: List[float], avail_disk: List[float], cell_service: List[str])

HealthSummary(timestamps: List[float], battery_charge_remaining: List[float], battery_current_strength: List[float], internal_temp_c: List[float], network_type: List[str], network_strength: List[float], power_state: List[str], avail_ram: List[float], avail_disk: List[float], cell_service: List[str])

Expand source code
@dataclass
class HealthSummary:
    timestamps: List[float]
    battery_charge_remaining: List[float]
    battery_current_strength: List[float]
    internal_temp_c: List[float]
    network_type: List[str]
    network_strength: List[float]
    power_state: List[str]
    avail_ram: List[float]
    avail_disk: List[float]
    cell_service: List[str]

Class variables

var avail_disk : List[float]
var avail_ram : List[float]
var battery_charge_remaining : List[float]
var battery_current_strength : List[float]
var cell_service : List[str]
var internal_temp_c : List[float]
var network_strength : List[float]
var network_type : List[str]
var power_state : List[str]
var timestamps : List[float]
class Location (latitude: float, longitude: float, altitude: float)

Location(latitude: float, longitude: float, altitude: float)

Expand source code
@dataclass
class Location:
    latitude: float
    longitude: float
    altitude: float

Class variables

var altitude : float
var latitude : float
var longitude : float
class LocationSummary (name: str, provider: str, location: Location, latitude_standard_deviation: float, longitude_standard_deviation: float, altitude_standard_deviation: float)

LocationSummary(name: str, provider: str, location: redvox.tests.me_test_big.Location, latitude_standard_deviation: float, longitude_standard_deviation: float, altitude_standard_deviation: float)

Expand source code
@dataclass
class LocationSummary:
    name: str
    provider: str
    location: Location
    latitude_standard_deviation: float
    longitude_standard_deviation: float
    altitude_standard_deviation: float

Class variables

var altitude_standard_deviation : float
var latitude_standard_deviation : float
var locationLocation
var longitude_standard_deviation : float
var name : str
var provider : str
class SensorSummary (sensor_type: str, sensor_name: str, n_samples: int, n_nan_samples: int, sample_rate: float)

SensorSummary(sensor_type: str, sensor_name: str, n_samples: int, n_nan_samples: int, sample_rate: float)

Expand source code
@dataclass
class SensorSummary:
    sensor_type: str
    sensor_name: str
    n_samples: int
    n_nan_samples: int
    sample_rate: float

Class variables

var n_nan_samples : int
var n_samples : int
var sample_rate : float
var sensor_name : str
var sensor_type : str
class StationSummary (time_range: TimeRange, time_corrected: bool, best_offset: float, best_latency: float, gaps: List[TimeRange], station_id: str, station_uuid: str, location: Optional[LocationSummary], sensors: List[SensorSummary], percent_data_available: float, health: Optional[HealthSummary], os: str, os_version: str, client_version: str)

StationSummary(time_range: redvox.tests.me_test_big.TimeRange, time_corrected: bool, best_offset: float, best_latency: float, gaps: List[redvox.tests.me_test_big.TimeRange], station_id: str, station_uuid: str, location: Optional[redvox.tests.me_test_big.LocationSummary], sensors: List[redvox.tests.me_test_big.SensorSummary], percent_data_available: float, health: Optional[redvox.tests.me_test_big.HealthSummary], os: str, os_version: str, client_version: str)

Expand source code
@dataclass
class StationSummary:
    time_range: TimeRange
    time_corrected: bool
    best_offset: float
    best_latency: float
    gaps: List[TimeRange]
    station_id: str
    station_uuid: str
    location: Optional[LocationSummary]
    sensors: List[SensorSummary]
    percent_data_available: float
    health: Optional[HealthSummary]
    os: str
    os_version: str
    client_version: str

Class variables

var best_latency : float
var best_offset : float
var client_version : str
var gaps : List[TimeRange]
var health : Optional[HealthSummary]
var location : Optional[LocationSummary]
var os : str
var os_version : str
var percent_data_available : float
var sensors : List[SensorSummary]
var station_id : str
var station_uuid : str
var time_corrected : bool
var time_rangeTimeRange
class TimeRange (start_ts_us: float, end_ts_us: float)

TimeRange(start_ts_us: float, end_ts_us: float)

Expand source code
@dataclass
class TimeRange:
    start_ts_us: float
    end_ts_us: float

    def timedelta(self) -> datetime.timedelta:
        return datetime.timedelta(microseconds=(self.end_ts_us - self.start_ts_us))

    def diff(self, other: "TimeRange") -> datetime.timedelta:
        return self.timedelta() - other.timedelta()

Class variables

var end_ts_us : float
var start_ts_us : float

Methods

def diff(self, other: TimeRange) ‑> datetime.timedelta
Expand source code
def diff(self, other: "TimeRange") -> datetime.timedelta:
    return self.timedelta() - other.timedelta()
def timedelta(self) ‑> datetime.timedelta
Expand source code
def timedelta(self) -> datetime.timedelta:
    return datetime.timedelta(microseconds=(self.end_ts_us - self.start_ts_us))