Module redvox.common.gap_and_pad_utils
This module provides tools to fill gaps and pad the ends of timestamp arrays
Expand source code
"""
This module provides tools to fill gaps and pad the ends of timestamp arrays
"""
import sys
from typing import List, Tuple, Optional, Dict
import enum
from math import modf
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
from redvox.common import date_time_utils as dtu
from redvox.common.errors import RedVoxExceptions
from redvox.api1000.wrapped_redvox_packet.sensors.audio import AudioCodec
from redvox.api1000.wrapped_redvox_packet.sensors.location import LocationProvider
from redvox.api1000.wrapped_redvox_packet.sensors.image import ImageCodec
from redvox.api1000.wrapped_redvox_packet.station_information import (
NetworkType,
PowerState,
CellServiceState,
WifiWakeLock,
ScreenState,
)
# percent of packet duration/sample rate required for gap to be considered a whole unit
DEFAULT_GAP_UPPER_LIMIT: float = 0.8
# percent of packet duration/sample rate required for gap to be considered nothing
DEFAULT_GAP_LOWER_LIMIT: float = 0.02
# columns for audio table
AUDIO_DF_COLUMNS = ["timestamps", "unaltered_timestamps", "microphone"]
# columns that cannot be interpolated
NON_INTERPOLATED_COLUMNS = ["compressed_audio", "image"]
# columns that are not numeric but can be interpolated
NON_NUMERIC_COLUMNS = [
"location_provider",
"image_codec",
"audio_codec",
"network_type",
"power_state",
"cell_service",
"wifi_wake_lock",
"screen_state",
]
# noinspection Mypy,DuplicatedCode
class DataPointCreationMode(enum.Enum):
"""
Type of data point to create
"""
NAN: int = 0
COPY: int = 1
INTERPOLATE: int = 2
@staticmethod
def list_names() -> List[str]:
return [n.name for n in DataPointCreationMode]
@dataclass_json
@dataclass
class AudioWithGaps:
"""
Represents methods of reconstructing audio data with or without gaps in it
Properties:
sample_interval_micros: microseconds between sample points
metadata: list of start times in microseconds since epoch UTC and the data to add
gaps: the list of start and end points of gaps (the start and end are actual data points)
errors: the errors encountered while getting the data
"""
sample_interval_micros: float
metadata: Optional[List[Tuple[float, pa.Table]]] = None
gaps: List[Tuple[float, float]] = field(default_factory=lambda: [])
errors: RedVoxExceptions = field(default_factory=lambda: RedVoxExceptions("AudioWithGaps"))
def create_timestamps(self) -> pa.Table:
"""
:return: converts the audio metadata into a data table
"""
result_array = [[], [], []]
for m in self.metadata:
timestamps = calc_evenly_sampled_timestamps(m[0], m[1].num_rows, self.sample_interval_micros)
result_array[0].extend(timestamps)
result_array[1].extend(timestamps)
result_array[2].extend(m[1]["microphone"].to_numpy())
for gs, ge in self.gaps:
fractional, whole = modf((ge - gs) / self.sample_interval_micros)
num_samples = int((whole - 1) if fractional < DEFAULT_GAP_LOWER_LIMIT else whole)
timestamps = calc_evenly_sampled_timestamps(
gs + self.sample_interval_micros, num_samples, self.sample_interval_micros
)
gap_array = [timestamps, np.full(len(timestamps), np.nan)]
result_array[0].extend(gap_array[0])
result_array[1].extend(gap_array[0])
result_array[2].extend(gap_array[1])
ptable = pa.Table.from_pydict(dict(zip(AUDIO_DF_COLUMNS, result_array)))
return pc.take(ptable, pc.sort_indices(ptable, sort_keys=[("timestamps", "ascending")]))
def add_error(self, error: str):
"""
add an error to the result
:param error: error message to add
"""
self.errors.append(error)
def calc_evenly_sampled_timestamps(start: float, samples: int, sample_interval_micros: float) -> np.array:
"""
given a start time, calculates samples amount of evenly spaced timestamps at rate_hz
:param start: float, start timestamp in microseconds
:param samples: int, number of samples
:param sample_interval_micros: float, sample interval in microseconds
:return: np.array with number of samples timestamps, evenly spaced starting at start
"""
return start + (np.arange(0, samples) * sample_interval_micros)
def check_gap_list(
gaps: List[Tuple[float, float]], start_timestamp: float = None, end_timestamp: float = None
) -> List[Tuple[float, float]]:
"""
removes any gaps where end time <= start time, consolidates overlapping gaps, and ensures that no gap
starts or ends before start_timestamp and starts or ends after end_timestamp. All timestamps are in
microseconds since epoch UTC
:param gaps: list of gaps to check
:param start_timestamp: lowest possible timestamp for a gap to start at
:param end_timestamp: lowest possible timestamp for a gap to end at
:return: list of correct, valid gaps
"""
return_gaps: List[Tuple[float, float]] = []
for gap in gaps:
if start_timestamp:
gap = (np.max([start_timestamp, gap[0]]), np.max([start_timestamp, gap[1]]))
if end_timestamp:
gap = (np.min([end_timestamp, gap[0]]), np.min([end_timestamp, gap[1]]))
if gap[0] < gap[1]:
if len(return_gaps) < 1:
return_gaps.append(gap)
for a, r_g in enumerate(return_gaps):
if (gap[0] < r_g[0] and gap[1] < r_g[0]) or (gap[0] > r_g[1] and gap[1] > r_g[1]):
return_gaps.append(gap)
break
else:
if gap[0] < r_g[0] < gap[1]:
r_g = (gap[0], r_g[1])
if gap[0] < r_g[1] < gap[1]:
r_g = (r_g[0], gap[1])
return_gaps[a] = r_g
return return_gaps
def fill_gaps(
arrow_df: pa.Table, gaps: List[Tuple[float, float]], sample_interval_micros: float, fill_mode: str = "nan"
) -> Tuple[pa.Table, List[Tuple[float, float]]]:
"""
fills gaps in the table with np.nan or interpolated values by interpolating timestamps based on the
calculated sample interval
:param arrow_df: pyarrow table with data. first column is "timestamps"
:param gaps: list of tuples of known non-inclusive start and end timestamps of the gaps
:param sample_interval_micros: known sample interval of the data points
:param fill_mode: must be one of: "nan", "interpolate", or "copy". Other inputs result in "nan". Default "nan".
Will convert input to lowercase.
:return: table without gaps and the list of gaps
"""
# extract the necessary information to compute gap size and gap timestamps
data_time_stamps = arrow_df["timestamps"].to_numpy()
if len(data_time_stamps) > 1:
data_duration = data_time_stamps[-1] - data_time_stamps[0]
expected_samples = (
np.floor(data_duration / sample_interval_micros)
+ (1 if data_duration % sample_interval_micros >= sample_interval_micros * DEFAULT_GAP_UPPER_LIMIT else 0)
) + 1
if expected_samples > len(data_time_stamps):
if fill_mode.lower() == "copy":
pcm = DataPointCreationMode["COPY"]
elif fill_mode.lower() == "interpolate":
pcm = DataPointCreationMode["INTERPOLATE"]
else:
pcm = DataPointCreationMode["NAN"]
# make it safe to alter the gap values
my_gaps = check_gap_list(gaps, data_time_stamps[0], data_time_stamps[-1])
for gap in my_gaps:
# if timestamps are around gaps, we have to update the values
before_start = np.argwhere([t <= gap[0] for t in data_time_stamps])
after_end = np.argwhere([t >= gap[1] for t in data_time_stamps])
if len(before_start) > 0:
before_start = before_start[-1][0]
# sim = gap[0] - data_time_stamps[before_start]
# result_df = add_data_points_to_df(result_df, before_start, sim, point_creation_mode=pcm)
gap = (data_time_stamps[before_start], gap[1])
else:
before_start = None
if len(after_end) > 0:
after_end = after_end[0][0]
# sim = gap[1] - data_time_stamps[after_end]
gap = (gap[0], data_time_stamps[after_end])
else:
after_end = None
num_new_points = int((gap[1] - gap[0]) / sample_interval_micros) - 1
if before_start is not None:
arrow_df = add_data_points_to_df(
arrow_df, before_start, sample_interval_micros, num_new_points, pcm
)
elif after_end is not None:
arrow_df = add_data_points_to_df(arrow_df, after_end, -sample_interval_micros, num_new_points, pcm)
indic = pc.sort_indices(arrow_df, sort_keys=[("timestamps", "ascending")])
return arrow_df.take(indic), gaps
return arrow_df, gaps
def fill_audio_gaps(
packet_data: List[Tuple[float, pa.Table]],
sample_interval_micros: float,
gap_lower_limit: float = DEFAULT_GAP_LOWER_LIMIT,
) -> AudioWithGaps:
"""
fills gaps in the table with np.nan by interpolating timestamps based on the expected sample interval
* ignores gaps with duration less than or equal to packet length * gap_lower_limit
:param packet_data: list of tuples, each tuple containing two pieces of packet information:
packet_start_timestamps; float of packet start timestamp in microseconds
and audio_data; pa.Table of data points
:param sample_interval_micros: sample interval in microseconds
:param gap_lower_limit: percentage of packet length required to disregard gap, default DEFAULT_GAP_LOWER_LIMIT
:return: AudioWithGaps object that contains a list of timestamps of the non-inclusive start and end of the gaps
and other information to recreate the audio record
"""
last_data_timestamp = packet_data[0][0]
gaps = []
result = AudioWithGaps(sample_interval_micros, packet_data)
for packet in packet_data:
samples_in_packet = packet[1].num_rows
start_ts = packet[0]
packet_length = sample_interval_micros * samples_in_packet
# check if start_ts is close to the last timestamp in data_timestamps
last_timestamp_diff = start_ts - last_data_timestamp
if last_timestamp_diff > gap_lower_limit * packet_length:
gap_start = last_data_timestamp - sample_interval_micros
last_data_timestamp = start_ts
gaps.append((gap_start, last_data_timestamp))
elif last_timestamp_diff < -gap_lower_limit * packet_length:
result.add_error(
f"Packet start timestamp: {dtu.microseconds_to_seconds(start_ts)} "
f"is before last timestamp of previous "
f"packet: {dtu.microseconds_to_seconds(last_data_timestamp - sample_interval_micros)}"
)
last_data_timestamp += samples_in_packet * sample_interval_micros
result.gaps = gaps
return result
def add_data_points_to_df(
data_table: pa.Table,
start_index: int,
sample_interval_micros: float,
num_samples_to_add: int = 1,
point_creation_mode: DataPointCreationMode = DataPointCreationMode.COPY,
) -> pa.Table:
"""
adds data points to the end of the table, starting from the index specified.
Note:
* table must not be empty
* start_index must be non-negative and less than the length of table
* num_samples_to_add must be greater than 0
* sample_interval_micros cannot be 0
* points are added onto the end and the result is not sorted
Options for point_creation_mode are:
* NAN: default values and nans
* COPY: copies of the start data point
* INTERPOLATE: interpolated values between start data point and adjacent point
:param data_table: pyarrow table to add dataless timestamps to
:param start_index: index of the table to use as starting point for creating new values
:param sample_interval_micros: sample interval in microseconds of the timestamps; use negative values to
add points before the start_index
:param num_samples_to_add: the number of timestamps to create, default 1
:param point_creation_mode: the mode of point creation to use
:return: updated table with synthetic data points
"""
if (
len(data_table) > start_index
and len(data_table) > 0
and num_samples_to_add > 0
and sample_interval_micros != 0.0
):
start_timestamp = data_table["timestamps"][start_index].as_py()
# create timestamps for every point that needs to be added
new_timestamps = start_timestamp + np.arange(1, num_samples_to_add + 1) * sample_interval_micros
if point_creation_mode == DataPointCreationMode.COPY:
# copy the start point
copy_row = data_table.slice(start_index, 1).to_pydict()
for t in new_timestamps:
copy_row["timestamps"] = [t]
# for k in copy_row.keys():
# new_dict[k].append(copy_row[k])
empty_df = pa.Table.from_pydict(copy_row)
elif point_creation_mode == DataPointCreationMode.INTERPOLATE:
# use the start point and the next point as the edges for interpolation
start_point = data_table.slice(start_index, 1).to_pydict()
numeric_start = start_point[
[col for col in data_table.schema.names if col not in NON_INTERPOLATED_COLUMNS + NON_NUMERIC_COLUMNS]
]
non_numeric_start = start_point[[col for col in data_table.schema.names if col in NON_NUMERIC_COLUMNS]]
end_point = data_table.slice(start_index + (1 if sample_interval_micros > 0 else -1), 1).to_pydict()
numeric_end = end_point[
[col for col in data_table.schema.names if col not in NON_INTERPOLATED_COLUMNS + NON_NUMERIC_COLUMNS]
]
non_numeric_end = end_point[[col for col in data_table.schema.names if col in NON_NUMERIC_COLUMNS]]
if np.abs(start_point["timestamps"] - new_timestamps[0]) <= np.abs(
end_point["timestamps"] - new_timestamps[0]
):
non_numeric_diff = non_numeric_start
else:
non_numeric_diff = non_numeric_end
numeric_diff = numeric_end - numeric_start
numeric_diff = (numeric_diff / numeric_diff["timestamps"]) * (
new_timestamps - numeric_start
) + numeric_start
if sys.version_info[0] > 3 or (sys.version_info[0] == 3 and sys.version_info[1] >= 9):
# merge dicts (python 3.9):
empty_df = pa.Table.from_pydict(numeric_diff | non_numeric_diff)
else:
# merge dicts (python 3.5 to 3.8)
empty_df = pa.Table.from_pydict({**numeric_diff, **non_numeric_diff})
else:
# add nans and defaults
empty_dict: Dict[str, List] = {}
for k in data_table.schema.names:
empty_dict[k] = []
for column_index in data_table.schema.names:
if column_index == "timestamps":
empty_dict[column_index] = new_timestamps
elif column_index == "location_provider":
empty_dict[column_index] = [LocationProvider["UNKNOWN"].value for i in range(num_samples_to_add)]
elif column_index == "image_codec":
empty_dict[column_index] = [ImageCodec["UNKNOWN"].value for i in range(num_samples_to_add)]
elif column_index == "audio_codec":
empty_dict[column_index] = [AudioCodec["UNKNOWN"].value for i in range(num_samples_to_add)]
elif column_index == "network_type":
empty_dict[column_index] = [NetworkType["UNKNOWN_NETWORK"].value for i in range(num_samples_to_add)]
elif column_index == "power_state":
empty_dict[column_index] = [
PowerState["UNKNOWN_POWER_STATE"].value for i in range(num_samples_to_add)
]
elif column_index == "cell_service":
empty_dict[column_index] = [CellServiceState["UNKNOWN"].value for i in range(num_samples_to_add)]
elif column_index == "wifi_wake_lock":
empty_dict[column_index] = [WifiWakeLock["NONE"].value for i in range(num_samples_to_add)]
elif column_index == "screen_state":
empty_dict[column_index] = [
ScreenState["UNKNOWN_SCREEN_STATE"].value for i in range(num_samples_to_add)
]
else:
empty_dict[column_index] = np.full(num_samples_to_add, np.nan).tolist()
empty_df = pa.Table.from_pydict(empty_dict)
data_table = pa.concat_tables([data_table, empty_df])
return data_table
Functions
def add_data_points_to_df(data_table: pyarrow.lib.Table, start_index: int, sample_interval_micros: float, num_samples_to_add: int = 1, point_creation_mode: DataPointCreationMode = DataPointCreationMode.COPY) ‑> pyarrow.lib.Table
-
adds data points to the end of the table, starting from the index specified. Note: * table must not be empty * start_index must be non-negative and less than the length of table * num_samples_to_add must be greater than 0 * sample_interval_micros cannot be 0 * points are added onto the end and the result is not sorted Options for point_creation_mode are: * NAN: default values and nans * COPY: copies of the start data point * INTERPOLATE: interpolated values between start data point and adjacent point
:param data_table: pyarrow table to add dataless timestamps to :param start_index: index of the table to use as starting point for creating new values :param sample_interval_micros: sample interval in microseconds of the timestamps; use negative values to add points before the start_index :param num_samples_to_add: the number of timestamps to create, default 1 :param point_creation_mode: the mode of point creation to use :return: updated table with synthetic data points
Expand source code
def add_data_points_to_df( data_table: pa.Table, start_index: int, sample_interval_micros: float, num_samples_to_add: int = 1, point_creation_mode: DataPointCreationMode = DataPointCreationMode.COPY, ) -> pa.Table: """ adds data points to the end of the table, starting from the index specified. Note: * table must not be empty * start_index must be non-negative and less than the length of table * num_samples_to_add must be greater than 0 * sample_interval_micros cannot be 0 * points are added onto the end and the result is not sorted Options for point_creation_mode are: * NAN: default values and nans * COPY: copies of the start data point * INTERPOLATE: interpolated values between start data point and adjacent point :param data_table: pyarrow table to add dataless timestamps to :param start_index: index of the table to use as starting point for creating new values :param sample_interval_micros: sample interval in microseconds of the timestamps; use negative values to add points before the start_index :param num_samples_to_add: the number of timestamps to create, default 1 :param point_creation_mode: the mode of point creation to use :return: updated table with synthetic data points """ if ( len(data_table) > start_index and len(data_table) > 0 and num_samples_to_add > 0 and sample_interval_micros != 0.0 ): start_timestamp = data_table["timestamps"][start_index].as_py() # create timestamps for every point that needs to be added new_timestamps = start_timestamp + np.arange(1, num_samples_to_add + 1) * sample_interval_micros if point_creation_mode == DataPointCreationMode.COPY: # copy the start point copy_row = data_table.slice(start_index, 1).to_pydict() for t in new_timestamps: copy_row["timestamps"] = [t] # for k in copy_row.keys(): # new_dict[k].append(copy_row[k]) empty_df = pa.Table.from_pydict(copy_row) elif point_creation_mode == DataPointCreationMode.INTERPOLATE: # use the start point and the next point as the edges for interpolation start_point = data_table.slice(start_index, 1).to_pydict() numeric_start = start_point[ [col for col in data_table.schema.names if col not in NON_INTERPOLATED_COLUMNS + NON_NUMERIC_COLUMNS] ] non_numeric_start = start_point[[col for col in data_table.schema.names if col in NON_NUMERIC_COLUMNS]] end_point = data_table.slice(start_index + (1 if sample_interval_micros > 0 else -1), 1).to_pydict() numeric_end = end_point[ [col for col in data_table.schema.names if col not in NON_INTERPOLATED_COLUMNS + NON_NUMERIC_COLUMNS] ] non_numeric_end = end_point[[col for col in data_table.schema.names if col in NON_NUMERIC_COLUMNS]] if np.abs(start_point["timestamps"] - new_timestamps[0]) <= np.abs( end_point["timestamps"] - new_timestamps[0] ): non_numeric_diff = non_numeric_start else: non_numeric_diff = non_numeric_end numeric_diff = numeric_end - numeric_start numeric_diff = (numeric_diff / numeric_diff["timestamps"]) * ( new_timestamps - numeric_start ) + numeric_start if sys.version_info[0] > 3 or (sys.version_info[0] == 3 and sys.version_info[1] >= 9): # merge dicts (python 3.9): empty_df = pa.Table.from_pydict(numeric_diff | non_numeric_diff) else: # merge dicts (python 3.5 to 3.8) empty_df = pa.Table.from_pydict({**numeric_diff, **non_numeric_diff}) else: # add nans and defaults empty_dict: Dict[str, List] = {} for k in data_table.schema.names: empty_dict[k] = [] for column_index in data_table.schema.names: if column_index == "timestamps": empty_dict[column_index] = new_timestamps elif column_index == "location_provider": empty_dict[column_index] = [LocationProvider["UNKNOWN"].value for i in range(num_samples_to_add)] elif column_index == "image_codec": empty_dict[column_index] = [ImageCodec["UNKNOWN"].value for i in range(num_samples_to_add)] elif column_index == "audio_codec": empty_dict[column_index] = [AudioCodec["UNKNOWN"].value for i in range(num_samples_to_add)] elif column_index == "network_type": empty_dict[column_index] = [NetworkType["UNKNOWN_NETWORK"].value for i in range(num_samples_to_add)] elif column_index == "power_state": empty_dict[column_index] = [ PowerState["UNKNOWN_POWER_STATE"].value for i in range(num_samples_to_add) ] elif column_index == "cell_service": empty_dict[column_index] = [CellServiceState["UNKNOWN"].value for i in range(num_samples_to_add)] elif column_index == "wifi_wake_lock": empty_dict[column_index] = [WifiWakeLock["NONE"].value for i in range(num_samples_to_add)] elif column_index == "screen_state": empty_dict[column_index] = [ ScreenState["UNKNOWN_SCREEN_STATE"].value for i in range(num_samples_to_add) ] else: empty_dict[column_index] = np.full(num_samples_to_add, np.nan).tolist() empty_df = pa.Table.from_pydict(empty_dict) data_table = pa.concat_tables([data_table, empty_df]) return data_table
def calc_evenly_sampled_timestamps(start: float, samples: int, sample_interval_micros: float) ‑>
-
given a start time, calculates samples amount of evenly spaced timestamps at rate_hz
:param start: float, start timestamp in microseconds :param samples: int, number of samples :param sample_interval_micros: float, sample interval in microseconds :return: np.array with number of samples timestamps, evenly spaced starting at start
Expand source code
def calc_evenly_sampled_timestamps(start: float, samples: int, sample_interval_micros: float) -> np.array: """ given a start time, calculates samples amount of evenly spaced timestamps at rate_hz :param start: float, start timestamp in microseconds :param samples: int, number of samples :param sample_interval_micros: float, sample interval in microseconds :return: np.array with number of samples timestamps, evenly spaced starting at start """ return start + (np.arange(0, samples) * sample_interval_micros)
def check_gap_list(gaps: List[Tuple[float, float]], start_timestamp: float = None, end_timestamp: float = None) ‑> List[Tuple[float, float]]
-
removes any gaps where end time <= start time, consolidates overlapping gaps, and ensures that no gap starts or ends before start_timestamp and starts or ends after end_timestamp. All timestamps are in microseconds since epoch UTC
:param gaps: list of gaps to check :param start_timestamp: lowest possible timestamp for a gap to start at :param end_timestamp: lowest possible timestamp for a gap to end at :return: list of correct, valid gaps
Expand source code
def check_gap_list( gaps: List[Tuple[float, float]], start_timestamp: float = None, end_timestamp: float = None ) -> List[Tuple[float, float]]: """ removes any gaps where end time <= start time, consolidates overlapping gaps, and ensures that no gap starts or ends before start_timestamp and starts or ends after end_timestamp. All timestamps are in microseconds since epoch UTC :param gaps: list of gaps to check :param start_timestamp: lowest possible timestamp for a gap to start at :param end_timestamp: lowest possible timestamp for a gap to end at :return: list of correct, valid gaps """ return_gaps: List[Tuple[float, float]] = [] for gap in gaps: if start_timestamp: gap = (np.max([start_timestamp, gap[0]]), np.max([start_timestamp, gap[1]])) if end_timestamp: gap = (np.min([end_timestamp, gap[0]]), np.min([end_timestamp, gap[1]])) if gap[0] < gap[1]: if len(return_gaps) < 1: return_gaps.append(gap) for a, r_g in enumerate(return_gaps): if (gap[0] < r_g[0] and gap[1] < r_g[0]) or (gap[0] > r_g[1] and gap[1] > r_g[1]): return_gaps.append(gap) break else: if gap[0] < r_g[0] < gap[1]: r_g = (gap[0], r_g[1]) if gap[0] < r_g[1] < gap[1]: r_g = (r_g[0], gap[1]) return_gaps[a] = r_g return return_gaps
def fill_audio_gaps(packet_data: List[Tuple[float, pyarrow.lib.Table]], sample_interval_micros: float, gap_lower_limit: float = 0.02) ‑> AudioWithGaps
-
fills gaps in the table with np.nan by interpolating timestamps based on the expected sample interval * ignores gaps with duration less than or equal to packet length * gap_lower_limit
:param packet_data: list of tuples, each tuple containing two pieces of packet information: packet_start_timestamps; float of packet start timestamp in microseconds and audio_data; pa.Table of data points :param sample_interval_micros: sample interval in microseconds :param gap_lower_limit: percentage of packet length required to disregard gap, default DEFAULT_GAP_LOWER_LIMIT :return: AudioWithGaps object that contains a list of timestamps of the non-inclusive start and end of the gaps and other information to recreate the audio record
Expand source code
def fill_audio_gaps( packet_data: List[Tuple[float, pa.Table]], sample_interval_micros: float, gap_lower_limit: float = DEFAULT_GAP_LOWER_LIMIT, ) -> AudioWithGaps: """ fills gaps in the table with np.nan by interpolating timestamps based on the expected sample interval * ignores gaps with duration less than or equal to packet length * gap_lower_limit :param packet_data: list of tuples, each tuple containing two pieces of packet information: packet_start_timestamps; float of packet start timestamp in microseconds and audio_data; pa.Table of data points :param sample_interval_micros: sample interval in microseconds :param gap_lower_limit: percentage of packet length required to disregard gap, default DEFAULT_GAP_LOWER_LIMIT :return: AudioWithGaps object that contains a list of timestamps of the non-inclusive start and end of the gaps and other information to recreate the audio record """ last_data_timestamp = packet_data[0][0] gaps = [] result = AudioWithGaps(sample_interval_micros, packet_data) for packet in packet_data: samples_in_packet = packet[1].num_rows start_ts = packet[0] packet_length = sample_interval_micros * samples_in_packet # check if start_ts is close to the last timestamp in data_timestamps last_timestamp_diff = start_ts - last_data_timestamp if last_timestamp_diff > gap_lower_limit * packet_length: gap_start = last_data_timestamp - sample_interval_micros last_data_timestamp = start_ts gaps.append((gap_start, last_data_timestamp)) elif last_timestamp_diff < -gap_lower_limit * packet_length: result.add_error( f"Packet start timestamp: {dtu.microseconds_to_seconds(start_ts)} " f"is before last timestamp of previous " f"packet: {dtu.microseconds_to_seconds(last_data_timestamp - sample_interval_micros)}" ) last_data_timestamp += samples_in_packet * sample_interval_micros result.gaps = gaps return result
def fill_gaps(arrow_df: pyarrow.lib.Table, gaps: List[Tuple[float, float]], sample_interval_micros: float, fill_mode: str = 'nan') ‑> Tuple[pyarrow.lib.Table, List[Tuple[float, float]]]
-
fills gaps in the table with np.nan or interpolated values by interpolating timestamps based on the calculated sample interval
:param arrow_df: pyarrow table with data. first column is "timestamps" :param gaps: list of tuples of known non-inclusive start and end timestamps of the gaps :param sample_interval_micros: known sample interval of the data points :param fill_mode: must be one of: "nan", "interpolate", or "copy". Other inputs result in "nan". Default "nan". Will convert input to lowercase. :return: table without gaps and the list of gaps
Expand source code
def fill_gaps( arrow_df: pa.Table, gaps: List[Tuple[float, float]], sample_interval_micros: float, fill_mode: str = "nan" ) -> Tuple[pa.Table, List[Tuple[float, float]]]: """ fills gaps in the table with np.nan or interpolated values by interpolating timestamps based on the calculated sample interval :param arrow_df: pyarrow table with data. first column is "timestamps" :param gaps: list of tuples of known non-inclusive start and end timestamps of the gaps :param sample_interval_micros: known sample interval of the data points :param fill_mode: must be one of: "nan", "interpolate", or "copy". Other inputs result in "nan". Default "nan". Will convert input to lowercase. :return: table without gaps and the list of gaps """ # extract the necessary information to compute gap size and gap timestamps data_time_stamps = arrow_df["timestamps"].to_numpy() if len(data_time_stamps) > 1: data_duration = data_time_stamps[-1] - data_time_stamps[0] expected_samples = ( np.floor(data_duration / sample_interval_micros) + (1 if data_duration % sample_interval_micros >= sample_interval_micros * DEFAULT_GAP_UPPER_LIMIT else 0) ) + 1 if expected_samples > len(data_time_stamps): if fill_mode.lower() == "copy": pcm = DataPointCreationMode["COPY"] elif fill_mode.lower() == "interpolate": pcm = DataPointCreationMode["INTERPOLATE"] else: pcm = DataPointCreationMode["NAN"] # make it safe to alter the gap values my_gaps = check_gap_list(gaps, data_time_stamps[0], data_time_stamps[-1]) for gap in my_gaps: # if timestamps are around gaps, we have to update the values before_start = np.argwhere([t <= gap[0] for t in data_time_stamps]) after_end = np.argwhere([t >= gap[1] for t in data_time_stamps]) if len(before_start) > 0: before_start = before_start[-1][0] # sim = gap[0] - data_time_stamps[before_start] # result_df = add_data_points_to_df(result_df, before_start, sim, point_creation_mode=pcm) gap = (data_time_stamps[before_start], gap[1]) else: before_start = None if len(after_end) > 0: after_end = after_end[0][0] # sim = gap[1] - data_time_stamps[after_end] gap = (gap[0], data_time_stamps[after_end]) else: after_end = None num_new_points = int((gap[1] - gap[0]) / sample_interval_micros) - 1 if before_start is not None: arrow_df = add_data_points_to_df( arrow_df, before_start, sample_interval_micros, num_new_points, pcm ) elif after_end is not None: arrow_df = add_data_points_to_df(arrow_df, after_end, -sample_interval_micros, num_new_points, pcm) indic = pc.sort_indices(arrow_df, sort_keys=[("timestamps", "ascending")]) return arrow_df.take(indic), gaps return arrow_df, gaps
Classes
class AudioWithGaps (sample_interval_micros: float, metadata: Optional[List[Tuple[float, pyarrow.lib.Table]]] = None, gaps: List[Tuple[float, float]] = <factory>, errors: RedVoxExceptions = <factory>)
-
Represents methods of reconstructing audio data with or without gaps in it
Properties
sample_interval_micros: microseconds between sample points
metadata: list of start times in microseconds since epoch UTC and the data to add
gaps: the list of start and end points of gaps (the start and end are actual data points)
errors: the errors encountered while getting the data
Expand source code
@dataclass_json @dataclass class AudioWithGaps: """ Represents methods of reconstructing audio data with or without gaps in it Properties: sample_interval_micros: microseconds between sample points metadata: list of start times in microseconds since epoch UTC and the data to add gaps: the list of start and end points of gaps (the start and end are actual data points) errors: the errors encountered while getting the data """ sample_interval_micros: float metadata: Optional[List[Tuple[float, pa.Table]]] = None gaps: List[Tuple[float, float]] = field(default_factory=lambda: []) errors: RedVoxExceptions = field(default_factory=lambda: RedVoxExceptions("AudioWithGaps")) def create_timestamps(self) -> pa.Table: """ :return: converts the audio metadata into a data table """ result_array = [[], [], []] for m in self.metadata: timestamps = calc_evenly_sampled_timestamps(m[0], m[1].num_rows, self.sample_interval_micros) result_array[0].extend(timestamps) result_array[1].extend(timestamps) result_array[2].extend(m[1]["microphone"].to_numpy()) for gs, ge in self.gaps: fractional, whole = modf((ge - gs) / self.sample_interval_micros) num_samples = int((whole - 1) if fractional < DEFAULT_GAP_LOWER_LIMIT else whole) timestamps = calc_evenly_sampled_timestamps( gs + self.sample_interval_micros, num_samples, self.sample_interval_micros ) gap_array = [timestamps, np.full(len(timestamps), np.nan)] result_array[0].extend(gap_array[0]) result_array[1].extend(gap_array[0]) result_array[2].extend(gap_array[1]) ptable = pa.Table.from_pydict(dict(zip(AUDIO_DF_COLUMNS, result_array))) return pc.take(ptable, pc.sort_indices(ptable, sort_keys=[("timestamps", "ascending")])) def add_error(self, error: str): """ add an error to the result :param error: error message to add """ self.errors.append(error)
Class variables
var errors : RedVoxExceptions
var gaps : List[Tuple[float, float]]
var metadata : Optional[List[Tuple[float, pyarrow.lib.Table]]]
var sample_interval_micros : float
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def add_error(self, error: str)
-
add an error to the result :param error: error message to add
Expand source code
def add_error(self, error: str): """ add an error to the result :param error: error message to add """ self.errors.append(error)
def create_timestamps(self) ‑> pyarrow.lib.Table
-
:return: converts the audio metadata into a data table
Expand source code
def create_timestamps(self) -> pa.Table: """ :return: converts the audio metadata into a data table """ result_array = [[], [], []] for m in self.metadata: timestamps = calc_evenly_sampled_timestamps(m[0], m[1].num_rows, self.sample_interval_micros) result_array[0].extend(timestamps) result_array[1].extend(timestamps) result_array[2].extend(m[1]["microphone"].to_numpy()) for gs, ge in self.gaps: fractional, whole = modf((ge - gs) / self.sample_interval_micros) num_samples = int((whole - 1) if fractional < DEFAULT_GAP_LOWER_LIMIT else whole) timestamps = calc_evenly_sampled_timestamps( gs + self.sample_interval_micros, num_samples, self.sample_interval_micros ) gap_array = [timestamps, np.full(len(timestamps), np.nan)] result_array[0].extend(gap_array[0]) result_array[1].extend(gap_array[0]) result_array[2].extend(gap_array[1]) ptable = pa.Table.from_pydict(dict(zip(AUDIO_DF_COLUMNS, result_array))) return pc.take(ptable, pc.sort_indices(ptable, sort_keys=[("timestamps", "ascending")]))
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class DataPointCreationMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Type of data point to create
Expand source code
class DataPointCreationMode(enum.Enum): """ Type of data point to create """ NAN: int = 0 COPY: int = 1 INTERPOLATE: int = 2 @staticmethod def list_names() -> List[str]: return [n.name for n in DataPointCreationMode]
Ancestors
- enum.Enum
Class variables
var COPY : int
var INTERPOLATE : int
var NAN : int
Static methods
def list_names() ‑> List[str]
-
Expand source code
@staticmethod def list_names() -> List[str]: return [n.name for n in DataPointCreationMode]