Module redvox.common.offset_model
Offset model used to predict offset change over a period of time
Author: Samuel Kei Takazawa Maintained by: Tyler Yoshiyama
Expand source code
"""
Offset model used to predict offset change over a period of time
Author: Samuel Kei Takazawa
Maintained by: Tyler Yoshiyama
"""
from datetime import timedelta, datetime
from typing import Tuple, Optional, List, TYPE_CHECKING
import numpy as np
import pandas as pd
from dataclasses import dataclass
from sklearn.linear_model import LinearRegression
if TYPE_CHECKING:
from redvox.common.file_statistics import StationStat
import redvox.common.date_time_utils as dt_utils
MIN_VALID_LATENCY_MICROS = 100 # minimum value of latency before it's unreliable
DEFAULT_SAMPLES = 3 # default number of samples per bin
MIN_SAMPLES = 3 # minimum number of samples per 5 minutes for reliable data
MIN_TIMESYNC_DURATION_MIN = 5 # minimum number of minutes of data required to produce reliable results
GPS_LATENCY_MICROS = 60000 # estimated latency for GPS communications
__MIN_VALID_LATENCY_MICROS: Optional[float] = MIN_VALID_LATENCY_MICROS
__MIN_SAMPLES: Optional[float] = MIN_SAMPLES
__MIN_TIMESYNC_DURATION_MIN: Optional[float] = MIN_TIMESYNC_DURATION_MIN
def set_min_valid_latency_micros(new_min: float):
"""
sets the minimum latency in microseconds for data to be considered valid
can't be less than 0, any value below 0 is converted to 0
:param new_min: new minimum value
"""
global __MIN_VALID_LATENCY_MICROS
__MIN_VALID_LATENCY_MICROS = np.fmax(new_min, 0)
def get_min_valid_latency_micros() -> float:
"""
:return: the minimum latency in microseconds for data to be considered valid
"""
global __MIN_VALID_LATENCY_MICROS
if __MIN_VALID_LATENCY_MICROS is None:
__MIN_VALID_LATENCY_MICROS = MIN_VALID_LATENCY_MICROS
return __MIN_VALID_LATENCY_MICROS
def set_min_samples(new_min: int):
"""
sets the minimum number of samples per bin for reliable results
can't be less than 3, anything below 3 is converted to 3
:param new_min: new minimum value
"""
global __MIN_SAMPLES
__MIN_SAMPLES = np.fmax(new_min, 3)
def get_min_samples() -> int:
"""
:return: the minimum number of samples per bin for reliable results
"""
global __MIN_SAMPLES
if __MIN_SAMPLES is None:
__MIN_SAMPLES = MIN_SAMPLES
return __MIN_SAMPLES
def set_min_timesync_dur(new_min: int):
"""
sets the minimum duration in minutes of a bin for reliable results
can't be less than 5, anything below 5 is converted to 5
:param new_min: new minimum value
"""
global __MIN_TIMESYNC_DURATION_MIN
__MIN_TIMESYNC_DURATION_MIN = np.fmax(new_min, 5)
def get_min_timesync_dur() -> int:
"""
:return: the minimum duration in minutes of a bin for reliable results
"""
global __MIN_TIMESYNC_DURATION_MIN
if __MIN_TIMESYNC_DURATION_MIN is None:
__MIN_TIMESYNC_DURATION_MIN = MIN_TIMESYNC_DURATION_MIN
return __MIN_TIMESYNC_DURATION_MIN
class OffsetModel:
"""
Offset model which represents the change in offset over a period of time
* All timestamps are in microseconds since epoch UTC.
* Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept)
* Invalidates latencies that are below our recognition threshold MIN_VALID_LATENCY_MICROS
* The data is binned by k_bins in equally spaced times; in each bin the n_samples best latencies are taken to get
the weighted linear regression.
* If given zero latencies, but an equal number of offsets and timestamps, it will assume you are giving it GPS data
and will put all values into a single bin with equal weights on all values.
Properties:
start_time: float, start timestamp of model in microseconds since epoch UTC
end_time: float, end timestamp of model in microseconds since epoch UTC
k_bins: int, the number of data bins used to create the model, default is 1 if model is empty
n_samples: int, the number of samples per data bin; default is 3 (minimum to create a balanced line)
slope: float, the slope of the change in offset
intercept: float, the offset at start_time
score: float, R2 value of the model; 1.0 is best, 0.0 is worst
mean_latency: float, mean latency
std_dev_latency: float, latency standard deviation
debug: boolean, if True, output additional information when running the OffsetModel, default False
min_valid_latency_us: float, the minimum latency in microseconds to be used in the model. default 100
min_samples_per_bin: int, the minimum number of samples per bin of data for the model to be reliable.
default 3
min_timesync_dur_min: int, the minimum number of minutes of data for the model to be reliable. default 5
"""
def __init__(
self,
latencies: np.ndarray,
offsets: np.ndarray,
times: np.ndarray,
start_time: float,
end_time: float,
n_samples: int = DEFAULT_SAMPLES,
debug: bool = False,
min_valid_latency_us: Optional[float] = None,
min_samples_per_bin: Optional[int] = None,
min_timesync_dur_min: Optional[int] = None,
):
"""
Create an OffsetModel
:param latencies: latencies within the time specified
:param offsets: offsets that correspond to the latencies
:param times: timestamps that correspond to the latencies
:param start_time: model's start timestamp in microseconds since epoch utc
:param end_time: model's end timestamp in microseconds since epoch utc
:param n_samples: number of samples per bin, default 3
:param debug: boolean for additional output when running OffsetModel, default False
:param min_valid_latency_us: the minimum latency in microseconds to be used in the model. default 100
:param min_samples_per_bin: the minimum number of samples per bin of data for the model to be reliable.
default 3
:param min_timesync_dur_min: the minimum number of minutes of data for the model to be reliable. default 5
"""
self.start_time = start_time
self.end_time = end_time
self.k_bins = get_bins_per_5min(start_time, end_time)
self.n_samples = n_samples
self.debug = debug
self.min_valid_latency_micros = (
get_min_valid_latency_micros() if min_valid_latency_us is None else min_valid_latency_us
)
self.min_samples_per_bin = get_min_samples() if min_samples_per_bin is None else min_samples_per_bin
self.min_timesync_dur_min = get_min_timesync_dur() if min_timesync_dur_min is None else min_timesync_dur_min
use_bins = True
if len(latencies) > 0:
latencies = np.where(latencies < self.min_valid_latency_micros, np.nan, latencies)
use_model = timesync_quality_check(
latencies, start_time, end_time, self.debug, self.min_timesync_dur_min, self.min_samples_per_bin
)
elif len(offsets) > 0 and len(offsets) == len(times):
latencies = np.full(len(offsets), GPS_LATENCY_MICROS)
use_model = True
use_bins = False
else:
use_model = False
if use_model:
# Organize the data into a data frame
full_df = pd.DataFrame(data=times, columns=["times"])
full_df["latencies"] = latencies
full_df["offsets"] = offsets
if use_bins:
# Get the index for the separations (add +1 to k_bins so that there would be k_bins bins)
bin_times = np.linspace(start_time, end_time, self.k_bins + 1)
# Make the dataframe with the data with n_samples per bins
binned_df = get_binned_df(full_df=full_df, bin_times=bin_times, n_samples=n_samples)
else:
# everything is in one bin
binned_df = full_df.sort_values(by=["times"])
# Compute the weighted linear regression
self.slope, zero_intercept, self.score = offset_weighted_linear_regression(
latencies=binned_df["latencies"].values,
offsets=binned_df["offsets"].values,
times=binned_df["times"].values,
)
# Get offset relative to the first time
self.intercept = get_offset_at_new_time(
new_time=start_time,
slope=self.slope,
intercept=zero_intercept,
model_time=0,
)
self.mean_latency = np.nanmean(binned_df["latencies"].values)
self.std_dev_latency = np.nanstd(binned_df["latencies"].values)
# slope == 0 means constant offset, so if slope is not 0, model is good.
use_model = self.slope != 0.0
# if data or model is not sufficient, use the offset corresponding to the lowest latency:
if not use_model:
self.score = 0.0
self.slope = 0.0
if all(np.nan_to_num(latencies) == 0.0):
self.intercept = 0.0
self.mean_latency = 0.0
self.std_dev_latency = 0.0
else:
best_latency = np.nanmin(latencies[np.nonzero(latencies)])
self.intercept = offsets[np.argwhere(latencies == best_latency)[0][0]]
self.mean_latency = np.nanmean(latencies)
self.std_dev_latency = np.nanstd(latencies)
def __repr__(self):
return (
f"start_time: {self.start_time}, "
f"end_time: {self.end_time}, "
f"k_bins: {self.k_bins}, "
f"n_samples: {self.n_samples}, "
f"slope: {self.slope}, "
f"intercept: {self.intercept}, "
f"score: {self.score}, "
f"mean_latency: {self.mean_latency}, "
f"std_dev_latency: {self.std_dev_latency}, "
f"min_valid_latency_micros: {self.min_valid_latency_micros}, "
f"min_samples_per_bin: {self.min_samples_per_bin}, "
f"min_timesync_dur_min: {self.min_timesync_dur_min}, "
f"debug: {self.debug}"
)
def __str__(self):
return (
f"start_time: {self.start_time}, "
f"end_time: {self.end_time}, "
f"k_bins: {self.k_bins}, "
f"n_samples: {self.n_samples}, "
f"slope: {self.slope}, "
f"intercept: {self.intercept}, "
f"score: {self.score}, "
f"mean_latency: {self.mean_latency}, "
f"std_dev_latency: {self.std_dev_latency}, "
f"min_valid_latency_micros: {self.min_valid_latency_micros}, "
f"min_samples_per_bin: {self.min_samples_per_bin}, "
f"min_timesync_dur_min: {self.min_timesync_dur_min}"
)
def as_dict(self) -> dict:
"""
:return: OffsetModel as a dictionary
"""
return {
"start_time": self.start_time,
"end_time": self.end_time,
"k_bins": self.k_bins,
"n_samples": self.n_samples,
"slope": self.slope,
"intercept": self.intercept,
"score": self.score,
"mean_latency": self.mean_latency,
"std_dev_latency": self.std_dev_latency,
"min_valid_latency_micros": self.min_valid_latency_micros,
"min_samples_per_bin": self.min_samples_per_bin,
"min_timesync_dur_min": self.min_timesync_dur_min,
"debug": self.debug,
}
@staticmethod
def from_dict(data: dict) -> "OffsetModel":
"""
create OffsetModel from a dictionary
:param data: dictionary to read
:return: OffsetModel
"""
result = OffsetModel.empty_model()
result.start_time = data["start_time"]
result.end_time = data["end_time"]
result.k_bins = data["k_bins"]
result.n_samples = data["n_samples"]
result.slope = data["slope"]
result.intercept = data["intercept"]
result.score = data["score"]
result.mean_latency = data["mean_latency"]
result.std_dev_latency = data["std_dev_latency"]
result.min_valid_latency_micros = data["min_valid_latency_micros"]
result.min_samples_per_bin = data["min_samples_per_bin"]
result.min_timesync_dur_min = data["min_timesync_dur_min"]
result.debug = data["debug"]
return result
@staticmethod
def empty_model() -> "OffsetModel":
"""
:return: an empty model with default values
"""
return OffsetModel(np.array([]), np.array([]), np.array([]), 0, 0)
def get_offset_at_time(self, time: float) -> float:
"""
Gets offset at time based on the offset model.
:param time: The time to get the new offset for
:return: new offset corresponding to the time
"""
return get_offset_at_new_time(time, self.slope, self.intercept, self.start_time)
def update_time(self, time: float, use_model_function: bool = True) -> float:
"""
update time based on the offset model.
:param time: The time to update
:param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True
:return: updated time
"""
return time + (self.get_offset_at_time(time) if use_model_function else self.intercept)
def update_timestamps(self, timestamps: np.array, use_model_function: bool = True) -> np.array:
"""
updates a list of timestamps
:param timestamps: timestamps to update
:param use_model_function: if True, use the slope of the model if it's not 0. default True
:return: updated list of timestamps
"""
if use_model_function and self.slope != 0.0:
return [self.update_time(t) for t in timestamps]
return [t + self.intercept for t in timestamps]
def get_original_time(self, time: float, use_model_function: bool = True) -> float:
"""
reverse the updated time to the unaltered value
:param time: time to update
:param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True
:return: unaltered, original time
"""
if use_model_function:
return (self.slope * self.start_time + time - self.intercept) / (1 + self.slope)
return time - self.intercept
# Method to get number of bins
def get_bins_per_5min(start_time: float, end_time: float) -> int:
"""
Calculates number of bins needed for roughly 5 minute bins.
k_bins = int((end_time - start_time) / (300 * 1e6) + 1)
:param start_time: the time used to compute the intercept (offset) and time bins; use start time of first packet
:param end_time: the time used to compute the time bins; use start time of last packet + packet duration
:return: number of bins to use for offset model
"""
# Divide the duration by 5 minutes
return int((end_time - start_time) / (1e6 * 300) + 1)
# min max scaling for the weights
def minmax_scale(data: np.ndarray) -> np.ndarray:
"""
:param data: the data to be scaled
:return: data scaled by subtracting the min value and dividing by (max - min) value.
"""
# Use np.nanmin and np.nanmax to avoid issues with nan values
return (data - np.nanmin(data)) / (np.nanmax(data) - np.nanmin(data))
# The score for Weighted Linear Regression Function
def get_wlr_score(model: LinearRegression, offsets: np.ndarray, times: np.ndarray, weights: np.ndarray) -> float:
"""
Computes and returns a R2 score for the weighted linear regression using sklearn's score method.
The best value is 1.0, and 0.0 corresponds to a function with no slope.
Negative values are also adjusted to be 0.0.
:param model: The linear regression model
:param offsets: array of offsets corresponding to the best latencies per packet
:param times: array of device times corresponding to the best latencies per packet
:param weights: weights used to compute the weighted linear regression
:return: score
"""
# Get predicted offsets of the model
predicted_offsets = model.predict(X=times.reshape(-1, 1))
# Compute the score
score = model.score(X=predicted_offsets, y=offsets, sample_weight=weights)
# Adjust the score so negative values are cast to 0.0
return np.max([score, 0.0])
# The Weighted Linear Regression Function for offsets
def offset_weighted_linear_regression(
latencies: np.ndarray, offsets: np.ndarray, times: np.ndarray
) -> Tuple[float, float, float]:
"""
Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept)
The intercept is based on first UTC time 0, all units are in microseconds
The function uses sklearn's LinearRegression with sample weights, and also returns the R2 score.
:param latencies: array of the best latencies per packet
:param offsets: array of offsets corresponding to the best latencies per packet
:param times: array of device times corresponding to the best latencies per packet
:return: slope, intercept, score
"""
if all(np.isnan(latencies)):
return 0.0, 0.0, 0.0
else:
# remove nan values for sklearn sake
times = times[~np.isnan(latencies)]
offsets = offsets[~np.isnan(latencies)]
latencies = latencies[~np.isnan(latencies)]
# Compute the weights for the linear regression by the latencies
latencies_ms = latencies / 1e3
weights = latencies_ms**-2
if np.all(weights == weights[0]):
norm_weights = None
else:
norm_weights = minmax_scale(weights)
# Set up the weighted linear regression
wls = LinearRegression()
wls.fit(X=times.reshape(-1, 1), y=offsets.reshape(-1, 1), sample_weight=norm_weights)
# get the score of the model
score = get_wlr_score(model=wls, offsets=offsets, times=times, weights=norm_weights)
# return the slope and intercept
return wls.coef_[0][0], wls.intercept_[0], score
def simple_offset_weighted_linear_regression(offsets: np.ndarray, times: np.ndarray) -> Tuple[float, float]:
"""
Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept)
for GPS timestamps vs device timestamps
The intercept is based on first UTC time 0, all units are in microseconds
The function uses sklearn's LinearRegression with no weights.
:param offsets: array of offsets
:param times: array of device times used to get the offsets
:return: slope of the model line, offset intercept at UTC 0
"""
# Set up the linear regression
wls = LinearRegression()
wls.fit(X=times.reshape(-1, 1), y=offsets.reshape(-1, 1))
intercept = get_offset_at_new_time(
new_time=times[0],
slope=wls.coef_[0][0],
intercept=wls.intercept_[0],
model_time=0,
)
# return the slope and intercept
return wls.coef_[0][0], intercept
# Function to correct the intercept value
def get_offset_at_new_time(new_time: float, slope: float, intercept: float, model_time: float) -> float:
"""
Gets offset at new_time time based on the offset model.
:param new_time: The time to get the new offset at
:param slope: slope of the offset model
:param intercept: the intercept of the offset model relative to the model_time
:param model_time: the starting time corresponding to the intercept of the offset model
:return: new offset at the new_time
"""
# get the time difference
time_diff = new_time - model_time
# apply the offset model to get new intercept
new_offset = slope * time_diff + intercept
return new_offset
# Function to get the subset data frame to do the weighted linear regression
def get_binned_df(full_df: pd.DataFrame, bin_times: np.ndarray, n_samples: float) -> pd.DataFrame:
"""
Returns a subset of the full_df with n_samples per binned times.
nan latencies values will be ignored.
:param full_df: pandas DataFrame containing latencies, offsets, and times.
:param bin_times: array of edge times for each bin
:param n_samples: number of samples to take per bin
:return: binned_df
"""
# Initialize the data frame
binned_df = pd.DataFrame()
# Loop through each bin and get the n smallest samples
for i in range(len(bin_times) - 1):
# select the time range
select_df = full_df[full_df["times"] < bin_times[i + 1]]
select_df = select_df[select_df["times"] > bin_times[i]]
# select n_samples smallest values (ignores nan values)
n_smallest = select_df.nsmallest(n_samples, "latencies")
# append the n_smallest entries
binned_df = pd.concat([binned_df, n_smallest])
# Sort the binned_df by time
binned_df = binned_df.sort_values(by=["times"])
return binned_df
def timesync_quality_check(
latencies: np.ndarray,
start_time: float,
end_time: float,
debug: bool = False,
min_timesync_dur_mins: Optional[int] = None,
min_samples: Optional[int] = None,
) -> bool:
"""
Checks quality of timesync data to determine if offset model should be used.
The following list is the quality check:
If timesync duration is longer than min_timesync_dur_mins (default 5) min
If there are min_samples (default 3) latency values (non-nan) per 5 minutes on average
Returns False if the data quality is not up to "standards".
:param latencies: array of the best latencies per packet
:param start_time: the time used to compute the intercept (offset) and time bins; use start time of first packet
:param end_time: the time used to compute the time bins; use start time of last packet + packet duration
:param debug: if True, reason for failing quality check is printed, default False
:param min_timesync_dur_mins: minimum number of minutes for result to be reliable
:param min_samples: minimum number of samples per bin
:return: True if timesync data passes all quality checks, False otherwise
"""
if min_timesync_dur_mins is None:
min_timesync_dur_mins = get_min_timesync_dur()
if min_samples is None:
min_samples = get_min_samples()
# Check the Duration of the signal of interest
duration_min = (end_time - start_time) / (1e6 * 60)
if duration_min < min_timesync_dur_mins:
if debug:
print(f"Timesync data duration less than {min_timesync_dur_mins} min")
return False
# Check average number of points per 5 min (pretty arbitrary, but maybe 3 points per 5 min)
points_per_5min = 5 * np.count_nonzero(~np.isnan(latencies)) / duration_min
if points_per_5min < min_samples:
if debug:
print(f"Less than {min_samples} of timesync data per 5 min")
return False
# Return True if it meets the above criteria
return True
@dataclass
class TimingOffsets:
"""
Represents the start and end offsets of a timing corrected window.
"""
start_offset: timedelta
adjusted_start: datetime
end_offset: timedelta
adjusted_end: datetime
def mapf(val: Optional[float]) -> float:
"""
Maps an optional float to floats by replacing Nones with NaNs.
:param val: Float value to map.
:return: The mapped float.
"""
if val is None or np.isnan(val):
return np.nan
return val
def compute_offsets(station_stats: List["StationStat"]) -> Optional[TimingOffsets]:
"""
Computes the offsets from the provided station statistics.
:param station_stats: Statistics to compute offsets from.
:return: Timing offset information or None if there are no offsets or there is an error.
"""
# Preallocate the data arrays.
latencies: np.ndarray = np.zeros(len(station_stats), float)
offsets: np.ndarray = np.zeros(len(station_stats), float)
times: np.ndarray = np.zeros(len(station_stats), float)
# Extract data or return early on data error
i: int
stat: "StationStat"
for i, stat in enumerate(station_stats):
if stat.packet_duration == 0.0 or not stat.packet_duration:
return None
latencies[i] = mapf(stat.latency)
offsets[i] = mapf(stat.offset)
times[i] = stat.best_latency_timestamp
if len(latencies) == 0:
return None
# Prep clock model
start_dt: datetime = station_stats[0].packet_start_dt
end_dt: datetime = station_stats[-1].packet_start_dt + station_stats[-1].packet_duration
start_time: float = dt_utils.datetime_to_epoch_microseconds_utc(start_dt)
end_time: float = dt_utils.datetime_to_epoch_microseconds_utc(end_dt)
model: OffsetModel = OffsetModel(latencies, offsets, times, start_time, end_time)
# Compute new start and end offsets
start_offset: timedelta = timedelta(microseconds=model.get_offset_at_time(start_time))
end_offset: timedelta = timedelta(microseconds=model.get_offset_at_time(end_time))
return TimingOffsets(start_offset, start_dt + start_offset, end_offset, end_dt + end_offset)
def model_from_stats(station_stats: List["StationStat"]) -> Optional[OffsetModel]:
"""
Computes the offset model from the provided station statistics.
:param station_stats: Statistics to compute model from.
:return: OffsetModel or None if there are no offsets or there is an error.
"""
# Preallocate the data arrays.
latencies: np.ndarray = np.zeros(len(station_stats), float)
offsets: np.ndarray = np.zeros(len(station_stats), float)
times: np.ndarray = np.zeros(len(station_stats), float)
# Extract data or return early on data error
i: int
stat: "StationStat"
for i, stat in enumerate(station_stats):
if stat.packet_duration == 0.0 or not stat.packet_duration:
return None
latencies[i] = mapf(stat.latency)
offsets[i] = mapf(stat.offset)
times[i] = stat.best_latency_timestamp
if len(latencies) == 0:
return None
# Prep clock model
start_dt: datetime = station_stats[0].packet_start_dt
end_dt: datetime = station_stats[-1].packet_start_dt + station_stats[-1].packet_duration
start_time: float = dt_utils.datetime_to_epoch_microseconds_utc(start_dt)
end_time: float = dt_utils.datetime_to_epoch_microseconds_utc(end_dt)
return OffsetModel(latencies, offsets, times, start_time, end_time)
Functions
def compute_offsets(station_stats: List[ForwardRef('StationStat')]) ‑> Optional[TimingOffsets]
-
Computes the offsets from the provided station statistics.
:param station_stats: Statistics to compute offsets from. :return: Timing offset information or None if there are no offsets or there is an error.
Expand source code
def compute_offsets(station_stats: List["StationStat"]) -> Optional[TimingOffsets]: """ Computes the offsets from the provided station statistics. :param station_stats: Statistics to compute offsets from. :return: Timing offset information or None if there are no offsets or there is an error. """ # Preallocate the data arrays. latencies: np.ndarray = np.zeros(len(station_stats), float) offsets: np.ndarray = np.zeros(len(station_stats), float) times: np.ndarray = np.zeros(len(station_stats), float) # Extract data or return early on data error i: int stat: "StationStat" for i, stat in enumerate(station_stats): if stat.packet_duration == 0.0 or not stat.packet_duration: return None latencies[i] = mapf(stat.latency) offsets[i] = mapf(stat.offset) times[i] = stat.best_latency_timestamp if len(latencies) == 0: return None # Prep clock model start_dt: datetime = station_stats[0].packet_start_dt end_dt: datetime = station_stats[-1].packet_start_dt + station_stats[-1].packet_duration start_time: float = dt_utils.datetime_to_epoch_microseconds_utc(start_dt) end_time: float = dt_utils.datetime_to_epoch_microseconds_utc(end_dt) model: OffsetModel = OffsetModel(latencies, offsets, times, start_time, end_time) # Compute new start and end offsets start_offset: timedelta = timedelta(microseconds=model.get_offset_at_time(start_time)) end_offset: timedelta = timedelta(microseconds=model.get_offset_at_time(end_time)) return TimingOffsets(start_offset, start_dt + start_offset, end_offset, end_dt + end_offset)
def get_binned_df(full_df: pandas.core.frame.DataFrame, bin_times: numpy.ndarray, n_samples: float) ‑> pandas.core.frame.DataFrame
-
Returns a subset of the full_df with n_samples per binned times. nan latencies values will be ignored.
:param full_df: pandas DataFrame containing latencies, offsets, and times. :param bin_times: array of edge times for each bin :param n_samples: number of samples to take per bin :return: binned_df
Expand source code
def get_binned_df(full_df: pd.DataFrame, bin_times: np.ndarray, n_samples: float) -> pd.DataFrame: """ Returns a subset of the full_df with n_samples per binned times. nan latencies values will be ignored. :param full_df: pandas DataFrame containing latencies, offsets, and times. :param bin_times: array of edge times for each bin :param n_samples: number of samples to take per bin :return: binned_df """ # Initialize the data frame binned_df = pd.DataFrame() # Loop through each bin and get the n smallest samples for i in range(len(bin_times) - 1): # select the time range select_df = full_df[full_df["times"] < bin_times[i + 1]] select_df = select_df[select_df["times"] > bin_times[i]] # select n_samples smallest values (ignores nan values) n_smallest = select_df.nsmallest(n_samples, "latencies") # append the n_smallest entries binned_df = pd.concat([binned_df, n_smallest]) # Sort the binned_df by time binned_df = binned_df.sort_values(by=["times"]) return binned_df
def get_bins_per_5min(start_time: float, end_time: float) ‑> int
-
Calculates number of bins needed for roughly 5 minute bins. k_bins = int((end_time - start_time) / (300 * 1e6) + 1) :param start_time: the time used to compute the intercept (offset) and time bins; use start time of first packet
:param end_time: the time used to compute the time bins; use start time of last packet + packet duration :return: number of bins to use for offset model
Expand source code
def get_bins_per_5min(start_time: float, end_time: float) -> int: """ Calculates number of bins needed for roughly 5 minute bins. k_bins = int((end_time - start_time) / (300 * 1e6) + 1) :param start_time: the time used to compute the intercept (offset) and time bins; use start time of first packet :param end_time: the time used to compute the time bins; use start time of last packet + packet duration :return: number of bins to use for offset model """ # Divide the duration by 5 minutes return int((end_time - start_time) / (1e6 * 300) + 1)
def get_min_samples() ‑> int
-
:return: the minimum number of samples per bin for reliable results
Expand source code
def get_min_samples() -> int: """ :return: the minimum number of samples per bin for reliable results """ global __MIN_SAMPLES if __MIN_SAMPLES is None: __MIN_SAMPLES = MIN_SAMPLES return __MIN_SAMPLES
def get_min_timesync_dur() ‑> int
-
:return: the minimum duration in minutes of a bin for reliable results
Expand source code
def get_min_timesync_dur() -> int: """ :return: the minimum duration in minutes of a bin for reliable results """ global __MIN_TIMESYNC_DURATION_MIN if __MIN_TIMESYNC_DURATION_MIN is None: __MIN_TIMESYNC_DURATION_MIN = MIN_TIMESYNC_DURATION_MIN return __MIN_TIMESYNC_DURATION_MIN
def get_min_valid_latency_micros() ‑> float
-
:return: the minimum latency in microseconds for data to be considered valid
Expand source code
def get_min_valid_latency_micros() -> float: """ :return: the minimum latency in microseconds for data to be considered valid """ global __MIN_VALID_LATENCY_MICROS if __MIN_VALID_LATENCY_MICROS is None: __MIN_VALID_LATENCY_MICROS = MIN_VALID_LATENCY_MICROS return __MIN_VALID_LATENCY_MICROS
def get_offset_at_new_time(new_time: float, slope: float, intercept: float, model_time: float) ‑> float
-
Gets offset at new_time time based on the offset model.
:param new_time: The time to get the new offset at :param slope: slope of the offset model :param intercept: the intercept of the offset model relative to the model_time :param model_time: the starting time corresponding to the intercept of the offset model :return: new offset at the new_time
Expand source code
def get_offset_at_new_time(new_time: float, slope: float, intercept: float, model_time: float) -> float: """ Gets offset at new_time time based on the offset model. :param new_time: The time to get the new offset at :param slope: slope of the offset model :param intercept: the intercept of the offset model relative to the model_time :param model_time: the starting time corresponding to the intercept of the offset model :return: new offset at the new_time """ # get the time difference time_diff = new_time - model_time # apply the offset model to get new intercept new_offset = slope * time_diff + intercept return new_offset
def get_wlr_score(model: sklearn.linear_model._base.LinearRegression, offsets: numpy.ndarray, times: numpy.ndarray, weights: numpy.ndarray) ‑> float
-
Computes and returns a R2 score for the weighted linear regression using sklearn's score method. The best value is 1.0, and 0.0 corresponds to a function with no slope. Negative values are also adjusted to be 0.0.
:param model: The linear regression model :param offsets: array of offsets corresponding to the best latencies per packet :param times: array of device times corresponding to the best latencies per packet :param weights: weights used to compute the weighted linear regression :return: score
Expand source code
def get_wlr_score(model: LinearRegression, offsets: np.ndarray, times: np.ndarray, weights: np.ndarray) -> float: """ Computes and returns a R2 score for the weighted linear regression using sklearn's score method. The best value is 1.0, and 0.0 corresponds to a function with no slope. Negative values are also adjusted to be 0.0. :param model: The linear regression model :param offsets: array of offsets corresponding to the best latencies per packet :param times: array of device times corresponding to the best latencies per packet :param weights: weights used to compute the weighted linear regression :return: score """ # Get predicted offsets of the model predicted_offsets = model.predict(X=times.reshape(-1, 1)) # Compute the score score = model.score(X=predicted_offsets, y=offsets, sample_weight=weights) # Adjust the score so negative values are cast to 0.0 return np.max([score, 0.0])
def mapf(val: Optional[float]) ‑> float
-
Maps an optional float to floats by replacing Nones with NaNs.
:param val: Float value to map. :return: The mapped float.
Expand source code
def mapf(val: Optional[float]) -> float: """ Maps an optional float to floats by replacing Nones with NaNs. :param val: Float value to map. :return: The mapped float. """ if val is None or np.isnan(val): return np.nan return val
def minmax_scale(data: numpy.ndarray) ‑> numpy.ndarray
-
:param data: the data to be scaled :return: data scaled by subtracting the min value and dividing by (max - min) value.
Expand source code
def minmax_scale(data: np.ndarray) -> np.ndarray: """ :param data: the data to be scaled :return: data scaled by subtracting the min value and dividing by (max - min) value. """ # Use np.nanmin and np.nanmax to avoid issues with nan values return (data - np.nanmin(data)) / (np.nanmax(data) - np.nanmin(data))
def model_from_stats(station_stats: List[ForwardRef('StationStat')]) ‑> Optional[OffsetModel]
-
Computes the offset model from the provided station statistics.
:param station_stats: Statistics to compute model from. :return: OffsetModel or None if there are no offsets or there is an error.
Expand source code
def model_from_stats(station_stats: List["StationStat"]) -> Optional[OffsetModel]: """ Computes the offset model from the provided station statistics. :param station_stats: Statistics to compute model from. :return: OffsetModel or None if there are no offsets or there is an error. """ # Preallocate the data arrays. latencies: np.ndarray = np.zeros(len(station_stats), float) offsets: np.ndarray = np.zeros(len(station_stats), float) times: np.ndarray = np.zeros(len(station_stats), float) # Extract data or return early on data error i: int stat: "StationStat" for i, stat in enumerate(station_stats): if stat.packet_duration == 0.0 or not stat.packet_duration: return None latencies[i] = mapf(stat.latency) offsets[i] = mapf(stat.offset) times[i] = stat.best_latency_timestamp if len(latencies) == 0: return None # Prep clock model start_dt: datetime = station_stats[0].packet_start_dt end_dt: datetime = station_stats[-1].packet_start_dt + station_stats[-1].packet_duration start_time: float = dt_utils.datetime_to_epoch_microseconds_utc(start_dt) end_time: float = dt_utils.datetime_to_epoch_microseconds_utc(end_dt) return OffsetModel(latencies, offsets, times, start_time, end_time)
def offset_weighted_linear_regression(latencies: numpy.ndarray, offsets: numpy.ndarray, times: numpy.ndarray) ‑> Tuple[float, float, float]
-
Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept) The intercept is based on first UTC time 0, all units are in microseconds The function uses sklearn's LinearRegression with sample weights, and also returns the R2 score.
:param latencies: array of the best latencies per packet :param offsets: array of offsets corresponding to the best latencies per packet :param times: array of device times corresponding to the best latencies per packet :return: slope, intercept, score
Expand source code
def offset_weighted_linear_regression( latencies: np.ndarray, offsets: np.ndarray, times: np.ndarray ) -> Tuple[float, float, float]: """ Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept) The intercept is based on first UTC time 0, all units are in microseconds The function uses sklearn's LinearRegression with sample weights, and also returns the R2 score. :param latencies: array of the best latencies per packet :param offsets: array of offsets corresponding to the best latencies per packet :param times: array of device times corresponding to the best latencies per packet :return: slope, intercept, score """ if all(np.isnan(latencies)): return 0.0, 0.0, 0.0 else: # remove nan values for sklearn sake times = times[~np.isnan(latencies)] offsets = offsets[~np.isnan(latencies)] latencies = latencies[~np.isnan(latencies)] # Compute the weights for the linear regression by the latencies latencies_ms = latencies / 1e3 weights = latencies_ms**-2 if np.all(weights == weights[0]): norm_weights = None else: norm_weights = minmax_scale(weights) # Set up the weighted linear regression wls = LinearRegression() wls.fit(X=times.reshape(-1, 1), y=offsets.reshape(-1, 1), sample_weight=norm_weights) # get the score of the model score = get_wlr_score(model=wls, offsets=offsets, times=times, weights=norm_weights) # return the slope and intercept return wls.coef_[0][0], wls.intercept_[0], score
def set_min_samples(new_min: int)
-
sets the minimum number of samples per bin for reliable results can't be less than 3, anything below 3 is converted to 3
:param new_min: new minimum value
Expand source code
def set_min_samples(new_min: int): """ sets the minimum number of samples per bin for reliable results can't be less than 3, anything below 3 is converted to 3 :param new_min: new minimum value """ global __MIN_SAMPLES __MIN_SAMPLES = np.fmax(new_min, 3)
def set_min_timesync_dur(new_min: int)
-
sets the minimum duration in minutes of a bin for reliable results can't be less than 5, anything below 5 is converted to 5
:param new_min: new minimum value
Expand source code
def set_min_timesync_dur(new_min: int): """ sets the minimum duration in minutes of a bin for reliable results can't be less than 5, anything below 5 is converted to 5 :param new_min: new minimum value """ global __MIN_TIMESYNC_DURATION_MIN __MIN_TIMESYNC_DURATION_MIN = np.fmax(new_min, 5)
def set_min_valid_latency_micros(new_min: float)
-
sets the minimum latency in microseconds for data to be considered valid can't be less than 0, any value below 0 is converted to 0
:param new_min: new minimum value
Expand source code
def set_min_valid_latency_micros(new_min: float): """ sets the minimum latency in microseconds for data to be considered valid can't be less than 0, any value below 0 is converted to 0 :param new_min: new minimum value """ global __MIN_VALID_LATENCY_MICROS __MIN_VALID_LATENCY_MICROS = np.fmax(new_min, 0)
def simple_offset_weighted_linear_regression(offsets: numpy.ndarray, times: numpy.ndarray) ‑> Tuple[float, float]
-
Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept) for GPS timestamps vs device timestamps The intercept is based on first UTC time 0, all units are in microseconds The function uses sklearn's LinearRegression with no weights.
:param offsets: array of offsets :param times: array of device times used to get the offsets :return: slope of the model line, offset intercept at UTC 0
Expand source code
def simple_offset_weighted_linear_regression(offsets: np.ndarray, times: np.ndarray) -> Tuple[float, float]: """ Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept) for GPS timestamps vs device timestamps The intercept is based on first UTC time 0, all units are in microseconds The function uses sklearn's LinearRegression with no weights. :param offsets: array of offsets :param times: array of device times used to get the offsets :return: slope of the model line, offset intercept at UTC 0 """ # Set up the linear regression wls = LinearRegression() wls.fit(X=times.reshape(-1, 1), y=offsets.reshape(-1, 1)) intercept = get_offset_at_new_time( new_time=times[0], slope=wls.coef_[0][0], intercept=wls.intercept_[0], model_time=0, ) # return the slope and intercept return wls.coef_[0][0], intercept
def timesync_quality_check(latencies: numpy.ndarray, start_time: float, end_time: float, debug: bool = False, min_timesync_dur_mins: Optional[int] = None, min_samples: Optional[int] = None) ‑> bool
-
Checks quality of timesync data to determine if offset model should be used. The following list is the quality check: If timesync duration is longer than min_timesync_dur_mins (default 5) min If there are min_samples (default 3) latency values (non-nan) per 5 minutes on average Returns False if the data quality is not up to "standards".
:param latencies: array of the best latencies per packet :param start_time: the time used to compute the intercept (offset) and time bins; use start time of first packet :param end_time: the time used to compute the time bins; use start time of last packet + packet duration :param debug: if True, reason for failing quality check is printed, default False :param min_timesync_dur_mins: minimum number of minutes for result to be reliable :param min_samples: minimum number of samples per bin :return: True if timesync data passes all quality checks, False otherwise
Expand source code
def timesync_quality_check( latencies: np.ndarray, start_time: float, end_time: float, debug: bool = False, min_timesync_dur_mins: Optional[int] = None, min_samples: Optional[int] = None, ) -> bool: """ Checks quality of timesync data to determine if offset model should be used. The following list is the quality check: If timesync duration is longer than min_timesync_dur_mins (default 5) min If there are min_samples (default 3) latency values (non-nan) per 5 minutes on average Returns False if the data quality is not up to "standards". :param latencies: array of the best latencies per packet :param start_time: the time used to compute the intercept (offset) and time bins; use start time of first packet :param end_time: the time used to compute the time bins; use start time of last packet + packet duration :param debug: if True, reason for failing quality check is printed, default False :param min_timesync_dur_mins: minimum number of minutes for result to be reliable :param min_samples: minimum number of samples per bin :return: True if timesync data passes all quality checks, False otherwise """ if min_timesync_dur_mins is None: min_timesync_dur_mins = get_min_timesync_dur() if min_samples is None: min_samples = get_min_samples() # Check the Duration of the signal of interest duration_min = (end_time - start_time) / (1e6 * 60) if duration_min < min_timesync_dur_mins: if debug: print(f"Timesync data duration less than {min_timesync_dur_mins} min") return False # Check average number of points per 5 min (pretty arbitrary, but maybe 3 points per 5 min) points_per_5min = 5 * np.count_nonzero(~np.isnan(latencies)) / duration_min if points_per_5min < min_samples: if debug: print(f"Less than {min_samples} of timesync data per 5 min") return False # Return True if it meets the above criteria return True
Classes
class OffsetModel (latencies: numpy.ndarray, offsets: numpy.ndarray, times: numpy.ndarray, start_time: float, end_time: float, n_samples: int = 3, debug: bool = False, min_valid_latency_us: Optional[float] = None, min_samples_per_bin: Optional[int] = None, min_timesync_dur_min: Optional[int] = None)
-
Offset model which represents the change in offset over a period of time
-
All timestamps are in microseconds since epoch UTC.
-
Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept)
-
Invalidates latencies that are below our recognition threshold MIN_VALID_LATENCY_MICROS
-
The data is binned by k_bins in equally spaced times; in each bin the n_samples best latencies are taken to get the weighted linear regression.
-
If given zero latencies, but an equal number of offsets and timestamps, it will assume you are giving it GPS data and will put all values into a single bin with equal weights on all values.
Properties
start_time: float, start timestamp of model in microseconds since epoch UTC
end_time: float, end timestamp of model in microseconds since epoch UTC
k_bins: int, the number of data bins used to create the model, default is 1 if model is empty
n_samples: int, the number of samples per data bin; default is 3 (minimum to create a balanced line)
slope: float, the slope of the change in offset
intercept: float, the offset at start_time
score: float, R2 value of the model; 1.0 is best, 0.0 is worst
mean_latency: float, mean latency
std_dev_latency: float, latency standard deviation
debug: boolean, if True, output additional information when running the OffsetModel, default False
min_valid_latency_us: float, the minimum latency in microseconds to be used in the model. default 100
min_samples_per_bin: int, the minimum number of samples per bin of data for the model to be reliable. default 3
min_timesync_dur_min: int, the minimum number of minutes of data for the model to be reliable. default 5
Create an OffsetModel
:param latencies: latencies within the time specified :param offsets: offsets that correspond to the latencies :param times: timestamps that correspond to the latencies :param start_time: model's start timestamp in microseconds since epoch utc :param end_time: model's end timestamp in microseconds since epoch utc :param n_samples: number of samples per bin, default 3 :param debug: boolean for additional output when running OffsetModel, default False :param min_valid_latency_us: the minimum latency in microseconds to be used in the model. default 100 :param min_samples_per_bin: the minimum number of samples per bin of data for the model to be reliable. default 3 :param min_timesync_dur_min: the minimum number of minutes of data for the model to be reliable. default 5
Expand source code
class OffsetModel: """ Offset model which represents the change in offset over a period of time * All timestamps are in microseconds since epoch UTC. * Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept) * Invalidates latencies that are below our recognition threshold MIN_VALID_LATENCY_MICROS * The data is binned by k_bins in equally spaced times; in each bin the n_samples best latencies are taken to get the weighted linear regression. * If given zero latencies, but an equal number of offsets and timestamps, it will assume you are giving it GPS data and will put all values into a single bin with equal weights on all values. Properties: start_time: float, start timestamp of model in microseconds since epoch UTC end_time: float, end timestamp of model in microseconds since epoch UTC k_bins: int, the number of data bins used to create the model, default is 1 if model is empty n_samples: int, the number of samples per data bin; default is 3 (minimum to create a balanced line) slope: float, the slope of the change in offset intercept: float, the offset at start_time score: float, R2 value of the model; 1.0 is best, 0.0 is worst mean_latency: float, mean latency std_dev_latency: float, latency standard deviation debug: boolean, if True, output additional information when running the OffsetModel, default False min_valid_latency_us: float, the minimum latency in microseconds to be used in the model. default 100 min_samples_per_bin: int, the minimum number of samples per bin of data for the model to be reliable. default 3 min_timesync_dur_min: int, the minimum number of minutes of data for the model to be reliable. default 5 """ def __init__( self, latencies: np.ndarray, offsets: np.ndarray, times: np.ndarray, start_time: float, end_time: float, n_samples: int = DEFAULT_SAMPLES, debug: bool = False, min_valid_latency_us: Optional[float] = None, min_samples_per_bin: Optional[int] = None, min_timesync_dur_min: Optional[int] = None, ): """ Create an OffsetModel :param latencies: latencies within the time specified :param offsets: offsets that correspond to the latencies :param times: timestamps that correspond to the latencies :param start_time: model's start timestamp in microseconds since epoch utc :param end_time: model's end timestamp in microseconds since epoch utc :param n_samples: number of samples per bin, default 3 :param debug: boolean for additional output when running OffsetModel, default False :param min_valid_latency_us: the minimum latency in microseconds to be used in the model. default 100 :param min_samples_per_bin: the minimum number of samples per bin of data for the model to be reliable. default 3 :param min_timesync_dur_min: the minimum number of minutes of data for the model to be reliable. default 5 """ self.start_time = start_time self.end_time = end_time self.k_bins = get_bins_per_5min(start_time, end_time) self.n_samples = n_samples self.debug = debug self.min_valid_latency_micros = ( get_min_valid_latency_micros() if min_valid_latency_us is None else min_valid_latency_us ) self.min_samples_per_bin = get_min_samples() if min_samples_per_bin is None else min_samples_per_bin self.min_timesync_dur_min = get_min_timesync_dur() if min_timesync_dur_min is None else min_timesync_dur_min use_bins = True if len(latencies) > 0: latencies = np.where(latencies < self.min_valid_latency_micros, np.nan, latencies) use_model = timesync_quality_check( latencies, start_time, end_time, self.debug, self.min_timesync_dur_min, self.min_samples_per_bin ) elif len(offsets) > 0 and len(offsets) == len(times): latencies = np.full(len(offsets), GPS_LATENCY_MICROS) use_model = True use_bins = False else: use_model = False if use_model: # Organize the data into a data frame full_df = pd.DataFrame(data=times, columns=["times"]) full_df["latencies"] = latencies full_df["offsets"] = offsets if use_bins: # Get the index for the separations (add +1 to k_bins so that there would be k_bins bins) bin_times = np.linspace(start_time, end_time, self.k_bins + 1) # Make the dataframe with the data with n_samples per bins binned_df = get_binned_df(full_df=full_df, bin_times=bin_times, n_samples=n_samples) else: # everything is in one bin binned_df = full_df.sort_values(by=["times"]) # Compute the weighted linear regression self.slope, zero_intercept, self.score = offset_weighted_linear_regression( latencies=binned_df["latencies"].values, offsets=binned_df["offsets"].values, times=binned_df["times"].values, ) # Get offset relative to the first time self.intercept = get_offset_at_new_time( new_time=start_time, slope=self.slope, intercept=zero_intercept, model_time=0, ) self.mean_latency = np.nanmean(binned_df["latencies"].values) self.std_dev_latency = np.nanstd(binned_df["latencies"].values) # slope == 0 means constant offset, so if slope is not 0, model is good. use_model = self.slope != 0.0 # if data or model is not sufficient, use the offset corresponding to the lowest latency: if not use_model: self.score = 0.0 self.slope = 0.0 if all(np.nan_to_num(latencies) == 0.0): self.intercept = 0.0 self.mean_latency = 0.0 self.std_dev_latency = 0.0 else: best_latency = np.nanmin(latencies[np.nonzero(latencies)]) self.intercept = offsets[np.argwhere(latencies == best_latency)[0][0]] self.mean_latency = np.nanmean(latencies) self.std_dev_latency = np.nanstd(latencies) def __repr__(self): return ( f"start_time: {self.start_time}, " f"end_time: {self.end_time}, " f"k_bins: {self.k_bins}, " f"n_samples: {self.n_samples}, " f"slope: {self.slope}, " f"intercept: {self.intercept}, " f"score: {self.score}, " f"mean_latency: {self.mean_latency}, " f"std_dev_latency: {self.std_dev_latency}, " f"min_valid_latency_micros: {self.min_valid_latency_micros}, " f"min_samples_per_bin: {self.min_samples_per_bin}, " f"min_timesync_dur_min: {self.min_timesync_dur_min}, " f"debug: {self.debug}" ) def __str__(self): return ( f"start_time: {self.start_time}, " f"end_time: {self.end_time}, " f"k_bins: {self.k_bins}, " f"n_samples: {self.n_samples}, " f"slope: {self.slope}, " f"intercept: {self.intercept}, " f"score: {self.score}, " f"mean_latency: {self.mean_latency}, " f"std_dev_latency: {self.std_dev_latency}, " f"min_valid_latency_micros: {self.min_valid_latency_micros}, " f"min_samples_per_bin: {self.min_samples_per_bin}, " f"min_timesync_dur_min: {self.min_timesync_dur_min}" ) def as_dict(self) -> dict: """ :return: OffsetModel as a dictionary """ return { "start_time": self.start_time, "end_time": self.end_time, "k_bins": self.k_bins, "n_samples": self.n_samples, "slope": self.slope, "intercept": self.intercept, "score": self.score, "mean_latency": self.mean_latency, "std_dev_latency": self.std_dev_latency, "min_valid_latency_micros": self.min_valid_latency_micros, "min_samples_per_bin": self.min_samples_per_bin, "min_timesync_dur_min": self.min_timesync_dur_min, "debug": self.debug, } @staticmethod def from_dict(data: dict) -> "OffsetModel": """ create OffsetModel from a dictionary :param data: dictionary to read :return: OffsetModel """ result = OffsetModel.empty_model() result.start_time = data["start_time"] result.end_time = data["end_time"] result.k_bins = data["k_bins"] result.n_samples = data["n_samples"] result.slope = data["slope"] result.intercept = data["intercept"] result.score = data["score"] result.mean_latency = data["mean_latency"] result.std_dev_latency = data["std_dev_latency"] result.min_valid_latency_micros = data["min_valid_latency_micros"] result.min_samples_per_bin = data["min_samples_per_bin"] result.min_timesync_dur_min = data["min_timesync_dur_min"] result.debug = data["debug"] return result @staticmethod def empty_model() -> "OffsetModel": """ :return: an empty model with default values """ return OffsetModel(np.array([]), np.array([]), np.array([]), 0, 0) def get_offset_at_time(self, time: float) -> float: """ Gets offset at time based on the offset model. :param time: The time to get the new offset for :return: new offset corresponding to the time """ return get_offset_at_new_time(time, self.slope, self.intercept, self.start_time) def update_time(self, time: float, use_model_function: bool = True) -> float: """ update time based on the offset model. :param time: The time to update :param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True :return: updated time """ return time + (self.get_offset_at_time(time) if use_model_function else self.intercept) def update_timestamps(self, timestamps: np.array, use_model_function: bool = True) -> np.array: """ updates a list of timestamps :param timestamps: timestamps to update :param use_model_function: if True, use the slope of the model if it's not 0. default True :return: updated list of timestamps """ if use_model_function and self.slope != 0.0: return [self.update_time(t) for t in timestamps] return [t + self.intercept for t in timestamps] def get_original_time(self, time: float, use_model_function: bool = True) -> float: """ reverse the updated time to the unaltered value :param time: time to update :param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True :return: unaltered, original time """ if use_model_function: return (self.slope * self.start_time + time - self.intercept) / (1 + self.slope) return time - self.intercept
Static methods
def empty_model() ‑> OffsetModel
-
:return: an empty model with default values
Expand source code
@staticmethod def empty_model() -> "OffsetModel": """ :return: an empty model with default values """ return OffsetModel(np.array([]), np.array([]), np.array([]), 0, 0)
def from_dict(data: dict) ‑> OffsetModel
-
create OffsetModel from a dictionary
:param data: dictionary to read :return: OffsetModel
Expand source code
@staticmethod def from_dict(data: dict) -> "OffsetModel": """ create OffsetModel from a dictionary :param data: dictionary to read :return: OffsetModel """ result = OffsetModel.empty_model() result.start_time = data["start_time"] result.end_time = data["end_time"] result.k_bins = data["k_bins"] result.n_samples = data["n_samples"] result.slope = data["slope"] result.intercept = data["intercept"] result.score = data["score"] result.mean_latency = data["mean_latency"] result.std_dev_latency = data["std_dev_latency"] result.min_valid_latency_micros = data["min_valid_latency_micros"] result.min_samples_per_bin = data["min_samples_per_bin"] result.min_timesync_dur_min = data["min_timesync_dur_min"] result.debug = data["debug"] return result
Methods
def as_dict(self) ‑> dict
-
:return: OffsetModel as a dictionary
Expand source code
def as_dict(self) -> dict: """ :return: OffsetModel as a dictionary """ return { "start_time": self.start_time, "end_time": self.end_time, "k_bins": self.k_bins, "n_samples": self.n_samples, "slope": self.slope, "intercept": self.intercept, "score": self.score, "mean_latency": self.mean_latency, "std_dev_latency": self.std_dev_latency, "min_valid_latency_micros": self.min_valid_latency_micros, "min_samples_per_bin": self.min_samples_per_bin, "min_timesync_dur_min": self.min_timesync_dur_min, "debug": self.debug, }
def get_offset_at_time(self, time: float) ‑> float
-
Gets offset at time based on the offset model.
:param time: The time to get the new offset for :return: new offset corresponding to the time
Expand source code
def get_offset_at_time(self, time: float) -> float: """ Gets offset at time based on the offset model. :param time: The time to get the new offset for :return: new offset corresponding to the time """ return get_offset_at_new_time(time, self.slope, self.intercept, self.start_time)
def get_original_time(self, time: float, use_model_function: bool = True) ‑> float
-
reverse the updated time to the unaltered value
:param time: time to update :param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True :return: unaltered, original time
Expand source code
def get_original_time(self, time: float, use_model_function: bool = True) -> float: """ reverse the updated time to the unaltered value :param time: time to update :param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True :return: unaltered, original time """ if use_model_function: return (self.slope * self.start_time + time - self.intercept) / (1 + self.slope) return time - self.intercept
def update_time(self, time: float, use_model_function: bool = True) ‑> float
-
update time based on the offset model.
:param time: The time to update :param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True :return: updated time
Expand source code
def update_time(self, time: float, use_model_function: bool = True) -> float: """ update time based on the offset model. :param time: The time to update :param use_model_function: if True, use the slope of the model, otherwise use the intercept. default True :return: updated time """ return time + (self.get_offset_at_time(time) if use_model_function else self.intercept)
def update_timestamps(self, timestamps:
, use_model_function: bool = True) ‑> -
updates a list of timestamps
:param timestamps: timestamps to update :param use_model_function: if True, use the slope of the model if it's not 0. default True :return: updated list of timestamps
Expand source code
def update_timestamps(self, timestamps: np.array, use_model_function: bool = True) -> np.array: """ updates a list of timestamps :param timestamps: timestamps to update :param use_model_function: if True, use the slope of the model if it's not 0. default True :return: updated list of timestamps """ if use_model_function and self.slope != 0.0: return [self.update_time(t) for t in timestamps] return [t + self.intercept for t in timestamps]
-
class TimingOffsets (start_offset: datetime.timedelta, adjusted_start: datetime.datetime, end_offset: datetime.timedelta, adjusted_end: datetime.datetime)
-
Represents the start and end offsets of a timing corrected window.
Expand source code
@dataclass class TimingOffsets: """ Represents the start and end offsets of a timing corrected window. """ start_offset: timedelta adjusted_start: datetime end_offset: timedelta adjusted_end: datetime
Class variables
var adjusted_end : datetime.datetime
var adjusted_start : datetime.datetime
var end_offset : datetime.timedelta
var start_offset : datetime.timedelta