Expand source code
"""
* There is a known issue when exiting programs in which a DataWindow has been created. This issue can be mitigated by
explicitly deleting the DataWindow that was created.
```
# example code
my_dw: DataWindow
# do stuff with my_dw
del my_dw
```
"""
import os
from redvox.common import data_window as dw
import redvox.common.date_time_utils as dtu
if __name__ == "__main__":
# my_dw = "/Users/tyler/Downloads/dw_1647024780000029_1.pkl.lz4"
# dw_test = dw.DataWindow.deserialize(my_dw)
#
# st = dw_test.first_station()
#
# evs = st.event_data()
#
# if "yamnet" in evs.get_stream_names():
# print(evs.get_stream("yamnet").events[0].get_timestamp())
# print(evs.get_stream("yamnet").events[-1].get_timestamp())
#
# print('test')
# exit(1)
mydir = "/Users/tyler/Downloads"
#
out_dir = "/Users/tyler/Documents/pyarrowreadertest/inl_test2"
# # ev_name = "examples"
# ev_name2 = "real_data"
#
# print(os.getcwd())
#
# cfg = dw.DataWindowConfig(input_dir=os.path.join(mydir), structured_layout=True, station_ids=["1637110702"],
# start_datetime=dtu.datetime_from_epoch_seconds_utc(1656368600),
# end_datetime=dtu.datetime_from_epoch_seconds_utc(1656369120))
#
# dw_e = dw.DataWindow("events_test", config=cfg,
# # output_dir=out_dir, out_type="PARQUET",
# debug=True)
# dw_e.save()
#
# dw_e = dw.DataWindow.load(os.path.join(out_dir, "events_test.json"))
dw_e = dw.DataWindow.deserialize(os.path.join(mydir, "dw_1661195700001081_1.pkl.lz4"))
# print(dw_e)
# st = dw_e.first_station()
for st in dw_e.stations():
print(st.id())
print(st.first_data_timestamp(), st.last_data_timestamp())
vals = st.audio_sensor().get_microphone_data()
print(vals[0], vals[1], vals[-2], vals[-1])
res = st.event_data().get_stream("inference")
if res:
print(res.input_sample_rate, res.samples_per_window, res.samples_per_hop, res.model_version)
if res.has_events():
# print(res.get_data_column("fail"))
print(res.get_event(0).get_timestamp(), res.get_event(-1).get_timestamp())
# print(res.get_event(0).get_schema(), res.get_event(-1).get_schema())
# print(res.get_data_column("class_0"))
# print(res.get_data_column("score_0"))
# print(res)
print("*********************************")
# res.set_schema({"oops": ["will fail", "not properly formatted"]})
#
# if res:
# ev = res.get_event()
#
# if ev:
# print(ev.get_byte_item("failure"))
# print(ev.get_numeric_item("score"))
# print(ev.get_string_item("class"))
#
# print(ev.get_byte_column("failure"))
# print(ev.get_numeric_column("score"))
# print(ev.get_string_column("class"))
#
# print(res)
# del dw_e