Module redvox.tests.more tests
Expand source code
import timeit
import os.path
import numpy as np
import pprint
import pickle
import psutil
import pandas as pd
import redpandas.redpd_df
import redpandas.redpd_filter as rpd_filter
import redpandas.redpd_xcorr as rpd_xcorr
import redpandas.redpd_plot.wiggles as rpd_plt
import redpandas.redpd_tfr as rpd_tfr
import redpandas.redpd_plot.mesh as rpd_plt_tfr
from redvox.common.data_window import DataWindow, DataWindowConfig
import redvox.common.date_time_utils as dt
import redvox.settings as settings
from redvox.common.event_stream import EventStreams
from redvox.common.api_reader import ApiReader
import redvox.api1000.wrapped_redvox_packet.wrapped_packet as wp
def main():
# compressed data to DW is 4.68-4.32 times
# up to 6 times for 8k sample rate
# higher sample rate means larger multiplication
# 2.5 mins with disk write to temp (lower RAM Usage)
# 1.5 mins with mem write (high ram usage)
settings.set_parallelism_enabled(False)
print("parallel: ", settings.is_parallelism_enabled())
save_path = "/Users/tyler/Documents/ml_export"
# path = "/Users/tyler/Documents/pyarrowreadertest/inl_test"
# path = "/Users/tyler/Documents/snl_21_9_27"
# path = "/Users/tyler/Documents/tss_test"
path = "/Users/tyler/Documents/ml_export/test_wamv"
out_pkl = os.path.join(path, "out22.pkl")
config = DataWindowConfig(
path,
True,
# use_model_correction=False,
# station_ids=["1637610022"]
# station_ids=["1637610022"]
# station_ids=["1637681007"]
# station_ids=["1637665009"]
# start_datetime=dt.datetime_from_epoch_seconds_utc(1661203100),
# end_datetime=dt.datetime_from_epoch_seconds_utc(1661203200)
)
s = timeit.default_timer()
# sf_test = DataWindow.deserialize("/Users/tyler/Documents/pyarrowreadertest/large_test/large_test_lz4.pkl.lz4")
# lz4 = 1542448.119 KB uncompressed
# parquet = 1565174.924 KB uncompressed
sf_test = DataWindow("test", config=config, out_type="NONE", make_runme=False, debug=False)
# sf_test.save()
# sf_test = DataWindow.load(os.path.join(save_path, "test.json"))
e = timeit.default_timer()
# sf_test = DataWindow.deserialize("/Users/tyler/Documents/duplicate_test/dw_1641329393000278_3.pkl.lz4")
# print("audio_rate", sf_test.first_station().audio_sample_rate_nominal_hz())
print("load dw", e - s)
for t in sf_test.stations():
print(t.id())
for i in range(3):
print(t.event_data().ml_data.windows[i].retain_top(3))
exit(1)
# sf_test = DataWindow.deserialize("/Users/tyler/Documents/perf_tests/dw_1661472735001221_1.pkl.lz4")
# print(sf_test.first_station().audio_sensor().get_microphone_data()[-50:-1])
import matplotlib.pyplot as plt
print("Plotting data")
plts = plt.subplots(1, 1, figsize=(8, 6), sharex=True)
fig = plts[0]
axes = plts[1]
axes.set_title("Audio Data")
cur_i = 0
for s in sf_test.stations():
print(s.id())
# if s.id() == "1637610022":
# ltcs = s.timesync_data().best_latency_per_exchange()
# new_best_latency = np.min([n for n in ltcs if n > 500])
# s.timesync_data()._best_latency = new_best_latency
# s.timesync_data()._best_latency_index = int(np.where(ltcs == new_best_latency)[0][0])
# # s.timesync_data()._best_offset = s.timesync_data().offsets()[1][s.timesync_data().best_latency_index()]
# s.timesync_data()._best_offset = -50464006.0
# s._correct_timestamps = True
# s.update_timestamps()
print(s.first_data_timestamp())
print(s.last_data_timestamp())
# axes.plot(s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset,
# s.audio_sensor().get_microphone_data())
with open(out_pkl, "wb") as outs:
pickle.dump(
[
s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset,
s.audio_sensor().get_microphone_data(),
],
outs,
)
axes.set_ylabel(s.id())
cur_i += 1
# tsync = s.timesync_data()
#
# best_exchanges = tsync.sync_exchanges()
#
# print("server:")
# print(best_exchanges[0][tsync.best_latency_index()])
# print(best_exchanges[1][tsync.best_latency_index()])
# print(best_exchanges[2][tsync.best_latency_index()])
# print("device:")
# print(best_exchanges[3][tsync.best_latency_index()])
# print(best_exchanges[4][tsync.best_latency_index()])
# print(best_exchanges[5][tsync.best_latency_index()])
# print(s.location_sensor().get_location_provider_data())
# print(s.location_sensor().data_timestamps())
# print(s.location_sensor().get_gps_timestamps_data())
plt.xlabel("Time")
plt.show()
exit(1)
# asdfasdf = [1.0000014 ** n for n in range(9999999)]
# print(asdfasdf.__sizeof__())
EPISODE_START_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132940)
EPISODE_END_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132984)
INPUT_DIR = "/Users/tyler/Downloads/data"
# # Absolute path for output pickle and parquet files
RPD_DIR = "rpd_files_sdk31"
OUTPUT_DIR = os.path.join(INPUT_DIR, RPD_DIR)
dw_config = DataWindowConfig(
input_dir=INPUT_DIR,
start_datetime=EPISODE_START_DATETIME,
end_datetime=EPISODE_END_DATETIME,
start_buffer_td=dt.timedelta(minutes=3),
station_ids=["0000000021"],
)
# station_ids=['0000000011', '0000000012', '0000000013', '0000000021',
# '0000000022', '0000000023', '0000000024'])
# dw = DataWindow(event_name="rdvx_flight2", config=dw_config, debug=False,
# out_dir=OUTPUT_DIR, out_type="NONE")
# dw.save()
# asdahfs = dw.load(os.path.join(OUTPUT_DIR, "rdvx_flight2.json"))
# print(asdahfs.pretty())
exit(1)
if __name__ == "__main__":
main()
Functions
def main()
-
Expand source code
def main(): # compressed data to DW is 4.68-4.32 times # up to 6 times for 8k sample rate # higher sample rate means larger multiplication # 2.5 mins with disk write to temp (lower RAM Usage) # 1.5 mins with mem write (high ram usage) settings.set_parallelism_enabled(False) print("parallel: ", settings.is_parallelism_enabled()) save_path = "/Users/tyler/Documents/ml_export" # path = "/Users/tyler/Documents/pyarrowreadertest/inl_test" # path = "/Users/tyler/Documents/snl_21_9_27" # path = "/Users/tyler/Documents/tss_test" path = "/Users/tyler/Documents/ml_export/test_wamv" out_pkl = os.path.join(path, "out22.pkl") config = DataWindowConfig( path, True, # use_model_correction=False, # station_ids=["1637610022"] # station_ids=["1637610022"] # station_ids=["1637681007"] # station_ids=["1637665009"] # start_datetime=dt.datetime_from_epoch_seconds_utc(1661203100), # end_datetime=dt.datetime_from_epoch_seconds_utc(1661203200) ) s = timeit.default_timer() # sf_test = DataWindow.deserialize("/Users/tyler/Documents/pyarrowreadertest/large_test/large_test_lz4.pkl.lz4") # lz4 = 1542448.119 KB uncompressed # parquet = 1565174.924 KB uncompressed sf_test = DataWindow("test", config=config, out_type="NONE", make_runme=False, debug=False) # sf_test.save() # sf_test = DataWindow.load(os.path.join(save_path, "test.json")) e = timeit.default_timer() # sf_test = DataWindow.deserialize("/Users/tyler/Documents/duplicate_test/dw_1641329393000278_3.pkl.lz4") # print("audio_rate", sf_test.first_station().audio_sample_rate_nominal_hz()) print("load dw", e - s) for t in sf_test.stations(): print(t.id()) for i in range(3): print(t.event_data().ml_data.windows[i].retain_top(3)) exit(1) # sf_test = DataWindow.deserialize("/Users/tyler/Documents/perf_tests/dw_1661472735001221_1.pkl.lz4") # print(sf_test.first_station().audio_sensor().get_microphone_data()[-50:-1]) import matplotlib.pyplot as plt print("Plotting data") plts = plt.subplots(1, 1, figsize=(8, 6), sharex=True) fig = plts[0] axes = plts[1] axes.set_title("Audio Data") cur_i = 0 for s in sf_test.stations(): print(s.id()) # if s.id() == "1637610022": # ltcs = s.timesync_data().best_latency_per_exchange() # new_best_latency = np.min([n for n in ltcs if n > 500]) # s.timesync_data()._best_latency = new_best_latency # s.timesync_data()._best_latency_index = int(np.where(ltcs == new_best_latency)[0][0]) # # s.timesync_data()._best_offset = s.timesync_data().offsets()[1][s.timesync_data().best_latency_index()] # s.timesync_data()._best_offset = -50464006.0 # s._correct_timestamps = True # s.update_timestamps() print(s.first_data_timestamp()) print(s.last_data_timestamp()) # axes.plot(s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset, # s.audio_sensor().get_microphone_data()) with open(out_pkl, "wb") as outs: pickle.dump( [ s.audio_sensor().data_timestamps() - 1632753420000000 + new_offset, s.audio_sensor().get_microphone_data(), ], outs, ) axes.set_ylabel(s.id()) cur_i += 1 # tsync = s.timesync_data() # # best_exchanges = tsync.sync_exchanges() # # print("server:") # print(best_exchanges[0][tsync.best_latency_index()]) # print(best_exchanges[1][tsync.best_latency_index()]) # print(best_exchanges[2][tsync.best_latency_index()]) # print("device:") # print(best_exchanges[3][tsync.best_latency_index()]) # print(best_exchanges[4][tsync.best_latency_index()]) # print(best_exchanges[5][tsync.best_latency_index()]) # print(s.location_sensor().get_location_provider_data()) # print(s.location_sensor().data_timestamps()) # print(s.location_sensor().get_gps_timestamps_data()) plt.xlabel("Time") plt.show() exit(1) # asdfasdf = [1.0000014 ** n for n in range(9999999)] # print(asdfasdf.__sizeof__()) EPISODE_START_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132940) EPISODE_END_DATETIME = dt.datetime_from_epoch_seconds_utc(1568132984) INPUT_DIR = "/Users/tyler/Downloads/data" # # Absolute path for output pickle and parquet files RPD_DIR = "rpd_files_sdk31" OUTPUT_DIR = os.path.join(INPUT_DIR, RPD_DIR) dw_config = DataWindowConfig( input_dir=INPUT_DIR, start_datetime=EPISODE_START_DATETIME, end_datetime=EPISODE_END_DATETIME, start_buffer_td=dt.timedelta(minutes=3), station_ids=["0000000021"], ) # station_ids=['0000000011', '0000000012', '0000000013', '0000000021', # '0000000022', '0000000023', '0000000024']) # dw = DataWindow(event_name="rdvx_flight2", config=dw_config, debug=False, # out_dir=OUTPUT_DIR, out_type="NONE") # dw.save() # asdahfs = dw.load(os.path.join(OUTPUT_DIR, "rdvx_flight2.json")) # print(asdahfs.pretty()) exit(1)