Module redvox.common.tri_message_stats
Modules for extracting time synchronization statistics according to Tri-Message protocol. All functions assume payload for ONE data packet/decoder ONLY. These modules will be called separately by API800 and API900 loaders, they are in themselves helper functions. They do not depend on API formats, they take the time sync payloads as parameters and use Tri-Message protocol to compute latencies, check criteria, and correct the "machine" start time B0 based on the minimum latencies.
Expand source code
"""
Modules for extracting time synchronization statistics according to Tri-Message protocol. All functions assume
payload for ONE data packet/decoder ONLY. These modules will be called separately by API800 and API900 loaders, they
are in themselves helper functions. They do not depend on API formats, they take the time sync payloads as parameters
and use Tri-Message protocol to compute latencies, check criteria, and correct the "machine" start time B0 based on the
minimum latencies.
"""
from typing import Dict, List, Optional, Tuple, Union
# noinspection Mypy
import numpy as np
class TriMessageStats:
"""
Stores statistics about the tri-message exchanges
ALL timestamps in microseconds
Properties:
packet_id: an identifier for the packet that contains the data. Used for reporting purposes
latency1: latencies measured by timestamps 1 and 2
latency3: latencies measured by timestamps 2 and 3
offset1: offsets measured by timestamps 1 and 2
offset3: offsets measured by timestamps 2 and 3
best_latency: minimum latency that meets all criteria, default np.nan
best_offset: best offset that meets all criteria, default 0.0
best_latency_array_index: index of which latency array has the best latency, valid values are either 1 or 3,
all other values are invalid, default 0
best_latency_index: index in latency array with the best latency, default None
best_latency_per_exchange_index_array: the index of which latency array has the best latency, per each exchange
num_messages: number of tri-message exchanges
"""
def __init__(
self,
packet_id: Union[str, int],
a1: np.ndarray,
a2: np.ndarray,
a3: np.ndarray,
b1: np.ndarray,
b2: np.ndarray,
b3: np.ndarray,
):
"""
Calculate latency, offset, and their qualities.
:param packet_id: an identifier for reporting purposes
:param a1: array of server timestamp 1
:param a2: array of server timestamp 2
:param a3: array of server timestamp 3
:param b1: array of device timestamp 1
:param b2: array of device timestamp 2
:param b3: array of device timestamp 3
"""
self.packet_id: Union[str, int] = packet_id
self.num_messages: int = len(a1)
# compute latencies and offsets
latencies_tuple: Tuple[np.ndarray, np.ndarray] = latencies(a1, a2, a3, b1, b2, b3)
self.latency1: np.ndarray = latencies_tuple[0]
self.latency3: np.ndarray = latencies_tuple[1]
offsets_tuple: Tuple[np.ndarray, np.ndarray] = offsets(a1, a2, a3, b1, b2, b3)
self.offset1: np.ndarray = offsets_tuple[0]
self.offset3: np.ndarray = offsets_tuple[1]
self.best_latency: float = np.nan
self.best_latency_array_index: int = 0
self.best_latency_index: Optional[int] = None
self.best_offset: float = 0.0
self.find_best_latency()
self.find_best_offset()
self.best_latency_per_exchange_index_array: List[int] = self.find_best_exchange_latencies_index()
def find_best_latency(self) -> None:
"""
Finds and sets the best latency among the latencies. Update is done in place, nothing is returned.
"""
if all(np.nan_to_num(self.latency1) == 0.0) or all(np.nan_to_num(self.latency3) == 0.0):
# all latencies for one of the arrays is zero, the data is untrustworthy. set the defaults
self.best_latency = np.nan
self.best_latency_array_index = None
self.best_latency_index = None
else:
# find value and index of minimum latency of nonzero, non-nan latencies
d1_min: float = np.min(self.latency1[np.nonzero(np.nan_to_num(self.latency1))])
d3_min: float = np.min(self.latency3[np.nonzero(np.nan_to_num(self.latency3))])
if d3_min > d1_min:
self.best_latency = d1_min # server round trip is shorter
self.best_latency_array_index = 1
self.best_latency_index = int(np.where(self.latency1 == d1_min)[0][0])
else:
self.best_latency = d3_min
self.best_latency_array_index = 3
self.best_latency_index = int(np.where(self.latency3 == d3_min)[0][0])
def find_best_offset(self) -> None:
"""
Finds and sets the best offset among the offsets. Update is done in place, nothing is returned.
"""
if self.best_latency is None:
self.find_best_latency()
if self.best_latency_array_index == 1:
self.best_offset = self.offset1[self.best_latency_index]
elif self.best_latency_array_index == 3:
self.best_offset = self.offset3[self.best_latency_index]
else:
self.best_offset = 0.0
def find_best_exchange_latencies_index(self) -> List[int]:
"""
:return: A list of the best latency index per exchange
"""
return [0 if self.latency1[n] < self.latency3[n] else 1 for n in range(self.num_messages)]
def set_latency(
self,
a1_coeffs: np.ndarray,
a2_coeffs: np.ndarray,
a3_coeffs: np.ndarray,
b1_coeffs: np.ndarray,
b2_coeffs: np.ndarray,
b3_coeffs: np.ndarray,
) -> None:
"""
set the latency, then find the best latency. Update is done in place, nothing is returned.
:param a1_coeffs: server timestamps 1
:param a2_coeffs: server timestamps 2
:param a3_coeffs: server timestamps 3
:param b1_coeffs: device timestamps 1
:param b2_coeffs: device timestamps 2
:param b3_coeffs: device timestamps 3
"""
# compute latencies
self.latency1, self.latency3 = latencies(a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs)
self.find_best_latency()
def set_offset(
self,
a1_coeffs: np.ndarray,
a2_coeffs: np.ndarray,
a3_coeffs: np.ndarray,
b1_coeffs: np.ndarray,
b2_coeffs: np.ndarray,
b3_coeffs: np.ndarray,
) -> None:
"""
set the offset then find the best offset. Update is done in place, nothing is returned.
:param a1_coeffs: server timestamps 1
:param a2_coeffs: server timestamps 2
:param a3_coeffs: server timestamps 3
:param b1_coeffs: device timestamps 1
:param b2_coeffs: device timestamps 2
:param b3_coeffs: device timestamps 3
"""
self.offset1, self.offset3 = offsets(a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs)
self.find_best_offset()
def latencies(
a1_coeffs: np.ndarray,
a2_coeffs: np.ndarray,
a3_coeffs: np.ndarray,
b1_coeffs: np.ndarray,
b2_coeffs: np.ndarray,
b3_coeffs: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute latencies in microseconds based on message exchange timestamps.
:param a1_coeffs: server timestamps 1
:param a2_coeffs: server timestamps 2
:param a3_coeffs: server timestamps 3
:param b1_coeffs: device timestamps 1
:param b2_coeffs: device timestamps 2
:param b3_coeffs: device timestamps 3
:return: tuple of nd.arrays; the array of server round trip latencies in microseconds and
the array of device round trip latencies in microseconds
"""
# Compute latencies in microseconds
d1_coeffs: np.ndarray = 0.5 * ((a2_coeffs - a1_coeffs) - (b2_coeffs - b1_coeffs))
d3_coeffs: np.ndarray = 0.5 * ((b3_coeffs - b2_coeffs) - (a3_coeffs - a2_coeffs))
# convert negative latencies to nan. negative latencies should not exist naturally
d1_coeffs[d1_coeffs < 0] = np.nan
d3_coeffs[d3_coeffs < 0] = np.nan
return d1_coeffs, d3_coeffs
def offsets(
a1_coeffs: np.ndarray,
a2_coeffs: np.ndarray,
a3_coeffs: np.ndarray,
b1_coeffs: np.ndarray,
b2_coeffs: np.ndarray,
b3_coeffs: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute offsets in microseconds based on message exchange timestamps.
:param a1_coeffs: server timestamps 1
:param a2_coeffs: server timestamps 2
:param a3_coeffs: server timestamps 3
:param b1_coeffs: device timestamps 1
:param b2_coeffs: device timestamps 2
:param b3_coeffs: device timestamps 3
:return: tuple of nd.arrays; the array of server round trip offsets in microseconds and
the array of device round trip offsets in microseconds
"""
# assume the generic equation f = a - b + d
# where d is latency, f is offset, b is machine time and a is time sync server time
# refer to latency equations above for definitions of d1_coeffs and d3_coeffs
# with latency d1_coeffs, the equation is f1 = a1_coeffs - b1_coeffs + d1_coeffs
# with latency d3_coeffs, the equation is f3 = a3_coeffs - b3_coeffs + d3_coeffs
# In the absence of latency, offset can be calculated this way:
o1_coeffs: np.ndarray = (a1_coeffs - b1_coeffs + a2_coeffs - b2_coeffs) / 2.0
o3_coeffs: np.ndarray = (a3_coeffs - b3_coeffs + a2_coeffs - b2_coeffs) / 2.0
return o1_coeffs, o3_coeffs
def validate_timestamps(
a1_coeffs: np.ndarray,
a2_coeffs: np.ndarray,
a3_coeffs: np.ndarray,
b1_coeffs: np.ndarray,
b2_coeffs: np.ndarray,
b3_coeffs: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
it's possible some of the tri-message values are duplicated; the duplicates and other invalid times
must be removed.
:param a1_coeffs: server timestamps 1
:param a2_coeffs: server timestamps 2
:param a3_coeffs: server timestamps 3
:param b1_coeffs: device timestamps 1
:param b2_coeffs: device timestamps 2
:param b3_coeffs: device timestamps 3
:return: tuple of 6 ndarrays; each array will be the same length and contain only valid entries in the exchanges.
"""
num_timestamps = len(a1_coeffs)
# if length is 1 or less, no need to validate, just return all the values
if num_timestamps <= 1:
return a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs
# if here, there's more than 1 exchange to check
valid_times: List[Dict] = [{}, {}, {}, {}, {}, {}]
valid_indices = []
all_timestamps = [a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs]
# for each set of timestamps a1x, a2x, etc. in all exchanges
for data_index in range(6):
for time_index in range(num_timestamps):
# compare the time to existing information
time = all_timestamps[data_index][time_index]
if time not in valid_times:
# it's not in valid times, it's a new time
valid_times[data_index][time] = time_index
for index in valid_times[0].values():
# if it's not in the first one, it's not valid. if it doesn't show up in all others, it's not valid
if (
index in valid_times[1].values()
and index in valid_times[2].values()
and index in valid_times[3].values()
and index in valid_times[4].values()
and index in valid_times[5].values()
):
valid_indices.append(index)
return (
a1_coeffs[valid_indices],
a2_coeffs[valid_indices],
a3_coeffs[valid_indices],
b1_coeffs[valid_indices],
b2_coeffs[valid_indices],
b3_coeffs[valid_indices],
)
def transmit_receive_timestamps_microsec(
coeffs: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Recover Tri-Message timestamp coefficients. Uses concept of
Tri-Message to synchronize a device with a reference server:
First, server A transmits a message to device B with timestamp a1_coeffs. B receives the message at timestamp
b1_coeffs. B then transmits a message back to A with timestamp b2_coeffs. A receives this message at
timestamp a2_coeffs. A transmits a second message to B at timestamp a3_coeffs. B receives the message at
timestamp b3_coeffs.
Parameters
----------
coeffs: array of tri-message coefficients (a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs)
Returns
-------
a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs: arrays of message exchange timestamps
"""
if len(coeffs) % 6 != 0:
raise Exception("Tri-Message contains partial exchange, unsafe to use it for computations.")
# Timing coefficients
step: int = 6 # each tri-message exchange contains 6 timestamps, 3 from server and 3 from device
stop: int = int(len(coeffs) / 6) * step
a1_coeffs: np.ndarray = coeffs[0:stop:step] # server first transmit timestamps in epoch microseconds
a2_coeffs: np.ndarray = coeffs[1:stop:step] # server first receive timestamps in epoch microseconds
a3_coeffs: np.ndarray = coeffs[2:stop:step] # server second transmit timestamps in epoch microseconds
b1_coeffs: np.ndarray = coeffs[3:stop:step] # device first receive timestamps in mach microseconds
b2_coeffs: np.ndarray = coeffs[4:stop:step] # device first transmit timestamps in mach microseconds
b3_coeffs: np.ndarray = coeffs[5:stop:step] # device second receive timestamps in mach microseconds
# make sure each tri-message exchange contains 6 timestamps (done with modulo check above)
# assert len(a1_coeffs) == len(a2_coeffs) == len(a3_coeffs) == len(b1_coeffs) == len(b2_coeffs) == len(b3_coeffs)
return a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs
Functions
def latencies(a1_coeffs: numpy.ndarray, a2_coeffs: numpy.ndarray, a3_coeffs: numpy.ndarray, b1_coeffs: numpy.ndarray, b2_coeffs: numpy.ndarray, b3_coeffs: numpy.ndarray) ‑> Tuple[numpy.ndarray, numpy.ndarray]
-
Compute latencies in microseconds based on message exchange timestamps.
:param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 :return: tuple of nd.arrays; the array of server round trip latencies in microseconds and the array of device round trip latencies in microseconds
Expand source code
def latencies( a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ Compute latencies in microseconds based on message exchange timestamps. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 :return: tuple of nd.arrays; the array of server round trip latencies in microseconds and the array of device round trip latencies in microseconds """ # Compute latencies in microseconds d1_coeffs: np.ndarray = 0.5 * ((a2_coeffs - a1_coeffs) - (b2_coeffs - b1_coeffs)) d3_coeffs: np.ndarray = 0.5 * ((b3_coeffs - b2_coeffs) - (a3_coeffs - a2_coeffs)) # convert negative latencies to nan. negative latencies should not exist naturally d1_coeffs[d1_coeffs < 0] = np.nan d3_coeffs[d3_coeffs < 0] = np.nan return d1_coeffs, d3_coeffs
def offsets(a1_coeffs: numpy.ndarray, a2_coeffs: numpy.ndarray, a3_coeffs: numpy.ndarray, b1_coeffs: numpy.ndarray, b2_coeffs: numpy.ndarray, b3_coeffs: numpy.ndarray) ‑> Tuple[numpy.ndarray, numpy.ndarray]
-
Compute offsets in microseconds based on message exchange timestamps.
:param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 :return: tuple of nd.arrays; the array of server round trip offsets in microseconds and the array of device round trip offsets in microseconds
Expand source code
def offsets( a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ Compute offsets in microseconds based on message exchange timestamps. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 :return: tuple of nd.arrays; the array of server round trip offsets in microseconds and the array of device round trip offsets in microseconds """ # assume the generic equation f = a - b + d # where d is latency, f is offset, b is machine time and a is time sync server time # refer to latency equations above for definitions of d1_coeffs and d3_coeffs # with latency d1_coeffs, the equation is f1 = a1_coeffs - b1_coeffs + d1_coeffs # with latency d3_coeffs, the equation is f3 = a3_coeffs - b3_coeffs + d3_coeffs # In the absence of latency, offset can be calculated this way: o1_coeffs: np.ndarray = (a1_coeffs - b1_coeffs + a2_coeffs - b2_coeffs) / 2.0 o3_coeffs: np.ndarray = (a3_coeffs - b3_coeffs + a2_coeffs - b2_coeffs) / 2.0 return o1_coeffs, o3_coeffs
def transmit_receive_timestamps_microsec(coeffs: numpy.ndarray) ‑> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]
-
Recover Tri-Message timestamp coefficients. Uses concept of Tri-Message to synchronize a device with a reference server:
First, server A transmits a message to device B with timestamp a1_coeffs. B receives the message at timestamp b1_coeffs. B then transmits a message back to A with timestamp b2_coeffs. A receives this message at timestamp a2_coeffs. A transmits a second message to B at timestamp a3_coeffs. B receives the message at timestamp b3_coeffs.
Parameters
coeffs
:array
oftri-message coefficients (a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs)
Returns
a1_coeffs
,a2_coeffs
,a3_coeffs
,b1_coeffs
,b2_coeffs
,b3_coeffs
:arrays
ofmessage exchange timestamps
Expand source code
def transmit_receive_timestamps_microsec( coeffs: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Recover Tri-Message timestamp coefficients. Uses concept of Tri-Message to synchronize a device with a reference server: First, server A transmits a message to device B with timestamp a1_coeffs. B receives the message at timestamp b1_coeffs. B then transmits a message back to A with timestamp b2_coeffs. A receives this message at timestamp a2_coeffs. A transmits a second message to B at timestamp a3_coeffs. B receives the message at timestamp b3_coeffs. Parameters ---------- coeffs: array of tri-message coefficients (a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs) Returns ------- a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs: arrays of message exchange timestamps """ if len(coeffs) % 6 != 0: raise Exception("Tri-Message contains partial exchange, unsafe to use it for computations.") # Timing coefficients step: int = 6 # each tri-message exchange contains 6 timestamps, 3 from server and 3 from device stop: int = int(len(coeffs) / 6) * step a1_coeffs: np.ndarray = coeffs[0:stop:step] # server first transmit timestamps in epoch microseconds a2_coeffs: np.ndarray = coeffs[1:stop:step] # server first receive timestamps in epoch microseconds a3_coeffs: np.ndarray = coeffs[2:stop:step] # server second transmit timestamps in epoch microseconds b1_coeffs: np.ndarray = coeffs[3:stop:step] # device first receive timestamps in mach microseconds b2_coeffs: np.ndarray = coeffs[4:stop:step] # device first transmit timestamps in mach microseconds b3_coeffs: np.ndarray = coeffs[5:stop:step] # device second receive timestamps in mach microseconds # make sure each tri-message exchange contains 6 timestamps (done with modulo check above) # assert len(a1_coeffs) == len(a2_coeffs) == len(a3_coeffs) == len(b1_coeffs) == len(b2_coeffs) == len(b3_coeffs) return a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs
def validate_timestamps(a1_coeffs: numpy.ndarray, a2_coeffs: numpy.ndarray, a3_coeffs: numpy.ndarray, b1_coeffs: numpy.ndarray, b2_coeffs: numpy.ndarray, b3_coeffs: numpy.ndarray) ‑> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]
-
it's possible some of the tri-message values are duplicated; the duplicates and other invalid times must be removed.
:param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 :return: tuple of 6 ndarrays; each array will be the same length and contain only valid entries in the exchanges.
Expand source code
def validate_timestamps( a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ it's possible some of the tri-message values are duplicated; the duplicates and other invalid times must be removed. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 :return: tuple of 6 ndarrays; each array will be the same length and contain only valid entries in the exchanges. """ num_timestamps = len(a1_coeffs) # if length is 1 or less, no need to validate, just return all the values if num_timestamps <= 1: return a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs # if here, there's more than 1 exchange to check valid_times: List[Dict] = [{}, {}, {}, {}, {}, {}] valid_indices = [] all_timestamps = [a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs] # for each set of timestamps a1x, a2x, etc. in all exchanges for data_index in range(6): for time_index in range(num_timestamps): # compare the time to existing information time = all_timestamps[data_index][time_index] if time not in valid_times: # it's not in valid times, it's a new time valid_times[data_index][time] = time_index for index in valid_times[0].values(): # if it's not in the first one, it's not valid. if it doesn't show up in all others, it's not valid if ( index in valid_times[1].values() and index in valid_times[2].values() and index in valid_times[3].values() and index in valid_times[4].values() and index in valid_times[5].values() ): valid_indices.append(index) return ( a1_coeffs[valid_indices], a2_coeffs[valid_indices], a3_coeffs[valid_indices], b1_coeffs[valid_indices], b2_coeffs[valid_indices], b3_coeffs[valid_indices], )
Classes
class TriMessageStats (packet_id: Union[str, int], a1: numpy.ndarray, a2: numpy.ndarray, a3: numpy.ndarray, b1: numpy.ndarray, b2: numpy.ndarray, b3: numpy.ndarray)
-
Stores statistics about the tri-message exchanges
ALL timestamps in microseconds
Properties
packet_id: an identifier for the packet that contains the data. Used for reporting purposes
latency1: latencies measured by timestamps 1 and 2
latency3: latencies measured by timestamps 2 and 3
offset1: offsets measured by timestamps 1 and 2
offset3: offsets measured by timestamps 2 and 3
best_latency: minimum latency that meets all criteria, default np.nan
best_offset: best offset that meets all criteria, default 0.0
best_latency_array_index: index of which latency array has the best latency, valid values are either 1 or 3, all other values are invalid, default 0
best_latency_index: index in latency array with the best latency, default None
best_latency_per_exchange_index_array: the index of which latency array has the best latency, per each exchange
num_messages: number of tri-message exchanges
Calculate latency, offset, and their qualities.
:param packet_id: an identifier for reporting purposes :param a1: array of server timestamp 1 :param a2: array of server timestamp 2 :param a3: array of server timestamp 3 :param b1: array of device timestamp 1 :param b2: array of device timestamp 2 :param b3: array of device timestamp 3
Expand source code
class TriMessageStats: """ Stores statistics about the tri-message exchanges ALL timestamps in microseconds Properties: packet_id: an identifier for the packet that contains the data. Used for reporting purposes latency1: latencies measured by timestamps 1 and 2 latency3: latencies measured by timestamps 2 and 3 offset1: offsets measured by timestamps 1 and 2 offset3: offsets measured by timestamps 2 and 3 best_latency: minimum latency that meets all criteria, default np.nan best_offset: best offset that meets all criteria, default 0.0 best_latency_array_index: index of which latency array has the best latency, valid values are either 1 or 3, all other values are invalid, default 0 best_latency_index: index in latency array with the best latency, default None best_latency_per_exchange_index_array: the index of which latency array has the best latency, per each exchange num_messages: number of tri-message exchanges """ def __init__( self, packet_id: Union[str, int], a1: np.ndarray, a2: np.ndarray, a3: np.ndarray, b1: np.ndarray, b2: np.ndarray, b3: np.ndarray, ): """ Calculate latency, offset, and their qualities. :param packet_id: an identifier for reporting purposes :param a1: array of server timestamp 1 :param a2: array of server timestamp 2 :param a3: array of server timestamp 3 :param b1: array of device timestamp 1 :param b2: array of device timestamp 2 :param b3: array of device timestamp 3 """ self.packet_id: Union[str, int] = packet_id self.num_messages: int = len(a1) # compute latencies and offsets latencies_tuple: Tuple[np.ndarray, np.ndarray] = latencies(a1, a2, a3, b1, b2, b3) self.latency1: np.ndarray = latencies_tuple[0] self.latency3: np.ndarray = latencies_tuple[1] offsets_tuple: Tuple[np.ndarray, np.ndarray] = offsets(a1, a2, a3, b1, b2, b3) self.offset1: np.ndarray = offsets_tuple[0] self.offset3: np.ndarray = offsets_tuple[1] self.best_latency: float = np.nan self.best_latency_array_index: int = 0 self.best_latency_index: Optional[int] = None self.best_offset: float = 0.0 self.find_best_latency() self.find_best_offset() self.best_latency_per_exchange_index_array: List[int] = self.find_best_exchange_latencies_index() def find_best_latency(self) -> None: """ Finds and sets the best latency among the latencies. Update is done in place, nothing is returned. """ if all(np.nan_to_num(self.latency1) == 0.0) or all(np.nan_to_num(self.latency3) == 0.0): # all latencies for one of the arrays is zero, the data is untrustworthy. set the defaults self.best_latency = np.nan self.best_latency_array_index = None self.best_latency_index = None else: # find value and index of minimum latency of nonzero, non-nan latencies d1_min: float = np.min(self.latency1[np.nonzero(np.nan_to_num(self.latency1))]) d3_min: float = np.min(self.latency3[np.nonzero(np.nan_to_num(self.latency3))]) if d3_min > d1_min: self.best_latency = d1_min # server round trip is shorter self.best_latency_array_index = 1 self.best_latency_index = int(np.where(self.latency1 == d1_min)[0][0]) else: self.best_latency = d3_min self.best_latency_array_index = 3 self.best_latency_index = int(np.where(self.latency3 == d3_min)[0][0]) def find_best_offset(self) -> None: """ Finds and sets the best offset among the offsets. Update is done in place, nothing is returned. """ if self.best_latency is None: self.find_best_latency() if self.best_latency_array_index == 1: self.best_offset = self.offset1[self.best_latency_index] elif self.best_latency_array_index == 3: self.best_offset = self.offset3[self.best_latency_index] else: self.best_offset = 0.0 def find_best_exchange_latencies_index(self) -> List[int]: """ :return: A list of the best latency index per exchange """ return [0 if self.latency1[n] < self.latency3[n] else 1 for n in range(self.num_messages)] def set_latency( self, a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> None: """ set the latency, then find the best latency. Update is done in place, nothing is returned. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 """ # compute latencies self.latency1, self.latency3 = latencies(a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs) self.find_best_latency() def set_offset( self, a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> None: """ set the offset then find the best offset. Update is done in place, nothing is returned. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 """ self.offset1, self.offset3 = offsets(a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs) self.find_best_offset()
Methods
def find_best_exchange_latencies_index(self) ‑> List[int]
-
:return: A list of the best latency index per exchange
Expand source code
def find_best_exchange_latencies_index(self) -> List[int]: """ :return: A list of the best latency index per exchange """ return [0 if self.latency1[n] < self.latency3[n] else 1 for n in range(self.num_messages)]
def find_best_latency(self) ‑> None
-
Finds and sets the best latency among the latencies. Update is done in place, nothing is returned.
Expand source code
def find_best_latency(self) -> None: """ Finds and sets the best latency among the latencies. Update is done in place, nothing is returned. """ if all(np.nan_to_num(self.latency1) == 0.0) or all(np.nan_to_num(self.latency3) == 0.0): # all latencies for one of the arrays is zero, the data is untrustworthy. set the defaults self.best_latency = np.nan self.best_latency_array_index = None self.best_latency_index = None else: # find value and index of minimum latency of nonzero, non-nan latencies d1_min: float = np.min(self.latency1[np.nonzero(np.nan_to_num(self.latency1))]) d3_min: float = np.min(self.latency3[np.nonzero(np.nan_to_num(self.latency3))]) if d3_min > d1_min: self.best_latency = d1_min # server round trip is shorter self.best_latency_array_index = 1 self.best_latency_index = int(np.where(self.latency1 == d1_min)[0][0]) else: self.best_latency = d3_min self.best_latency_array_index = 3 self.best_latency_index = int(np.where(self.latency3 == d3_min)[0][0])
def find_best_offset(self) ‑> None
-
Finds and sets the best offset among the offsets. Update is done in place, nothing is returned.
Expand source code
def find_best_offset(self) -> None: """ Finds and sets the best offset among the offsets. Update is done in place, nothing is returned. """ if self.best_latency is None: self.find_best_latency() if self.best_latency_array_index == 1: self.best_offset = self.offset1[self.best_latency_index] elif self.best_latency_array_index == 3: self.best_offset = self.offset3[self.best_latency_index] else: self.best_offset = 0.0
def set_latency(self, a1_coeffs: numpy.ndarray, a2_coeffs: numpy.ndarray, a3_coeffs: numpy.ndarray, b1_coeffs: numpy.ndarray, b2_coeffs: numpy.ndarray, b3_coeffs: numpy.ndarray) ‑> None
-
set the latency, then find the best latency. Update is done in place, nothing is returned.
:param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3
Expand source code
def set_latency( self, a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> None: """ set the latency, then find the best latency. Update is done in place, nothing is returned. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 """ # compute latencies self.latency1, self.latency3 = latencies(a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs) self.find_best_latency()
def set_offset(self, a1_coeffs: numpy.ndarray, a2_coeffs: numpy.ndarray, a3_coeffs: numpy.ndarray, b1_coeffs: numpy.ndarray, b2_coeffs: numpy.ndarray, b3_coeffs: numpy.ndarray) ‑> None
-
set the offset then find the best offset. Update is done in place, nothing is returned.
:param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3
Expand source code
def set_offset( self, a1_coeffs: np.ndarray, a2_coeffs: np.ndarray, a3_coeffs: np.ndarray, b1_coeffs: np.ndarray, b2_coeffs: np.ndarray, b3_coeffs: np.ndarray, ) -> None: """ set the offset then find the best offset. Update is done in place, nothing is returned. :param a1_coeffs: server timestamps 1 :param a2_coeffs: server timestamps 2 :param a3_coeffs: server timestamps 3 :param b1_coeffs: device timestamps 1 :param b2_coeffs: device timestamps 2 :param b3_coeffs: device timestamps 3 """ self.offset1, self.offset3 = offsets(a1_coeffs, a2_coeffs, a3_coeffs, b1_coeffs, b2_coeffs, b3_coeffs) self.find_best_offset()