Module redvox.cloud.query_timing_correction
This module contains classes and routines for performing timing corrections on cloud based ranged queries.
Expand source code
"""
This module contains classes and routines for performing timing corrections on cloud based ranged queries.
"""
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional, TYPE_CHECKING
from redvox.common.offset_model import compute_offsets
if TYPE_CHECKING:
from redvox.cloud.station_stats import StationStatsResp
from redvox.common.file_statistics import StationStat
from redvox.common.offset_model import TimingOffsets
from redvox.cloud.client import CloudClient
@dataclass
class CorrectedQuery:
"""
A set of fields required for a corrected timing query.
"""
station_id: str
original_start_ts: float
original_end_ts: float
corrected_start_ts: float
corrected_end_ts: float
def start_offset(self) -> float:
"""
Computes the start offset.
:return: The start offset.
"""
return self.corrected_start_ts - self.original_start_ts
def end_offset(self) -> float:
"""
Computes the end offset.
:return: The end offset.
"""
return self.corrected_end_ts - self.original_end_ts
def correct_query_timing(
client: "CloudClient", start_ts: int, end_ts: int, station_ids: List[str]
) -> Optional[List[CorrectedQuery]]:
"""
Corrects the timing for a given cloud range query.
:param client: An instance of a cloud client.
:param start_ts: The start of the query window.
:param end_ts: The end of the query window.
:param station_ids: A list of station IDs in the query.
:return: A list of query corrections per station and per app start time.
"""
station_stats_resp: Optional["StationStatsResp"] = client.request_station_stats(
start_ts, end_ts, station_ids
)
station_stats: List[StationStat] = station_stats_resp.station_stats
if station_stats_resp is None or len(station_stats) == 0:
return None
# Ensure sorted
station_stats.sort(key=lambda stat: stat.packet_start_dt)
# Group by station ID and then app start time
grouped: Dict[str, Dict[datetime, List["StationStat"]]] = defaultdict(
lambda: defaultdict(list)
)
station_stat: "StationStat"
for station_stat in station_stats:
grouped[station_stat.station_id][station_stat.app_start_dt].append(station_stat)
# Compute new queries
corrected_queries: List[CorrectedQuery] = []
station_id: str
for station_id in grouped:
app_start_dt: datetime
for app_start_dt in grouped[station_id]:
stats: List["StationStat"] = grouped[station_id][app_start_dt]
# No stats, return the original query
if len(stats) == 0:
corrected_queries.append(
CorrectedQuery(station_id, start_ts, end_ts, start_ts, end_ts)
)
continue
# Compute new offsets
timing_offsets: Optional["TimingOffsets"] = compute_offsets(stats)
# No offsets, return the original query
if timing_offsets is None:
corrected_queries.append(
CorrectedQuery(station_id, start_ts, end_ts, start_ts, end_ts)
)
continue
# Compute new offsets
corrected_start_ts: float = (
start_ts + timing_offsets.start_offset.total_seconds()
)
corrected_end_ts: float = end_ts + timing_offsets.end_offset.total_seconds()
corrected_queries.append(
CorrectedQuery(
station_id, start_ts, end_ts, corrected_start_ts, corrected_end_ts
)
)
return corrected_queries
Functions
def correct_query_timing(client: CloudClient, start_ts: int, end_ts: int, station_ids: List[str]) ‑> Optional[List[CorrectedQuery]]
-
Corrects the timing for a given cloud range query. :param client: An instance of a cloud client. :param start_ts: The start of the query window. :param end_ts: The end of the query window. :param station_ids: A list of station IDs in the query. :return: A list of query corrections per station and per app start time.
Expand source code
def correct_query_timing( client: "CloudClient", start_ts: int, end_ts: int, station_ids: List[str] ) -> Optional[List[CorrectedQuery]]: """ Corrects the timing for a given cloud range query. :param client: An instance of a cloud client. :param start_ts: The start of the query window. :param end_ts: The end of the query window. :param station_ids: A list of station IDs in the query. :return: A list of query corrections per station and per app start time. """ station_stats_resp: Optional["StationStatsResp"] = client.request_station_stats( start_ts, end_ts, station_ids ) station_stats: List[StationStat] = station_stats_resp.station_stats if station_stats_resp is None or len(station_stats) == 0: return None # Ensure sorted station_stats.sort(key=lambda stat: stat.packet_start_dt) # Group by station ID and then app start time grouped: Dict[str, Dict[datetime, List["StationStat"]]] = defaultdict( lambda: defaultdict(list) ) station_stat: "StationStat" for station_stat in station_stats: grouped[station_stat.station_id][station_stat.app_start_dt].append(station_stat) # Compute new queries corrected_queries: List[CorrectedQuery] = [] station_id: str for station_id in grouped: app_start_dt: datetime for app_start_dt in grouped[station_id]: stats: List["StationStat"] = grouped[station_id][app_start_dt] # No stats, return the original query if len(stats) == 0: corrected_queries.append( CorrectedQuery(station_id, start_ts, end_ts, start_ts, end_ts) ) continue # Compute new offsets timing_offsets: Optional["TimingOffsets"] = compute_offsets(stats) # No offsets, return the original query if timing_offsets is None: corrected_queries.append( CorrectedQuery(station_id, start_ts, end_ts, start_ts, end_ts) ) continue # Compute new offsets corrected_start_ts: float = ( start_ts + timing_offsets.start_offset.total_seconds() ) corrected_end_ts: float = end_ts + timing_offsets.end_offset.total_seconds() corrected_queries.append( CorrectedQuery( station_id, start_ts, end_ts, corrected_start_ts, corrected_end_ts ) ) return corrected_queries
Classes
class CorrectedQuery (station_id: str, original_start_ts: float, original_end_ts: float, corrected_start_ts: float, corrected_end_ts: float)
-
A set of fields required for a corrected timing query.
Expand source code
@dataclass class CorrectedQuery: """ A set of fields required for a corrected timing query. """ station_id: str original_start_ts: float original_end_ts: float corrected_start_ts: float corrected_end_ts: float def start_offset(self) -> float: """ Computes the start offset. :return: The start offset. """ return self.corrected_start_ts - self.original_start_ts def end_offset(self) -> float: """ Computes the end offset. :return: The end offset. """ return self.corrected_end_ts - self.original_end_ts
Class variables
var corrected_end_ts : float
var corrected_start_ts : float
var original_end_ts : float
var original_start_ts : float
var station_id : str
Methods
def end_offset(self) ‑> float
-
Computes the end offset. :return: The end offset.
Expand source code
def end_offset(self) -> float: """ Computes the end offset. :return: The end offset. """ return self.corrected_end_ts - self.original_end_ts
def start_offset(self) ‑> float
-
Computes the start offset. :return: The start offset.
Expand source code
def start_offset(self) -> float: """ Computes the start offset. :return: The start offset. """ return self.corrected_start_ts - self.original_start_ts