Module redvox.api900.location_analyzer
finds a station's best location and compare it against a surveyed point if one is provided station location data can be loaded from rdvxz files or input manually The surveyed point can come from Google earth or any other positioning tool.
PLEASE NOTE: latitude and longitude measurements are always in degrees (deg) altitude and accuracy measurements are always in meters (m) barometer measurements are always in kiloPascals (kPa) exceptions to this will ALWAYS be noted in comments and variable names
barometric formula source: https://www.math24.net/barometric-formula/ barometric formula P(h) = P0 * e**(h * (-Mg/RT)) where h is a height in meters, P(h) is pressure in kPa at h and P0 is sea-level pressure in kPa Mg/RT is a constant based on assumptions of average earth based values.
Haversine equation constants from site: https://movable-type.co.uk/scripts/gis-faq-5.1.html
Expand source code
"""
finds a station's best location and compare it against a surveyed point if one is provided
station location data can be loaded from rdvxz files or input manually
The surveyed point can come from Google earth or any other positioning tool.
PLEASE NOTE:
latitude and longitude measurements are always in degrees (deg)
altitude and accuracy measurements are always in meters (m)
barometer measurements are always in kiloPascals (kPa)
exceptions to this will ALWAYS be noted in comments and variable names
barometric formula source: https://www.math24.net/barometric-formula/
barometric formula P(h) = P0 * e**(h * (-Mg/RT))
where h is a height in meters, P(h) is pressure in kPa at h and P0 is sea-level pressure in kPa
Mg/RT is a constant based on assumptions of average earth based values.
Haversine equation constants from site: https://movable-type.co.uk/scripts/gis-faq-5.1.html
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Optional, Tuple
from fastkml import kml, styles
from fastkml.geometry import Point
from redvox.api900 import reader
from redvox.common.constants import (
EPSILON,
DEG_TO_RAD,
AVG_SEA_LEVEL_PRESSURE_KPA,
EARTH_RADIUS_M,
STANDARD_TEMPERATURE_K,
MOLAR_MASS_AIR_KG_PER_MOL,
GRAVITY_M_PER_S2,
UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2,
)
# instruments have only so much accuracy, so if something has a distance less than the following values
# from a given point, we could feasibly consider it to be close enough to be at the given point.
# default horizontal distance in meters for something to be included with a given point
DEFAULT_INCLUSION_HORIZONTAL_M = 100.0
# default vertical distance in meters for something to be included within a given point
DEFAULT_INCLUSION_VERTICAL_M = 50.0
# default vertical distance in meters computed via barometer measurements to be included within a given point
DEFAULT_INCLUSION_VERTICAL_BAR_M = 10.0
# Survey dictionary minimum keys
# lat: latitude, lon: longitude, alt: altitude, bar: barometer reading
SURVEY_KEYS = ["lat", "lon", "alt", "bar"]
# Optional survey dictionary keys
# sea_bar: pressure reading at sea level
OPTIONAL_SURVEY_KEYS = ["sea_bar"]
# GPS data frame indices
GPS_DATA_INDICES = ["latitude", "longitude", "altitude", "accuracy"]
# closest gps data point to surveyed data frame columns
CLOSEST_TO_SURVEY_COLUMNS = ["closest acc", "closest lat", "closest lon", "closest alt", "closest bar", "distance"]
# mean location data frame columns
MEAN_LOC_COLUMNS = ["mean acc", "mean lat", "mean lon", "mean alt", "mean bar"]
# standard deviation (std) values data frame columns
STD_LOC_COLUMNS = ["std acc", "std lat", "std lon", "std alt", "std bar"]
# station info data frame columns
STATION_INFO_COLUMNS = ["os", "sample rate"]
# master data frame columns
MASTER_COLUMNS = STATION_INFO_COLUMNS + CLOSEST_TO_SURVEY_COLUMNS + MEAN_LOC_COLUMNS + STD_LOC_COLUMNS
# dict of validation methods that can be utilized
VALIDATION_METHODS = {"sol": "close to solution", "mean": "close to mean"}
class DataHolder:
"""
Stores an array of float data. The data is privatized for security.
It also keeps track of the "best value" of the data set.
Properties:
* id: a string identifier for the data
* _data: private data storage; all values must be floats
* best_value: the value that best represents the data set
"""
#
def __init__(self, name: str):
"""
sets up the DataHolder
:param name: a string identifier for the data
"""
self.id = name
self._data = []
self.best_value = None
def add(self, new_data: float):
"""
adds one element to the data
:param new_data: float value to add
"""
self._data.append(new_data)
self.replace_zeroes_with_epsilon()
def set_data(self, new_data: List[float]):
"""
overwrites the stored data with the new_data
:param new_data: the new list of floats to overwrite the existing data with
"""
self._data = new_data
self.replace_zeroes_with_epsilon()
def replace_zeroes_with_epsilon(self):
"""
replaces all 0 values in the data with extremely tiny values
"""
for index in range(len(self._data)):
if self._data[index] == 0.0:
self._data[index] = EPSILON
def get_mean(self) -> float:
"""
:return: the mean of the data
"""
return np.mean(self._data)
def get_std(self) -> float:
"""
:return: the standard deviation of the data
"""
return np.std(self._data)
def get_data(self) -> List[float]:
"""
:return: the data
"""
return self._data
def get_len_data(self) -> int:
"""
:return: the length of the data array
"""
return len(self._data)
class GPSDataHolder:
"""
holds gps data (latitude, longitude, altitude, and accuracy) and barometric data
uses a dataframe to organize the gps data
Properties:
* gps_df: a dataframe to hold all the gps data
* barometer: a DataHolder for barometer data
* id: string identifier for the data set
* os_type: string identifier for the operating system of the data set
* mic_samp_rate_hz: float sample rate of station microphone in hz
* best_data_index: the index that corresponds to the best representative of the data
"""
def __init__(
self,
name: str,
opsys: str,
data: Optional[List[List[float]]] = None,
mic_samp_rate_hz: float = 80.0,
bar: Optional[DataHolder] = None,
):
"""
sets up the GPSDataHolder
:param name: string identifier for the data set
:param opsys: string identifier for the data set's operating system
:param data: the data as a list of list of floats, default None
:param mic_samp_rate_hz: float sample rate of the microphone in hz, default 80 hz
:param bar: barometer DataHolder, default None
"""
self.gps_df = pd.DataFrame(data, index=GPS_DATA_INDICES)
self.barometer = bar
self.id = name
self.os_type = opsys
self.mic_samp_rate_hz = mic_samp_rate_hz
self.best_data_index = 0
def clone(self):
"""
:return: an exact copy of the GPSDataHolder
"""
# return a copy of the calling data frame
new_gps_dh = GPSDataHolder(self.id, self.os_type, None, self.mic_samp_rate_hz, self.barometer)
new_gps_dh.gps_df = self.gps_df
new_gps_dh.best_data_index = self.best_data_index
return new_gps_dh
def set_data(self, new_data: Optional[List[List[float]]] = None):
"""
set gps location data. data is expected to be 4 lists: latitude values, longitude values, altitude values,
and accuracy values
:param new_data: list of list of floats that represent the gps data, default None
"""
self.gps_df = pd.DataFrame(new_data, index=GPS_DATA_INDICES)
def set_metadata(
self, new_id: Optional[str] = None, new_os: Optional[str] = None, new_mic_samp_rate_hz: Optional[float] = None
):
"""
set metadata fields: id, os_type and mic_sample_rate_hz
:param new_id: the new string identifier for the data set, default None
:param new_os: the new string identifier for the data set's os, default None
:param new_mic_samp_rate_hz: float of new microphone sample rate in hz, default None
"""
if new_id is not None:
self.id = new_id
if new_os is not None:
self.os_type = new_os
if new_mic_samp_rate_hz is not None:
self.mic_samp_rate_hz = new_mic_samp_rate_hz
def get_mean_all(self) -> Dict[str, float]:
"""
:return: means of the latitude, longitude, altitude, accuracy, and barometer
"""
bar_mean = self.barometer.get_mean()
if bar_mean == 0 or bar_mean is None:
bar_mean = 0.00000000001
lat_mean = self.gps_df.loc["latitude"].mean()
lon_mean = self.gps_df.loc["longitude"].mean()
alt_mean = self.gps_df.loc["altitude"].mean()
acc_mean = self.gps_df.loc["accuracy"].mean()
return {"acc": acc_mean, "lat": lat_mean, "lon": lon_mean, "alt": alt_mean, "bar": bar_mean}
def get_std_all(self) -> Dict[str, float]:
"""
:return: standard deviations of the latitude, longitude, altitude, accuracy, and barometer
"""
bar_std = self.barometer.get_std()
if bar_std == 0 or bar_std is None:
bar_std = 0.00000000001
lat_std = self.gps_df.loc["latitude"].std()
if np.isnan(lat_std):
lat_std = 0
lon_std = self.gps_df.loc["longitude"].std()
if np.isnan(lon_std):
lon_std = 0
alt_std = self.gps_df.loc["altitude"].std()
if np.isnan(alt_std):
alt_std = 0
acc_std = self.gps_df.loc["accuracy"].std()
if np.isnan(acc_std):
acc_std = 0
return {"acc": acc_std, "lat": lat_std, "lon": lon_std, "alt": alt_std, "bar": bar_std}
def set_barometer(self, bar_data: List[float]):
"""
sets the barometer DataHolder. uses the mean of the data as the best value
:param bar_data: list of floats to set barometer data as
"""
self.barometer = DataHolder("barometer")
self.barometer.set_data(bar_data)
self.barometer.best_value = np.mean(bar_data)
def get_size(self) -> (int, int):
"""
:return: the amount of gps and barometer data points
"""
return self.gps_df.iloc[0].size, self.barometer.get_len_data()
class LocationAnalyzer:
"""
stores location information, which can be analyzed later
contains functions to find mean, standard deviation (std) and validation of data
use one analyzer per real location point. one analyzer can accommodate multiple stations per survey point
the real location dictionary must contain the minimum keys listed in SURVEY_KEYS
the real location dictionary may contain other keys than ones listed in SURVEY_KEYS
keys in OPTIONAL_SURVEY_KEYS have special meaning and can only be used as this program intends to use them
uses dataframes with station id as the index
Properties:
* all_stations_info_df: dataframe with metadata about all stations
* all_stations_mean_df: dataframe with means from all stations
* all_stations_std_df: dataframe with stds from all stations
* all_stations_closest_df: dataframe with the closest point to the real location and its distance to the
real location for all stations
* invalid_points: a list of gps points that are blacklisted
* _real_location: the surveyed point that the station is located at. privatized for security
* all_gps_data: a list of all GPSDataHolders that form the data set
* valid_gps_data: a list of all GPSDataHolders that pass validation checks
"""
def __init__(
self,
wrapped_packets: List[List[reader.WrappedRedvoxPacket]] = None,
real_location: Optional[Dict[str, float]] = None,
invalid_points: Optional[List[Dict[str, float]]] = None,
):
"""
set up the LocationAnalyzer
:param wrapped_packets: a list of wrapped redvox packet lists to analyze, default None
:param real_location: dictionary containing the real location of the station, default None
:param invalid_points: list of gps points that should not be in the data set, default None
"""
self.all_stations_closest_df = pd.DataFrame([], columns=CLOSEST_TO_SURVEY_COLUMNS)
self.all_stations_mean_df = pd.DataFrame([], columns=MEAN_LOC_COLUMNS)
self.all_stations_std_df = pd.DataFrame([], columns=STD_LOC_COLUMNS)
self.all_stations_info_df = pd.DataFrame([], columns=STATION_INFO_COLUMNS)
self.invalid_points = invalid_points
self.all_gps_data = []
self.valid_gps_data = []
self._real_location = real_location
# if given a path to redvox data, load data from there
if wrapped_packets is not None:
for wrapped_device_packets in wrapped_packets:
self.get_loc_from_packets(wrapped_device_packets)
def set_real_location(self, survey: Dict[str, float] = None):
"""
set the real location
:param survey: dictionary containing the station's location, default None
"""
self._real_location = survey
def get_real_location(self) -> Dict[str, float]:
"""
:return: the station's real location
"""
return self._real_location
def get_all_dataframes(self) -> pd.DataFrame:
"""
:return: all 4 dataframes fused together, joined by station id
"""
frames = [
self.all_stations_info_df,
self.all_stations_closest_df,
self.all_stations_mean_df,
self.all_stations_std_df,
]
return pd.concat(frames, axis=1)
def get_stats_dataframes(self) -> pd.DataFrame:
"""
:return: station info, mean and std dataframes fused together
"""
frames = [self.all_stations_info_df, self.all_stations_mean_df, self.all_stations_std_df]
return pd.concat(frames, axis=1)
def get_loc_from_packets(self, w_p: List[reader.WrappedRedvoxPacket]):
"""
store the location information and their mean and std using a collection of wrapped redvox packets
assumes a list of redvox packets shares 1 device id
:param w_p: a list of wrapped redvox packets to read
"""
# extract the information from the packets
sample_rate = w_p[0].microphone_sensor().sample_rate_hz()
dev_os_type = w_p[0].device_os()
idd = w_p[0].redvox_id()
packet_gps_data = load_position_data(w_p)
# compute mean location
mean_loc = packet_gps_data.get_mean_all()
std_loc = packet_gps_data.get_std_all()
# store the information
self.all_gps_data.append(packet_gps_data)
self.all_stations_info_df.loc[idd] = [dev_os_type, sample_rate]
self.all_stations_std_df.loc[idd] = [
std_loc["acc"],
std_loc["lat"],
std_loc["lon"],
std_loc["alt"],
std_loc["bar"],
]
self.all_stations_mean_df.loc[idd] = [
mean_loc["acc"],
mean_loc["lat"],
mean_loc["lon"],
mean_loc["alt"],
mean_loc["bar"],
]
def analyze_data(self, write_output: bool = False):
"""
analyze data, then if a real location exists, compare data to real location
output is written if enabled
:param write_output: boolean to write any debugging output, default False
"""
self.validate_all()
# if there's no real location, make the mean the real location
if self._real_location is None:
means = self.all_stations_mean_df
self._real_location = {
"lat": np.mean(means["mean lat"]),
"lon": np.mean(means["mean lon"]),
"alt": np.mean(means["mean alt"]),
"bar": np.mean(means["mean bar"]),
}
self.compare_with_real_location()
# print results
if write_output:
self.print_to_csv("temp.csv")
def get_barometric_heights(self, sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA) -> pd.DataFrame:
"""
for each station, compute the barometric height using the mean
:param sea_pressure: the local sea pressure in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA
:return: a dataframe with the barometric heights in meters and station id as the index
"""
bar_heights = {}
data_dict = self.all_stations_mean_df["mean bar"].T.to_dict()
for index in data_dict.keys():
bar_heights[index] = compute_barometric_height(data_dict[index], sea_pressure)
barometric_heights = pd.DataFrame(bar_heights, index=["bar height"], columns=self.all_stations_mean_df.index)
return barometric_heights.T
def validate_all(
self,
validation_ranges: Tuple[float, float, float] = (
DEFAULT_INCLUSION_HORIZONTAL_M,
DEFAULT_INCLUSION_VERTICAL_M,
DEFAULT_INCLUSION_VERTICAL_BAR_M,
),
):
"""
check that all data in the data set are valid. Remove outliers and strange values
:param validation_ranges: tuple of floats that the data values are compared against for validation
"""
# validation always assumes nothing is valid when it starts, so empty out the valid_gps_data
self.valid_gps_data = []
for station in self.all_gps_data:
# if self._real_location is not None:
# validated_gps = self.validator(station, self._real_location)
# else:
# validated_gps = validate_data(station)
validated_gps = validate(station, validation_ranges, "blacklist", self.invalid_points)
if validated_gps.get_size()[0] != 0:
self.valid_gps_data.append(validated_gps)
def compare_with_real_location(self):
"""
find the closest valid data point to the real location. information is stored in the data frames
"""
# compute closest point to real location
result = compute_distance_all(self._real_location, self.valid_gps_data)
self.all_stations_closest_df = result[CLOSEST_TO_SURVEY_COLUMNS]
self.all_stations_info_df = result[STATION_INFO_COLUMNS]
self.all_stations_mean_df = result[MEAN_LOC_COLUMNS]
self.all_stations_std_df = result[STD_LOC_COLUMNS]
def print_location_df(self, info_type: Optional[str] = None, os_type: Optional[str] = None):
"""
print a single dataframe or a group of dataframes
:param info_type: string denoting the type or group of dataframes to output, default None
:param os_type: string denoting the os of the stations to output, default None
"""
if info_type == "real":
print_station_df(self.all_stations_closest_df, os_type)
elif info_type == "info":
print_station_df(self.all_stations_info_df, os_type)
elif info_type == "std":
print_station_df(self.all_stations_std_df, os_type)
elif info_type == "mean":
print_station_df(self.all_stations_mean_df, os_type)
elif info_type == "all":
print_station_df(self.get_all_dataframes(), os_type)
else:
# fuse statistical dataframes together
print_station_df(self.get_stats_dataframes(), os_type)
def print_to_csv(self, path: str, os_type: Optional[str] = None, debug: Optional[bool] = False):
"""
print dataframes to csv files in path
:param path: string containing full path and file name
:param os_type: string denoting the os of the stations to output, default None
:param debug: if true, output debug statements, default False
"""
# fuse all dataframes together
result = self.get_all_dataframes()
if os_type == "Android":
get_all_android_station(result).to_csv(path)
elif os_type == "iOS":
get_all_ios_station(result).to_csv(path)
else:
os_type = "all"
result.to_csv(path)
if debug:
print("Printed {} station data to {}.".format(os_type, path))
def get_all_ios_station(station_df: pd.DataFrame) -> pd.DataFrame:
"""
:param station_df: the dataframe to search
:return: a dataframe with all data related to iOS stations in the dataframe
"""
return station_df.loc[station_df["os"] == "iOS"]
def get_all_android_station(station_df: pd.DataFrame) -> pd.DataFrame:
"""
:param station_df: the dataframe to search
:return: a dataframe with all data related to android stations in the dataframe
"""
return station_df.loc[station_df["os"] == "Android"]
def print_station_df(station_df: pd.DataFrame, os_type: Optional[str] = None):
"""
print a dataframe, filtering on the station's os type
:param station_df: a dataframe to search
:param os_type: os type to filter on, default None
"""
if os_type == "Android":
print(get_all_android_station(station_df))
elif os_type == "iOS":
print(get_all_ios_station(station_df))
else:
print(station_df)
def load_position_data(w_p: List[reader.WrappedRedvoxPacket]) -> GPSDataHolder:
"""
:param w_p: list of wrapped packets to read
:return: all gps data from the packets in a GPSDataHolder
"""
gps_data = [[], [], [], []]
packet = None
packet_name = None
bar_data = []
try:
for packet in w_p:
packet_name = packet.default_filename()
if packet.has_barometer_sensor():
bar_chan = packet.barometer_sensor() # load barometer data
bar_data.extend(bar_chan.payload_values())
else:
# add defaults
bar_data.extend([0.0])
print("WARNING: {} Barometer empty, using default values!".format(packet_name))
if packet.has_location_sensor():
# load each channel's data into the container
loc_chan = packet.location_sensor()
gps_data[0].extend(loc_chan.payload_values_latitude())
gps_data[1].extend(loc_chan.payload_values_longitude())
gps_data[2].extend(loc_chan.payload_values_altitude())
gps_data[3].extend(loc_chan.payload_values_accuracy())
else:
# add defaults
gps_data[0].extend([0.0])
gps_data[1].extend([0.0])
gps_data[2].extend([0.0])
gps_data[3].extend([0.0])
print("WARNING: {} Location empty, using default values!".format(packet_name))
except Exception as eror:
if packet is not None:
error_string = "Something went wrong while reading location data from file: {}. " "Original message: {}"
raise Exception(error_string.format(packet_name, eror))
else:
raise Exception("No packet found in file. Original message: {}".format(eror))
# load data into data holder
redvox_id = w_p[0].redvox_id()
gps_dfh = GPSDataHolder(str(redvox_id), w_p[0].device_os(), gps_data, w_p[0].microphone_sensor().sample_rate_hz())
gps_dfh.set_barometer(bar_data)
return gps_dfh
def compute_barometric_height(
barometric_pressure: float,
sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA,
standard_temp: float = STANDARD_TEMPERATURE_K,
molar_air_mass: float = MOLAR_MASS_AIR_KG_PER_MOL,
gravity: float = GRAVITY_M_PER_S2,
gas_constant: float = UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2,
) -> float:
"""
compute height of a single point using a station's barometric and sea-level pressure
barometric equation from https://www.math24.net/barometric-formula/
:param barometric_pressure: pressure at a station in kPa
:param sea_pressure: pressure at sea level in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA
:param standard_temp: surface temperature in K, default STANDARD_TEMPERATURE_K
:param molar_air_mass: molar mass of air in kg/mol, default MOLAR_MASS_AIR_KG_PER_MOL
:param gravity: the acceleration of gravity in m/s2, default GRAVITY_M_PER_S2
:param gas_constant: the universal gas constant in (kg * m2)/(K * mol * s2),
default UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2
:return: height of station in meters
"""
# formula and derivations:
# P(h) = P0 * e**(-Mgh/RT) where:
# P0 = AVG_SEA_LEVEL_PRESSURE_KPA = 101.325
# g = GRAVITY_M_PER_S2 = 9.807
# M = MOLAR_MASS_AIR_KG_PER_MOL = 0.02896
# T = STANDARD_TEMPERATURE_K = 288.15
# R = UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2 = 8.3143
# therefore h = ln(P0/P(h)) / (Mg/RT)
# due to log function, we can't let sea_pressure or barometric_pressure be 0
if sea_pressure == 0.0:
sea_pressure = EPSILON
if barometric_pressure == 0.0:
barometric_pressure = EPSILON
barometric_height = np.log(sea_pressure / barometric_pressure) / (
(molar_air_mass * gravity) / (standard_temp * gas_constant)
)
return barometric_height
def compute_barometric_height_array(
barometric_pressure: np.array,
sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA,
standard_temp: float = STANDARD_TEMPERATURE_K,
molar_air_mass: float = MOLAR_MASS_AIR_KG_PER_MOL,
gravity: float = GRAVITY_M_PER_S2,
gas_constant: float = UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2,
) -> np.array:
"""
compute height of many points using each station's barometric and sea-level pressure
:param barometric_pressure: array of pressures at stations in kPa
:param sea_pressure: pressure at sea level in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA
:param standard_temp: surface temperature in K, default STANDARD_TEMPERATURE_K
:param molar_air_mass: molar mass of air in kg/mol, default MOLAR_MASS_AIR_KG_PER_MOL
:param gravity: the acceleration of gravity in m/s2, default GRAVITY_M_PER_S2
:param gas_constant: the universal gas constant in (kg * m2)/(K * mol * s2),
default UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2
:return: the height of each station in meters
"""
# due to log function, we can't let sea_pressure or barometric_pressure be 0
if sea_pressure == 0.0:
sea_pressure = EPSILON
for index in range(len(barometric_pressure)):
if barometric_pressure[index] == 0.0:
barometric_pressure[index] = EPSILON
barometric_height = np.log(sea_pressure / barometric_pressure) / (
(molar_air_mass * gravity) / (standard_temp * gas_constant)
)
return barometric_height
def get_component_dist_to_point(point: Dict[str, float], gps_data: pd.Series, bar_mean: float) -> (float, float, float):
"""
compute distance from the gps data point to the chosen point using haversine formula
:param point: dict with location to compute distance to
:param gps_data: series with gps data of one point
:param bar_mean: the mean barometer reading
:return: the distance in meters of the horizontal and vertical gps components and barometer readings
"""
# horizontal distance, use haversine formula
dlon = gps_data["longitude"] - point["lon"]
dlat = gps_data["latitude"] - point["lat"]
haver = np.sin(dlat * DEG_TO_RAD / 2.0) ** 2.0 + (
np.cos(point["lat"] * DEG_TO_RAD)
* np.cos(gps_data["latitude"] * DEG_TO_RAD)
* np.sin(dlon * DEG_TO_RAD / 2.0) ** 2.0
)
c = 2.0 * np.arcsin(np.min([1.0, np.sqrt(haver)]))
h_dist = EARTH_RADIUS_M * c
# vertical distance
v_dist = np.abs(gps_data["altitude"] - point["alt"])
# vertical distance using barometer
v_bar_dist = np.abs(compute_barometric_height(bar_mean) - point["alt"])
return h_dist, v_dist, v_bar_dist
def get_gps_dist_to_location(
point: Dict[str, float], gps_dataholder: GPSDataHolder, bar_alt: Optional[float] = None
) -> np.array:
"""
compute distance from multiple gps points to the chosen point using haversine formula
:param point: dict with location to compute distance to
:param gps_dataholder: all the gps data points to compute distance from
:param bar_alt: height as measured by a barometer, default None
:return: array of all distances in meters from gps point to chosen point
"""
# compute distance from the gps data points to the location
# if given a barometer altitude value, use that instead of the gps altitude
if bar_alt is not None:
station_alt = bar_alt
else:
station_alt = gps_dataholder.gps_df.loc["altitude"].to_numpy()
# user haversine formula
dlon = gps_dataholder.gps_df.loc["longitude"].to_numpy() - point["lon"]
dlat = gps_dataholder.gps_df.loc["latitude"].to_numpy() - point["lat"]
haver = np.sin(dlat * DEG_TO_RAD / 2.0) ** 2.0 + (
np.cos(point["lat"] * DEG_TO_RAD)
* np.cos(gps_dataholder.gps_df.loc["latitude"].to_numpy() * DEG_TO_RAD)
* np.sin(dlon * DEG_TO_RAD / 2.0) ** 2.0
)
c = 2 * np.arcsin(np.minimum([1.0], np.sqrt(haver)))
h_dist = EARTH_RADIUS_M * c
dist_array = h_dist**2 + (point["alt"] - station_alt) ** 2
return np.sqrt(dist_array)
def validate_blacklist(
gps_data: pd.Series,
point: Dict[str, float],
bar_mean: float,
inclusion_ranges: Tuple[float, float, float] = (
DEFAULT_INCLUSION_HORIZONTAL_M,
DEFAULT_INCLUSION_VERTICAL_M,
DEFAULT_INCLUSION_VERTICAL_BAR_M,
),
) -> bool:
"""
:param gps_data: data to compare
:param point: the point that is blacklisted
:param bar_mean: the mean of the barometer measurements
:param inclusion_ranges: distance from blacklisted point to be considered close enough
:return: True if point is not in blacklisted point's vicinity
"""
# calculate distance from gps data to invalid point
h_dist, v_dist, v_bar_dist = get_component_dist_to_point(point, gps_data, bar_mean)
# if outside horizontal and vertical distance, we're far enough away from the invalid point
return h_dist > inclusion_ranges[0] and (v_dist > inclusion_ranges[1] or v_bar_dist > inclusion_ranges[2])
def validate_near_point(
gps_data: pd.Series,
point: Dict[str, float],
bar_mean: float,
inclusion_ranges: Tuple[float, float, float] = (
DEFAULT_INCLUSION_HORIZONTAL_M,
DEFAULT_INCLUSION_VERTICAL_M,
DEFAULT_INCLUSION_VERTICAL_BAR_M,
),
) -> bool:
"""
:param gps_data: data to compare
:param point: the chosen point to compare against
:param bar_mean: the mean of the barometer measurements
:param inclusion_ranges: distance from chosen point to be considered close enough
:return: True if point is within the chosen point's vicinity
"""
# calculate distance from gps data to point
h_dist, v_dist, v_bar_dist = get_component_dist_to_point(point, gps_data, bar_mean)
# if within horizontal distance and vertical distance, we're close enough to the point
return h_dist <= inclusion_ranges[0] and (v_dist <= inclusion_ranges[1] or v_bar_dist <= inclusion_ranges[2])
def point_on_line_side(line_points: Tuple[Dict[str, float], Dict[str, float]], point: Dict[str, float]) -> float:
"""
check which side of a line the point is on
algorithm from: http://geomalgorithms.com/a03-_inclusion.html
:param line_points: two coordinates that define a line
:param point: point to test
:return: < 0 for right side, == 0 for on line, > 0 for left side
"""
return (line_points[1]["lon"] - line_points[0]["lon"]) * (point["lat"] - line_points[0]["lat"]) - (
point["lon"] - line_points[0]["lon"]
) * (line_points[1]["lat"] - line_points[0]["lat"])
def validate_point_in_polygon(point: Dict[str, float], edges: List[Dict[str, float]]) -> bool:
"""
Use winding number algorithm to determine if a point is in a polygon (or on the edge)
if winding number is 0, point is outside polygon
algorithm from: http://geomalgorithms.com/a03-_inclusion.html
:param point: coordinates of the point to compare
:param edges: list of coordinates of the edges of the polygon, with the last edge equal to the first
:return: True if point is in the polygon
"""
wn = 0 # winding number
for index in range(len(edges) - 1):
if edges[index]["lat"] <= point["lat"]:
if edges[index + 1]["lat"] > point["lat"]:
if point_on_line_side((edges[index], edges[index + 1]), point) >= 0:
wn += 1
elif edges[index + 1]["lat"] <= point["lat"]:
if point_on_line_side((edges[index], edges[index + 1]), point) <= 0:
wn -= 1
return wn != 0
def validate(
data_to_test: GPSDataHolder,
inclusion_ranges: Tuple[float, float, float] = (
DEFAULT_INCLUSION_HORIZONTAL_M,
DEFAULT_INCLUSION_VERTICAL_M,
DEFAULT_INCLUSION_VERTICAL_BAR_M,
),
validation_type: str = None,
validation_points: List[Dict[str, float]] = None,
debug: bool = False,
) -> GPSDataHolder:
"""
validation master function. Can perform any kind of validation requested
:param data_to_test: gps data to validate
:param inclusion_ranges: ranges to include a data point with a validation point
:param validation_type: the kind of validation to perform, default None
:param validation_points: the points to validate against, default None
:param debug: if True, output debugging information, default False
:return: all valid gps data
"""
# perform validation. returns all valid data
# check if we even have points to compare against
if len(validation_points) < 1:
return data_to_test # no points to check, everything is good
need_to_test_gps = data_to_test.clone()
while True: # keep going until the data doesn't change
# remove any points in the data that are not close to the points
validated_gps_data = [[], [], [], []]
for gps_point in data_to_test.gps_df.columns:
# extract data to test
gps_data = data_to_test.gps_df[gps_point]
point_valid = True # assume point is valid
for point in validation_points:
if validation_type == "solution" or validation_type == "mean":
point_valid = validate_near_point(
gps_data, point, data_to_test.barometer.get_mean(), inclusion_ranges
)
if not point_valid: # if point_valid ever becomes false, we can stop processing
break
else:
point_valid = validate_blacklist(
gps_data, point, data_to_test.barometer.get_mean(), inclusion_ranges
)
if not point_valid: # if point_valid ever becomes false, we can stop processing
break
if point_valid: # add only valid points
validated_gps_data[0].append(gps_data["latitude"])
validated_gps_data[1].append(gps_data["longitude"])
validated_gps_data[2].append(gps_data["altitude"])
validated_gps_data[3].append(gps_data["accuracy"])
# create the object to return.
validated_gps = GPSDataHolder(
data_to_test.id,
data_to_test.os_type,
validated_gps_data,
data_to_test.mic_samp_rate_hz,
data_to_test.barometer,
)
if validated_gps.get_size() == need_to_test_gps.get_size():
# print message if user allows it
if debug:
print("{} data validated".format(validated_gps.id))
# if data does not change, we are done validating
return validated_gps
else:
# use the new data to update the old data
need_to_test_gps = validated_gps.clone()
def compute_distance_all(point: Dict[str, float], all_gps_data: List[GPSDataHolder]) -> pd.DataFrame:
"""
:param point: the point to compute distance to
:param all_gps_data: the gps data points to compute distance from
:return: dataframe containing all information about the gps points' distance to the chosen point
"""
# compare distances from multiple gps points to the location. return the closest point
closeness = {}
for gps_dh in all_gps_data:
closeness.update(compute_distance(point, gps_dh))
return pd.DataFrame(closeness, index=MASTER_COLUMNS).T
def compute_distance(point: dict, gps_data: GPSDataHolder) -> dict:
"""
:param point: the chosen point to compute distance to
:param gps_data: the data points to compute distance from
:return: dictionary containing all information about the gps points' distance to the chosen point
"""
# for a location, compute distance to closest data point
idd = gps_data.id
stations_data = {idd: None}
gps_loc = gps_data.get_mean_all()
# find the closest barometer altitude to location
# bar_alt_tmp = (((SEA_PRESSURE / np.array(gps_data.barometer.data)) ** 0.190263096) - 1) * (SOL_TEMP / 0.0065)
if "sea bar" in point.keys() and point["sea_bar"] is not None:
bar_alt_tmp = compute_barometric_height_array(np.array(gps_data.barometer.get_data()), point["sea_bar"])
else:
bar_alt_tmp = compute_barometric_height_array(np.array(gps_data.barometer.get_data()))
# simplified barometric equation:
# P(h) = 101.325 * e ** (-0.00012h) -> P(h) / 101.325 = 1 / (e ** 0.00012h)
# e ** 0.00012h = 101.325 / P(h) -> 0.00012h = ln(101.325) - ln(P))
# SEA_PRESSURE = 101.325
# h = ln(SEA_PRESSURE/P(h)) / 0.00012
min_index = np.argmin(np.abs(bar_alt_tmp - point["alt"]))
gps_data.barometer.best_value = gps_data.barometer.get_data()[min_index]
bar_alt = bar_alt_tmp[min_index]
# for all gps coords, find closest to solution
dist_array = get_gps_dist_to_location(point, gps_data)
min_index = np.argmin(dist_array)
# compute distance using best barometer measurement
dist_array_bar = get_gps_dist_to_location(point, gps_data, bar_alt)
min_bar_index = np.argmin(dist_array_bar)
# compare minimum of pure gps and gps with barometer
if dist_array_bar[min_bar_index] < dist_array_bar[min_index]:
min_index = min_bar_index
dist_array = dist_array_bar
# finding the std of the distances is basically finding the std of accuracy
acc_std = np.std(dist_array)
lat_std = np.std(np.abs(point["lat"] - gps_data.gps_df.loc["latitude"].to_numpy()))
lon_std = np.std(np.abs(point["lon"] - gps_data.gps_df.loc["longitude"].to_numpy()))
alt_std = np.std(np.abs(point["alt"] - gps_data.gps_df.loc["altitude"].to_numpy()))
bar_std = gps_data.barometer.get_std()
# put data into dictionary to store in data frames later
stations_data[idd] = [
gps_data.os_type,
gps_data.mic_samp_rate_hz,
gps_data.gps_df.loc["accuracy", min_index],
gps_data.gps_df.loc["latitude", min_index],
gps_data.gps_df.loc["longitude", min_index],
gps_data.gps_df.loc["altitude", min_index],
gps_data.barometer.best_value,
dist_array[min_index],
gps_loc["acc"],
gps_loc["lat"],
gps_loc["lon"],
gps_loc["alt"],
gps_loc["bar"],
acc_std,
lat_std,
lon_std,
alt_std,
bar_std,
]
return stations_data
def load_kml(kml_file: str) -> Dict[str, Dict[str, float]]:
"""
load location from a kml file
:param kml_file: full path of the file to load data from
:return: dictionary of locations with identifiers
"""
with open(kml_file, "r", encoding="utf-8") as my_file:
kml_doc = my_file.read()
kml_data = kml.KML()
kml_data.from_string(bytes(kml_doc, encoding="utf8"))
locations = list(list(kml_data.features())[0].features())
set_locations = {}
for place in locations:
set_locations[place.name] = {"lon": place.geometry.x, "lat": place.geometry.y, "alt": place.geometry.z}
return set_locations
def write_kml(kml_file: str, master_dict: Dict[str, Dict[str, float]]):
"""
put information from master_dict into a kml file
:param kml_file: full path of kml file to write data to
:param master_dict: the dictionary of information to write
"""
ns = "{http://www.opengis.net/kml/2.2}"
# declare kml structure and the document
kmlz = kml.KML(ns=ns)
doc = kml.Document(ns, "1")
# declare, then add styles to doc
doc_style = styles.Style(id="2")
pnt_style = styles.IconStyle(id="3", color="ff0000ff")
pnt_style.icon_href = "http://maps.google.com/mapfiles/kml/shapes/placemark_circle.png"
doc_style.append_style(pnt_style)
doc.append_style(doc_style)
# id is assigned dynamically as new elements are created
new_id = 4
for key in master_dict.keys():
# how do we know if bar is better than alt?
# set point description to os and sample rate
description = "{} {}hz".format(master_dict[key]["os"], str(master_dict[key]["sample rate"]))
# declare the placemark, then give it some coordinates
pnt = kml.Placemark(ns, id=str(new_id), name=key, description=description, styleUrl="#2")
new_id += 1
pnt.geometry = Point(master_dict[key]["mean lon"], master_dict[key]["mean lat"], master_dict[key]["mean alt"])
# add placemark to doc
doc.append(pnt)
# add the doc to the kml file
kmlz.append(doc)
# write the kml file, with nice formatting
with open(kml_file, "w", encoding="utf-8") as my_file:
my_file.write('<?xml version="1.0" encoding="UTF-8"?>\n')
my_file.write(kmlz.to_string(prettyprint=True))
Functions
def compute_barometric_height(barometric_pressure: float, sea_pressure: float = 101.325, standard_temp: float = 288.15, molar_air_mass: float = 0.02896, gravity: float = 9.80665, gas_constant: float = 8.3143) ‑> float
-
compute height of a single point using a station's barometric and sea-level pressure
barometric equation from https://www.math24.net/barometric-formula/ :param barometric_pressure: pressure at a station in kPa :param sea_pressure: pressure at sea level in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :param standard_temp: surface temperature in K, default STANDARD_TEMPERATURE_K :param molar_air_mass: molar mass of air in kg/mol, default MOLAR_MASS_AIR_KG_PER_MOL :param gravity: the acceleration of gravity in m/s2, default GRAVITY_M_PER_S2 :param gas_constant: the universal gas constant in (kg * m2)/(K * mol * s2), default UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2 :return: height of station in meters
Expand source code
def compute_barometric_height( barometric_pressure: float, sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA, standard_temp: float = STANDARD_TEMPERATURE_K, molar_air_mass: float = MOLAR_MASS_AIR_KG_PER_MOL, gravity: float = GRAVITY_M_PER_S2, gas_constant: float = UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2, ) -> float: """ compute height of a single point using a station's barometric and sea-level pressure barometric equation from https://www.math24.net/barometric-formula/ :param barometric_pressure: pressure at a station in kPa :param sea_pressure: pressure at sea level in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :param standard_temp: surface temperature in K, default STANDARD_TEMPERATURE_K :param molar_air_mass: molar mass of air in kg/mol, default MOLAR_MASS_AIR_KG_PER_MOL :param gravity: the acceleration of gravity in m/s2, default GRAVITY_M_PER_S2 :param gas_constant: the universal gas constant in (kg * m2)/(K * mol * s2), default UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2 :return: height of station in meters """ # formula and derivations: # P(h) = P0 * e**(-Mgh/RT) where: # P0 = AVG_SEA_LEVEL_PRESSURE_KPA = 101.325 # g = GRAVITY_M_PER_S2 = 9.807 # M = MOLAR_MASS_AIR_KG_PER_MOL = 0.02896 # T = STANDARD_TEMPERATURE_K = 288.15 # R = UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2 = 8.3143 # therefore h = ln(P0/P(h)) / (Mg/RT) # due to log function, we can't let sea_pressure or barometric_pressure be 0 if sea_pressure == 0.0: sea_pressure = EPSILON if barometric_pressure == 0.0: barometric_pressure = EPSILON barometric_height = np.log(sea_pressure / barometric_pressure) / ( (molar_air_mass * gravity) / (standard_temp * gas_constant) ) return barometric_height
def compute_barometric_height_array(barometric_pressure:
, sea_pressure: float = 101.325, standard_temp: float = 288.15, molar_air_mass: float = 0.02896, gravity: float = 9.80665, gas_constant: float = 8.3143) ‑> -
compute height of many points using each station's barometric and sea-level pressure :param barometric_pressure: array of pressures at stations in kPa :param sea_pressure: pressure at sea level in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :param standard_temp: surface temperature in K, default STANDARD_TEMPERATURE_K :param molar_air_mass: molar mass of air in kg/mol, default MOLAR_MASS_AIR_KG_PER_MOL :param gravity: the acceleration of gravity in m/s2, default GRAVITY_M_PER_S2 :param gas_constant: the universal gas constant in (kg * m2)/(K * mol * s2), default UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2 :return: the height of each station in meters
Expand source code
def compute_barometric_height_array( barometric_pressure: np.array, sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA, standard_temp: float = STANDARD_TEMPERATURE_K, molar_air_mass: float = MOLAR_MASS_AIR_KG_PER_MOL, gravity: float = GRAVITY_M_PER_S2, gas_constant: float = UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2, ) -> np.array: """ compute height of many points using each station's barometric and sea-level pressure :param barometric_pressure: array of pressures at stations in kPa :param sea_pressure: pressure at sea level in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :param standard_temp: surface temperature in K, default STANDARD_TEMPERATURE_K :param molar_air_mass: molar mass of air in kg/mol, default MOLAR_MASS_AIR_KG_PER_MOL :param gravity: the acceleration of gravity in m/s2, default GRAVITY_M_PER_S2 :param gas_constant: the universal gas constant in (kg * m2)/(K * mol * s2), default UNIVERSAL_GAS_CONSTANT_KG_M2_PER_K_MOL_S2 :return: the height of each station in meters """ # due to log function, we can't let sea_pressure or barometric_pressure be 0 if sea_pressure == 0.0: sea_pressure = EPSILON for index in range(len(barometric_pressure)): if barometric_pressure[index] == 0.0: barometric_pressure[index] = EPSILON barometric_height = np.log(sea_pressure / barometric_pressure) / ( (molar_air_mass * gravity) / (standard_temp * gas_constant) ) return barometric_height
def compute_distance(point: dict, gps_data: GPSDataHolder) ‑> dict
-
:param point: the chosen point to compute distance to :param gps_data: the data points to compute distance from :return: dictionary containing all information about the gps points' distance to the chosen point
Expand source code
def compute_distance(point: dict, gps_data: GPSDataHolder) -> dict: """ :param point: the chosen point to compute distance to :param gps_data: the data points to compute distance from :return: dictionary containing all information about the gps points' distance to the chosen point """ # for a location, compute distance to closest data point idd = gps_data.id stations_data = {idd: None} gps_loc = gps_data.get_mean_all() # find the closest barometer altitude to location # bar_alt_tmp = (((SEA_PRESSURE / np.array(gps_data.barometer.data)) ** 0.190263096) - 1) * (SOL_TEMP / 0.0065) if "sea bar" in point.keys() and point["sea_bar"] is not None: bar_alt_tmp = compute_barometric_height_array(np.array(gps_data.barometer.get_data()), point["sea_bar"]) else: bar_alt_tmp = compute_barometric_height_array(np.array(gps_data.barometer.get_data())) # simplified barometric equation: # P(h) = 101.325 * e ** (-0.00012h) -> P(h) / 101.325 = 1 / (e ** 0.00012h) # e ** 0.00012h = 101.325 / P(h) -> 0.00012h = ln(101.325) - ln(P)) # SEA_PRESSURE = 101.325 # h = ln(SEA_PRESSURE/P(h)) / 0.00012 min_index = np.argmin(np.abs(bar_alt_tmp - point["alt"])) gps_data.barometer.best_value = gps_data.barometer.get_data()[min_index] bar_alt = bar_alt_tmp[min_index] # for all gps coords, find closest to solution dist_array = get_gps_dist_to_location(point, gps_data) min_index = np.argmin(dist_array) # compute distance using best barometer measurement dist_array_bar = get_gps_dist_to_location(point, gps_data, bar_alt) min_bar_index = np.argmin(dist_array_bar) # compare minimum of pure gps and gps with barometer if dist_array_bar[min_bar_index] < dist_array_bar[min_index]: min_index = min_bar_index dist_array = dist_array_bar # finding the std of the distances is basically finding the std of accuracy acc_std = np.std(dist_array) lat_std = np.std(np.abs(point["lat"] - gps_data.gps_df.loc["latitude"].to_numpy())) lon_std = np.std(np.abs(point["lon"] - gps_data.gps_df.loc["longitude"].to_numpy())) alt_std = np.std(np.abs(point["alt"] - gps_data.gps_df.loc["altitude"].to_numpy())) bar_std = gps_data.barometer.get_std() # put data into dictionary to store in data frames later stations_data[idd] = [ gps_data.os_type, gps_data.mic_samp_rate_hz, gps_data.gps_df.loc["accuracy", min_index], gps_data.gps_df.loc["latitude", min_index], gps_data.gps_df.loc["longitude", min_index], gps_data.gps_df.loc["altitude", min_index], gps_data.barometer.best_value, dist_array[min_index], gps_loc["acc"], gps_loc["lat"], gps_loc["lon"], gps_loc["alt"], gps_loc["bar"], acc_std, lat_std, lon_std, alt_std, bar_std, ] return stations_data
def compute_distance_all(point: Dict[str, float], all_gps_data: List[GPSDataHolder]) ‑> pandas.core.frame.DataFrame
-
:param point: the point to compute distance to :param all_gps_data: the gps data points to compute distance from :return: dataframe containing all information about the gps points' distance to the chosen point
Expand source code
def compute_distance_all(point: Dict[str, float], all_gps_data: List[GPSDataHolder]) -> pd.DataFrame: """ :param point: the point to compute distance to :param all_gps_data: the gps data points to compute distance from :return: dataframe containing all information about the gps points' distance to the chosen point """ # compare distances from multiple gps points to the location. return the closest point closeness = {} for gps_dh in all_gps_data: closeness.update(compute_distance(point, gps_dh)) return pd.DataFrame(closeness, index=MASTER_COLUMNS).T
def get_all_android_station(station_df: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
-
:param station_df: the dataframe to search :return: a dataframe with all data related to android stations in the dataframe
Expand source code
def get_all_android_station(station_df: pd.DataFrame) -> pd.DataFrame: """ :param station_df: the dataframe to search :return: a dataframe with all data related to android stations in the dataframe """ return station_df.loc[station_df["os"] == "Android"]
def get_all_ios_station(station_df: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame
-
:param station_df: the dataframe to search :return: a dataframe with all data related to iOS stations in the dataframe
Expand source code
def get_all_ios_station(station_df: pd.DataFrame) -> pd.DataFrame: """ :param station_df: the dataframe to search :return: a dataframe with all data related to iOS stations in the dataframe """ return station_df.loc[station_df["os"] == "iOS"]
def get_component_dist_to_point(point: Dict[str, float], gps_data: pandas.core.series.Series, bar_mean: float) ‑> (
, , ) -
compute distance from the gps data point to the chosen point using haversine formula :param point: dict with location to compute distance to :param gps_data: series with gps data of one point :param bar_mean: the mean barometer reading :return: the distance in meters of the horizontal and vertical gps components and barometer readings
Expand source code
def get_component_dist_to_point(point: Dict[str, float], gps_data: pd.Series, bar_mean: float) -> (float, float, float): """ compute distance from the gps data point to the chosen point using haversine formula :param point: dict with location to compute distance to :param gps_data: series with gps data of one point :param bar_mean: the mean barometer reading :return: the distance in meters of the horizontal and vertical gps components and barometer readings """ # horizontal distance, use haversine formula dlon = gps_data["longitude"] - point["lon"] dlat = gps_data["latitude"] - point["lat"] haver = np.sin(dlat * DEG_TO_RAD / 2.0) ** 2.0 + ( np.cos(point["lat"] * DEG_TO_RAD) * np.cos(gps_data["latitude"] * DEG_TO_RAD) * np.sin(dlon * DEG_TO_RAD / 2.0) ** 2.0 ) c = 2.0 * np.arcsin(np.min([1.0, np.sqrt(haver)])) h_dist = EARTH_RADIUS_M * c # vertical distance v_dist = np.abs(gps_data["altitude"] - point["alt"]) # vertical distance using barometer v_bar_dist = np.abs(compute_barometric_height(bar_mean) - point["alt"]) return h_dist, v_dist, v_bar_dist
def get_gps_dist_to_location(point: Dict[str, float], gps_dataholder: GPSDataHolder, bar_alt: Optional[float] = None) ‑>
-
compute distance from multiple gps points to the chosen point using haversine formula :param point: dict with location to compute distance to :param gps_dataholder: all the gps data points to compute distance from :param bar_alt: height as measured by a barometer, default None :return: array of all distances in meters from gps point to chosen point
Expand source code
def get_gps_dist_to_location( point: Dict[str, float], gps_dataholder: GPSDataHolder, bar_alt: Optional[float] = None ) -> np.array: """ compute distance from multiple gps points to the chosen point using haversine formula :param point: dict with location to compute distance to :param gps_dataholder: all the gps data points to compute distance from :param bar_alt: height as measured by a barometer, default None :return: array of all distances in meters from gps point to chosen point """ # compute distance from the gps data points to the location # if given a barometer altitude value, use that instead of the gps altitude if bar_alt is not None: station_alt = bar_alt else: station_alt = gps_dataholder.gps_df.loc["altitude"].to_numpy() # user haversine formula dlon = gps_dataholder.gps_df.loc["longitude"].to_numpy() - point["lon"] dlat = gps_dataholder.gps_df.loc["latitude"].to_numpy() - point["lat"] haver = np.sin(dlat * DEG_TO_RAD / 2.0) ** 2.0 + ( np.cos(point["lat"] * DEG_TO_RAD) * np.cos(gps_dataholder.gps_df.loc["latitude"].to_numpy() * DEG_TO_RAD) * np.sin(dlon * DEG_TO_RAD / 2.0) ** 2.0 ) c = 2 * np.arcsin(np.minimum([1.0], np.sqrt(haver))) h_dist = EARTH_RADIUS_M * c dist_array = h_dist**2 + (point["alt"] - station_alt) ** 2 return np.sqrt(dist_array)
def load_kml(kml_file: str) ‑> Dict[str, Dict[str, float]]
-
load location from a kml file :param kml_file: full path of the file to load data from :return: dictionary of locations with identifiers
Expand source code
def load_kml(kml_file: str) -> Dict[str, Dict[str, float]]: """ load location from a kml file :param kml_file: full path of the file to load data from :return: dictionary of locations with identifiers """ with open(kml_file, "r", encoding="utf-8") as my_file: kml_doc = my_file.read() kml_data = kml.KML() kml_data.from_string(bytes(kml_doc, encoding="utf8")) locations = list(list(kml_data.features())[0].features()) set_locations = {} for place in locations: set_locations[place.name] = {"lon": place.geometry.x, "lat": place.geometry.y, "alt": place.geometry.z} return set_locations
def load_position_data(w_p: List[WrappedRedvoxPacket]) ‑> GPSDataHolder
-
:param w_p: list of wrapped packets to read :return: all gps data from the packets in a GPSDataHolder
Expand source code
def load_position_data(w_p: List[reader.WrappedRedvoxPacket]) -> GPSDataHolder: """ :param w_p: list of wrapped packets to read :return: all gps data from the packets in a GPSDataHolder """ gps_data = [[], [], [], []] packet = None packet_name = None bar_data = [] try: for packet in w_p: packet_name = packet.default_filename() if packet.has_barometer_sensor(): bar_chan = packet.barometer_sensor() # load barometer data bar_data.extend(bar_chan.payload_values()) else: # add defaults bar_data.extend([0.0]) print("WARNING: {} Barometer empty, using default values!".format(packet_name)) if packet.has_location_sensor(): # load each channel's data into the container loc_chan = packet.location_sensor() gps_data[0].extend(loc_chan.payload_values_latitude()) gps_data[1].extend(loc_chan.payload_values_longitude()) gps_data[2].extend(loc_chan.payload_values_altitude()) gps_data[3].extend(loc_chan.payload_values_accuracy()) else: # add defaults gps_data[0].extend([0.0]) gps_data[1].extend([0.0]) gps_data[2].extend([0.0]) gps_data[3].extend([0.0]) print("WARNING: {} Location empty, using default values!".format(packet_name)) except Exception as eror: if packet is not None: error_string = "Something went wrong while reading location data from file: {}. " "Original message: {}" raise Exception(error_string.format(packet_name, eror)) else: raise Exception("No packet found in file. Original message: {}".format(eror)) # load data into data holder redvox_id = w_p[0].redvox_id() gps_dfh = GPSDataHolder(str(redvox_id), w_p[0].device_os(), gps_data, w_p[0].microphone_sensor().sample_rate_hz()) gps_dfh.set_barometer(bar_data) return gps_dfh
def point_on_line_side(line_points: Tuple[Dict[str, float], Dict[str, float]], point: Dict[str, float]) ‑> float
-
check which side of a line the point is on
algorithm from: http://geomalgorithms.com/a03-_inclusion.html :param line_points: two coordinates that define a line :param point: point to test :return: < 0 for right side, == 0 for on line, > 0 for left side
Expand source code
def point_on_line_side(line_points: Tuple[Dict[str, float], Dict[str, float]], point: Dict[str, float]) -> float: """ check which side of a line the point is on algorithm from: http://geomalgorithms.com/a03-_inclusion.html :param line_points: two coordinates that define a line :param point: point to test :return: < 0 for right side, == 0 for on line, > 0 for left side """ return (line_points[1]["lon"] - line_points[0]["lon"]) * (point["lat"] - line_points[0]["lat"]) - ( point["lon"] - line_points[0]["lon"] ) * (line_points[1]["lat"] - line_points[0]["lat"])
def print_station_df(station_df: pandas.core.frame.DataFrame, os_type: Optional[str] = None)
-
print a dataframe, filtering on the station's os type :param station_df: a dataframe to search :param os_type: os type to filter on, default None
Expand source code
def print_station_df(station_df: pd.DataFrame, os_type: Optional[str] = None): """ print a dataframe, filtering on the station's os type :param station_df: a dataframe to search :param os_type: os type to filter on, default None """ if os_type == "Android": print(get_all_android_station(station_df)) elif os_type == "iOS": print(get_all_ios_station(station_df)) else: print(station_df)
def validate(data_to_test: GPSDataHolder, inclusion_ranges: Tuple[float, float, float] = (100.0, 50.0, 10.0), validation_type: str = None, validation_points: List[Dict[str, float]] = None, debug: bool = False) ‑> GPSDataHolder
-
validation master function. Can perform any kind of validation requested :param data_to_test: gps data to validate :param inclusion_ranges: ranges to include a data point with a validation point :param validation_type: the kind of validation to perform, default None :param validation_points: the points to validate against, default None :param debug: if True, output debugging information, default False :return: all valid gps data
Expand source code
def validate( data_to_test: GPSDataHolder, inclusion_ranges: Tuple[float, float, float] = ( DEFAULT_INCLUSION_HORIZONTAL_M, DEFAULT_INCLUSION_VERTICAL_M, DEFAULT_INCLUSION_VERTICAL_BAR_M, ), validation_type: str = None, validation_points: List[Dict[str, float]] = None, debug: bool = False, ) -> GPSDataHolder: """ validation master function. Can perform any kind of validation requested :param data_to_test: gps data to validate :param inclusion_ranges: ranges to include a data point with a validation point :param validation_type: the kind of validation to perform, default None :param validation_points: the points to validate against, default None :param debug: if True, output debugging information, default False :return: all valid gps data """ # perform validation. returns all valid data # check if we even have points to compare against if len(validation_points) < 1: return data_to_test # no points to check, everything is good need_to_test_gps = data_to_test.clone() while True: # keep going until the data doesn't change # remove any points in the data that are not close to the points validated_gps_data = [[], [], [], []] for gps_point in data_to_test.gps_df.columns: # extract data to test gps_data = data_to_test.gps_df[gps_point] point_valid = True # assume point is valid for point in validation_points: if validation_type == "solution" or validation_type == "mean": point_valid = validate_near_point( gps_data, point, data_to_test.barometer.get_mean(), inclusion_ranges ) if not point_valid: # if point_valid ever becomes false, we can stop processing break else: point_valid = validate_blacklist( gps_data, point, data_to_test.barometer.get_mean(), inclusion_ranges ) if not point_valid: # if point_valid ever becomes false, we can stop processing break if point_valid: # add only valid points validated_gps_data[0].append(gps_data["latitude"]) validated_gps_data[1].append(gps_data["longitude"]) validated_gps_data[2].append(gps_data["altitude"]) validated_gps_data[3].append(gps_data["accuracy"]) # create the object to return. validated_gps = GPSDataHolder( data_to_test.id, data_to_test.os_type, validated_gps_data, data_to_test.mic_samp_rate_hz, data_to_test.barometer, ) if validated_gps.get_size() == need_to_test_gps.get_size(): # print message if user allows it if debug: print("{} data validated".format(validated_gps.id)) # if data does not change, we are done validating return validated_gps else: # use the new data to update the old data need_to_test_gps = validated_gps.clone()
def validate_blacklist(gps_data: pandas.core.series.Series, point: Dict[str, float], bar_mean: float, inclusion_ranges: Tuple[float, float, float] = (100.0, 50.0, 10.0)) ‑> bool
-
:param gps_data: data to compare :param point: the point that is blacklisted :param bar_mean: the mean of the barometer measurements :param inclusion_ranges: distance from blacklisted point to be considered close enough :return: True if point is not in blacklisted point's vicinity
Expand source code
def validate_blacklist( gps_data: pd.Series, point: Dict[str, float], bar_mean: float, inclusion_ranges: Tuple[float, float, float] = ( DEFAULT_INCLUSION_HORIZONTAL_M, DEFAULT_INCLUSION_VERTICAL_M, DEFAULT_INCLUSION_VERTICAL_BAR_M, ), ) -> bool: """ :param gps_data: data to compare :param point: the point that is blacklisted :param bar_mean: the mean of the barometer measurements :param inclusion_ranges: distance from blacklisted point to be considered close enough :return: True if point is not in blacklisted point's vicinity """ # calculate distance from gps data to invalid point h_dist, v_dist, v_bar_dist = get_component_dist_to_point(point, gps_data, bar_mean) # if outside horizontal and vertical distance, we're far enough away from the invalid point return h_dist > inclusion_ranges[0] and (v_dist > inclusion_ranges[1] or v_bar_dist > inclusion_ranges[2])
def validate_near_point(gps_data: pandas.core.series.Series, point: Dict[str, float], bar_mean: float, inclusion_ranges: Tuple[float, float, float] = (100.0, 50.0, 10.0)) ‑> bool
-
:param gps_data: data to compare :param point: the chosen point to compare against :param bar_mean: the mean of the barometer measurements :param inclusion_ranges: distance from chosen point to be considered close enough :return: True if point is within the chosen point's vicinity
Expand source code
def validate_near_point( gps_data: pd.Series, point: Dict[str, float], bar_mean: float, inclusion_ranges: Tuple[float, float, float] = ( DEFAULT_INCLUSION_HORIZONTAL_M, DEFAULT_INCLUSION_VERTICAL_M, DEFAULT_INCLUSION_VERTICAL_BAR_M, ), ) -> bool: """ :param gps_data: data to compare :param point: the chosen point to compare against :param bar_mean: the mean of the barometer measurements :param inclusion_ranges: distance from chosen point to be considered close enough :return: True if point is within the chosen point's vicinity """ # calculate distance from gps data to point h_dist, v_dist, v_bar_dist = get_component_dist_to_point(point, gps_data, bar_mean) # if within horizontal distance and vertical distance, we're close enough to the point return h_dist <= inclusion_ranges[0] and (v_dist <= inclusion_ranges[1] or v_bar_dist <= inclusion_ranges[2])
def validate_point_in_polygon(point: Dict[str, float], edges: List[Dict[str, float]]) ‑> bool
-
Use winding number algorithm to determine if a point is in a polygon (or on the edge)
if winding number is 0, point is outside polygon
algorithm from: http://geomalgorithms.com/a03-_inclusion.html :param point: coordinates of the point to compare :param edges: list of coordinates of the edges of the polygon, with the last edge equal to the first :return: True if point is in the polygon
Expand source code
def validate_point_in_polygon(point: Dict[str, float], edges: List[Dict[str, float]]) -> bool: """ Use winding number algorithm to determine if a point is in a polygon (or on the edge) if winding number is 0, point is outside polygon algorithm from: http://geomalgorithms.com/a03-_inclusion.html :param point: coordinates of the point to compare :param edges: list of coordinates of the edges of the polygon, with the last edge equal to the first :return: True if point is in the polygon """ wn = 0 # winding number for index in range(len(edges) - 1): if edges[index]["lat"] <= point["lat"]: if edges[index + 1]["lat"] > point["lat"]: if point_on_line_side((edges[index], edges[index + 1]), point) >= 0: wn += 1 elif edges[index + 1]["lat"] <= point["lat"]: if point_on_line_side((edges[index], edges[index + 1]), point) <= 0: wn -= 1 return wn != 0
def write_kml(kml_file: str, master_dict: Dict[str, Dict[str, float]])
-
put information from master_dict into a kml file :param kml_file: full path of kml file to write data to :param master_dict: the dictionary of information to write
Expand source code
def write_kml(kml_file: str, master_dict: Dict[str, Dict[str, float]]): """ put information from master_dict into a kml file :param kml_file: full path of kml file to write data to :param master_dict: the dictionary of information to write """ ns = "{http://www.opengis.net/kml/2.2}" # declare kml structure and the document kmlz = kml.KML(ns=ns) doc = kml.Document(ns, "1") # declare, then add styles to doc doc_style = styles.Style(id="2") pnt_style = styles.IconStyle(id="3", color="ff0000ff") pnt_style.icon_href = "http://maps.google.com/mapfiles/kml/shapes/placemark_circle.png" doc_style.append_style(pnt_style) doc.append_style(doc_style) # id is assigned dynamically as new elements are created new_id = 4 for key in master_dict.keys(): # how do we know if bar is better than alt? # set point description to os and sample rate description = "{} {}hz".format(master_dict[key]["os"], str(master_dict[key]["sample rate"])) # declare the placemark, then give it some coordinates pnt = kml.Placemark(ns, id=str(new_id), name=key, description=description, styleUrl="#2") new_id += 1 pnt.geometry = Point(master_dict[key]["mean lon"], master_dict[key]["mean lat"], master_dict[key]["mean alt"]) # add placemark to doc doc.append(pnt) # add the doc to the kml file kmlz.append(doc) # write the kml file, with nice formatting with open(kml_file, "w", encoding="utf-8") as my_file: my_file.write('<?xml version="1.0" encoding="UTF-8"?>\n') my_file.write(kmlz.to_string(prettyprint=True))
Classes
class DataHolder (name: str)
-
Stores an array of float data. The data is privatized for security. It also keeps track of the "best value" of the data set.
Properties
- id: a string identifier for the data
- _data: private data storage; all values must be floats
- best_value: the value that best represents the data set
sets up the DataHolder :param name: a string identifier for the data
Expand source code
class DataHolder: """ Stores an array of float data. The data is privatized for security. It also keeps track of the "best value" of the data set. Properties: * id: a string identifier for the data * _data: private data storage; all values must be floats * best_value: the value that best represents the data set """ # def __init__(self, name: str): """ sets up the DataHolder :param name: a string identifier for the data """ self.id = name self._data = [] self.best_value = None def add(self, new_data: float): """ adds one element to the data :param new_data: float value to add """ self._data.append(new_data) self.replace_zeroes_with_epsilon() def set_data(self, new_data: List[float]): """ overwrites the stored data with the new_data :param new_data: the new list of floats to overwrite the existing data with """ self._data = new_data self.replace_zeroes_with_epsilon() def replace_zeroes_with_epsilon(self): """ replaces all 0 values in the data with extremely tiny values """ for index in range(len(self._data)): if self._data[index] == 0.0: self._data[index] = EPSILON def get_mean(self) -> float: """ :return: the mean of the data """ return np.mean(self._data) def get_std(self) -> float: """ :return: the standard deviation of the data """ return np.std(self._data) def get_data(self) -> List[float]: """ :return: the data """ return self._data def get_len_data(self) -> int: """ :return: the length of the data array """ return len(self._data)
Methods
def add(self, new_data: float)
-
adds one element to the data :param new_data: float value to add
Expand source code
def add(self, new_data: float): """ adds one element to the data :param new_data: float value to add """ self._data.append(new_data) self.replace_zeroes_with_epsilon()
def get_data(self) ‑> List[float]
-
:return: the data
Expand source code
def get_data(self) -> List[float]: """ :return: the data """ return self._data
def get_len_data(self) ‑> int
-
:return: the length of the data array
Expand source code
def get_len_data(self) -> int: """ :return: the length of the data array """ return len(self._data)
def get_mean(self) ‑> float
-
:return: the mean of the data
Expand source code
def get_mean(self) -> float: """ :return: the mean of the data """ return np.mean(self._data)
def get_std(self) ‑> float
-
:return: the standard deviation of the data
Expand source code
def get_std(self) -> float: """ :return: the standard deviation of the data """ return np.std(self._data)
def replace_zeroes_with_epsilon(self)
-
replaces all 0 values in the data with extremely tiny values
Expand source code
def replace_zeroes_with_epsilon(self): """ replaces all 0 values in the data with extremely tiny values """ for index in range(len(self._data)): if self._data[index] == 0.0: self._data[index] = EPSILON
def set_data(self, new_data: List[float])
-
overwrites the stored data with the new_data :param new_data: the new list of floats to overwrite the existing data with
Expand source code
def set_data(self, new_data: List[float]): """ overwrites the stored data with the new_data :param new_data: the new list of floats to overwrite the existing data with """ self._data = new_data self.replace_zeroes_with_epsilon()
class GPSDataHolder (name: str, opsys: str, data: Optional[List[List[float]]] = None, mic_samp_rate_hz: float = 80.0, bar: Optional[DataHolder] = None)
-
holds gps data (latitude, longitude, altitude, and accuracy) and barometric data uses a dataframe to organize the gps data
Properties
- gps_df: a dataframe to hold all the gps data
- barometer: a DataHolder for barometer data
- id: string identifier for the data set
- os_type: string identifier for the operating system of the data set
- mic_samp_rate_hz: float sample rate of station microphone in hz
- best_data_index: the index that corresponds to the best representative of the data
sets up the GPSDataHolder :param name: string identifier for the data set :param opsys: string identifier for the data set's operating system :param data: the data as a list of list of floats, default None :param mic_samp_rate_hz: float sample rate of the microphone in hz, default 80 hz :param bar: barometer DataHolder, default None
Expand source code
class GPSDataHolder: """ holds gps data (latitude, longitude, altitude, and accuracy) and barometric data uses a dataframe to organize the gps data Properties: * gps_df: a dataframe to hold all the gps data * barometer: a DataHolder for barometer data * id: string identifier for the data set * os_type: string identifier for the operating system of the data set * mic_samp_rate_hz: float sample rate of station microphone in hz * best_data_index: the index that corresponds to the best representative of the data """ def __init__( self, name: str, opsys: str, data: Optional[List[List[float]]] = None, mic_samp_rate_hz: float = 80.0, bar: Optional[DataHolder] = None, ): """ sets up the GPSDataHolder :param name: string identifier for the data set :param opsys: string identifier for the data set's operating system :param data: the data as a list of list of floats, default None :param mic_samp_rate_hz: float sample rate of the microphone in hz, default 80 hz :param bar: barometer DataHolder, default None """ self.gps_df = pd.DataFrame(data, index=GPS_DATA_INDICES) self.barometer = bar self.id = name self.os_type = opsys self.mic_samp_rate_hz = mic_samp_rate_hz self.best_data_index = 0 def clone(self): """ :return: an exact copy of the GPSDataHolder """ # return a copy of the calling data frame new_gps_dh = GPSDataHolder(self.id, self.os_type, None, self.mic_samp_rate_hz, self.barometer) new_gps_dh.gps_df = self.gps_df new_gps_dh.best_data_index = self.best_data_index return new_gps_dh def set_data(self, new_data: Optional[List[List[float]]] = None): """ set gps location data. data is expected to be 4 lists: latitude values, longitude values, altitude values, and accuracy values :param new_data: list of list of floats that represent the gps data, default None """ self.gps_df = pd.DataFrame(new_data, index=GPS_DATA_INDICES) def set_metadata( self, new_id: Optional[str] = None, new_os: Optional[str] = None, new_mic_samp_rate_hz: Optional[float] = None ): """ set metadata fields: id, os_type and mic_sample_rate_hz :param new_id: the new string identifier for the data set, default None :param new_os: the new string identifier for the data set's os, default None :param new_mic_samp_rate_hz: float of new microphone sample rate in hz, default None """ if new_id is not None: self.id = new_id if new_os is not None: self.os_type = new_os if new_mic_samp_rate_hz is not None: self.mic_samp_rate_hz = new_mic_samp_rate_hz def get_mean_all(self) -> Dict[str, float]: """ :return: means of the latitude, longitude, altitude, accuracy, and barometer """ bar_mean = self.barometer.get_mean() if bar_mean == 0 or bar_mean is None: bar_mean = 0.00000000001 lat_mean = self.gps_df.loc["latitude"].mean() lon_mean = self.gps_df.loc["longitude"].mean() alt_mean = self.gps_df.loc["altitude"].mean() acc_mean = self.gps_df.loc["accuracy"].mean() return {"acc": acc_mean, "lat": lat_mean, "lon": lon_mean, "alt": alt_mean, "bar": bar_mean} def get_std_all(self) -> Dict[str, float]: """ :return: standard deviations of the latitude, longitude, altitude, accuracy, and barometer """ bar_std = self.barometer.get_std() if bar_std == 0 or bar_std is None: bar_std = 0.00000000001 lat_std = self.gps_df.loc["latitude"].std() if np.isnan(lat_std): lat_std = 0 lon_std = self.gps_df.loc["longitude"].std() if np.isnan(lon_std): lon_std = 0 alt_std = self.gps_df.loc["altitude"].std() if np.isnan(alt_std): alt_std = 0 acc_std = self.gps_df.loc["accuracy"].std() if np.isnan(acc_std): acc_std = 0 return {"acc": acc_std, "lat": lat_std, "lon": lon_std, "alt": alt_std, "bar": bar_std} def set_barometer(self, bar_data: List[float]): """ sets the barometer DataHolder. uses the mean of the data as the best value :param bar_data: list of floats to set barometer data as """ self.barometer = DataHolder("barometer") self.barometer.set_data(bar_data) self.barometer.best_value = np.mean(bar_data) def get_size(self) -> (int, int): """ :return: the amount of gps and barometer data points """ return self.gps_df.iloc[0].size, self.barometer.get_len_data()
Methods
def clone(self)
-
:return: an exact copy of the GPSDataHolder
Expand source code
def clone(self): """ :return: an exact copy of the GPSDataHolder """ # return a copy of the calling data frame new_gps_dh = GPSDataHolder(self.id, self.os_type, None, self.mic_samp_rate_hz, self.barometer) new_gps_dh.gps_df = self.gps_df new_gps_dh.best_data_index = self.best_data_index return new_gps_dh
def get_mean_all(self) ‑> Dict[str, float]
-
:return: means of the latitude, longitude, altitude, accuracy, and barometer
Expand source code
def get_mean_all(self) -> Dict[str, float]: """ :return: means of the latitude, longitude, altitude, accuracy, and barometer """ bar_mean = self.barometer.get_mean() if bar_mean == 0 or bar_mean is None: bar_mean = 0.00000000001 lat_mean = self.gps_df.loc["latitude"].mean() lon_mean = self.gps_df.loc["longitude"].mean() alt_mean = self.gps_df.loc["altitude"].mean() acc_mean = self.gps_df.loc["accuracy"].mean() return {"acc": acc_mean, "lat": lat_mean, "lon": lon_mean, "alt": alt_mean, "bar": bar_mean}
def get_size(self) ‑> (
, ) -
:return: the amount of gps and barometer data points
Expand source code
def get_size(self) -> (int, int): """ :return: the amount of gps and barometer data points """ return self.gps_df.iloc[0].size, self.barometer.get_len_data()
def get_std_all(self) ‑> Dict[str, float]
-
:return: standard deviations of the latitude, longitude, altitude, accuracy, and barometer
Expand source code
def get_std_all(self) -> Dict[str, float]: """ :return: standard deviations of the latitude, longitude, altitude, accuracy, and barometer """ bar_std = self.barometer.get_std() if bar_std == 0 or bar_std is None: bar_std = 0.00000000001 lat_std = self.gps_df.loc["latitude"].std() if np.isnan(lat_std): lat_std = 0 lon_std = self.gps_df.loc["longitude"].std() if np.isnan(lon_std): lon_std = 0 alt_std = self.gps_df.loc["altitude"].std() if np.isnan(alt_std): alt_std = 0 acc_std = self.gps_df.loc["accuracy"].std() if np.isnan(acc_std): acc_std = 0 return {"acc": acc_std, "lat": lat_std, "lon": lon_std, "alt": alt_std, "bar": bar_std}
def set_barometer(self, bar_data: List[float])
-
sets the barometer DataHolder. uses the mean of the data as the best value :param bar_data: list of floats to set barometer data as
Expand source code
def set_barometer(self, bar_data: List[float]): """ sets the barometer DataHolder. uses the mean of the data as the best value :param bar_data: list of floats to set barometer data as """ self.barometer = DataHolder("barometer") self.barometer.set_data(bar_data) self.barometer.best_value = np.mean(bar_data)
def set_data(self, new_data: Optional[List[List[float]]] = None)
-
set gps location data. data is expected to be 4 lists: latitude values, longitude values, altitude values, and accuracy values :param new_data: list of list of floats that represent the gps data, default None
Expand source code
def set_data(self, new_data: Optional[List[List[float]]] = None): """ set gps location data. data is expected to be 4 lists: latitude values, longitude values, altitude values, and accuracy values :param new_data: list of list of floats that represent the gps data, default None """ self.gps_df = pd.DataFrame(new_data, index=GPS_DATA_INDICES)
def set_metadata(self, new_id: Optional[str] = None, new_os: Optional[str] = None, new_mic_samp_rate_hz: Optional[float] = None)
-
set metadata fields: id, os_type and mic_sample_rate_hz :param new_id: the new string identifier for the data set, default None :param new_os: the new string identifier for the data set's os, default None :param new_mic_samp_rate_hz: float of new microphone sample rate in hz, default None
Expand source code
def set_metadata( self, new_id: Optional[str] = None, new_os: Optional[str] = None, new_mic_samp_rate_hz: Optional[float] = None ): """ set metadata fields: id, os_type and mic_sample_rate_hz :param new_id: the new string identifier for the data set, default None :param new_os: the new string identifier for the data set's os, default None :param new_mic_samp_rate_hz: float of new microphone sample rate in hz, default None """ if new_id is not None: self.id = new_id if new_os is not None: self.os_type = new_os if new_mic_samp_rate_hz is not None: self.mic_samp_rate_hz = new_mic_samp_rate_hz
class LocationAnalyzer (wrapped_packets: List[List[WrappedRedvoxPacket]] = None, real_location: Optional[Dict[str, float]] = None, invalid_points: Optional[List[Dict[str, float]]] = None)
-
stores location information, which can be analyzed later contains functions to find mean, standard deviation (std) and validation of data use one analyzer per real location point. one analyzer can accommodate multiple stations per survey point the real location dictionary must contain the minimum keys listed in SURVEY_KEYS the real location dictionary may contain other keys than ones listed in SURVEY_KEYS keys in OPTIONAL_SURVEY_KEYS have special meaning and can only be used as this program intends to use them uses dataframes with station id as the index
Properties
- all_stations_info_df: dataframe with metadata about all stations
- all_stations_mean_df: dataframe with means from all stations
- all_stations_std_df: dataframe with stds from all stations
- all_stations_closest_df: dataframe with the closest point to the real location and its distance to the real location for all stations
- invalid_points: a list of gps points that are blacklisted
- _real_location: the surveyed point that the station is located at. privatized for security
- all_gps_data: a list of all GPSDataHolders that form the data set
- valid_gps_data: a list of all GPSDataHolders that pass validation checks
set up the LocationAnalyzer :param wrapped_packets: a list of wrapped redvox packet lists to analyze, default None :param real_location: dictionary containing the real location of the station, default None :param invalid_points: list of gps points that should not be in the data set, default None
Expand source code
class LocationAnalyzer: """ stores location information, which can be analyzed later contains functions to find mean, standard deviation (std) and validation of data use one analyzer per real location point. one analyzer can accommodate multiple stations per survey point the real location dictionary must contain the minimum keys listed in SURVEY_KEYS the real location dictionary may contain other keys than ones listed in SURVEY_KEYS keys in OPTIONAL_SURVEY_KEYS have special meaning and can only be used as this program intends to use them uses dataframes with station id as the index Properties: * all_stations_info_df: dataframe with metadata about all stations * all_stations_mean_df: dataframe with means from all stations * all_stations_std_df: dataframe with stds from all stations * all_stations_closest_df: dataframe with the closest point to the real location and its distance to the real location for all stations * invalid_points: a list of gps points that are blacklisted * _real_location: the surveyed point that the station is located at. privatized for security * all_gps_data: a list of all GPSDataHolders that form the data set * valid_gps_data: a list of all GPSDataHolders that pass validation checks """ def __init__( self, wrapped_packets: List[List[reader.WrappedRedvoxPacket]] = None, real_location: Optional[Dict[str, float]] = None, invalid_points: Optional[List[Dict[str, float]]] = None, ): """ set up the LocationAnalyzer :param wrapped_packets: a list of wrapped redvox packet lists to analyze, default None :param real_location: dictionary containing the real location of the station, default None :param invalid_points: list of gps points that should not be in the data set, default None """ self.all_stations_closest_df = pd.DataFrame([], columns=CLOSEST_TO_SURVEY_COLUMNS) self.all_stations_mean_df = pd.DataFrame([], columns=MEAN_LOC_COLUMNS) self.all_stations_std_df = pd.DataFrame([], columns=STD_LOC_COLUMNS) self.all_stations_info_df = pd.DataFrame([], columns=STATION_INFO_COLUMNS) self.invalid_points = invalid_points self.all_gps_data = [] self.valid_gps_data = [] self._real_location = real_location # if given a path to redvox data, load data from there if wrapped_packets is not None: for wrapped_device_packets in wrapped_packets: self.get_loc_from_packets(wrapped_device_packets) def set_real_location(self, survey: Dict[str, float] = None): """ set the real location :param survey: dictionary containing the station's location, default None """ self._real_location = survey def get_real_location(self) -> Dict[str, float]: """ :return: the station's real location """ return self._real_location def get_all_dataframes(self) -> pd.DataFrame: """ :return: all 4 dataframes fused together, joined by station id """ frames = [ self.all_stations_info_df, self.all_stations_closest_df, self.all_stations_mean_df, self.all_stations_std_df, ] return pd.concat(frames, axis=1) def get_stats_dataframes(self) -> pd.DataFrame: """ :return: station info, mean and std dataframes fused together """ frames = [self.all_stations_info_df, self.all_stations_mean_df, self.all_stations_std_df] return pd.concat(frames, axis=1) def get_loc_from_packets(self, w_p: List[reader.WrappedRedvoxPacket]): """ store the location information and their mean and std using a collection of wrapped redvox packets assumes a list of redvox packets shares 1 device id :param w_p: a list of wrapped redvox packets to read """ # extract the information from the packets sample_rate = w_p[0].microphone_sensor().sample_rate_hz() dev_os_type = w_p[0].device_os() idd = w_p[0].redvox_id() packet_gps_data = load_position_data(w_p) # compute mean location mean_loc = packet_gps_data.get_mean_all() std_loc = packet_gps_data.get_std_all() # store the information self.all_gps_data.append(packet_gps_data) self.all_stations_info_df.loc[idd] = [dev_os_type, sample_rate] self.all_stations_std_df.loc[idd] = [ std_loc["acc"], std_loc["lat"], std_loc["lon"], std_loc["alt"], std_loc["bar"], ] self.all_stations_mean_df.loc[idd] = [ mean_loc["acc"], mean_loc["lat"], mean_loc["lon"], mean_loc["alt"], mean_loc["bar"], ] def analyze_data(self, write_output: bool = False): """ analyze data, then if a real location exists, compare data to real location output is written if enabled :param write_output: boolean to write any debugging output, default False """ self.validate_all() # if there's no real location, make the mean the real location if self._real_location is None: means = self.all_stations_mean_df self._real_location = { "lat": np.mean(means["mean lat"]), "lon": np.mean(means["mean lon"]), "alt": np.mean(means["mean alt"]), "bar": np.mean(means["mean bar"]), } self.compare_with_real_location() # print results if write_output: self.print_to_csv("temp.csv") def get_barometric_heights(self, sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA) -> pd.DataFrame: """ for each station, compute the barometric height using the mean :param sea_pressure: the local sea pressure in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :return: a dataframe with the barometric heights in meters and station id as the index """ bar_heights = {} data_dict = self.all_stations_mean_df["mean bar"].T.to_dict() for index in data_dict.keys(): bar_heights[index] = compute_barometric_height(data_dict[index], sea_pressure) barometric_heights = pd.DataFrame(bar_heights, index=["bar height"], columns=self.all_stations_mean_df.index) return barometric_heights.T def validate_all( self, validation_ranges: Tuple[float, float, float] = ( DEFAULT_INCLUSION_HORIZONTAL_M, DEFAULT_INCLUSION_VERTICAL_M, DEFAULT_INCLUSION_VERTICAL_BAR_M, ), ): """ check that all data in the data set are valid. Remove outliers and strange values :param validation_ranges: tuple of floats that the data values are compared against for validation """ # validation always assumes nothing is valid when it starts, so empty out the valid_gps_data self.valid_gps_data = [] for station in self.all_gps_data: # if self._real_location is not None: # validated_gps = self.validator(station, self._real_location) # else: # validated_gps = validate_data(station) validated_gps = validate(station, validation_ranges, "blacklist", self.invalid_points) if validated_gps.get_size()[0] != 0: self.valid_gps_data.append(validated_gps) def compare_with_real_location(self): """ find the closest valid data point to the real location. information is stored in the data frames """ # compute closest point to real location result = compute_distance_all(self._real_location, self.valid_gps_data) self.all_stations_closest_df = result[CLOSEST_TO_SURVEY_COLUMNS] self.all_stations_info_df = result[STATION_INFO_COLUMNS] self.all_stations_mean_df = result[MEAN_LOC_COLUMNS] self.all_stations_std_df = result[STD_LOC_COLUMNS] def print_location_df(self, info_type: Optional[str] = None, os_type: Optional[str] = None): """ print a single dataframe or a group of dataframes :param info_type: string denoting the type or group of dataframes to output, default None :param os_type: string denoting the os of the stations to output, default None """ if info_type == "real": print_station_df(self.all_stations_closest_df, os_type) elif info_type == "info": print_station_df(self.all_stations_info_df, os_type) elif info_type == "std": print_station_df(self.all_stations_std_df, os_type) elif info_type == "mean": print_station_df(self.all_stations_mean_df, os_type) elif info_type == "all": print_station_df(self.get_all_dataframes(), os_type) else: # fuse statistical dataframes together print_station_df(self.get_stats_dataframes(), os_type) def print_to_csv(self, path: str, os_type: Optional[str] = None, debug: Optional[bool] = False): """ print dataframes to csv files in path :param path: string containing full path and file name :param os_type: string denoting the os of the stations to output, default None :param debug: if true, output debug statements, default False """ # fuse all dataframes together result = self.get_all_dataframes() if os_type == "Android": get_all_android_station(result).to_csv(path) elif os_type == "iOS": get_all_ios_station(result).to_csv(path) else: os_type = "all" result.to_csv(path) if debug: print("Printed {} station data to {}.".format(os_type, path))
Methods
def analyze_data(self, write_output: bool = False)
-
analyze data, then if a real location exists, compare data to real location
output is written if enabled :param write_output: boolean to write any debugging output, default False
Expand source code
def analyze_data(self, write_output: bool = False): """ analyze data, then if a real location exists, compare data to real location output is written if enabled :param write_output: boolean to write any debugging output, default False """ self.validate_all() # if there's no real location, make the mean the real location if self._real_location is None: means = self.all_stations_mean_df self._real_location = { "lat": np.mean(means["mean lat"]), "lon": np.mean(means["mean lon"]), "alt": np.mean(means["mean alt"]), "bar": np.mean(means["mean bar"]), } self.compare_with_real_location() # print results if write_output: self.print_to_csv("temp.csv")
def compare_with_real_location(self)
-
find the closest valid data point to the real location. information is stored in the data frames
Expand source code
def compare_with_real_location(self): """ find the closest valid data point to the real location. information is stored in the data frames """ # compute closest point to real location result = compute_distance_all(self._real_location, self.valid_gps_data) self.all_stations_closest_df = result[CLOSEST_TO_SURVEY_COLUMNS] self.all_stations_info_df = result[STATION_INFO_COLUMNS] self.all_stations_mean_df = result[MEAN_LOC_COLUMNS] self.all_stations_std_df = result[STD_LOC_COLUMNS]
def get_all_dataframes(self) ‑> pandas.core.frame.DataFrame
-
:return: all 4 dataframes fused together, joined by station id
Expand source code
def get_all_dataframes(self) -> pd.DataFrame: """ :return: all 4 dataframes fused together, joined by station id """ frames = [ self.all_stations_info_df, self.all_stations_closest_df, self.all_stations_mean_df, self.all_stations_std_df, ] return pd.concat(frames, axis=1)
def get_barometric_heights(self, sea_pressure: float = 101.325) ‑> pandas.core.frame.DataFrame
-
for each station, compute the barometric height using the mean :param sea_pressure: the local sea pressure in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :return: a dataframe with the barometric heights in meters and station id as the index
Expand source code
def get_barometric_heights(self, sea_pressure: float = AVG_SEA_LEVEL_PRESSURE_KPA) -> pd.DataFrame: """ for each station, compute the barometric height using the mean :param sea_pressure: the local sea pressure in kPa, default AVG_SEA_LEVEL_PRESSURE_KPA :return: a dataframe with the barometric heights in meters and station id as the index """ bar_heights = {} data_dict = self.all_stations_mean_df["mean bar"].T.to_dict() for index in data_dict.keys(): bar_heights[index] = compute_barometric_height(data_dict[index], sea_pressure) barometric_heights = pd.DataFrame(bar_heights, index=["bar height"], columns=self.all_stations_mean_df.index) return barometric_heights.T
def get_loc_from_packets(self, w_p: List[WrappedRedvoxPacket])
-
store the location information and their mean and std using a collection of wrapped redvox packets
assumes a list of redvox packets shares 1 device id :param w_p: a list of wrapped redvox packets to read
Expand source code
def get_loc_from_packets(self, w_p: List[reader.WrappedRedvoxPacket]): """ store the location information and their mean and std using a collection of wrapped redvox packets assumes a list of redvox packets shares 1 device id :param w_p: a list of wrapped redvox packets to read """ # extract the information from the packets sample_rate = w_p[0].microphone_sensor().sample_rate_hz() dev_os_type = w_p[0].device_os() idd = w_p[0].redvox_id() packet_gps_data = load_position_data(w_p) # compute mean location mean_loc = packet_gps_data.get_mean_all() std_loc = packet_gps_data.get_std_all() # store the information self.all_gps_data.append(packet_gps_data) self.all_stations_info_df.loc[idd] = [dev_os_type, sample_rate] self.all_stations_std_df.loc[idd] = [ std_loc["acc"], std_loc["lat"], std_loc["lon"], std_loc["alt"], std_loc["bar"], ] self.all_stations_mean_df.loc[idd] = [ mean_loc["acc"], mean_loc["lat"], mean_loc["lon"], mean_loc["alt"], mean_loc["bar"], ]
def get_real_location(self) ‑> Dict[str, float]
-
:return: the station's real location
Expand source code
def get_real_location(self) -> Dict[str, float]: """ :return: the station's real location """ return self._real_location
def get_stats_dataframes(self) ‑> pandas.core.frame.DataFrame
-
:return: station info, mean and std dataframes fused together
Expand source code
def get_stats_dataframes(self) -> pd.DataFrame: """ :return: station info, mean and std dataframes fused together """ frames = [self.all_stations_info_df, self.all_stations_mean_df, self.all_stations_std_df] return pd.concat(frames, axis=1)
def print_location_df(self, info_type: Optional[str] = None, os_type: Optional[str] = None)
-
print a single dataframe or a group of dataframes :param info_type: string denoting the type or group of dataframes to output, default None :param os_type: string denoting the os of the stations to output, default None
Expand source code
def print_location_df(self, info_type: Optional[str] = None, os_type: Optional[str] = None): """ print a single dataframe or a group of dataframes :param info_type: string denoting the type or group of dataframes to output, default None :param os_type: string denoting the os of the stations to output, default None """ if info_type == "real": print_station_df(self.all_stations_closest_df, os_type) elif info_type == "info": print_station_df(self.all_stations_info_df, os_type) elif info_type == "std": print_station_df(self.all_stations_std_df, os_type) elif info_type == "mean": print_station_df(self.all_stations_mean_df, os_type) elif info_type == "all": print_station_df(self.get_all_dataframes(), os_type) else: # fuse statistical dataframes together print_station_df(self.get_stats_dataframes(), os_type)
def print_to_csv(self, path: str, os_type: Optional[str] = None, debug: Optional[bool] = False)
-
print dataframes to csv files in path :param path: string containing full path and file name :param os_type: string denoting the os of the stations to output, default None :param debug: if true, output debug statements, default False
Expand source code
def print_to_csv(self, path: str, os_type: Optional[str] = None, debug: Optional[bool] = False): """ print dataframes to csv files in path :param path: string containing full path and file name :param os_type: string denoting the os of the stations to output, default None :param debug: if true, output debug statements, default False """ # fuse all dataframes together result = self.get_all_dataframes() if os_type == "Android": get_all_android_station(result).to_csv(path) elif os_type == "iOS": get_all_ios_station(result).to_csv(path) else: os_type = "all" result.to_csv(path) if debug: print("Printed {} station data to {}.".format(os_type, path))
def set_real_location(self, survey: Dict[str, float] = None)
-
set the real location :param survey: dictionary containing the station's location, default None
Expand source code
def set_real_location(self, survey: Dict[str, float] = None): """ set the real location :param survey: dictionary containing the station's location, default None """ self._real_location = survey
def validate_all(self, validation_ranges: Tuple[float, float, float] = (100.0, 50.0, 10.0))
-
check that all data in the data set are valid. Remove outliers and strange values :param validation_ranges: tuple of floats that the data values are compared against for validation
Expand source code
def validate_all( self, validation_ranges: Tuple[float, float, float] = ( DEFAULT_INCLUSION_HORIZONTAL_M, DEFAULT_INCLUSION_VERTICAL_M, DEFAULT_INCLUSION_VERTICAL_BAR_M, ), ): """ check that all data in the data set are valid. Remove outliers and strange values :param validation_ranges: tuple of floats that the data values are compared against for validation """ # validation always assumes nothing is valid when it starts, so empty out the valid_gps_data self.valid_gps_data = [] for station in self.all_gps_data: # if self._real_location is not None: # validated_gps = self.validator(station, self._real_location) # else: # validated_gps = validate_data(station) validated_gps = validate(station, validation_ranges, "blacklist", self.invalid_points) if validated_gps.get_size()[0] != 0: self.valid_gps_data.append(validated_gps)