Module redvox.api900.timesync.api900_timesync

Modules for extracting time synchronization statistics for API 900 data. Also includes functions for correcting time arrays.

Expand source code
"""
Modules for extracting time synchronization statistics for API 900 data.
Also includes functions for correcting time arrays.
"""

from typing import List, Optional, Tuple

# noinspection Mypy
import numpy as np

from redvox.api900.reader import WrappedRedvoxPacket
from redvox.api900.reader_utils import empty_array
from redvox.api900.timesync import tri_message_stats
from redvox.common import date_time_utils as dt
from redvox.common import file_statistics as fh
from redvox.common import stats_helper


class TimeSyncData:
    """
    Stores latencies, revised start times, time sync server offsets, and difference in adjusted machine time and
    acquisition server time
    Note that inputs to this must be continuous data; any change in sensor mach_time_zero or sample_rate
    requires a new TimeSyncData object
    ALL times in microseconds
    properties:
        rev_start_times: revised machine start times (per packet)
        server_acquisition_times: time packet arrived at server (per packet)
        latencies: latencies (per packet)
        best_latency: the lowest latency among packets
        best_latency_index: index in latencies array that contains the best latency
        latency_stats: StatsContainer for latencies
        offsets: list of calculated offsets (per packet)
        best_offset: the offset that is paired with the lowest latency
        offset_stats: StatsContainer for offsets
        num_packets: the number of packets being analyzed
        sample_rate_hz: the sample rate in Hz of all packets
        packet_duration: the duration of all packets
        mach_time_zero: the start time of the app in machine time
        num_tri_messages: number of tri-message exchanges (per packet)
        tri_message_coeffs: list of 6-tuples containing the tri-message exchange coefficients (up to 1 tuple per packet)
        best_tri_msg_indices: the index of the best tri-message for each of the tri_message_coeffs tuples
        acquire_travel_time: calculated time it took packet to reach server (up to 1 per packet)
        bad_packets: indices of packets with an undefined or 0 latency
    """

    def __init__(self, wrapped_packets: List[WrappedRedvoxPacket] = None):
        """
        Initialize properties, assume all packets share the same sample rate and are continuous
        :param wrapped_packets: list of data packets
        """
        self.latency_stats: stats_helper.StatsContainer = stats_helper.StatsContainer("latency")
        self.offset_stats: stats_helper.StatsContainer = stats_helper.StatsContainer("offset")
        if wrapped_packets is None or len(wrapped_packets) < 1:
            self.rev_start_times: np.ndarray = np.ndarray((0, 0))
            self.server_acquisition_times: np.ndarray = np.ndarray((0, 0))
            self.latencies: np.ndarray = np.ndarray((0, 0))
            self.best_latency: Optional[float] = None
            self.best_latency_index: Optional[int] = None
            self.offsets: np.ndarray = np.ndarray((0, 0))
            self.best_offset: float = 0.0
            self.num_packets: int = 0
            self.sample_rate_hz: Optional[float] = None
            self.packet_duration: int = 0
            self.mach_time_zero: Optional[float] = None
            self.num_tri_messages: np.ndarray = np.ndarray((0, 0))
            self.tri_message_coeffs: List[Tuple] = []
            self.best_tri_msg_indices: List[int] = []
            self.acquire_travel_time: np.ndarray = np.ndarray((0, 0))
            self.bad_packets: List[int] = []
        else:
            self.get_time_sync_data(wrapped_packets)

    def get_time_sync_data(self, wrapped_packets: List[WrappedRedvoxPacket]):
        """
        Sets the time statistics between machine, time server, and acquisition server for the packets given.
        :param wrapped_packets: wrapped packets with same sample rate
        """
        self.server_acquisition_times = np.zeros(len(wrapped_packets))  # array of server acquisition times
        self.rev_start_times = np.zeros(len(wrapped_packets))   # array of app start times
        self.num_tri_messages = np.zeros(len(wrapped_packets))  # number of tri-message exchanges per packet
        self.latencies = np.zeros(len(wrapped_packets))  # array of minimum latencies
        self.offsets = np.zeros(len(wrapped_packets))  # array of offset applied to machine time to get sync time
        self.num_packets = len(wrapped_packets)  # number of packets in list
        self.tri_message_coeffs = []    # a list of tri-message coefficients
        self.best_tri_msg_indices = []  # a list of the best latency index in each set of tri-message coefficients
        self.bad_packets = []   # list of packets that contain invalid data
        sample_rates = np.zeros(len(wrapped_packets))       # list of sample rates, should all be the same
        mach_time_zeros = np.zeros(len(wrapped_packets))    # list of mach time zeros, should all the be same

        # get the server acquisition time, app start time, and tri message stats
        for i, wrapped_packet in enumerate(wrapped_packets):
            # pass the mach_time_zero and sample_rate into arrays
            sample_rates[i] = wrapped_packet.microphone_sensor().sample_rate_hz()
            mach_time_zeros[i] = wrapped_packet.mach_time_zero()
            self.server_acquisition_times[i] = wrapped_packet.server_timestamp_epoch_microseconds_utc()
            self.rev_start_times[i] = wrapped_packet.app_file_start_timestamp_epoch_microseconds_utc()
            self._compute_tri_message_stats(wrapped_packet, i)

        # check sample rate and mach time zero (if it exists) for changes.  if it changed, sync will not work.
        if not self._validate_sensor_settings(sample_rates, mach_time_zeros):
            raise Exception("ERROR: Sensor settings changed; separate data based on changes and re-analyze")
        self.find_bad_packets()
        self.evaluate_latencies_and_offsets()
        # set the packet duration
        self.packet_duration = dt.seconds_to_microseconds(fh.get_duration_seconds_from_sample_rate(
            int(self.sample_rate_hz)))
        # apply duration to app start to get packet end time, then subtract
        # that from server acquire time to get travel time to acquisition server
        # ASSUMING that acquisition and time-sync server have the same time source
        self.acquire_travel_time = self.server_acquisition_times - (self.rev_start_times + self.packet_duration)

    def evaluate_latencies_and_offsets(self):
        """
        evaluates the goodness of latencies and offsets.
        adjusts rev_start_times if there is a best offset
        """
        # if no decoders synced properly, all latencies are NaNs or all indices are bad
        if np.isnan(self.latencies).all() or len(self.bad_packets) == self.num_packets:
            # make everything empty or None, and best offset is 0 (no change)
            self.latencies = []
            self.offsets = []
            self.best_latency = None
            self.best_latency_index = None
            self.best_offset = 0.0
        else:
            # convert all NaN and negative values from latencies into zero
            self.latencies = np.nan_to_num(self.latencies)
            self.latencies[self.latencies < 0] = 0.0
            # find and set minimum latency based on non-zero latencies
            self.best_latency = np.min(self.get_valid_latencies())
            self.latency_stats.best_value = self.best_latency
            self.best_latency_index = np.where(self.latencies == self.best_latency)[0][0]
            # find all non-NaN, non-zero offsets
            good_offsets = self.get_valid_offsets()

            if good_offsets is not None and len(good_offsets) > 0:
                # there must be some good offsets; convert all NaN offsets to 0
                self.offsets = np.nan_to_num(self.offsets)
                # set best offset based on best latency
                self.best_offset = self.offsets[self.best_latency_index]
                self.offset_stats.best_value = self.best_offset
                # fix all packet start times by adding the packet's offset
                self.rev_start_times += self.best_offset
            else:
                # set offset properties to empty or None
                self.offsets = []

    def _compute_tri_message_stats(self, packet: WrappedRedvoxPacket, index: int):
        """
        helper function to compute tri message stats of the time sync data
        :param packet: a packet to compute tri message stats from
        :param index: the index to add the data to
        """
        # if there is a time sync channel with payload, find the minimum latency and its corresponding offset
        if packet.has_time_synchronization_sensor():
            timesync_channel = packet.time_synchronization_sensor()
            coeffs = timesync_channel.payload_values()
            # set the number of tri-message exchanges in the packet
            self.num_tri_messages[index] = int(len(coeffs) / 6)
            coeffs_tuple: Tuple[
                np.ndarray,
                np.ndarray,
                np.ndarray,
                np.ndarray,
                np.ndarray,
                np.ndarray] = tri_message_stats.transmit_receive_timestamps_microsec(coeffs)
            self.tri_message_coeffs.append(coeffs_tuple)  # save the tri-message exchanges for later
            a1_coeffs: np.ndarray = coeffs_tuple[0]
            a2_coeffs: np.ndarray = coeffs_tuple[1]
            a3_coeffs: np.ndarray = coeffs_tuple[2]
            b1_coeffs: np.ndarray = coeffs_tuple[3]
            b2_coeffs: np.ndarray = coeffs_tuple[4]
            b3_coeffs: np.ndarray = coeffs_tuple[5]
            # get tri message data via TriMessageStats class
            tms = tri_message_stats.TriMessageStats(packet.redvox_id(),
                                                    a1_coeffs,
                                                    a2_coeffs,
                                                    a3_coeffs,
                                                    b1_coeffs,
                                                    b2_coeffs,
                                                    b3_coeffs)
            # check if we actually have exchanges to evaluate
            if self.num_tri_messages[index] > 0:
                # Concatenate d1 and d3 arrays, and o1 and o3 arrays when passing values into stats class
                #  note the last parameter multiplies the number of exchanges by 2, as there are two latencies
                #  and offsets calculated per exchange
                # noinspection PyTypeChecker
                self.latency_stats.add(np.mean([*tms.latency1, *tms.latency3]), np.std([*tms.latency1, *tms.latency3]),
                                       self.num_tri_messages[index] * 2)
                # noinspection PyTypeChecker
                self.offset_stats.add(np.mean([*tms.offset1, *tms.offset3]), np.std([*tms.offset1, *tms.offset3]),
                                      self.num_tri_messages[index] * 2)
                # set the best latency and offset based on the packet's metadata, or tri-message stats if no metadata
                latency: Optional[float] = packet.best_latency()
                # pass the location of the best calculated latency
                self.best_tri_msg_indices.append(tms.best_latency_index)
                if latency is None:
                    # no metadata
                    self.latencies[index] = tms.best_latency
                else:
                    self.latencies[index] = latency
                offset: Optional[float] = packet.best_offset()
                if offset is None:
                    # no metadata
                    self.offsets[index] = tms.best_offset
                else:
                    self.offsets[index] = offset
                # everything has been computed, time to return
                return
        # If here, there are no exchanges to read or there is no sync channel.
        # write default or empty values to the correct properties
        self.latencies[index] = 0.0
        self.offsets[index] = 0.0
        self.tri_message_coeffs.append((empty_array(), empty_array(), empty_array(),
                                        empty_array(), empty_array(), empty_array()))
        # pass the location of the best latency
        self.best_tri_msg_indices.append(-1)
        # noinspection PyTypeChecker
        self.latency_stats.add(0, 0, 0)
        # noinspection PyTypeChecker
        self.offset_stats.add(0, 0, 0)

    def _validate_sensor_settings(self, sample_rates: np.array, mach_time_zeros: np.array) -> bool:
        """
        Examine all sample rates and mach time zeros to ensure that sensor settings do not change
        Sets the sample rate and mach time zero if there is no change
        :param sample_rates: sample rates of all packets from sensor
        :param mach_time_zeros: machine time zero of all packets from sensor
        :return: True if sensor settings do not change
        """
        # set the sample rate and mach time zero
        self.sample_rate_hz = sample_rates[0]
        self.mach_time_zero = mach_time_zeros[0]
        if len(sample_rates) > 1:
            # if there's more than 1 value, we need to compare them
            for i in range(1, len(sample_rates)):
                if self.sample_rate_hz != sample_rates[i]:
                    print("ERROR: sample rate in data packets has changed.")
                    return False
                # process only non-nan mach time zeros
                if not np.isnan(self.mach_time_zero) and self.mach_time_zero != mach_time_zeros[i]:
                    print("ERROR: mach time zero in data packets has changed.")
                    return False
        return True

    def find_bad_packets(self):
        """
        Find bad packets and mark them using the bad_packets property
        Assuming all packets have been processed and all bad packets are marked with a 0 (or are negative)
        """
        for idx in range(len(self.latencies)):  # mark bad indices (they have a 0 or less value)
            if self.latencies[idx] <= 0:
                self.bad_packets.append(idx)

    def get_ratio_bad_packets(self) -> float:
        """
        Return the ratio of packets with bad latency calculations over total packets
        :return: num packets with bad latency calculations / total packets
        """
        if self.num_packets < 1:
            return 0
        return len(self.bad_packets) / float(self.num_packets)

    def get_latency_mean(self) -> Optional[float]:
        """
        return the mean of all latencies, and None if the latencies are invalid.
        :return: the mean of all latencies
        """
        if self.best_latency is None:
            return None
        else:
            return self.latency_stats.mean_of_means()

    def get_latency_std_dev(self) -> Optional[float]:
        """
        return the standard deviation (std_dev) of all latencies, and None if the latencies are invalid.
        :return: the std dev of all latencies
        """
        if self.best_latency is None:
            return None
        else:
            return self.latency_stats.total_std_dev()

    def get_valid_latencies(self, latency_array: np.array = None) -> np.array:
        """
        takes latencies, converts NaNs and negatives to 0, then returns non-zero latencies
        :param latency_array: optional array to clean instead of self.latencies; default None
        :return: non-NaN, non-zero latencies
        """
        if latency_array is None:
            clean_latencies = np.nan_to_num(self.latencies)  # replace NaNs with zeros
        else:
            clean_latencies = np.nan_to_num(latency_array)  # replace NaNs with zeros
        clean_latencies[clean_latencies < 0] = 0  # replace negative latencies with 0
        return clean_latencies[clean_latencies != 0]  # return only non-zero latencies

    def get_offset_mean(self) -> Optional[float]:
        """
        return the mean of all offsets, and 0.0 if the offsets or latencies are invalid.
        :return: the mean of all offsets
        """
        if self.best_latency is None or self.best_offset == 0.0:
            return 0.0
        else:
            return self.offset_stats.mean_of_means()

    def get_offset_std_dev(self) -> Optional[float]:
        """
        return the standard deviation (std_dev) of all offsets, and 0.0 if the offsets or latencies are invalid.
        :return: the std dev of all offsets
        """
        if self.best_latency is None or self.best_offset == 0.0:
            return 0.0
        else:
            return self.offset_stats.total_std_dev()

    def get_valid_offsets(self, offset_array: np.array = None) -> np.array:
        """
        takes valid offsets (based on bad_packets), converts NaNs to 0, then returns non-zero offsets
        :param offset_array: optional array to clean; default None
        :return: non-NaN, non-zero offsets
        """
        if offset_array is None:
            if len(self.bad_packets) > 0:
                valids = []
                for i in range(len(self.offsets)):
                    if i not in self.bad_packets:
                        valids.append(self.offsets[i])
                clean_offsets = np.nan_to_num(valids)  # replace NaNs with zeros
            else:
                clean_offsets = np.nan_to_num(self.offsets)
        else:
            clean_offsets = np.nan_to_num(offset_array)
        return clean_offsets[clean_offsets != 0]  # return only non-zero offsets

    def get_valid_rev_start_times(self) -> np.array:
        """
        return the array of valid (based on bad_packets) revised start times
        :return: array of valid revised start times
        """
        if len(self.bad_packets) > 0:  # return only the start times not associated with a bad packet
            valids = []
            for i in range(len(self.rev_start_times)):
                if i not in self.bad_packets:  # this is a good packet
                    valids.append(self.rev_start_times[i])
            return np.array(valids)
        else:
            return self.rev_start_times

    def get_best_rev_start_time(self) -> int:
        """
        return the revised start time associated with the lowest latency
        :return: revised start time in microseconds associated with the lowest latency
        """
        return self.rev_start_times[self.best_latency_index]


def validate_sensors(wrapped_packets: List[WrappedRedvoxPacket]) -> bool:
    """
    Examine all sample rates and mach time zeros to ensure that sensor settings do not change
    :param wrapped_packets: a list of wrapped redvox packets to read
    :return: True if sensor settings do not change
    """
    # check that we have packets to read
    num_packets = len(wrapped_packets)
    if num_packets < 1:
        print("ERROR: no data to validate.")
        return False
    # if we have more than one packet, we need to validate the data
    elif num_packets > 1:
        sample_rates = np.zeros(num_packets)
        mach_time_zeros = np.zeros(num_packets)
        for j, wrapped_packet in enumerate(wrapped_packets):
            sample_rates[j] = wrapped_packet.microphone_sensor().sample_rate_hz()
            mach_time_zeros[j] = wrapped_packet.mach_time_zero()
        for i in range(1, len(sample_rates)):
            if sample_rates[0] != sample_rates[i]:
                print("ERROR: sample rate in data packets has changed.")
                return False
            # process only non-nan mach time zeros
            if not np.isnan(mach_time_zeros[0]) and mach_time_zeros[0] != mach_time_zeros[i]:
                print("ERROR: mach time zero in data packets has changed.")
                return False
    # we get here if all packets have the same sample rate and mach time zero
    return True


def update_evenly_sampled_time_array(tsd: TimeSyncData, num_samples: float = None,
                                     time_start_array: np.array = None) -> np.ndarray:
    """
    Correct evenly sampled times using updated time_start_array values as the focal point.
    Expects tsd to have the same number of packets as elements in time_start_array
    Inserts data gaps where necessary before building array.
    Throws an exception if the number of packets in tsd does not match the length of time_start_array

    :param tsd: TimeSyncData object that contains the information needed to update the time array
    :param num_samples: number of samples in one file; optional, uses number based on sample rate if not given
    :param time_start_array: the list of timestamps to correct in seconds; optional, uses the start times in the
                             TimeSyncData object if not given
    :return: Revised time array in epoch seconds
    """
    if time_start_array is None:
        # replace the time_start_array with values from tsd; convert tsd times to seconds
        time_start_array = tsd.rev_start_times / dt.MICROSECONDS_IN_SECOND
    num_files = tsd.num_packets
    # the TimeSyncData must have the same number of packets as the number of elements in time_start_array
    if num_files != len(time_start_array):
        # alert the user, then quit
        raise Exception("ERROR: Attempted to update a time array that doesn't contain "
                        "the same number of elements as the TimeSyncData!")

    if num_samples is None:
        num_samples = fh.get_num_points_from_sample_rate(tsd.sample_rate_hz)
    t_dt = 1.0 / tsd.sample_rate_hz

    # Use TimeSyncData object to find best start index.
    # Samples before will be the number of decoders before a0 times the number of samples in a file.
    # Samples after will be the number of decoders after a0 times the number of samples in a file minus 1;
    # the minus one represents the best a0.
    decoder_idx = tsd.best_latency_index
    samples_before = int(decoder_idx * num_samples)
    samples_after = round((num_files - decoder_idx) * num_samples) - 1
    best_start_sec = time_start_array[decoder_idx]

    # build the time arrays separately in epoch seconds, then join into one
    # add 1 to include the actual a0 sample, then add 1 again to skip the a0 sample; this avoids repetition
    timesec_before = np.vectorize(lambda t: best_start_sec - t * t_dt)(list(range(int(samples_before + 1))))
    timesec_before = timesec_before[::-1]  # reverse 'before' times so they increase from earliest start time
    timesec_after = np.vectorize(lambda t: best_start_sec + t * t_dt)(list(range(1, int(samples_after + 1))))
    timesec_rev = np.concatenate([timesec_before, timesec_after])

    return timesec_rev


def update_time_array(tsd: TimeSyncData, time_array: np.array) -> np.ndarray:
    """
    Correct timestamps in time_array using information from TimeSyncData
    :param tsd: TimeSyncData object that contains the information needed to update the time array
    :param time_array: the list of timestamps to correct in seconds
    :return: Revised time array in epoch seconds
    """
    return time_array + (tsd.best_offset / dt.MICROSECONDS_IN_SECOND)


def sync_packet_time_900(wrapped_packets_fs: list, verbose: bool = False):
    """
    Correct the timestamps for api900 data based on minimum latency and best offset of the ensemble.

    :param wrapped_packets_fs: wrapped packets with same sample rate and from same device
    :param verbose: if True, prints statements to assess goodness of time sync
    """

    # assume each packet has a best latency and offset
    latencies = []  # array of minimum latencies
    offsets = []  # array of offset applied to machine time to get sync time

    # latencies and offsets
    for packet in wrapped_packets_fs:
        if packet.has_time_synchronization_sensor():
            # set the best latency and offset based on the packet's metadata
            if packet.best_latency() is not None and packet.best_offset() is not None:
                latencies.append(packet.best_latency())
                offsets.append(packet.best_offset())
            else:
                # metadata not set, use packet contents
                timesync_channel = packet.time_synchronization_sensor()
                coeffs = timesync_channel.payload_values()
                coeffs_tuple: Tuple[
                    np.ndarray,
                    np.ndarray,
                    np.ndarray,
                    np.ndarray,
                    np.ndarray,
                    np.ndarray] = tri_message_stats.transmit_receive_timestamps_microsec(coeffs)
                a1_coeffs: np.ndarray = coeffs_tuple[0]
                a2_coeffs: np.ndarray = coeffs_tuple[1]
                a3_coeffs: np.ndarray = coeffs_tuple[2]
                b1_coeffs: np.ndarray = coeffs_tuple[3]
                b2_coeffs: np.ndarray = coeffs_tuple[4]
                b3_coeffs: np.ndarray = coeffs_tuple[5]
                # get tri message data via TriMessageStats class
                tms = tri_message_stats.TriMessageStats(packet.redvox_id(),
                                                        a1_coeffs,
                                                        a2_coeffs,
                                                        a3_coeffs,
                                                        b1_coeffs,
                                                        b2_coeffs,
                                                        b3_coeffs)
                latencies.append(tms.best_latency)
                offsets.append(tms.best_offset)
        # if there is no time sync channel (usually no communication between device and server), default to 0
        else:
            latencies.append(0)
            offsets.append(0)

    # convert NaN latencies to 0
    latencies = np.nan_to_num(latencies)
    # convert any negative latency to 0.
    latencies[latencies < 0] = 0

    if len(np.nonzero(latencies)[0]) == 0:  # if no decoders synced properly, all latencies are 0
        sync_server = wrapped_packets_fs[0].time_synchronization_server()
        if verbose:
            print('\n{} did not achieve sync with time sync server {}!'.format(
                wrapped_packets_fs[0].redvox_id(), sync_server))
        offset = 0
    else:
        d1_min = np.min(latencies[latencies != 0])
        best_d1_index = np.where(latencies == d1_min)[0][0]
        # correct each packet's timestamps
        offset = int(offsets[best_d1_index])

    for wrapped_packet in wrapped_packets_fs:
        # update the machine file start time and mach time zero
        wrapped_packet.set_app_file_start_timestamp_epoch_microseconds_utc(
            wrapped_packet.app_file_start_timestamp_epoch_microseconds_utc() + offset)
        wrapped_packet.set_app_file_start_timestamp_machine(
            wrapped_packet.app_file_start_timestamp_machine() + offset)
        if wrapped_packet.mach_time_zero() is not None:
            wrapped_packet.set_mach_time_zero(wrapped_packet.mach_time_zero() + offset)
        # update microphone start time
        if wrapped_packet.has_microphone_sensor():
            wrapped_packet.microphone_sensor().set_first_sample_timestamp_epoch_microseconds_utc(
                wrapped_packet.microphone_sensor().first_sample_timestamp_epoch_microseconds_utc() + offset)
        # correct the times in unevenly sampled channels using the packets' best offset
        wrapped_packet.update_uneven_sensor_timestamps(offset)
        wrapped_packet.set_is_synch_corrected(True)

Functions

def sync_packet_time_900(wrapped_packets_fs: list, verbose: bool = False)

Correct the timestamps for api900 data based on minimum latency and best offset of the ensemble.

:param wrapped_packets_fs: wrapped packets with same sample rate and from same device :param verbose: if True, prints statements to assess goodness of time sync

Expand source code
def sync_packet_time_900(wrapped_packets_fs: list, verbose: bool = False):
    """
    Correct the timestamps for api900 data based on minimum latency and best offset of the ensemble.

    :param wrapped_packets_fs: wrapped packets with same sample rate and from same device
    :param verbose: if True, prints statements to assess goodness of time sync
    """

    # assume each packet has a best latency and offset
    latencies = []  # array of minimum latencies
    offsets = []  # array of offset applied to machine time to get sync time

    # latencies and offsets
    for packet in wrapped_packets_fs:
        if packet.has_time_synchronization_sensor():
            # set the best latency and offset based on the packet's metadata
            if packet.best_latency() is not None and packet.best_offset() is not None:
                latencies.append(packet.best_latency())
                offsets.append(packet.best_offset())
            else:
                # metadata not set, use packet contents
                timesync_channel = packet.time_synchronization_sensor()
                coeffs = timesync_channel.payload_values()
                coeffs_tuple: Tuple[
                    np.ndarray,
                    np.ndarray,
                    np.ndarray,
                    np.ndarray,
                    np.ndarray,
                    np.ndarray] = tri_message_stats.transmit_receive_timestamps_microsec(coeffs)
                a1_coeffs: np.ndarray = coeffs_tuple[0]
                a2_coeffs: np.ndarray = coeffs_tuple[1]
                a3_coeffs: np.ndarray = coeffs_tuple[2]
                b1_coeffs: np.ndarray = coeffs_tuple[3]
                b2_coeffs: np.ndarray = coeffs_tuple[4]
                b3_coeffs: np.ndarray = coeffs_tuple[5]
                # get tri message data via TriMessageStats class
                tms = tri_message_stats.TriMessageStats(packet.redvox_id(),
                                                        a1_coeffs,
                                                        a2_coeffs,
                                                        a3_coeffs,
                                                        b1_coeffs,
                                                        b2_coeffs,
                                                        b3_coeffs)
                latencies.append(tms.best_latency)
                offsets.append(tms.best_offset)
        # if there is no time sync channel (usually no communication between device and server), default to 0
        else:
            latencies.append(0)
            offsets.append(0)

    # convert NaN latencies to 0
    latencies = np.nan_to_num(latencies)
    # convert any negative latency to 0.
    latencies[latencies < 0] = 0

    if len(np.nonzero(latencies)[0]) == 0:  # if no decoders synced properly, all latencies are 0
        sync_server = wrapped_packets_fs[0].time_synchronization_server()
        if verbose:
            print('\n{} did not achieve sync with time sync server {}!'.format(
                wrapped_packets_fs[0].redvox_id(), sync_server))
        offset = 0
    else:
        d1_min = np.min(latencies[latencies != 0])
        best_d1_index = np.where(latencies == d1_min)[0][0]
        # correct each packet's timestamps
        offset = int(offsets[best_d1_index])

    for wrapped_packet in wrapped_packets_fs:
        # update the machine file start time and mach time zero
        wrapped_packet.set_app_file_start_timestamp_epoch_microseconds_utc(
            wrapped_packet.app_file_start_timestamp_epoch_microseconds_utc() + offset)
        wrapped_packet.set_app_file_start_timestamp_machine(
            wrapped_packet.app_file_start_timestamp_machine() + offset)
        if wrapped_packet.mach_time_zero() is not None:
            wrapped_packet.set_mach_time_zero(wrapped_packet.mach_time_zero() + offset)
        # update microphone start time
        if wrapped_packet.has_microphone_sensor():
            wrapped_packet.microphone_sensor().set_first_sample_timestamp_epoch_microseconds_utc(
                wrapped_packet.microphone_sensor().first_sample_timestamp_epoch_microseconds_utc() + offset)
        # correct the times in unevenly sampled channels using the packets' best offset
        wrapped_packet.update_uneven_sensor_timestamps(offset)
        wrapped_packet.set_is_synch_corrected(True)
def update_evenly_sampled_time_array(tsd: TimeSyncData, num_samples: float = None, time_start_array:  = None) ‑> numpy.ndarray

Correct evenly sampled times using updated time_start_array values as the focal point. Expects tsd to have the same number of packets as elements in time_start_array Inserts data gaps where necessary before building array. Throws an exception if the number of packets in tsd does not match the length of time_start_array

:param tsd: TimeSyncData object that contains the information needed to update the time array :param num_samples: number of samples in one file; optional, uses number based on sample rate if not given :param time_start_array: the list of timestamps to correct in seconds; optional, uses the start times in the TimeSyncData object if not given :return: Revised time array in epoch seconds

Expand source code
def update_evenly_sampled_time_array(tsd: TimeSyncData, num_samples: float = None,
                                     time_start_array: np.array = None) -> np.ndarray:
    """
    Correct evenly sampled times using updated time_start_array values as the focal point.
    Expects tsd to have the same number of packets as elements in time_start_array
    Inserts data gaps where necessary before building array.
    Throws an exception if the number of packets in tsd does not match the length of time_start_array

    :param tsd: TimeSyncData object that contains the information needed to update the time array
    :param num_samples: number of samples in one file; optional, uses number based on sample rate if not given
    :param time_start_array: the list of timestamps to correct in seconds; optional, uses the start times in the
                             TimeSyncData object if not given
    :return: Revised time array in epoch seconds
    """
    if time_start_array is None:
        # replace the time_start_array with values from tsd; convert tsd times to seconds
        time_start_array = tsd.rev_start_times / dt.MICROSECONDS_IN_SECOND
    num_files = tsd.num_packets
    # the TimeSyncData must have the same number of packets as the number of elements in time_start_array
    if num_files != len(time_start_array):
        # alert the user, then quit
        raise Exception("ERROR: Attempted to update a time array that doesn't contain "
                        "the same number of elements as the TimeSyncData!")

    if num_samples is None:
        num_samples = fh.get_num_points_from_sample_rate(tsd.sample_rate_hz)
    t_dt = 1.0 / tsd.sample_rate_hz

    # Use TimeSyncData object to find best start index.
    # Samples before will be the number of decoders before a0 times the number of samples in a file.
    # Samples after will be the number of decoders after a0 times the number of samples in a file minus 1;
    # the minus one represents the best a0.
    decoder_idx = tsd.best_latency_index
    samples_before = int(decoder_idx * num_samples)
    samples_after = round((num_files - decoder_idx) * num_samples) - 1
    best_start_sec = time_start_array[decoder_idx]

    # build the time arrays separately in epoch seconds, then join into one
    # add 1 to include the actual a0 sample, then add 1 again to skip the a0 sample; this avoids repetition
    timesec_before = np.vectorize(lambda t: best_start_sec - t * t_dt)(list(range(int(samples_before + 1))))
    timesec_before = timesec_before[::-1]  # reverse 'before' times so they increase from earliest start time
    timesec_after = np.vectorize(lambda t: best_start_sec + t * t_dt)(list(range(1, int(samples_after + 1))))
    timesec_rev = np.concatenate([timesec_before, timesec_after])

    return timesec_rev
def update_time_array(tsd: TimeSyncData, time_array: ) ‑> numpy.ndarray

Correct timestamps in time_array using information from TimeSyncData :param tsd: TimeSyncData object that contains the information needed to update the time array :param time_array: the list of timestamps to correct in seconds :return: Revised time array in epoch seconds

Expand source code
def update_time_array(tsd: TimeSyncData, time_array: np.array) -> np.ndarray:
    """
    Correct timestamps in time_array using information from TimeSyncData
    :param tsd: TimeSyncData object that contains the information needed to update the time array
    :param time_array: the list of timestamps to correct in seconds
    :return: Revised time array in epoch seconds
    """
    return time_array + (tsd.best_offset / dt.MICROSECONDS_IN_SECOND)
def validate_sensors(wrapped_packets: List[WrappedRedvoxPacket]) ‑> bool

Examine all sample rates and mach time zeros to ensure that sensor settings do not change :param wrapped_packets: a list of wrapped redvox packets to read :return: True if sensor settings do not change

Expand source code
def validate_sensors(wrapped_packets: List[WrappedRedvoxPacket]) -> bool:
    """
    Examine all sample rates and mach time zeros to ensure that sensor settings do not change
    :param wrapped_packets: a list of wrapped redvox packets to read
    :return: True if sensor settings do not change
    """
    # check that we have packets to read
    num_packets = len(wrapped_packets)
    if num_packets < 1:
        print("ERROR: no data to validate.")
        return False
    # if we have more than one packet, we need to validate the data
    elif num_packets > 1:
        sample_rates = np.zeros(num_packets)
        mach_time_zeros = np.zeros(num_packets)
        for j, wrapped_packet in enumerate(wrapped_packets):
            sample_rates[j] = wrapped_packet.microphone_sensor().sample_rate_hz()
            mach_time_zeros[j] = wrapped_packet.mach_time_zero()
        for i in range(1, len(sample_rates)):
            if sample_rates[0] != sample_rates[i]:
                print("ERROR: sample rate in data packets has changed.")
                return False
            # process only non-nan mach time zeros
            if not np.isnan(mach_time_zeros[0]) and mach_time_zeros[0] != mach_time_zeros[i]:
                print("ERROR: mach time zero in data packets has changed.")
                return False
    # we get here if all packets have the same sample rate and mach time zero
    return True

Classes

class TimeSyncData (wrapped_packets: List[WrappedRedvoxPacket] = None)

Stores latencies, revised start times, time sync server offsets, and difference in adjusted machine time and acquisition server time Note that inputs to this must be continuous data; any change in sensor mach_time_zero or sample_rate requires a new TimeSyncData object ALL times in microseconds properties: rev_start_times: revised machine start times (per packet) server_acquisition_times: time packet arrived at server (per packet) latencies: latencies (per packet) best_latency: the lowest latency among packets best_latency_index: index in latencies array that contains the best latency latency_stats: StatsContainer for latencies offsets: list of calculated offsets (per packet) best_offset: the offset that is paired with the lowest latency offset_stats: StatsContainer for offsets num_packets: the number of packets being analyzed sample_rate_hz: the sample rate in Hz of all packets packet_duration: the duration of all packets mach_time_zero: the start time of the app in machine time num_tri_messages: number of tri-message exchanges (per packet) tri_message_coeffs: list of 6-tuples containing the tri-message exchange coefficients (up to 1 tuple per packet) best_tri_msg_indices: the index of the best tri-message for each of the tri_message_coeffs tuples acquire_travel_time: calculated time it took packet to reach server (up to 1 per packet) bad_packets: indices of packets with an undefined or 0 latency

Initialize properties, assume all packets share the same sample rate and are continuous :param wrapped_packets: list of data packets

Expand source code
class TimeSyncData:
    """
    Stores latencies, revised start times, time sync server offsets, and difference in adjusted machine time and
    acquisition server time
    Note that inputs to this must be continuous data; any change in sensor mach_time_zero or sample_rate
    requires a new TimeSyncData object
    ALL times in microseconds
    properties:
        rev_start_times: revised machine start times (per packet)
        server_acquisition_times: time packet arrived at server (per packet)
        latencies: latencies (per packet)
        best_latency: the lowest latency among packets
        best_latency_index: index in latencies array that contains the best latency
        latency_stats: StatsContainer for latencies
        offsets: list of calculated offsets (per packet)
        best_offset: the offset that is paired with the lowest latency
        offset_stats: StatsContainer for offsets
        num_packets: the number of packets being analyzed
        sample_rate_hz: the sample rate in Hz of all packets
        packet_duration: the duration of all packets
        mach_time_zero: the start time of the app in machine time
        num_tri_messages: number of tri-message exchanges (per packet)
        tri_message_coeffs: list of 6-tuples containing the tri-message exchange coefficients (up to 1 tuple per packet)
        best_tri_msg_indices: the index of the best tri-message for each of the tri_message_coeffs tuples
        acquire_travel_time: calculated time it took packet to reach server (up to 1 per packet)
        bad_packets: indices of packets with an undefined or 0 latency
    """

    def __init__(self, wrapped_packets: List[WrappedRedvoxPacket] = None):
        """
        Initialize properties, assume all packets share the same sample rate and are continuous
        :param wrapped_packets: list of data packets
        """
        self.latency_stats: stats_helper.StatsContainer = stats_helper.StatsContainer("latency")
        self.offset_stats: stats_helper.StatsContainer = stats_helper.StatsContainer("offset")
        if wrapped_packets is None or len(wrapped_packets) < 1:
            self.rev_start_times: np.ndarray = np.ndarray((0, 0))
            self.server_acquisition_times: np.ndarray = np.ndarray((0, 0))
            self.latencies: np.ndarray = np.ndarray((0, 0))
            self.best_latency: Optional[float] = None
            self.best_latency_index: Optional[int] = None
            self.offsets: np.ndarray = np.ndarray((0, 0))
            self.best_offset: float = 0.0
            self.num_packets: int = 0
            self.sample_rate_hz: Optional[float] = None
            self.packet_duration: int = 0
            self.mach_time_zero: Optional[float] = None
            self.num_tri_messages: np.ndarray = np.ndarray((0, 0))
            self.tri_message_coeffs: List[Tuple] = []
            self.best_tri_msg_indices: List[int] = []
            self.acquire_travel_time: np.ndarray = np.ndarray((0, 0))
            self.bad_packets: List[int] = []
        else:
            self.get_time_sync_data(wrapped_packets)

    def get_time_sync_data(self, wrapped_packets: List[WrappedRedvoxPacket]):
        """
        Sets the time statistics between machine, time server, and acquisition server for the packets given.
        :param wrapped_packets: wrapped packets with same sample rate
        """
        self.server_acquisition_times = np.zeros(len(wrapped_packets))  # array of server acquisition times
        self.rev_start_times = np.zeros(len(wrapped_packets))   # array of app start times
        self.num_tri_messages = np.zeros(len(wrapped_packets))  # number of tri-message exchanges per packet
        self.latencies = np.zeros(len(wrapped_packets))  # array of minimum latencies
        self.offsets = np.zeros(len(wrapped_packets))  # array of offset applied to machine time to get sync time
        self.num_packets = len(wrapped_packets)  # number of packets in list
        self.tri_message_coeffs = []    # a list of tri-message coefficients
        self.best_tri_msg_indices = []  # a list of the best latency index in each set of tri-message coefficients
        self.bad_packets = []   # list of packets that contain invalid data
        sample_rates = np.zeros(len(wrapped_packets))       # list of sample rates, should all be the same
        mach_time_zeros = np.zeros(len(wrapped_packets))    # list of mach time zeros, should all the be same

        # get the server acquisition time, app start time, and tri message stats
        for i, wrapped_packet in enumerate(wrapped_packets):
            # pass the mach_time_zero and sample_rate into arrays
            sample_rates[i] = wrapped_packet.microphone_sensor().sample_rate_hz()
            mach_time_zeros[i] = wrapped_packet.mach_time_zero()
            self.server_acquisition_times[i] = wrapped_packet.server_timestamp_epoch_microseconds_utc()
            self.rev_start_times[i] = wrapped_packet.app_file_start_timestamp_epoch_microseconds_utc()
            self._compute_tri_message_stats(wrapped_packet, i)

        # check sample rate and mach time zero (if it exists) for changes.  if it changed, sync will not work.
        if not self._validate_sensor_settings(sample_rates, mach_time_zeros):
            raise Exception("ERROR: Sensor settings changed; separate data based on changes and re-analyze")
        self.find_bad_packets()
        self.evaluate_latencies_and_offsets()
        # set the packet duration
        self.packet_duration = dt.seconds_to_microseconds(fh.get_duration_seconds_from_sample_rate(
            int(self.sample_rate_hz)))
        # apply duration to app start to get packet end time, then subtract
        # that from server acquire time to get travel time to acquisition server
        # ASSUMING that acquisition and time-sync server have the same time source
        self.acquire_travel_time = self.server_acquisition_times - (self.rev_start_times + self.packet_duration)

    def evaluate_latencies_and_offsets(self):
        """
        evaluates the goodness of latencies and offsets.
        adjusts rev_start_times if there is a best offset
        """
        # if no decoders synced properly, all latencies are NaNs or all indices are bad
        if np.isnan(self.latencies).all() or len(self.bad_packets) == self.num_packets:
            # make everything empty or None, and best offset is 0 (no change)
            self.latencies = []
            self.offsets = []
            self.best_latency = None
            self.best_latency_index = None
            self.best_offset = 0.0
        else:
            # convert all NaN and negative values from latencies into zero
            self.latencies = np.nan_to_num(self.latencies)
            self.latencies[self.latencies < 0] = 0.0
            # find and set minimum latency based on non-zero latencies
            self.best_latency = np.min(self.get_valid_latencies())
            self.latency_stats.best_value = self.best_latency
            self.best_latency_index = np.where(self.latencies == self.best_latency)[0][0]
            # find all non-NaN, non-zero offsets
            good_offsets = self.get_valid_offsets()

            if good_offsets is not None and len(good_offsets) > 0:
                # there must be some good offsets; convert all NaN offsets to 0
                self.offsets = np.nan_to_num(self.offsets)
                # set best offset based on best latency
                self.best_offset = self.offsets[self.best_latency_index]
                self.offset_stats.best_value = self.best_offset
                # fix all packet start times by adding the packet's offset
                self.rev_start_times += self.best_offset
            else:
                # set offset properties to empty or None
                self.offsets = []

    def _compute_tri_message_stats(self, packet: WrappedRedvoxPacket, index: int):
        """
        helper function to compute tri message stats of the time sync data
        :param packet: a packet to compute tri message stats from
        :param index: the index to add the data to
        """
        # if there is a time sync channel with payload, find the minimum latency and its corresponding offset
        if packet.has_time_synchronization_sensor():
            timesync_channel = packet.time_synchronization_sensor()
            coeffs = timesync_channel.payload_values()
            # set the number of tri-message exchanges in the packet
            self.num_tri_messages[index] = int(len(coeffs) / 6)
            coeffs_tuple: Tuple[
                np.ndarray,
                np.ndarray,
                np.ndarray,
                np.ndarray,
                np.ndarray,
                np.ndarray] = tri_message_stats.transmit_receive_timestamps_microsec(coeffs)
            self.tri_message_coeffs.append(coeffs_tuple)  # save the tri-message exchanges for later
            a1_coeffs: np.ndarray = coeffs_tuple[0]
            a2_coeffs: np.ndarray = coeffs_tuple[1]
            a3_coeffs: np.ndarray = coeffs_tuple[2]
            b1_coeffs: np.ndarray = coeffs_tuple[3]
            b2_coeffs: np.ndarray = coeffs_tuple[4]
            b3_coeffs: np.ndarray = coeffs_tuple[5]
            # get tri message data via TriMessageStats class
            tms = tri_message_stats.TriMessageStats(packet.redvox_id(),
                                                    a1_coeffs,
                                                    a2_coeffs,
                                                    a3_coeffs,
                                                    b1_coeffs,
                                                    b2_coeffs,
                                                    b3_coeffs)
            # check if we actually have exchanges to evaluate
            if self.num_tri_messages[index] > 0:
                # Concatenate d1 and d3 arrays, and o1 and o3 arrays when passing values into stats class
                #  note the last parameter multiplies the number of exchanges by 2, as there are two latencies
                #  and offsets calculated per exchange
                # noinspection PyTypeChecker
                self.latency_stats.add(np.mean([*tms.latency1, *tms.latency3]), np.std([*tms.latency1, *tms.latency3]),
                                       self.num_tri_messages[index] * 2)
                # noinspection PyTypeChecker
                self.offset_stats.add(np.mean([*tms.offset1, *tms.offset3]), np.std([*tms.offset1, *tms.offset3]),
                                      self.num_tri_messages[index] * 2)
                # set the best latency and offset based on the packet's metadata, or tri-message stats if no metadata
                latency: Optional[float] = packet.best_latency()
                # pass the location of the best calculated latency
                self.best_tri_msg_indices.append(tms.best_latency_index)
                if latency is None:
                    # no metadata
                    self.latencies[index] = tms.best_latency
                else:
                    self.latencies[index] = latency
                offset: Optional[float] = packet.best_offset()
                if offset is None:
                    # no metadata
                    self.offsets[index] = tms.best_offset
                else:
                    self.offsets[index] = offset
                # everything has been computed, time to return
                return
        # If here, there are no exchanges to read or there is no sync channel.
        # write default or empty values to the correct properties
        self.latencies[index] = 0.0
        self.offsets[index] = 0.0
        self.tri_message_coeffs.append((empty_array(), empty_array(), empty_array(),
                                        empty_array(), empty_array(), empty_array()))
        # pass the location of the best latency
        self.best_tri_msg_indices.append(-1)
        # noinspection PyTypeChecker
        self.latency_stats.add(0, 0, 0)
        # noinspection PyTypeChecker
        self.offset_stats.add(0, 0, 0)

    def _validate_sensor_settings(self, sample_rates: np.array, mach_time_zeros: np.array) -> bool:
        """
        Examine all sample rates and mach time zeros to ensure that sensor settings do not change
        Sets the sample rate and mach time zero if there is no change
        :param sample_rates: sample rates of all packets from sensor
        :param mach_time_zeros: machine time zero of all packets from sensor
        :return: True if sensor settings do not change
        """
        # set the sample rate and mach time zero
        self.sample_rate_hz = sample_rates[0]
        self.mach_time_zero = mach_time_zeros[0]
        if len(sample_rates) > 1:
            # if there's more than 1 value, we need to compare them
            for i in range(1, len(sample_rates)):
                if self.sample_rate_hz != sample_rates[i]:
                    print("ERROR: sample rate in data packets has changed.")
                    return False
                # process only non-nan mach time zeros
                if not np.isnan(self.mach_time_zero) and self.mach_time_zero != mach_time_zeros[i]:
                    print("ERROR: mach time zero in data packets has changed.")
                    return False
        return True

    def find_bad_packets(self):
        """
        Find bad packets and mark them using the bad_packets property
        Assuming all packets have been processed and all bad packets are marked with a 0 (or are negative)
        """
        for idx in range(len(self.latencies)):  # mark bad indices (they have a 0 or less value)
            if self.latencies[idx] <= 0:
                self.bad_packets.append(idx)

    def get_ratio_bad_packets(self) -> float:
        """
        Return the ratio of packets with bad latency calculations over total packets
        :return: num packets with bad latency calculations / total packets
        """
        if self.num_packets < 1:
            return 0
        return len(self.bad_packets) / float(self.num_packets)

    def get_latency_mean(self) -> Optional[float]:
        """
        return the mean of all latencies, and None if the latencies are invalid.
        :return: the mean of all latencies
        """
        if self.best_latency is None:
            return None
        else:
            return self.latency_stats.mean_of_means()

    def get_latency_std_dev(self) -> Optional[float]:
        """
        return the standard deviation (std_dev) of all latencies, and None if the latencies are invalid.
        :return: the std dev of all latencies
        """
        if self.best_latency is None:
            return None
        else:
            return self.latency_stats.total_std_dev()

    def get_valid_latencies(self, latency_array: np.array = None) -> np.array:
        """
        takes latencies, converts NaNs and negatives to 0, then returns non-zero latencies
        :param latency_array: optional array to clean instead of self.latencies; default None
        :return: non-NaN, non-zero latencies
        """
        if latency_array is None:
            clean_latencies = np.nan_to_num(self.latencies)  # replace NaNs with zeros
        else:
            clean_latencies = np.nan_to_num(latency_array)  # replace NaNs with zeros
        clean_latencies[clean_latencies < 0] = 0  # replace negative latencies with 0
        return clean_latencies[clean_latencies != 0]  # return only non-zero latencies

    def get_offset_mean(self) -> Optional[float]:
        """
        return the mean of all offsets, and 0.0 if the offsets or latencies are invalid.
        :return: the mean of all offsets
        """
        if self.best_latency is None or self.best_offset == 0.0:
            return 0.0
        else:
            return self.offset_stats.mean_of_means()

    def get_offset_std_dev(self) -> Optional[float]:
        """
        return the standard deviation (std_dev) of all offsets, and 0.0 if the offsets or latencies are invalid.
        :return: the std dev of all offsets
        """
        if self.best_latency is None or self.best_offset == 0.0:
            return 0.0
        else:
            return self.offset_stats.total_std_dev()

    def get_valid_offsets(self, offset_array: np.array = None) -> np.array:
        """
        takes valid offsets (based on bad_packets), converts NaNs to 0, then returns non-zero offsets
        :param offset_array: optional array to clean; default None
        :return: non-NaN, non-zero offsets
        """
        if offset_array is None:
            if len(self.bad_packets) > 0:
                valids = []
                for i in range(len(self.offsets)):
                    if i not in self.bad_packets:
                        valids.append(self.offsets[i])
                clean_offsets = np.nan_to_num(valids)  # replace NaNs with zeros
            else:
                clean_offsets = np.nan_to_num(self.offsets)
        else:
            clean_offsets = np.nan_to_num(offset_array)
        return clean_offsets[clean_offsets != 0]  # return only non-zero offsets

    def get_valid_rev_start_times(self) -> np.array:
        """
        return the array of valid (based on bad_packets) revised start times
        :return: array of valid revised start times
        """
        if len(self.bad_packets) > 0:  # return only the start times not associated with a bad packet
            valids = []
            for i in range(len(self.rev_start_times)):
                if i not in self.bad_packets:  # this is a good packet
                    valids.append(self.rev_start_times[i])
            return np.array(valids)
        else:
            return self.rev_start_times

    def get_best_rev_start_time(self) -> int:
        """
        return the revised start time associated with the lowest latency
        :return: revised start time in microseconds associated with the lowest latency
        """
        return self.rev_start_times[self.best_latency_index]

Methods

def evaluate_latencies_and_offsets(self)

evaluates the goodness of latencies and offsets. adjusts rev_start_times if there is a best offset

Expand source code
def evaluate_latencies_and_offsets(self):
    """
    evaluates the goodness of latencies and offsets.
    adjusts rev_start_times if there is a best offset
    """
    # if no decoders synced properly, all latencies are NaNs or all indices are bad
    if np.isnan(self.latencies).all() or len(self.bad_packets) == self.num_packets:
        # make everything empty or None, and best offset is 0 (no change)
        self.latencies = []
        self.offsets = []
        self.best_latency = None
        self.best_latency_index = None
        self.best_offset = 0.0
    else:
        # convert all NaN and negative values from latencies into zero
        self.latencies = np.nan_to_num(self.latencies)
        self.latencies[self.latencies < 0] = 0.0
        # find and set minimum latency based on non-zero latencies
        self.best_latency = np.min(self.get_valid_latencies())
        self.latency_stats.best_value = self.best_latency
        self.best_latency_index = np.where(self.latencies == self.best_latency)[0][0]
        # find all non-NaN, non-zero offsets
        good_offsets = self.get_valid_offsets()

        if good_offsets is not None and len(good_offsets) > 0:
            # there must be some good offsets; convert all NaN offsets to 0
            self.offsets = np.nan_to_num(self.offsets)
            # set best offset based on best latency
            self.best_offset = self.offsets[self.best_latency_index]
            self.offset_stats.best_value = self.best_offset
            # fix all packet start times by adding the packet's offset
            self.rev_start_times += self.best_offset
        else:
            # set offset properties to empty or None
            self.offsets = []
def find_bad_packets(self)

Find bad packets and mark them using the bad_packets property Assuming all packets have been processed and all bad packets are marked with a 0 (or are negative)

Expand source code
def find_bad_packets(self):
    """
    Find bad packets and mark them using the bad_packets property
    Assuming all packets have been processed and all bad packets are marked with a 0 (or are negative)
    """
    for idx in range(len(self.latencies)):  # mark bad indices (they have a 0 or less value)
        if self.latencies[idx] <= 0:
            self.bad_packets.append(idx)
def get_best_rev_start_time(self) ‑> int

return the revised start time associated with the lowest latency :return: revised start time in microseconds associated with the lowest latency

Expand source code
def get_best_rev_start_time(self) -> int:
    """
    return the revised start time associated with the lowest latency
    :return: revised start time in microseconds associated with the lowest latency
    """
    return self.rev_start_times[self.best_latency_index]
def get_latency_mean(self) ‑> Optional[float]

return the mean of all latencies, and None if the latencies are invalid. :return: the mean of all latencies

Expand source code
def get_latency_mean(self) -> Optional[float]:
    """
    return the mean of all latencies, and None if the latencies are invalid.
    :return: the mean of all latencies
    """
    if self.best_latency is None:
        return None
    else:
        return self.latency_stats.mean_of_means()
def get_latency_std_dev(self) ‑> Optional[float]

return the standard deviation (std_dev) of all latencies, and None if the latencies are invalid. :return: the std dev of all latencies

Expand source code
def get_latency_std_dev(self) -> Optional[float]:
    """
    return the standard deviation (std_dev) of all latencies, and None if the latencies are invalid.
    :return: the std dev of all latencies
    """
    if self.best_latency is None:
        return None
    else:
        return self.latency_stats.total_std_dev()
def get_offset_mean(self) ‑> Optional[float]

return the mean of all offsets, and 0.0 if the offsets or latencies are invalid. :return: the mean of all offsets

Expand source code
def get_offset_mean(self) -> Optional[float]:
    """
    return the mean of all offsets, and 0.0 if the offsets or latencies are invalid.
    :return: the mean of all offsets
    """
    if self.best_latency is None or self.best_offset == 0.0:
        return 0.0
    else:
        return self.offset_stats.mean_of_means()
def get_offset_std_dev(self) ‑> Optional[float]

return the standard deviation (std_dev) of all offsets, and 0.0 if the offsets or latencies are invalid. :return: the std dev of all offsets

Expand source code
def get_offset_std_dev(self) -> Optional[float]:
    """
    return the standard deviation (std_dev) of all offsets, and 0.0 if the offsets or latencies are invalid.
    :return: the std dev of all offsets
    """
    if self.best_latency is None or self.best_offset == 0.0:
        return 0.0
    else:
        return self.offset_stats.total_std_dev()
def get_ratio_bad_packets(self) ‑> float

Return the ratio of packets with bad latency calculations over total packets :return: num packets with bad latency calculations / total packets

Expand source code
def get_ratio_bad_packets(self) -> float:
    """
    Return the ratio of packets with bad latency calculations over total packets
    :return: num packets with bad latency calculations / total packets
    """
    if self.num_packets < 1:
        return 0
    return len(self.bad_packets) / float(self.num_packets)
def get_time_sync_data(self, wrapped_packets: List[WrappedRedvoxPacket])

Sets the time statistics between machine, time server, and acquisition server for the packets given. :param wrapped_packets: wrapped packets with same sample rate

Expand source code
def get_time_sync_data(self, wrapped_packets: List[WrappedRedvoxPacket]):
    """
    Sets the time statistics between machine, time server, and acquisition server for the packets given.
    :param wrapped_packets: wrapped packets with same sample rate
    """
    self.server_acquisition_times = np.zeros(len(wrapped_packets))  # array of server acquisition times
    self.rev_start_times = np.zeros(len(wrapped_packets))   # array of app start times
    self.num_tri_messages = np.zeros(len(wrapped_packets))  # number of tri-message exchanges per packet
    self.latencies = np.zeros(len(wrapped_packets))  # array of minimum latencies
    self.offsets = np.zeros(len(wrapped_packets))  # array of offset applied to machine time to get sync time
    self.num_packets = len(wrapped_packets)  # number of packets in list
    self.tri_message_coeffs = []    # a list of tri-message coefficients
    self.best_tri_msg_indices = []  # a list of the best latency index in each set of tri-message coefficients
    self.bad_packets = []   # list of packets that contain invalid data
    sample_rates = np.zeros(len(wrapped_packets))       # list of sample rates, should all be the same
    mach_time_zeros = np.zeros(len(wrapped_packets))    # list of mach time zeros, should all the be same

    # get the server acquisition time, app start time, and tri message stats
    for i, wrapped_packet in enumerate(wrapped_packets):
        # pass the mach_time_zero and sample_rate into arrays
        sample_rates[i] = wrapped_packet.microphone_sensor().sample_rate_hz()
        mach_time_zeros[i] = wrapped_packet.mach_time_zero()
        self.server_acquisition_times[i] = wrapped_packet.server_timestamp_epoch_microseconds_utc()
        self.rev_start_times[i] = wrapped_packet.app_file_start_timestamp_epoch_microseconds_utc()
        self._compute_tri_message_stats(wrapped_packet, i)

    # check sample rate and mach time zero (if it exists) for changes.  if it changed, sync will not work.
    if not self._validate_sensor_settings(sample_rates, mach_time_zeros):
        raise Exception("ERROR: Sensor settings changed; separate data based on changes and re-analyze")
    self.find_bad_packets()
    self.evaluate_latencies_and_offsets()
    # set the packet duration
    self.packet_duration = dt.seconds_to_microseconds(fh.get_duration_seconds_from_sample_rate(
        int(self.sample_rate_hz)))
    # apply duration to app start to get packet end time, then subtract
    # that from server acquire time to get travel time to acquisition server
    # ASSUMING that acquisition and time-sync server have the same time source
    self.acquire_travel_time = self.server_acquisition_times - (self.rev_start_times + self.packet_duration)
def get_valid_latencies(self, latency_array:  = None) ‑> 

takes latencies, converts NaNs and negatives to 0, then returns non-zero latencies :param latency_array: optional array to clean instead of self.latencies; default None :return: non-NaN, non-zero latencies

Expand source code
def get_valid_latencies(self, latency_array: np.array = None) -> np.array:
    """
    takes latencies, converts NaNs and negatives to 0, then returns non-zero latencies
    :param latency_array: optional array to clean instead of self.latencies; default None
    :return: non-NaN, non-zero latencies
    """
    if latency_array is None:
        clean_latencies = np.nan_to_num(self.latencies)  # replace NaNs with zeros
    else:
        clean_latencies = np.nan_to_num(latency_array)  # replace NaNs with zeros
    clean_latencies[clean_latencies < 0] = 0  # replace negative latencies with 0
    return clean_latencies[clean_latencies != 0]  # return only non-zero latencies
def get_valid_offsets(self, offset_array:  = None) ‑> 

takes valid offsets (based on bad_packets), converts NaNs to 0, then returns non-zero offsets :param offset_array: optional array to clean; default None :return: non-NaN, non-zero offsets

Expand source code
def get_valid_offsets(self, offset_array: np.array = None) -> np.array:
    """
    takes valid offsets (based on bad_packets), converts NaNs to 0, then returns non-zero offsets
    :param offset_array: optional array to clean; default None
    :return: non-NaN, non-zero offsets
    """
    if offset_array is None:
        if len(self.bad_packets) > 0:
            valids = []
            for i in range(len(self.offsets)):
                if i not in self.bad_packets:
                    valids.append(self.offsets[i])
            clean_offsets = np.nan_to_num(valids)  # replace NaNs with zeros
        else:
            clean_offsets = np.nan_to_num(self.offsets)
    else:
        clean_offsets = np.nan_to_num(offset_array)
    return clean_offsets[clean_offsets != 0]  # return only non-zero offsets
def get_valid_rev_start_times(self) ‑> 

return the array of valid (based on bad_packets) revised start times :return: array of valid revised start times

Expand source code
def get_valid_rev_start_times(self) -> np.array:
    """
    return the array of valid (based on bad_packets) revised start times
    :return: array of valid revised start times
    """
    if len(self.bad_packets) > 0:  # return only the start times not associated with a bad packet
        valids = []
        for i in range(len(self.rev_start_times)):
            if i not in self.bad_packets:  # this is a good packet
                valids.append(self.rev_start_times[i])
        return np.array(valids)
    else:
        return self.rev_start_times