Module redvox.tests.moar_test

Testing DataWindowArrow

Expand source code
"""
Testing DataWindowArrow
"""
from typing import Tuple
import os.path
import timeit
from pathlib import Path

import tempfile
import lz4.frame
import pickle
import numpy as np
from sklearn.linear_model import LinearRegression

from redvox.common.data_window import DataWindow
from redvox.common import station
from redvox.common import sensor_data
from redvox.common import packet_to_pyarrow
from redvox.common import gap_and_pad_utils
from redvox.common import sensor_reader_utils
from redvox.common import timesync
from redvox.common.data_window_io import DataWindowOutputType
from redvox.common.data_window import EventOrigin
from redvox.common.data_window import DataWindowConfig
from redvox.common.api_reader import ApiReader
import redvox.common.date_time_utils as dt
from redvox.common import run_me
from redvox.common.event_stream import Event
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
from redvox.common.offset_model import simple_offset_weighted_linear_regression, get_offset_at_new_time, \
    offset_weighted_linear_regression, OffsetModel
import redvox.settings as settings

settings.set_parallelism_enabled(False)

IDS_LIST = ["1637112001", "1637112002",
            "1637621001", "1637620010",
            "1637620004",
            "1637621005",
            "1637621002", "1637621003", "1637621009", "1637665009",
            "1637620002", "1637620003", "1637620009",
            "1637110702", "1637112602", "1637620001",
            "1637110701", "1637112301", "1637112631",
            "1637620006", "1637621006", "1637621007", "1637621010",
            "1637620008",
            "1637621004",
            "1637665010", "1637665004",
            ]


# The Weighted Linear Regression Function for offsets
def offset_linear_regression(
        offsets: np.ndarray, times: np.ndarray
) -> Tuple[float, float, float]:
    """
    Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept)
    The intercept is based on first UTC time 0, all units are in microseconds
    The function uses sklearn's LinearRegression and also returns the R2 score.

    :param offsets: array of offsets randomly sampled from gnss
    :param times: array of device times randomly sampled from gnss
    :return:  slope, intercept, score
    """

    if all(np.isnan(offsets)):
        return 0.0, 0.0, 0.0

    if len(offsets) < 30:
        return 0., float(np.median(offsets)), 0.

    # Set up the weighted linear regression
    ls = LinearRegression()
    ls.fit(
        X=times.reshape(-1, 1), y=offsets.reshape(-1, 1)
    )

    # get the score of the model
    score = get_lr_score(model=ls, offsets=offsets, times=times)

    # return the slope and intercept
    return ls.coef_[0][0], ls.intercept_[0], score


# The score for Linear Regression Function
def get_lr_score(
        model: LinearRegression, offsets: np.ndarray, times: np.ndarray
) -> float:
    """
    Computes and returns a R2 score for the linear regression using sklearn's score method.
    The best value is 1.0, and 0.0 corresponds to a function with no slope.
    Negative values are also adjusted to be 0.0.

    :param model: The linear regression model
    :param offsets: array of offsets corresponding to the best latencies per packet
    :param times: array of device times corresponding to the best latencies per packet
    :return: score
    """
    # Get predicted offsets of the model
    predicted_offsets = model.predict(X=times.reshape(-1, 1))

    # Compute the score
    score = model.score(X=predicted_offsets, y=offsets)

    # Adjust the score so negative values are cast to 0.0
    return np.max([score, 0.0])


if __name__ == "__main__":
    print("parallel on:", settings.is_parallelism_enabled())

    mydir = "/Users/tyler/Documents/model_data_test"

    dwm = DataWindow("gps_test", config=DataWindowConfig(mydir, True))

    for s in dwm.stations():
        print(f"id: {s.id()}")
        print(f"gpsoffset: {s.gps_offset_model()}\ntimesync: {s.timesync_data().offset_model()}")

    mydir = "/Users/tyler/Downloads/dw_1654621740000266_2.pkl.lz4"

    dwm = DataWindow.deserialize(mydir)

    for s in dwm.stations():
        print(f"id: {s.id()}")
        print(f"gpsoffset: {s.gps_offset_model()}")
    exit(0)

Functions

def get_lr_score(model: sklearn.linear_model._base.LinearRegression, offsets: numpy.ndarray, times: numpy.ndarray) ‑> float

Computes and returns a R2 score for the linear regression using sklearn's score method. The best value is 1.0, and 0.0 corresponds to a function with no slope. Negative values are also adjusted to be 0.0.

:param model: The linear regression model :param offsets: array of offsets corresponding to the best latencies per packet :param times: array of device times corresponding to the best latencies per packet :return: score

Expand source code
def get_lr_score(
        model: LinearRegression, offsets: np.ndarray, times: np.ndarray
) -> float:
    """
    Computes and returns a R2 score for the linear regression using sklearn's score method.
    The best value is 1.0, and 0.0 corresponds to a function with no slope.
    Negative values are also adjusted to be 0.0.

    :param model: The linear regression model
    :param offsets: array of offsets corresponding to the best latencies per packet
    :param times: array of device times corresponding to the best latencies per packet
    :return: score
    """
    # Get predicted offsets of the model
    predicted_offsets = model.predict(X=times.reshape(-1, 1))

    # Compute the score
    score = model.score(X=predicted_offsets, y=offsets)

    # Adjust the score so negative values are cast to 0.0
    return np.max([score, 0.0])
def offset_linear_regression(offsets: numpy.ndarray, times: numpy.ndarray) ‑> Tuple[float, float, float]

Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept) The intercept is based on first UTC time 0, all units are in microseconds The function uses sklearn's LinearRegression and also returns the R2 score.

:param offsets: array of offsets randomly sampled from gnss :param times: array of device times randomly sampled from gnss :return: slope, intercept, score

Expand source code
def offset_linear_regression(
        offsets: np.ndarray, times: np.ndarray
) -> Tuple[float, float, float]:
    """
    Computes and returns the slope and intercept for the offset function (offset = slope * time + intercept)
    The intercept is based on first UTC time 0, all units are in microseconds
    The function uses sklearn's LinearRegression and also returns the R2 score.

    :param offsets: array of offsets randomly sampled from gnss
    :param times: array of device times randomly sampled from gnss
    :return:  slope, intercept, score
    """

    if all(np.isnan(offsets)):
        return 0.0, 0.0, 0.0

    if len(offsets) < 30:
        return 0., float(np.median(offsets)), 0.

    # Set up the weighted linear regression
    ls = LinearRegression()
    ls.fit(
        X=times.reshape(-1, 1), y=offsets.reshape(-1, 1)
    )

    # get the score of the model
    score = get_lr_score(model=ls, offsets=offsets, times=times)

    # return the slope and intercept
    return ls.coef_[0][0], ls.intercept_[0], score