Module libquantum.utils

This module contains general utilities that can work with values containing nans.

Expand source code
This module contains general utilities that can work with values containing nans.

from enum import Enum
from typing import Tuple, Union

import numpy as np
from scipy import signal

from scipy.integrate import cumulative_trapezoid
from libquantum.scales import EPSILON
from redvox.common import date_time_utils as dt

def taper_tukey(sig_wf_or_time: np.ndarray,
                fraction_cosine: float) -> np.ndarray:
    Constructs a symmetric Tukey window with the same dimensions as a time or signal numpy array.
    fraction_cosine = 0 is a rectangular window, 1 is a Hann window

    :param sig_wf_or_time: input signal or time
    :param fraction_cosine: fraction of the window inside the cosine tapered window, shared between the head and tail
    :return: tukey taper window amplitude
    return, alpha=fraction_cosine, sym=True)

def datetime_now_epoch_s() -> float:
    Returns the invocation Unix time in seconds

    :return: The current epoch timestamp as seconds since the epoch UTC
    return dt.datetime_to_epoch_seconds_utc(

def datetime_now_epoch_micros() -> float:
    Returns the invocation Unix time in microseconds

    :return: The current epoch timestamp as microseconds since the epoch UTC
    return dt.datetime_to_epoch_microseconds_utc(

# Integrals and derivatives
def integrate_cumtrapz(timestamps_s: np.ndarray,
                       sensor_wf: np.ndarray,
                       initial_value: float = 0) -> np.ndarray:
    Cumulative trapazoid integration using scipy.integrate.cumulative_trapezoid
    Initiated by Kei 2106, work in progress. See blast_derivative_integral for validation.

    :param timestamps_s: timestamps corresponding to the data in seconds
    :param sensor_wf: data to integrate using cumulative trapezoid
    :param initial_value: the value to add in the initial of the integrated data to match length of input (default is 0)
    :return: integrated data with the same length as the input
    integrated_data = cumulative_trapezoid(x=timestamps_s,
    return integrated_data

def derivative_gradient(timestamps_s: np.ndarray,
                        sensor_wf: np.ndarray) -> np.ndarray:
    Derivative using gradient

    :param timestamps_s: timestamps corresponding to the data in seconds
    :param sensor_wf: data to integrate using cumulative trapezoid
    :return: derivative data with the same length as the input
    # derivative_data = np.gradient(sensor_wf)/np.gradient(timestamps_s)
    derivative_data = np.gradient(sensor_wf, timestamps_s)

    return derivative_data

def derivative_diff(timestamps_s: np.ndarray,
                    sensor_wf: np.ndarray) -> np.ndarray:
    Derivative using diff

    :param timestamps_s: timestamps corresponding to the data in seconds
    :param sensor_wf: data to integrate using cumulative trapezoid
    :return: derivative data with the same length as the input. Hold/repeat last value

    derivative_data0 = np.diff(sensor_wf)/np.diff(timestamps_s)
    derivative_data = np.append(derivative_data0, derivative_data0[-1])

    return derivative_data

class ExtractionType(Enum):
    Enumeration of valid extraction types.

    ARGMAX = max of the absolute value of the signal
    SIGMAX = fancier signal picker, from POSITIVE max
    BITMAX = fancier signal picker, from ABSOLUTE max
    ARGMAX: str = "argmax"
    SIGMAX: str = "sigmax"
    BITMAX: str = "bitmax"

def sig_extract(sig: np.ndarray,
                time_epoch_s: np.ndarray,
                intro_s: float,
                outro_s: float,
                pick_bits_below_max: float = 1.,
                pick_time_interval_s: float = 1.,
                extract_type: ExtractionType = ExtractionType.ARGMAX) -> Tuple[np.ndarray, np.ndarray, float]:
    Extract signal and time relative to reference index

    :param sig: input signal
    :param time_epoch_s: signal epoch time
    :param intro_s: time before pick
    :param outro_s: time after pick
    :param pick_bits_below_max: pick treshold in bits below max
    :param pick_time_interval_s: pick time interval between adjacent max point
    :param extract_type: Type of extraction, see ExtractionType class
    :return: extracted signal np.ndarray, extracted signal timestamps np.ndarray, and pick time

    sig_sample_interval_s = np.mean(np.diff(time_epoch_s))

    if extract_type == ExtractionType.ARGMAX:
        index_max = np.argmax(np.abs(sig))
        # Max pick
        pick_time_epoch_s = time_epoch_s[index_max]
    elif extract_type == ExtractionType.SIGMAX:
        time_index_pick_all = \
            picker_signal_max_index(sig=sig, sig_sample_rate_hz=1./sig_sample_interval_s,
                                    bits_pick=pick_bits_below_max, time_interval_s=pick_time_interval_s)
        # First pick
        pick_time_epoch_s = time_epoch_s[time_index_pick_all[0]]
    elif extract_type == ExtractionType.BITMAX:
        time_index_pick_all = \
            picker_signal_bit_index(sig=sig, sig_sample_rate_hz=1./sig_sample_interval_s,
                                    bits_pick=pick_bits_below_max, time_interval_s=pick_time_interval_s)
        # First pick
        pick_time_epoch_s = time_epoch_s[time_index_pick_all[0]]
        print('Unexpected extraction type to sig_extract, return max')
        index_max = np.argmax(np.abs(sig))
        # Max pick
        pick_time_epoch_s = time_epoch_s[index_max]

    epoch_s_start = pick_time_epoch_s - intro_s
    epoch_s_stop = pick_time_epoch_s + outro_s

    intro_index = np.argmin(np.abs(time_epoch_s - epoch_s_start))
    outro_index = np.argmin(np.abs(time_epoch_s - epoch_s_stop))
    sig_wf = sig[intro_index: outro_index]
    sig_epoch_s = time_epoch_s[intro_index: outro_index]

    return sig_wf, sig_epoch_s, pick_time_epoch_s

def sig_frame(sig: np.ndarray,
              time_epoch_s: np.ndarray,
              epoch_s_start: float,
              epoch_s_stop: float) -> Tuple[np.ndarray, np.ndarray]:
    Frame one-component signal within start and stop epoch times

    :param sig: input signal
    :param time_epoch_s: input epoch time in seconds
    :param epoch_s_start: start epoch time
    :param epoch_s_stop: stop epoch time
    :return: truncated time series and time
    intro_index = np.argmin(np.abs(time_epoch_s - epoch_s_start))
    outro_index = np.argmin(np.abs(time_epoch_s - epoch_s_stop))
    sig_wf = sig[intro_index: outro_index]
    sig_epoch_s = time_epoch_s[intro_index: outro_index]

    return sig_wf, sig_epoch_s

def sig3c_frame(sig3c: np.ndarray,
                time_epoch_s: np.ndarray,
                epoch_s_start: float,
                epoch_s_stop: float) -> Tuple[np.ndarray, np.ndarray]:
    Frame three-component signal within start and stop epoch times

    :param sig3c: input signal with three components
    :param time_epoch_s: input epoch time in seconds
    :param epoch_s_start: start epoch time
    :param epoch_s_stop: stop epoch time
    :return: truncated time series and time
    intro_index = np.argmin(np.abs(time_epoch_s - epoch_s_start))
    outro_index = np.argmin(np.abs(time_epoch_s - epoch_s_stop))
    sig_wf = sig3c[:, intro_index: outro_index]
    sig_epoch_s = time_epoch_s[intro_index: outro_index]

    return sig_wf, sig_epoch_s

def dbepsilon(x: np.ndarray) -> np.ndarray:
    Converts the absolute value of a time series to dB

    :param x: time series
    :return: ndarray
    y = 10 * np.log10(np.abs(x ** 2) + EPSILON)
    return y

def dbepsilon_max(x: np.ndarray) -> float:
    Returns the max of the absolute value of a time series to dB

    :param x: time series
    :return: float
    y = 10 * np.log10(np.max(np.abs(x ** 2)) + EPSILON)
    return y

def log2epsilon(x: np.ndarray) -> np.ndarray:
    log 2 of the absolute value of linear amplitude, with EPSILON to avoid singularities

    :param x: time series or fft - not power
    :return: ndarray
    y = np.log2(np.abs(x) + EPSILON)
    return y

def log2epsilon_max(x: np.ndarray) -> float:
    max of the log 2 of absolute value of linear amplitude, with EPSILON to avoid singularities

    :param x: time series or fft - not power
    :return: float
    y = np.max(np.log2(np.abs(x) + EPSILON))
    return y

Picker modules

def picker_signal_max_index(sig: np.array,
                            sig_sample_rate_hz: float,
                            bits_pick: float,
                            time_interval_s: float) -> np.array:
    Finds the picker index for the POSITIVE max of a signal

    :param sig: array of waveform data
    :param sig_sample_rate_hz: float sample rate of reference sensor
    :param bits_pick: detection threshold in bits loss
    :param time_interval_s: min time interval between events
    :return: picker index

    # Compute the distance
    distance_points = int(time_interval_s * sig_sample_rate_hz)
    height_min = np.max(sig) - 2 ** bits_pick
    time_index_pick, _ = signal.find_peaks(sig, height=height_min, distance=distance_points)

    return time_index_pick

def picker_signal_bit_index(sig: np.array,
                            sig_sample_rate_hz: float,
                            bits_pick: float,
                            time_interval_s: float) -> np.ndarray:
    Finds the picker index from the ABSOLUTE max in bits

    :param sig: array of waveform data
    :param sig_sample_rate_hz: float sample rate of reference sensor
    :param bits_pick: detection treshold in db loss
    :param time_interval_s: min time interval between events
    :return: picker index

    # Compute the distance
    distance_points = int(time_interval_s * sig_sample_rate_hz)
    sig_bit = log2epsilon(sig)
    height_min = log2epsilon_max(sig) - bits_pick
    index_pick, _ = signal.find_peaks(sig_bit, height=height_min, distance=distance_points)

    return index_pick

def picker_comb(sig_pick: np.ndarray,
                index_pick: np.ndarray) -> np.ndarray:
    Constructs a comb function from the picks

    :param sig_pick: 1D record corresponding to the picks
    :param index_pick: indexes for the picks
    :return: comb with unit amplitude
    comb = np.zeros(sig_pick.shape)
    comb[index_pick] = np.ones(index_pick.shape)
    return comb

Matrix transformations

def sum_columns(sxx: np.ndarray) -> np.ndarray:
    Sum over all the columns in a 1D or 2D array

    :param sxx: input vector or matrix
    :return: ndarray with sum
    if not isinstance(sxx, np.ndarray):
        raise TypeError('Input must be array.')
    elif len(sxx) == 0:
        raise ValueError('Cannot compute on empty array.')

    if np.isnan(sxx).any() or np.isinf(sxx).any():
        sxx = np.nan_to_num(sxx)

    if len(sxx.shape) == 1:
        sum_c = np.sum(sxx, axis=0)
    elif len(sxx.shape) == 2:
        sum_c = np.sum(sxx, axis=1)
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c

def mean_columns(sxx: np.ndarray) -> np.ndarray:
    Compute the mean of the columns in a 1D or 2D array

    :param sxx: input vector or matrix
    :return: ndarray with mean
    if not isinstance(sxx, np.ndarray):
        raise TypeError('Input must be array.')
    elif len(sxx) == 0:
        raise ValueError('Cannot compute on empty array.')

    if np.isnan(sxx).any() or np.isinf(sxx).any():
        sxx = np.nan_to_num(sxx)

    if len(sxx.shape) == 1:
        sum_c = np.mean(sxx, axis=0)
    elif len(sxx.shape) == 2:
        sum_c = np.mean(sxx, axis=1)
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c

def just_tile(array1d_in: Union[float, np.ndarray],
              shape_out: tuple) -> np.ndarray:
    Constructs tiled array from 1D array to the shape specified by shape_out

    :param array1d_in: 1D array or vector
    :param shape_out: Tuple with output array shape.
    :return: ndarray
    if len(shape_out) == 1:
        tiled_matrix = np.tile(array1d_in, (shape_out[0]))
    elif len(shape_out) == 2:
        tiled_matrix = np.tile(array1d_in, (shape_out[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(array1d_in.shape)))

    return tiled_matrix

def sum_tile(sxx: np.ndarray) -> np.ndarray:
    Compute the sum of the columns in a 1D or 2D array and then re-tile to the original size

    :param sxx: input vector or matrix
    :return: ndarray of sum
    sum_c = sum_columns(sxx)

    # create array of repeated values of PSD with dimensions that match those of energy array
    if len(sxx.shape) == 1:
        sum_c_matrix = np.tile(sum_c, (sxx.shape[0]))
    elif len(sxx.shape) == 2:
        sum_c_matrix = np.tile(sum_c, (sxx.shape[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c_matrix

def mean_tile(sxx: np.ndarray,
              shape_out: np.ndarray) -> np.ndarray:
    Compute the mean of the columns in a 1D or 2D array and then re-tile to the original size

    :param sxx: input vector or matrix
    :param shape_out: shape of output vector or matrix
    :return: ndarray of mean
    sum_c = mean_columns(sxx)

    # create array of repeated values of PSD with dimensions that match those of energy array
    if len(shape_out) == 1:
        sum_c_matrix = np.tile(sum_c, (shape_out[0]))
    elif len(shape_out) == 2:
        sum_c_matrix = np.tile(sum_c, (shape_out[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c_matrix

def d1tile_x_d2(d1: Union[float, np.ndarray],
                d2: np.ndarray) -> np.ndarray:
    Create array of repeated values with dimensions that match those of energy array
    Useful to multiply frequency-dependent values to frequency-time matrices

    :param d1: 1D input vector, nominally frequency/scale multipliers
    :param d2: 2D array, first dimension should be that same as d1
    :return: array with matching values
    shape_out = d2.shape

    if len(shape_out) == 1:
        d1_matrix = np.tile(d1, (shape_out[0]))
    elif len(shape_out) == 2:
        d1_matrix = np.tile(d1, (shape_out[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(d1.shape)))

    if d1_matrix.shape == d2.shape:
        d1_x_d2 = d1_matrix * d2
        raise TypeError('Cannot handle an array of shape {}.'.format(str(d1.shape)))
    return d1_x_d2


def d1tile_x_d2(d1: Union[float, numpy.ndarray], d2: numpy.ndarray) ‑> numpy.ndarray

Create array of repeated values with dimensions that match those of energy array Useful to multiply frequency-dependent values to frequency-time matrices

:param d1: 1D input vector, nominally frequency/scale multipliers :param d2: 2D array, first dimension should be that same as d1 :return: array with matching values

Expand source code
def d1tile_x_d2(d1: Union[float, np.ndarray],
                d2: np.ndarray) -> np.ndarray:
    Create array of repeated values with dimensions that match those of energy array
    Useful to multiply frequency-dependent values to frequency-time matrices

    :param d1: 1D input vector, nominally frequency/scale multipliers
    :param d2: 2D array, first dimension should be that same as d1
    :return: array with matching values
    shape_out = d2.shape

    if len(shape_out) == 1:
        d1_matrix = np.tile(d1, (shape_out[0]))
    elif len(shape_out) == 2:
        d1_matrix = np.tile(d1, (shape_out[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(d1.shape)))

    if d1_matrix.shape == d2.shape:
        d1_x_d2 = d1_matrix * d2
        raise TypeError('Cannot handle an array of shape {}.'.format(str(d1.shape)))
    return d1_x_d2
def datetime_now_epoch_micros() ‑> float

Returns the invocation Unix time in microseconds

:return: The current epoch timestamp as microseconds since the epoch UTC

Expand source code
def datetime_now_epoch_micros() -> float:
    Returns the invocation Unix time in microseconds

    :return: The current epoch timestamp as microseconds since the epoch UTC
    return dt.datetime_to_epoch_microseconds_utc(
def datetime_now_epoch_s() ‑> float

Returns the invocation Unix time in seconds

:return: The current epoch timestamp as seconds since the epoch UTC

Expand source code
def datetime_now_epoch_s() -> float:
    Returns the invocation Unix time in seconds

    :return: The current epoch timestamp as seconds since the epoch UTC
    return dt.datetime_to_epoch_seconds_utc(
def dbepsilon(x: numpy.ndarray) ‑> numpy.ndarray

Converts the absolute value of a time series to dB

:param x: time series :return: ndarray

Expand source code
def dbepsilon(x: np.ndarray) -> np.ndarray:
    Converts the absolute value of a time series to dB

    :param x: time series
    :return: ndarray
    y = 10 * np.log10(np.abs(x ** 2) + EPSILON)
    return y
def dbepsilon_max(x: numpy.ndarray) ‑> float

Returns the max of the absolute value of a time series to dB

:param x: time series :return: float

Expand source code
def dbepsilon_max(x: np.ndarray) -> float:
    Returns the max of the absolute value of a time series to dB

    :param x: time series
    :return: float
    y = 10 * np.log10(np.max(np.abs(x ** 2)) + EPSILON)
    return y
def derivative_diff(timestamps_s: numpy.ndarray, sensor_wf: numpy.ndarray) ‑> numpy.ndarray

Derivative using diff

:param timestamps_s: timestamps corresponding to the data in seconds :param sensor_wf: data to integrate using cumulative trapezoid :return: derivative data with the same length as the input. Hold/repeat last value

Expand source code
def derivative_diff(timestamps_s: np.ndarray,
                    sensor_wf: np.ndarray) -> np.ndarray:
    Derivative using diff

    :param timestamps_s: timestamps corresponding to the data in seconds
    :param sensor_wf: data to integrate using cumulative trapezoid
    :return: derivative data with the same length as the input. Hold/repeat last value

    derivative_data0 = np.diff(sensor_wf)/np.diff(timestamps_s)
    derivative_data = np.append(derivative_data0, derivative_data0[-1])

    return derivative_data
def derivative_gradient(timestamps_s: numpy.ndarray, sensor_wf: numpy.ndarray) ‑> numpy.ndarray

Derivative using gradient

:param timestamps_s: timestamps corresponding to the data in seconds :param sensor_wf: data to integrate using cumulative trapezoid :return: derivative data with the same length as the input

Expand source code
def derivative_gradient(timestamps_s: np.ndarray,
                        sensor_wf: np.ndarray) -> np.ndarray:
    Derivative using gradient

    :param timestamps_s: timestamps corresponding to the data in seconds
    :param sensor_wf: data to integrate using cumulative trapezoid
    :return: derivative data with the same length as the input
    # derivative_data = np.gradient(sensor_wf)/np.gradient(timestamps_s)
    derivative_data = np.gradient(sensor_wf, timestamps_s)

    return derivative_data
def integrate_cumtrapz(timestamps_s: numpy.ndarray, sensor_wf: numpy.ndarray, initial_value: float = 0) ‑> numpy.ndarray

Cumulative trapazoid integration using scipy.integrate.cumulative_trapezoid Initiated by Kei 2106, work in progress. See blast_derivative_integral for validation.

:param timestamps_s: timestamps corresponding to the data in seconds :param sensor_wf: data to integrate using cumulative trapezoid :param initial_value: the value to add in the initial of the integrated data to match length of input (default is 0) :return: integrated data with the same length as the input

Expand source code
def integrate_cumtrapz(timestamps_s: np.ndarray,
                       sensor_wf: np.ndarray,
                       initial_value: float = 0) -> np.ndarray:
    Cumulative trapazoid integration using scipy.integrate.cumulative_trapezoid
    Initiated by Kei 2106, work in progress. See blast_derivative_integral for validation.

    :param timestamps_s: timestamps corresponding to the data in seconds
    :param sensor_wf: data to integrate using cumulative trapezoid
    :param initial_value: the value to add in the initial of the integrated data to match length of input (default is 0)
    :return: integrated data with the same length as the input
    integrated_data = cumulative_trapezoid(x=timestamps_s,
    return integrated_data
def just_tile(array1d_in: Union[float, numpy.ndarray], shape_out: tuple) ‑> numpy.ndarray

Constructs tiled array from 1D array to the shape specified by shape_out

:param array1d_in: 1D array or vector :param shape_out: Tuple with output array shape. :return: ndarray

Expand source code
def just_tile(array1d_in: Union[float, np.ndarray],
              shape_out: tuple) -> np.ndarray:
    Constructs tiled array from 1D array to the shape specified by shape_out

    :param array1d_in: 1D array or vector
    :param shape_out: Tuple with output array shape.
    :return: ndarray
    if len(shape_out) == 1:
        tiled_matrix = np.tile(array1d_in, (shape_out[0]))
    elif len(shape_out) == 2:
        tiled_matrix = np.tile(array1d_in, (shape_out[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(array1d_in.shape)))

    return tiled_matrix
def log2epsilon(x: numpy.ndarray) ‑> numpy.ndarray

log 2 of the absolute value of linear amplitude, with EPSILON to avoid singularities

:param x: time series or fft - not power :return: ndarray

Expand source code
def log2epsilon(x: np.ndarray) -> np.ndarray:
    log 2 of the absolute value of linear amplitude, with EPSILON to avoid singularities

    :param x: time series or fft - not power
    :return: ndarray
    y = np.log2(np.abs(x) + EPSILON)
    return y
def log2epsilon_max(x: numpy.ndarray) ‑> float

max of the log 2 of absolute value of linear amplitude, with EPSILON to avoid singularities

:param x: time series or fft - not power :return: float

Expand source code
def log2epsilon_max(x: np.ndarray) -> float:
    max of the log 2 of absolute value of linear amplitude, with EPSILON to avoid singularities

    :param x: time series or fft - not power
    :return: float
    y = np.max(np.log2(np.abs(x) + EPSILON))
    return y
def mean_columns(sxx: numpy.ndarray) ‑> numpy.ndarray

Compute the mean of the columns in a 1D or 2D array

:param sxx: input vector or matrix :return: ndarray with mean

Expand source code
def mean_columns(sxx: np.ndarray) -> np.ndarray:
    Compute the mean of the columns in a 1D or 2D array

    :param sxx: input vector or matrix
    :return: ndarray with mean
    if not isinstance(sxx, np.ndarray):
        raise TypeError('Input must be array.')
    elif len(sxx) == 0:
        raise ValueError('Cannot compute on empty array.')

    if np.isnan(sxx).any() or np.isinf(sxx).any():
        sxx = np.nan_to_num(sxx)

    if len(sxx.shape) == 1:
        sum_c = np.mean(sxx, axis=0)
    elif len(sxx.shape) == 2:
        sum_c = np.mean(sxx, axis=1)
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c
def mean_tile(sxx: numpy.ndarray, shape_out: numpy.ndarray) ‑> numpy.ndarray

Compute the mean of the columns in a 1D or 2D array and then re-tile to the original size

:param sxx: input vector or matrix :param shape_out: shape of output vector or matrix :return: ndarray of mean

Expand source code
def mean_tile(sxx: np.ndarray,
              shape_out: np.ndarray) -> np.ndarray:
    Compute the mean of the columns in a 1D or 2D array and then re-tile to the original size

    :param sxx: input vector or matrix
    :param shape_out: shape of output vector or matrix
    :return: ndarray of mean
    sum_c = mean_columns(sxx)

    # create array of repeated values of PSD with dimensions that match those of energy array
    if len(shape_out) == 1:
        sum_c_matrix = np.tile(sum_c, (shape_out[0]))
    elif len(shape_out) == 2:
        sum_c_matrix = np.tile(sum_c, (shape_out[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c_matrix
def picker_comb(sig_pick: numpy.ndarray, index_pick: numpy.ndarray) ‑> numpy.ndarray

Constructs a comb function from the picks

:param sig_pick: 1D record corresponding to the picks :param index_pick: indexes for the picks :return: comb with unit amplitude

Expand source code
def picker_comb(sig_pick: np.ndarray,
                index_pick: np.ndarray) -> np.ndarray:
    Constructs a comb function from the picks

    :param sig_pick: 1D record corresponding to the picks
    :param index_pick: indexes for the picks
    :return: comb with unit amplitude
    comb = np.zeros(sig_pick.shape)
    comb[index_pick] = np.ones(index_pick.shape)
    return comb
def picker_signal_bit_index(sig: , sig_sample_rate_hz: float, bits_pick: float, time_interval_s: float) ‑> numpy.ndarray

Finds the picker index from the ABSOLUTE max in bits

:param sig: array of waveform data :param sig_sample_rate_hz: float sample rate of reference sensor :param bits_pick: detection treshold in db loss :param time_interval_s: min time interval between events :return: picker index

Expand source code
def picker_signal_bit_index(sig: np.array,
                            sig_sample_rate_hz: float,
                            bits_pick: float,
                            time_interval_s: float) -> np.ndarray:
    Finds the picker index from the ABSOLUTE max in bits

    :param sig: array of waveform data
    :param sig_sample_rate_hz: float sample rate of reference sensor
    :param bits_pick: detection treshold in db loss
    :param time_interval_s: min time interval between events
    :return: picker index

    # Compute the distance
    distance_points = int(time_interval_s * sig_sample_rate_hz)
    sig_bit = log2epsilon(sig)
    height_min = log2epsilon_max(sig) - bits_pick
    index_pick, _ = signal.find_peaks(sig_bit, height=height_min, distance=distance_points)

    return index_pick
def picker_signal_max_index(sig: , sig_sample_rate_hz: float, bits_pick: float, time_interval_s: float) ‑> 

Finds the picker index for the POSITIVE max of a signal

:param sig: array of waveform data :param sig_sample_rate_hz: float sample rate of reference sensor :param bits_pick: detection threshold in bits loss :param time_interval_s: min time interval between events :return: picker index

Expand source code
def picker_signal_max_index(sig: np.array,
                            sig_sample_rate_hz: float,
                            bits_pick: float,
                            time_interval_s: float) -> np.array:
    Finds the picker index for the POSITIVE max of a signal

    :param sig: array of waveform data
    :param sig_sample_rate_hz: float sample rate of reference sensor
    :param bits_pick: detection threshold in bits loss
    :param time_interval_s: min time interval between events
    :return: picker index

    # Compute the distance
    distance_points = int(time_interval_s * sig_sample_rate_hz)
    height_min = np.max(sig) - 2 ** bits_pick
    time_index_pick, _ = signal.find_peaks(sig, height=height_min, distance=distance_points)

    return time_index_pick
def sig3c_frame(sig3c: numpy.ndarray, time_epoch_s: numpy.ndarray, epoch_s_start: float, epoch_s_stop: float) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Frame three-component signal within start and stop epoch times

:param sig3c: input signal with three components :param time_epoch_s: input epoch time in seconds :param epoch_s_start: start epoch time :param epoch_s_stop: stop epoch time :return: truncated time series and time

Expand source code
def sig3c_frame(sig3c: np.ndarray,
                time_epoch_s: np.ndarray,
                epoch_s_start: float,
                epoch_s_stop: float) -> Tuple[np.ndarray, np.ndarray]:
    Frame three-component signal within start and stop epoch times

    :param sig3c: input signal with three components
    :param time_epoch_s: input epoch time in seconds
    :param epoch_s_start: start epoch time
    :param epoch_s_stop: stop epoch time
    :return: truncated time series and time
    intro_index = np.argmin(np.abs(time_epoch_s - epoch_s_start))
    outro_index = np.argmin(np.abs(time_epoch_s - epoch_s_stop))
    sig_wf = sig3c[:, intro_index: outro_index]
    sig_epoch_s = time_epoch_s[intro_index: outro_index]

    return sig_wf, sig_epoch_s
def sig_extract(sig: numpy.ndarray, time_epoch_s: numpy.ndarray, intro_s: float, outro_s: float, pick_bits_below_max: float = 1.0, pick_time_interval_s: float = 1.0, extract_type: ExtractionType = ExtractionType.ARGMAX) ‑> Tuple[numpy.ndarray, numpy.ndarray, float]

Extract signal and time relative to reference index

:param sig: input signal :param time_epoch_s: signal epoch time :param intro_s: time before pick :param outro_s: time after pick :param pick_bits_below_max: pick treshold in bits below max :param pick_time_interval_s: pick time interval between adjacent max point :param extract_type: Type of extraction, see ExtractionType class :return: extracted signal np.ndarray, extracted signal timestamps np.ndarray, and pick time

Expand source code
def sig_extract(sig: np.ndarray,
                time_epoch_s: np.ndarray,
                intro_s: float,
                outro_s: float,
                pick_bits_below_max: float = 1.,
                pick_time_interval_s: float = 1.,
                extract_type: ExtractionType = ExtractionType.ARGMAX) -> Tuple[np.ndarray, np.ndarray, float]:
    Extract signal and time relative to reference index

    :param sig: input signal
    :param time_epoch_s: signal epoch time
    :param intro_s: time before pick
    :param outro_s: time after pick
    :param pick_bits_below_max: pick treshold in bits below max
    :param pick_time_interval_s: pick time interval between adjacent max point
    :param extract_type: Type of extraction, see ExtractionType class
    :return: extracted signal np.ndarray, extracted signal timestamps np.ndarray, and pick time

    sig_sample_interval_s = np.mean(np.diff(time_epoch_s))

    if extract_type == ExtractionType.ARGMAX:
        index_max = np.argmax(np.abs(sig))
        # Max pick
        pick_time_epoch_s = time_epoch_s[index_max]
    elif extract_type == ExtractionType.SIGMAX:
        time_index_pick_all = \
            picker_signal_max_index(sig=sig, sig_sample_rate_hz=1./sig_sample_interval_s,
                                    bits_pick=pick_bits_below_max, time_interval_s=pick_time_interval_s)
        # First pick
        pick_time_epoch_s = time_epoch_s[time_index_pick_all[0]]
    elif extract_type == ExtractionType.BITMAX:
        time_index_pick_all = \
            picker_signal_bit_index(sig=sig, sig_sample_rate_hz=1./sig_sample_interval_s,
                                    bits_pick=pick_bits_below_max, time_interval_s=pick_time_interval_s)
        # First pick
        pick_time_epoch_s = time_epoch_s[time_index_pick_all[0]]
        print('Unexpected extraction type to sig_extract, return max')
        index_max = np.argmax(np.abs(sig))
        # Max pick
        pick_time_epoch_s = time_epoch_s[index_max]

    epoch_s_start = pick_time_epoch_s - intro_s
    epoch_s_stop = pick_time_epoch_s + outro_s

    intro_index = np.argmin(np.abs(time_epoch_s - epoch_s_start))
    outro_index = np.argmin(np.abs(time_epoch_s - epoch_s_stop))
    sig_wf = sig[intro_index: outro_index]
    sig_epoch_s = time_epoch_s[intro_index: outro_index]

    return sig_wf, sig_epoch_s, pick_time_epoch_s
def sig_frame(sig: numpy.ndarray, time_epoch_s: numpy.ndarray, epoch_s_start: float, epoch_s_stop: float) ‑> Tuple[numpy.ndarray, numpy.ndarray]

Frame one-component signal within start and stop epoch times

:param sig: input signal :param time_epoch_s: input epoch time in seconds :param epoch_s_start: start epoch time :param epoch_s_stop: stop epoch time :return: truncated time series and time

Expand source code
def sig_frame(sig: np.ndarray,
              time_epoch_s: np.ndarray,
              epoch_s_start: float,
              epoch_s_stop: float) -> Tuple[np.ndarray, np.ndarray]:
    Frame one-component signal within start and stop epoch times

    :param sig: input signal
    :param time_epoch_s: input epoch time in seconds
    :param epoch_s_start: start epoch time
    :param epoch_s_stop: stop epoch time
    :return: truncated time series and time
    intro_index = np.argmin(np.abs(time_epoch_s - epoch_s_start))
    outro_index = np.argmin(np.abs(time_epoch_s - epoch_s_stop))
    sig_wf = sig[intro_index: outro_index]
    sig_epoch_s = time_epoch_s[intro_index: outro_index]

    return sig_wf, sig_epoch_s
def sum_columns(sxx: numpy.ndarray) ‑> numpy.ndarray

Sum over all the columns in a 1D or 2D array

:param sxx: input vector or matrix :return: ndarray with sum

Expand source code
def sum_columns(sxx: np.ndarray) -> np.ndarray:
    Sum over all the columns in a 1D or 2D array

    :param sxx: input vector or matrix
    :return: ndarray with sum
    if not isinstance(sxx, np.ndarray):
        raise TypeError('Input must be array.')
    elif len(sxx) == 0:
        raise ValueError('Cannot compute on empty array.')

    if np.isnan(sxx).any() or np.isinf(sxx).any():
        sxx = np.nan_to_num(sxx)

    if len(sxx.shape) == 1:
        sum_c = np.sum(sxx, axis=0)
    elif len(sxx.shape) == 2:
        sum_c = np.sum(sxx, axis=1)
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c
def sum_tile(sxx: numpy.ndarray) ‑> numpy.ndarray

Compute the sum of the columns in a 1D or 2D array and then re-tile to the original size

:param sxx: input vector or matrix :return: ndarray of sum

Expand source code
def sum_tile(sxx: np.ndarray) -> np.ndarray:
    Compute the sum of the columns in a 1D or 2D array and then re-tile to the original size

    :param sxx: input vector or matrix
    :return: ndarray of sum
    sum_c = sum_columns(sxx)

    # create array of repeated values of PSD with dimensions that match those of energy array
    if len(sxx.shape) == 1:
        sum_c_matrix = np.tile(sum_c, (sxx.shape[0]))
    elif len(sxx.shape) == 2:
        sum_c_matrix = np.tile(sum_c, (sxx.shape[1], 1)).T
        raise TypeError('Cannot handle an array of shape {}.'.format(str(sxx.shape)))

    return sum_c_matrix
def taper_tukey(sig_wf_or_time: numpy.ndarray, fraction_cosine: float) ‑> numpy.ndarray

Constructs a symmetric Tukey window with the same dimensions as a time or signal numpy array. fraction_cosine = 0 is a rectangular window, 1 is a Hann window

:param sig_wf_or_time: input signal or time :param fraction_cosine: fraction of the window inside the cosine tapered window, shared between the head and tail :return: tukey taper window amplitude

Expand source code
def taper_tukey(sig_wf_or_time: np.ndarray,
                fraction_cosine: float) -> np.ndarray:
    Constructs a symmetric Tukey window with the same dimensions as a time or signal numpy array.
    fraction_cosine = 0 is a rectangular window, 1 is a Hann window

    :param sig_wf_or_time: input signal or time
    :param fraction_cosine: fraction of the window inside the cosine tapered window, shared between the head and tail
    :return: tukey taper window amplitude
    return, alpha=fraction_cosine, sym=True)


class ExtractionType (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumeration of valid extraction types.

ARGMAX = max of the absolute value of the signal SIGMAX = fancier signal picker, from POSITIVE max BITMAX = fancier signal picker, from ABSOLUTE max

Expand source code
class ExtractionType(Enum):
    Enumeration of valid extraction types.

    ARGMAX = max of the absolute value of the signal
    SIGMAX = fancier signal picker, from POSITIVE max
    BITMAX = fancier signal picker, from ABSOLUTE max
    ARGMAX: str = "argmax"
    SIGMAX: str = "sigmax"
    BITMAX: str = "bitmax"


  • enum.Enum

Class variables

var ARGMAX : str
var BITMAX : str
var SIGMAX : str