Module redvox.common.timesync

Modules for extracting time synchronization statistics for API 900 and 1000 data. Expects API M packets due to versatility of the packet. Also includes functions for correcting time arrays. ALL timestamps in microseconds unless otherwise stated

Expand source code
Modules for extracting time synchronization statistics for API 900 and 1000 data.
Expects API M packets due to versatility of the packet.
Also includes functions for correcting time arrays.
ALL timestamps in microseconds unless otherwise stated

from functools import reduce
from typing import List, Optional, Union
from pathlib import Path
import os

# noinspection Mypy
import numpy as np
import pyarrow as pa
import pyarrow.dataset as ds

import redvox.common.timesync_io as io
from import json_file_to_dict
import redvox.api900.lib.api900_pb2 as api900_pb2
from redvox.api1000.proto.redvox_api_m_pb2 import RedvoxPacketM
from redvox.api900.lib.api900_pb2 import RedvoxPacket
import redvox.api900.reader_utils as util_900
from redvox.common.offset_model import OffsetModel
from redvox.common import tri_message_stats as tms

class TimeSync:
    Stores latencies, offsets, and other timesync related information about a single station
    ALL timestamps in microseconds unless otherwise stated
    Timestamps are never updated from the raw values provided from the data

        _time_sync_exchanges_list: list of lists, timestamps that form the time sync exchanges, default empty list

        _latencies: np.ndarray, calculated latencies of the exchanges.  Must be two equal length arrays,
        default empty np.ndarray

        _best_latency: float, best latency of the data, default np.nan

        _mean_latency: float, mean latency, default np.nan

        _latency_std: float, standard deviation of latencies, default np.nan

        _offsets: np.ndarray, calculated offsets of the exchanges.  Must be two equal length arrays,
        default empty np.ndarray

        _best_offset: float, best offset of the data, default 0.0

        _mean_offset: float, mean offset, default np.nan

        _offset_std: float, standard deviation of offsets, default np.nan

        _data_start: float, start timestamp of the data, default np.nan

        _data_end: float, end timestamp of the data, default np.nan

        _best_latency_index: int, index in latencies/sync exchanges array that contains the best latency,
        default np.nan

        _best_msg_array_index: int, indicates which latency array has the best latency.
        Must be 1 or 3, other values are invalid.  Default 0

        _offset_model: OffsetModel, used to correct timestamps.

        _arrow_dir: str, directory to save arrow file in, default "." (current dir)

        _arrow_file: str, base name of file to save data as, default "timesync"

    def __init__(
        time_sync_exchanges_list: Optional[List[float]] = None,
        latencies: Optional[np.ndarray] = None,
        best_latency: float = np.nan,
        mean_latency: float = np.nan,
        latency_std: float = np.nan,
        offsets: Optional[np.ndarray] = None,
        best_offset: float = 0.0,
        mean_offset: float = np.nan,
        offset_std: float = np.nan,
        data_start: float = np.nan,
        data_end: float = np.nan,
        best_latency_index: int = np.nan,
        best_array_index: int = 0,
        arrow_dir: str = ".",
        arrow_file_name: str = "timesync",
        Initialize properties

        :param time_sync_exchanges_list: optional timesync exchanges of the packet as a flat list, default None
        :param latencies: optional arrays of latencies from exchanges; must be two equal length arrays, default None
        :param best_latency: the best latency of set, default np.nan
        :param mean_latency: the mean of all latencies, default np.nan
        :param latency_std: the standard deviation of all latencies, default np.nan
        :param offsets: optional arrays of offsets from exchanges; must be two equal length arrays, default None
        :param best_offset: the best offset of the packet, default 0.0
        :param mean_offset: the mean of all offsets, default np.nan
        :param offset_std: the standard deviation of all offsets, default np.nan
        :param data_start: the start timestamp of the data, default np.nan
        :param data_end: the end timestamp of the data, default np.nan
        :param best_latency_index: index of the best latency in latencies array, default np.nan
        :param best_array_index: index of the best latency array; must be either 1 (first array) or 3 (second array),
                                    any other value is invalid, default 0
        :param arrow_dir: directory to save timesync data in, default "."
        :param arrow_file_name: name of file to save timesync data as.  do not include extensions.  default "timesync"
        self._latencies: np.ndarray = latencies
        self._best_latency: float = best_latency
        self._mean_latency: float = mean_latency
        self._latency_std: float = latency_std
        self._offsets: np.ndarray = offsets
        self._best_offset: float = best_offset
        self._mean_offset: float = mean_offset
        self._offset_std: float = offset_std
        self._best_latency_index: int = best_latency_index
        self._best_msg_array_index: int = best_array_index
        self._data_start: float = data_start
        self._data_end: float = data_end
        self.arrow_dir: str = arrow_dir
        self.arrow_file: str = arrow_file_name

        if time_sync_exchanges_list is None or len(time_sync_exchanges_list) < 1:
            self._time_sync_exchanges_list = [[], [], [], [], [], []]
            self._best_exchange_index_list = []
            self._offset_model: OffsetModel = OffsetModel.empty_model()
            self._time_sync_exchanges_list = np.transpose(
                [time_sync_exchanges_list[i : i + 6] for i in range(0, len(time_sync_exchanges_list), 6)]
            if self._latencies is None:
                if not np.isnan(self._data_start):
                    self._data_start = self.get_exchange_timestamps(4)[0]
                if not np.isnan(self._data_end):
                    self._data_end = self.get_exchange_timestamps(5)[-1]

    def __repr__(self):
        return (
            f"best_latency_index: {self._best_latency_index}, "
            f"best_latency: {self._best_latency}, "
            f"mean_latency: {self._mean_latency}, "
            f"latency_std: {self._latency_std}, "
            f"best_offset: {self._best_offset}, "
            f"mean_offset: {self._mean_offset}, "
            f"offset_std: {self._offset_std}"

    def __str__(self):
        return (
            f"best_latency_index: {self._best_latency_index}, "
            f"best_latency: {self._best_latency}, "
            f"mean_latency: {self._mean_latency}, "
            f"latency_std: {self._latency_std}, "
            f"best_offset: {self._best_offset}, "
            f"mean_offset: {self._mean_offset}, "
            f"offset_std: {self._offset_std}"

    def as_dict(self) -> dict:
        :return: TimeSync as a dictionary
        return {
            "best_latency_index": self._best_latency_index,
            "best_msg_array_index": self._best_msg_array_index,
            "best_latency": self._best_latency,
            "mean_latency": self._mean_latency,
            "latency_std": self._latency_std,
            "best_offset": self._best_offset,
            "mean_offset": self._mean_offset,
            "offset_std": self._offset_std,
            "data_start": self._data_start,
            "data_end": self._data_end,
            "arrow_dir": self.arrow_dir,
            "arrow_file_name": self.arrow_file,

    def to_json(self):
        :return: TimeSync as json string
        return io.to_json(self)

    def to_json_file(self, file_name: Optional[str] = None) -> Path:
        saves the timesync data as json and data in the same directory.

        :param file_name: the optional base file name.  Do not include a file extension.
                            If None, a default file name is created using this format:
        :return: path to json file
        return io.to_json_file(self, file_name)

    def from_json_file(file_path: str) -> "TimeSync":
        convert contents of json file to TimeSync data

        :param file_path: full path of file to load data from.
        :return: TimeSyncArrow object
        json_data = json_file_to_dict(file_path)
        data = ds.dataset(
            os.path.join(json_data["arrow_dir"], json_data["arrow_file_name"] + ".parquet"),
        result = TimeSync(
            np.array((data["latencies1"], data["latencies3"])),
            np.array((data["offsets1"], data["offsets3"])),
        return result

    def from_dict(ts_dict: dict, tse_data: List) -> "TimeSync":
        create TimeSync frp, dictionary

        :param ts_dict: dictionary of metadata
        :param tse_data: list of time sync exchanges
        :return: TimeSync object
        return TimeSync(tse_data, ts_dict["best_latency"], ts_dict["best_offset"])

    def data_as_pyarrow(self) -> pa.Table:
        :return: convert timesync exchanges, latencies, and offsets into a pyarrow table
        return pa.Table.from_pydict(
                "a1": self._time_sync_exchanges_list[0],
                "a2": self._time_sync_exchanges_list[1],
                "a3": self._time_sync_exchanges_list[2],
                "b1": self._time_sync_exchanges_list[3],
                "b2": self._time_sync_exchanges_list[4],
                "b3": self._time_sync_exchanges_list[5],
                "latencies1": self._latencies[0],
                "latencies3": self._latencies[1],
                "offsets1": self._offsets[0],
                "offsets3": self._offsets[1],

    def _stats_from_exchanges(self):
        Compute the tri-message stats from the data
        if self.num_tri_messages() < 1:
            self._latencies = np.array(([], []))
            self._offsets = np.array(([], []))
            self._best_latency = np.nan
            self._mean_latency = np.nan
            self._latency_std = np.nan
            self._best_offset = 0
            self._mean_offset = np.nan
            self._offset_std = np.nan
            self._best_latency_index = np.nan
            self._best_msg_array_index = 0
            self._offset_model = OffsetModel.empty_model()
            self._best_exchange_index_list = []
            # compute tri message data from time sync exchanges
            tse = tms.TriMessageStats(
            self._latencies = np.array((tse.latency1, tse.latency3))
            self._offsets = np.array((tse.offset1, tse.offset3))
            if not np.isnan(self._data_start) and not np.isnan(self._data_end):
                self._offset_model = OffsetModel(
                # Compute the statistics for latency using the model
                self._mean_latency = self._offset_model.mean_latency
                self._latency_std = self._offset_model.std_dev_latency
                self._offset_model = OffsetModel.empty_model()
                # Compute the statistics for latency using the exchanges
                self._mean_latency = np.mean([*self._latencies[0], *self._latencies[1]])
                self._latency_std = np.std([*self._latencies[0], *self._latencies[1]])
            # compute the rest of the statistics
            self._mean_offset = np.mean([*self._offsets[0], *self._offsets[1]])
            self._offset_std = np.std([*self._offsets[0], *self._offsets[1]])
            self._best_latency_index = tse.best_latency_index
            self._best_msg_array_index = tse.best_latency_array_index
            self._best_latency = tse.best_latency
            self._best_offset = tse.best_offset
            self._best_exchange_index_list = tse.best_latency_per_exchange_index_array

    def process_exchanges(self, force: bool = False):
        Use the sync exchanges to compute the time sync stats (latencies, best latency, offsets, etc).

        :param force: if True, will overwrite any existing values.
        if self._latencies is None or force:

    def get_exchange_timestamps(self, index: int) -> np.array:
        :param index: index of timestamps to return.  0, 1, 2 are server timestamps.  3, 4, 5 are device.
                        Any value not from 0 to 5 will be converted to 0
        :return: timestamps from the chosen index.
        if index < 0 or index > 5:
            index = 0
        return self._time_sync_exchanges_list[index]

    def get_device_exchanges_timestamps(self) -> np.array:
        :return: timestamps of sync exchanges initiated by the device
        return np.concatenate((self._time_sync_exchanges_list[3].tolist(), self._time_sync_exchanges_list[5].tolist()))

    def num_tri_messages(self) -> int:
        :return: number of tri-message exchanges
        return np.size(self._time_sync_exchanges_list, 1)

    def get_best_latency_timestamp(self) -> float:
        :return: timestamp of best latency, or np.nan if no best latency.
        if not np.isnan(self._best_latency_index):
            if self._best_msg_array_index == 1:
                return self._time_sync_exchanges_list[3][self._best_latency_index]
            elif self._best_msg_array_index == 3:
                return self._time_sync_exchanges_list[5][self._best_latency_index]
        return np.nan

    def append_timesync_arrow(self, new_data: "TimeSync"):
        adds timesync data from new_data to current

        :param new_data: another TimeSyncArrow object
        self._time_sync_exchanges_list[0] = np.append(
            self._time_sync_exchanges_list[0], new_data._time_sync_exchanges_list[0]
        self._time_sync_exchanges_list[1] = np.append(
            self._time_sync_exchanges_list[1], new_data._time_sync_exchanges_list[1]
        self._time_sync_exchanges_list[2] = np.append(
            self._time_sync_exchanges_list[2], new_data._time_sync_exchanges_list[2]
        self._time_sync_exchanges_list[3] = np.append(
            self._time_sync_exchanges_list[3], new_data._time_sync_exchanges_list[3]
        self._time_sync_exchanges_list[4] = np.append(
            self._time_sync_exchanges_list[4], new_data._time_sync_exchanges_list[4]
        self._time_sync_exchanges_list[5] = np.append(
            self._time_sync_exchanges_list[5], new_data._time_sync_exchanges_list[5]
        if np.isnan(self._data_start):
            self._data_start = new_data._data_start
        elif not np.isnan(new_data._data_start):
            self._data_start = np.min([self._data_start, new_data._data_start])
        if np.isnan(self._data_end):
            self._data_end = new_data._data_end
        elif not np.isnan(new_data._data_end):
            self._data_end = np.max([self._data_end, new_data._data_end])

    def from_raw_packets(self, packets: List[Union[RedvoxPacketM, RedvoxPacket]]) -> "TimeSync":
        converts packets into TimeSyncData objects, then performs analysis

        :param packets: list of RedvoxPacketM and RedvoxPacket to convert
        :return: modified version of self
        all_exchanges: List[float] = []
        if isinstance(packets[0], RedvoxPacketM):
            self._data_start = packets[0].timing_information.packet_start_mach_timestamp
            self._data_start = packets[0].app_file_start_timestamp_machine
        if isinstance(packets[-1], RedvoxPacketM):
            self._data_end = packets[-1].timing_information.packet_end_mach_timestamp
            self._data_end = packets[-1].app_file_start_timestamp_machine

        packet: Union[RedvoxPacketM, RedvoxPacket]
        for packet in packets:
            if isinstance(packet, RedvoxPacketM):
                        lambda acc, ex: acc + [ex.a1, ex.a2, ex.a3, ex.b1, ex.b2, ex.b3],
                # Get synch exchanges
                ch: api900_pb2.UnevenlySampledChannel
                for ch in packet.unevenly_sampled_channels:
                    if api900_pb2.TIME_SYNCHRONIZATION in ch.channel_types:

        if len(all_exchanges) > 0:
            self._time_sync_exchanges_list = np.transpose(
                [all_exchanges[i : i + 6] for i in range(0, len(all_exchanges), 6)]
        return self

    def sync_exchanges(self) -> np.ndarray:
        :return: time sync exchanges
        return self._time_sync_exchanges_list

    def set_sync_exchanges(self, exchanges: List[np.array]):
        Set the sync exchanges to be the list of exchanges.
        The list of exchanges must have 6 np.arrays, each equal length, and the lists must be in the order of:
        Server1, Server2, Server3, Device1, Device2, Device3

        :param exchanges: 6 lists of equal length of timestamps
        self._time_sync_exchanges_list = exchanges

    def latencies(self) -> np.ndarray:
        :return: latencies as two np.arrays
        return self._latencies

    def best_latency(self) -> float:
        :return: best latency of data
        return self._best_latency

    def mean_latency(self) -> float:
        :return: mean latency of data
        return self._mean_latency

    def latency_std(self) -> float:
        :return: standard deviation of latency
        return self._latency_std

    def best_latency_index(self) -> int:
        :return: index/position of the best latency or np.nan if no best latency exists
        return self._best_latency_index

    def best_latency_per_exchange(self) -> np.array:
        :return: the best latency per sync exchange as a numpy array
        return np.array([self._latencies[self._best_exchange_index_list[n]][n] for n in range(self.num_tri_messages())])

    def offsets(self) -> np.ndarray:
        :return: offsets as two np.arrays
        return self._offsets

    def best_offset(self) -> float:
        :return: best offset of data
        return self._best_offset

    def mean_offset(self) -> float:
        :return: mean offset of data
        return self._mean_offset

    def offset_std(self) -> float:
        :return: standard deviation of offset
        return self._offset_std

    def best_offset_per_exchange(self) -> np.array:
        :return: the best offset per sync exchange as a numpy array
        return np.array([self._offsets[self._best_exchange_index_list[n]][n] for n in range(self.num_tri_messages())])

    def offset_model(self) -> OffsetModel:
        :return: OffsetModel of the TimeSync
        return self._offset_model

    def data_start_timestamp(self) -> float:
        :return: timestamp of first data point
        return self._data_start

    def data_end_timestamp(self) -> float:
        :return: timestamp of last data point
        return self._data_end


class RedvoxPacket (*args, **kwargs)

A ProtocolMessage


  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

class RedvoxPacketM (*args, **kwargs)

A ProtocolMessage


  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DoubleSamplePayload

A ProtocolMessage

var EventStream

A ProtocolMessage

var MetadataEntry

A ProtocolMessage

var SamplePayload

A ProtocolMessage

var Sensors

A ProtocolMessage

var StationInformation

A ProtocolMessage

var SummaryStatistics

A ProtocolMessage

var TimingInformation

A ProtocolMessage

var TimingPayload

A ProtocolMessage

var Unit
class TimeSync (time_sync_exchanges_list: Optional[List[float]] = None, latencies: Optional[numpy.ndarray] = None, best_latency: float = nan, mean_latency: float = nan, latency_std: float = nan, offsets: Optional[numpy.ndarray] = None, best_offset: float = 0.0, mean_offset: float = nan, offset_std: float = nan, data_start: float = nan, data_end: float = nan, best_latency_index: int = nan, best_array_index: int = 0, arrow_dir: str = '.', arrow_file_name: str = 'timesync')

Stores latencies, offsets, and other timesync related information about a single station ALL timestamps in microseconds unless otherwise stated Timestamps are never updated from the raw values provided from the data


_time_sync_exchanges_list: list of lists, timestamps that form the time sync exchanges, default empty list

_latencies: np.ndarray, calculated latencies of the exchanges. Must be two equal length arrays, default empty np.ndarray

_best_latency: float, best latency of the data, default np.nan

_mean_latency: float, mean latency, default np.nan

_latency_std: float, standard deviation of latencies, default np.nan

_offsets: np.ndarray, calculated offsets of the exchanges. Must be two equal length arrays, default empty np.ndarray

_best_offset: float, best offset of the data, default 0.0

_mean_offset: float, mean offset, default np.nan

_offset_std: float, standard deviation of offsets, default np.nan

_data_start: float, start timestamp of the data, default np.nan

_data_end: float, end timestamp of the data, default np.nan

_best_latency_index: int, index in latencies/sync exchanges array that contains the best latency, default np.nan

_best_msg_array_index: int, indicates which latency array has the best latency. Must be 1 or 3, other values are invalid. Default 0

_offset_model: OffsetModel, used to correct timestamps.

_arrow_dir: str, directory to save arrow file in, default "." (current dir)

_arrow_file: str, base name of file to save data as, default "timesync"

Initialize properties

:param time_sync_exchanges_list: optional timesync exchanges of the packet as a flat list, default None :param latencies: optional arrays of latencies from exchanges; must be two equal length arrays, default None :param best_latency: the best latency of set, default np.nan :param mean_latency: the mean of all latencies, default np.nan :param latency_std: the standard deviation of all latencies, default np.nan :param offsets: optional arrays of offsets from exchanges; must be two equal length arrays, default None :param best_offset: the best offset of the packet, default 0.0 :param mean_offset: the mean of all offsets, default np.nan :param offset_std: the standard deviation of all offsets, default np.nan :param data_start: the start timestamp of the data, default np.nan :param data_end: the end timestamp of the data, default np.nan :param best_latency_index: index of the best latency in latencies array, default np.nan :param best_array_index: index of the best latency array; must be either 1 (first array) or 3 (second array), any other value is invalid, default 0 :param arrow_dir: directory to save timesync data in, default "." :param arrow_file_name: name of file to save timesync data as. do not include extensions. default "timesync"

Expand source code
class TimeSync:
    Stores latencies, offsets, and other timesync related information about a single station
    ALL timestamps in microseconds unless otherwise stated
    Timestamps are never updated from the raw values provided from the data

        _time_sync_exchanges_list: list of lists, timestamps that form the time sync exchanges, default empty list

        _latencies: np.ndarray, calculated latencies of the exchanges.  Must be two equal length arrays,
        default empty np.ndarray

        _best_latency: float, best latency of the data, default np.nan

        _mean_latency: float, mean latency, default np.nan

        _latency_std: float, standard deviation of latencies, default np.nan

        _offsets: np.ndarray, calculated offsets of the exchanges.  Must be two equal length arrays,
        default empty np.ndarray

        _best_offset: float, best offset of the data, default 0.0

        _mean_offset: float, mean offset, default np.nan

        _offset_std: float, standard deviation of offsets, default np.nan

        _data_start: float, start timestamp of the data, default np.nan

        _data_end: float, end timestamp of the data, default np.nan

        _best_latency_index: int, index in latencies/sync exchanges array that contains the best latency,
        default np.nan

        _best_msg_array_index: int, indicates which latency array has the best latency.
        Must be 1 or 3, other values are invalid.  Default 0

        _offset_model: OffsetModel, used to correct timestamps.

        _arrow_dir: str, directory to save arrow file in, default "." (current dir)

        _arrow_file: str, base name of file to save data as, default "timesync"

    def __init__(
        time_sync_exchanges_list: Optional[List[float]] = None,
        latencies: Optional[np.ndarray] = None,
        best_latency: float = np.nan,
        mean_latency: float = np.nan,
        latency_std: float = np.nan,
        offsets: Optional[np.ndarray] = None,
        best_offset: float = 0.0,
        mean_offset: float = np.nan,
        offset_std: float = np.nan,
        data_start: float = np.nan,
        data_end: float = np.nan,
        best_latency_index: int = np.nan,
        best_array_index: int = 0,
        arrow_dir: str = ".",
        arrow_file_name: str = "timesync",
        Initialize properties

        :param time_sync_exchanges_list: optional timesync exchanges of the packet as a flat list, default None
        :param latencies: optional arrays of latencies from exchanges; must be two equal length arrays, default None
        :param best_latency: the best latency of set, default np.nan
        :param mean_latency: the mean of all latencies, default np.nan
        :param latency_std: the standard deviation of all latencies, default np.nan
        :param offsets: optional arrays of offsets from exchanges; must be two equal length arrays, default None
        :param best_offset: the best offset of the packet, default 0.0
        :param mean_offset: the mean of all offsets, default np.nan
        :param offset_std: the standard deviation of all offsets, default np.nan
        :param data_start: the start timestamp of the data, default np.nan
        :param data_end: the end timestamp of the data, default np.nan
        :param best_latency_index: index of the best latency in latencies array, default np.nan
        :param best_array_index: index of the best latency array; must be either 1 (first array) or 3 (second array),
                                    any other value is invalid, default 0
        :param arrow_dir: directory to save timesync data in, default "."
        :param arrow_file_name: name of file to save timesync data as.  do not include extensions.  default "timesync"
        self._latencies: np.ndarray = latencies
        self._best_latency: float = best_latency
        self._mean_latency: float = mean_latency
        self._latency_std: float = latency_std
        self._offsets: np.ndarray = offsets
        self._best_offset: float = best_offset
        self._mean_offset: float = mean_offset
        self._offset_std: float = offset_std
        self._best_latency_index: int = best_latency_index
        self._best_msg_array_index: int = best_array_index
        self._data_start: float = data_start
        self._data_end: float = data_end
        self.arrow_dir: str = arrow_dir
        self.arrow_file: str = arrow_file_name

        if time_sync_exchanges_list is None or len(time_sync_exchanges_list) < 1:
            self._time_sync_exchanges_list = [[], [], [], [], [], []]
            self._best_exchange_index_list = []
            self._offset_model: OffsetModel = OffsetModel.empty_model()
            self._time_sync_exchanges_list = np.transpose(
                [time_sync_exchanges_list[i : i + 6] for i in range(0, len(time_sync_exchanges_list), 6)]
            if self._latencies is None:
                if not np.isnan(self._data_start):
                    self._data_start = self.get_exchange_timestamps(4)[0]
                if not np.isnan(self._data_end):
                    self._data_end = self.get_exchange_timestamps(5)[-1]

    def __repr__(self):
        return (
            f"best_latency_index: {self._best_latency_index}, "
            f"best_latency: {self._best_latency}, "
            f"mean_latency: {self._mean_latency}, "
            f"latency_std: {self._latency_std}, "
            f"best_offset: {self._best_offset}, "
            f"mean_offset: {self._mean_offset}, "
            f"offset_std: {self._offset_std}"

    def __str__(self):
        return (
            f"best_latency_index: {self._best_latency_index}, "
            f"best_latency: {self._best_latency}, "
            f"mean_latency: {self._mean_latency}, "
            f"latency_std: {self._latency_std}, "
            f"best_offset: {self._best_offset}, "
            f"mean_offset: {self._mean_offset}, "
            f"offset_std: {self._offset_std}"

    def as_dict(self) -> dict:
        :return: TimeSync as a dictionary
        return {
            "best_latency_index": self._best_latency_index,
            "best_msg_array_index": self._best_msg_array_index,
            "best_latency": self._best_latency,
            "mean_latency": self._mean_latency,
            "latency_std": self._latency_std,
            "best_offset": self._best_offset,
            "mean_offset": self._mean_offset,
            "offset_std": self._offset_std,
            "data_start": self._data_start,
            "data_end": self._data_end,
            "arrow_dir": self.arrow_dir,
            "arrow_file_name": self.arrow_file,

    def to_json(self):
        :return: TimeSync as json string
        return io.to_json(self)

    def to_json_file(self, file_name: Optional[str] = None) -> Path:
        saves the timesync data as json and data in the same directory.

        :param file_name: the optional base file name.  Do not include a file extension.
                            If None, a default file name is created using this format:
        :return: path to json file
        return io.to_json_file(self, file_name)

    def from_json_file(file_path: str) -> "TimeSync":
        convert contents of json file to TimeSync data

        :param file_path: full path of file to load data from.
        :return: TimeSyncArrow object
        json_data = json_file_to_dict(file_path)
        data = ds.dataset(
            os.path.join(json_data["arrow_dir"], json_data["arrow_file_name"] + ".parquet"),
        result = TimeSync(
            np.array((data["latencies1"], data["latencies3"])),
            np.array((data["offsets1"], data["offsets3"])),
        return result

    def from_dict(ts_dict: dict, tse_data: List) -> "TimeSync":
        create TimeSync frp, dictionary

        :param ts_dict: dictionary of metadata
        :param tse_data: list of time sync exchanges
        :return: TimeSync object
        return TimeSync(tse_data, ts_dict["best_latency"], ts_dict["best_offset"])

    def data_as_pyarrow(self) -> pa.Table:
        :return: convert timesync exchanges, latencies, and offsets into a pyarrow table
        return pa.Table.from_pydict(
                "a1": self._time_sync_exchanges_list[0],
                "a2": self._time_sync_exchanges_list[1],
                "a3": self._time_sync_exchanges_list[2],
                "b1": self._time_sync_exchanges_list[3],
                "b2": self._time_sync_exchanges_list[4],
                "b3": self._time_sync_exchanges_list[5],
                "latencies1": self._latencies[0],
                "latencies3": self._latencies[1],
                "offsets1": self._offsets[0],
                "offsets3": self._offsets[1],

    def _stats_from_exchanges(self):
        Compute the tri-message stats from the data
        if self.num_tri_messages() < 1:
            self._latencies = np.array(([], []))
            self._offsets = np.array(([], []))
            self._best_latency = np.nan
            self._mean_latency = np.nan
            self._latency_std = np.nan
            self._best_offset = 0
            self._mean_offset = np.nan
            self._offset_std = np.nan
            self._best_latency_index = np.nan
            self._best_msg_array_index = 0
            self._offset_model = OffsetModel.empty_model()
            self._best_exchange_index_list = []
            # compute tri message data from time sync exchanges
            tse = tms.TriMessageStats(
            self._latencies = np.array((tse.latency1, tse.latency3))
            self._offsets = np.array((tse.offset1, tse.offset3))
            if not np.isnan(self._data_start) and not np.isnan(self._data_end):
                self._offset_model = OffsetModel(
                # Compute the statistics for latency using the model
                self._mean_latency = self._offset_model.mean_latency
                self._latency_std = self._offset_model.std_dev_latency
                self._offset_model = OffsetModel.empty_model()
                # Compute the statistics for latency using the exchanges
                self._mean_latency = np.mean([*self._latencies[0], *self._latencies[1]])
                self._latency_std = np.std([*self._latencies[0], *self._latencies[1]])
            # compute the rest of the statistics
            self._mean_offset = np.mean([*self._offsets[0], *self._offsets[1]])
            self._offset_std = np.std([*self._offsets[0], *self._offsets[1]])
            self._best_latency_index = tse.best_latency_index
            self._best_msg_array_index = tse.best_latency_array_index
            self._best_latency = tse.best_latency
            self._best_offset = tse.best_offset
            self._best_exchange_index_list = tse.best_latency_per_exchange_index_array

    def process_exchanges(self, force: bool = False):
        Use the sync exchanges to compute the time sync stats (latencies, best latency, offsets, etc).

        :param force: if True, will overwrite any existing values.
        if self._latencies is None or force:

    def get_exchange_timestamps(self, index: int) -> np.array:
        :param index: index of timestamps to return.  0, 1, 2 are server timestamps.  3, 4, 5 are device.
                        Any value not from 0 to 5 will be converted to 0
        :return: timestamps from the chosen index.
        if index < 0 or index > 5:
            index = 0
        return self._time_sync_exchanges_list[index]

    def get_device_exchanges_timestamps(self) -> np.array:
        :return: timestamps of sync exchanges initiated by the device
        return np.concatenate((self._time_sync_exchanges_list[3].tolist(), self._time_sync_exchanges_list[5].tolist()))

    def num_tri_messages(self) -> int:
        :return: number of tri-message exchanges
        return np.size(self._time_sync_exchanges_list, 1)

    def get_best_latency_timestamp(self) -> float:
        :return: timestamp of best latency, or np.nan if no best latency.
        if not np.isnan(self._best_latency_index):
            if self._best_msg_array_index == 1:
                return self._time_sync_exchanges_list[3][self._best_latency_index]
            elif self._best_msg_array_index == 3:
                return self._time_sync_exchanges_list[5][self._best_latency_index]
        return np.nan

    def append_timesync_arrow(self, new_data: "TimeSync"):
        adds timesync data from new_data to current

        :param new_data: another TimeSyncArrow object
        self._time_sync_exchanges_list[0] = np.append(
            self._time_sync_exchanges_list[0], new_data._time_sync_exchanges_list[0]
        self._time_sync_exchanges_list[1] = np.append(
            self._time_sync_exchanges_list[1], new_data._time_sync_exchanges_list[1]
        self._time_sync_exchanges_list[2] = np.append(
            self._time_sync_exchanges_list[2], new_data._time_sync_exchanges_list[2]
        self._time_sync_exchanges_list[3] = np.append(
            self._time_sync_exchanges_list[3], new_data._time_sync_exchanges_list[3]
        self._time_sync_exchanges_list[4] = np.append(
            self._time_sync_exchanges_list[4], new_data._time_sync_exchanges_list[4]
        self._time_sync_exchanges_list[5] = np.append(
            self._time_sync_exchanges_list[5], new_data._time_sync_exchanges_list[5]
        if np.isnan(self._data_start):
            self._data_start = new_data._data_start
        elif not np.isnan(new_data._data_start):
            self._data_start = np.min([self._data_start, new_data._data_start])
        if np.isnan(self._data_end):
            self._data_end = new_data._data_end
        elif not np.isnan(new_data._data_end):
            self._data_end = np.max([self._data_end, new_data._data_end])

    def from_raw_packets(self, packets: List[Union[RedvoxPacketM, RedvoxPacket]]) -> "TimeSync":
        converts packets into TimeSyncData objects, then performs analysis

        :param packets: list of RedvoxPacketM and RedvoxPacket to convert
        :return: modified version of self
        all_exchanges: List[float] = []
        if isinstance(packets[0], RedvoxPacketM):
            self._data_start = packets[0].timing_information.packet_start_mach_timestamp
            self._data_start = packets[0].app_file_start_timestamp_machine
        if isinstance(packets[-1], RedvoxPacketM):
            self._data_end = packets[-1].timing_information.packet_end_mach_timestamp
            self._data_end = packets[-1].app_file_start_timestamp_machine

        packet: Union[RedvoxPacketM, RedvoxPacket]
        for packet in packets:
            if isinstance(packet, RedvoxPacketM):
                        lambda acc, ex: acc + [ex.a1, ex.a2, ex.a3, ex.b1, ex.b2, ex.b3],
                # Get synch exchanges
                ch: api900_pb2.UnevenlySampledChannel
                for ch in packet.unevenly_sampled_channels:
                    if api900_pb2.TIME_SYNCHRONIZATION in ch.channel_types:

        if len(all_exchanges) > 0:
            self._time_sync_exchanges_list = np.transpose(
                [all_exchanges[i : i + 6] for i in range(0, len(all_exchanges), 6)]
        return self

    def sync_exchanges(self) -> np.ndarray:
        :return: time sync exchanges
        return self._time_sync_exchanges_list

    def set_sync_exchanges(self, exchanges: List[np.array]):
        Set the sync exchanges to be the list of exchanges.
        The list of exchanges must have 6 np.arrays, each equal length, and the lists must be in the order of:
        Server1, Server2, Server3, Device1, Device2, Device3

        :param exchanges: 6 lists of equal length of timestamps
        self._time_sync_exchanges_list = exchanges

    def latencies(self) -> np.ndarray:
        :return: latencies as two np.arrays
        return self._latencies

    def best_latency(self) -> float:
        :return: best latency of data
        return self._best_latency

    def mean_latency(self) -> float:
        :return: mean latency of data
        return self._mean_latency

    def latency_std(self) -> float:
        :return: standard deviation of latency
        return self._latency_std

    def best_latency_index(self) -> int:
        :return: index/position of the best latency or np.nan if no best latency exists
        return self._best_latency_index

    def best_latency_per_exchange(self) -> np.array:
        :return: the best latency per sync exchange as a numpy array
        return np.array([self._latencies[self._best_exchange_index_list[n]][n] for n in range(self.num_tri_messages())])

    def offsets(self) -> np.ndarray:
        :return: offsets as two np.arrays
        return self._offsets

    def best_offset(self) -> float:
        :return: best offset of data
        return self._best_offset

    def mean_offset(self) -> float:
        :return: mean offset of data
        return self._mean_offset

    def offset_std(self) -> float:
        :return: standard deviation of offset
        return self._offset_std

    def best_offset_per_exchange(self) -> np.array:
        :return: the best offset per sync exchange as a numpy array
        return np.array([self._offsets[self._best_exchange_index_list[n]][n] for n in range(self.num_tri_messages())])

    def offset_model(self) -> OffsetModel:
        :return: OffsetModel of the TimeSync
        return self._offset_model

    def data_start_timestamp(self) -> float:
        :return: timestamp of first data point
        return self._data_start

    def data_end_timestamp(self) -> float:
        :return: timestamp of last data point
        return self._data_end

Static methods

def from_dict(ts_dict: dict, tse_data: List) ‑> TimeSync

create TimeSync frp, dictionary

:param ts_dict: dictionary of metadata :param tse_data: list of time sync exchanges :return: TimeSync object

Expand source code
def from_dict(ts_dict: dict, tse_data: List) -> "TimeSync":
    create TimeSync frp, dictionary

    :param ts_dict: dictionary of metadata
    :param tse_data: list of time sync exchanges
    :return: TimeSync object
    return TimeSync(tse_data, ts_dict["best_latency"], ts_dict["best_offset"])
def from_json_file(file_path: str) ‑> TimeSync

convert contents of json file to TimeSync data

:param file_path: full path of file to load data from. :return: TimeSyncArrow object

Expand source code
def from_json_file(file_path: str) -> "TimeSync":
    convert contents of json file to TimeSync data

    :param file_path: full path of file to load data from.
    :return: TimeSyncArrow object
    json_data = json_file_to_dict(file_path)
    data = ds.dataset(
        os.path.join(json_data["arrow_dir"], json_data["arrow_file_name"] + ".parquet"),
    result = TimeSync(
        np.array((data["latencies1"], data["latencies3"])),
        np.array((data["offsets1"], data["offsets3"])),
    return result


def append_timesync_arrow(self, new_data: TimeSync)

adds timesync data from new_data to current

:param new_data: another TimeSyncArrow object

Expand source code
def append_timesync_arrow(self, new_data: "TimeSync"):
    adds timesync data from new_data to current

    :param new_data: another TimeSyncArrow object
    self._time_sync_exchanges_list[0] = np.append(
        self._time_sync_exchanges_list[0], new_data._time_sync_exchanges_list[0]
    self._time_sync_exchanges_list[1] = np.append(
        self._time_sync_exchanges_list[1], new_data._time_sync_exchanges_list[1]
    self._time_sync_exchanges_list[2] = np.append(
        self._time_sync_exchanges_list[2], new_data._time_sync_exchanges_list[2]
    self._time_sync_exchanges_list[3] = np.append(
        self._time_sync_exchanges_list[3], new_data._time_sync_exchanges_list[3]
    self._time_sync_exchanges_list[4] = np.append(
        self._time_sync_exchanges_list[4], new_data._time_sync_exchanges_list[4]
    self._time_sync_exchanges_list[5] = np.append(
        self._time_sync_exchanges_list[5], new_data._time_sync_exchanges_list[5]
    if np.isnan(self._data_start):
        self._data_start = new_data._data_start
    elif not np.isnan(new_data._data_start):
        self._data_start = np.min([self._data_start, new_data._data_start])
    if np.isnan(self._data_end):
        self._data_end = new_data._data_end
    elif not np.isnan(new_data._data_end):
        self._data_end = np.max([self._data_end, new_data._data_end])
def as_dict(self) ‑> dict

:return: TimeSync as a dictionary

Expand source code
def as_dict(self) -> dict:
    :return: TimeSync as a dictionary
    return {
        "best_latency_index": self._best_latency_index,
        "best_msg_array_index": self._best_msg_array_index,
        "best_latency": self._best_latency,
        "mean_latency": self._mean_latency,
        "latency_std": self._latency_std,
        "best_offset": self._best_offset,
        "mean_offset": self._mean_offset,
        "offset_std": self._offset_std,
        "data_start": self._data_start,
        "data_end": self._data_end,
        "arrow_dir": self.arrow_dir,
        "arrow_file_name": self.arrow_file,
def best_latency(self) ‑> float

:return: best latency of data

Expand source code
def best_latency(self) -> float:
    :return: best latency of data
    return self._best_latency
def best_latency_index(self) ‑> int

:return: index/position of the best latency or np.nan if no best latency exists

Expand source code
def best_latency_index(self) -> int:
    :return: index/position of the best latency or np.nan if no best latency exists
    return self._best_latency_index
def best_latency_per_exchange(self) ‑> 

:return: the best latency per sync exchange as a numpy array

Expand source code
def best_latency_per_exchange(self) -> np.array:
    :return: the best latency per sync exchange as a numpy array
    return np.array([self._latencies[self._best_exchange_index_list[n]][n] for n in range(self.num_tri_messages())])
def best_offset(self) ‑> float

:return: best offset of data

Expand source code
def best_offset(self) -> float:
    :return: best offset of data
    return self._best_offset
def best_offset_per_exchange(self) ‑> 

:return: the best offset per sync exchange as a numpy array

Expand source code
def best_offset_per_exchange(self) -> np.array:
    :return: the best offset per sync exchange as a numpy array
    return np.array([self._offsets[self._best_exchange_index_list[n]][n] for n in range(self.num_tri_messages())])
def data_as_pyarrow(self) ‑> pyarrow.lib.Table

:return: convert timesync exchanges, latencies, and offsets into a pyarrow table

Expand source code
def data_as_pyarrow(self) -> pa.Table:
    :return: convert timesync exchanges, latencies, and offsets into a pyarrow table
    return pa.Table.from_pydict(
            "a1": self._time_sync_exchanges_list[0],
            "a2": self._time_sync_exchanges_list[1],
            "a3": self._time_sync_exchanges_list[2],
            "b1": self._time_sync_exchanges_list[3],
            "b2": self._time_sync_exchanges_list[4],
            "b3": self._time_sync_exchanges_list[5],
            "latencies1": self._latencies[0],
            "latencies3": self._latencies[1],
            "offsets1": self._offsets[0],
            "offsets3": self._offsets[1],
def data_end_timestamp(self) ‑> float

:return: timestamp of last data point

Expand source code
def data_end_timestamp(self) -> float:
    :return: timestamp of last data point
    return self._data_end
def data_start_timestamp(self) ‑> float

:return: timestamp of first data point

Expand source code
def data_start_timestamp(self) -> float:
    :return: timestamp of first data point
    return self._data_start
def from_raw_packets(self, packets: List[Union[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM, api900_pb2.RedvoxPacket]]) ‑> TimeSync

converts packets into TimeSyncData objects, then performs analysis

:param packets: list of RedvoxPacketM and RedvoxPacket to convert :return: modified version of self

Expand source code
def from_raw_packets(self, packets: List[Union[RedvoxPacketM, RedvoxPacket]]) -> "TimeSync":
    converts packets into TimeSyncData objects, then performs analysis

    :param packets: list of RedvoxPacketM and RedvoxPacket to convert
    :return: modified version of self
    all_exchanges: List[float] = []
    if isinstance(packets[0], RedvoxPacketM):
        self._data_start = packets[0].timing_information.packet_start_mach_timestamp
        self._data_start = packets[0].app_file_start_timestamp_machine
    if isinstance(packets[-1], RedvoxPacketM):
        self._data_end = packets[-1].timing_information.packet_end_mach_timestamp
        self._data_end = packets[-1].app_file_start_timestamp_machine

    packet: Union[RedvoxPacketM, RedvoxPacket]
    for packet in packets:
        if isinstance(packet, RedvoxPacketM):
                    lambda acc, ex: acc + [ex.a1, ex.a2, ex.a3, ex.b1, ex.b2, ex.b3],
            # Get synch exchanges
            ch: api900_pb2.UnevenlySampledChannel
            for ch in packet.unevenly_sampled_channels:
                if api900_pb2.TIME_SYNCHRONIZATION in ch.channel_types:

    if len(all_exchanges) > 0:
        self._time_sync_exchanges_list = np.transpose(
            [all_exchanges[i : i + 6] for i in range(0, len(all_exchanges), 6)]
    return self
def get_best_latency_timestamp(self) ‑> float

:return: timestamp of best latency, or np.nan if no best latency.

Expand source code
def get_best_latency_timestamp(self) -> float:
    :return: timestamp of best latency, or np.nan if no best latency.
    if not np.isnan(self._best_latency_index):
        if self._best_msg_array_index == 1:
            return self._time_sync_exchanges_list[3][self._best_latency_index]
        elif self._best_msg_array_index == 3:
            return self._time_sync_exchanges_list[5][self._best_latency_index]
    return np.nan
def get_device_exchanges_timestamps(self) ‑> 

:return: timestamps of sync exchanges initiated by the device

Expand source code
def get_device_exchanges_timestamps(self) -> np.array:
    :return: timestamps of sync exchanges initiated by the device
    return np.concatenate((self._time_sync_exchanges_list[3].tolist(), self._time_sync_exchanges_list[5].tolist()))
def get_exchange_timestamps(self, index: int) ‑> 

:param index: index of timestamps to return. 0, 1, 2 are server timestamps. 3, 4, 5 are device. Any value not from 0 to 5 will be converted to 0 :return: timestamps from the chosen index.

Expand source code
def get_exchange_timestamps(self, index: int) -> np.array:
    :param index: index of timestamps to return.  0, 1, 2 are server timestamps.  3, 4, 5 are device.
                    Any value not from 0 to 5 will be converted to 0
    :return: timestamps from the chosen index.
    if index < 0 or index > 5:
        index = 0
    return self._time_sync_exchanges_list[index]
def latencies(self) ‑> numpy.ndarray

:return: latencies as two np.arrays

Expand source code
def latencies(self) -> np.ndarray:
    :return: latencies as two np.arrays
    return self._latencies
def latency_std(self) ‑> float

:return: standard deviation of latency

Expand source code
def latency_std(self) -> float:
    :return: standard deviation of latency
    return self._latency_std
def mean_latency(self) ‑> float

:return: mean latency of data

Expand source code
def mean_latency(self) -> float:
    :return: mean latency of data
    return self._mean_latency
def mean_offset(self) ‑> float

:return: mean offset of data

Expand source code
def mean_offset(self) -> float:
    :return: mean offset of data
    return self._mean_offset
def num_tri_messages(self) ‑> int

:return: number of tri-message exchanges

Expand source code
def num_tri_messages(self) -> int:
    :return: number of tri-message exchanges
    return np.size(self._time_sync_exchanges_list, 1)
def offset_model(self) ‑> OffsetModel

:return: OffsetModel of the TimeSync

Expand source code
def offset_model(self) -> OffsetModel:
    :return: OffsetModel of the TimeSync
    return self._offset_model
def offset_std(self) ‑> float

:return: standard deviation of offset

Expand source code
def offset_std(self) -> float:
    :return: standard deviation of offset
    return self._offset_std
def offsets(self) ‑> numpy.ndarray

:return: offsets as two np.arrays

Expand source code
def offsets(self) -> np.ndarray:
    :return: offsets as two np.arrays
    return self._offsets
def process_exchanges(self, force: bool = False)

Use the sync exchanges to compute the time sync stats (latencies, best latency, offsets, etc).

:param force: if True, will overwrite any existing values.

Expand source code
def process_exchanges(self, force: bool = False):
    Use the sync exchanges to compute the time sync stats (latencies, best latency, offsets, etc).

    :param force: if True, will overwrite any existing values.
    if self._latencies is None or force:
def set_sync_exchanges(self, exchanges: List[])

Set the sync exchanges to be the list of exchanges. The list of exchanges must have 6 np.arrays, each equal length, and the lists must be in the order of: Server1, Server2, Server3, Device1, Device2, Device3

:param exchanges: 6 lists of equal length of timestamps

Expand source code
def set_sync_exchanges(self, exchanges: List[np.array]):
    Set the sync exchanges to be the list of exchanges.
    The list of exchanges must have 6 np.arrays, each equal length, and the lists must be in the order of:
    Server1, Server2, Server3, Device1, Device2, Device3

    :param exchanges: 6 lists of equal length of timestamps
    self._time_sync_exchanges_list = exchanges
def sync_exchanges(self) ‑> numpy.ndarray

:return: time sync exchanges

Expand source code
def sync_exchanges(self) -> np.ndarray:
    :return: time sync exchanges
    return self._time_sync_exchanges_list
def to_json(self)

:return: TimeSync as json string

Expand source code
def to_json(self):
    :return: TimeSync as json string
    return io.to_json(self)
def to_json_file(self, file_name: Optional[str] = None) ‑> pathlib.Path

saves the timesync data as json and data in the same directory.

:param file_name: the optional base file name. Do not include a file extension. If None, a default file name is created using this format: timesync.json :return: path to json file

Expand source code
def to_json_file(self, file_name: Optional[str] = None) -> Path:
    saves the timesync data as json and data in the same directory.

    :param file_name: the optional base file name.  Do not include a file extension.
                        If None, a default file name is created using this format:
    :return: path to json file
    return io.to_json_file(self, file_name)