Module redvox.tests.event_stream_test

  • There is a known issue when exiting programs in which a DataWindow has been created. This issue can be mitigated by explicitly deleting the DataWindow that was created. # example code my_dw: DataWindow # do stuff with my_dw del my_dw
Expand source code
"""
* There is a known issue when exiting programs in which a DataWindow has been created.  This issue can be mitigated by
  explicitly deleting the DataWindow that was created.
  ```
  # example code
  my_dw: DataWindow
  # do stuff with my_dw
  del my_dw
  ```
"""

import os

from redvox.common import data_window as dw
import redvox.common.date_time_utils as dtu


if __name__ == "__main__":
    # my_dw = "/Users/tyler/Downloads/dw_1647024780000029_1.pkl.lz4"
    # dw_test = dw.DataWindow.deserialize(my_dw)
    #
    # st = dw_test.first_station()
    #
    # evs = st.event_data()
    #
    # if "yamnet" in evs.get_stream_names():
    #     print(evs.get_stream("yamnet").events[0].get_timestamp())
    #     print(evs.get_stream("yamnet").events[-1].get_timestamp())
    #
    # print('test')
    # exit(1)

    mydir = "/Users/tyler/Downloads"
    #
    out_dir = "/Users/tyler/Documents/pyarrowreadertest/inl_test2"
    # # ev_name = "examples"
    # ev_name2 = "real_data"
    #
    # print(os.getcwd())
    #
    # cfg = dw.DataWindowConfig(input_dir=os.path.join(mydir), structured_layout=True, station_ids=["1637110702"],
    #                           start_datetime=dtu.datetime_from_epoch_seconds_utc(1656368600),
    #                           end_datetime=dtu.datetime_from_epoch_seconds_utc(1656369120))
    #
    # dw_e = dw.DataWindow("events_test", config=cfg,
    #                      # output_dir=out_dir, out_type="PARQUET",
    #                      debug=True)

    # dw_e.save()
    #
    # dw_e = dw.DataWindow.load(os.path.join(out_dir, "events_test.json"))
    dw_e = dw.DataWindow.deserialize(os.path.join(mydir, "dw_1661195700001081_1.pkl.lz4"))
    # print(dw_e)

    # st = dw_e.first_station()
    for st in dw_e.stations():
        print(st.id())
        print(st.first_data_timestamp(), st.last_data_timestamp())
        vals = st.audio_sensor().get_microphone_data()
        print(vals[0], vals[1], vals[-2], vals[-1])

        res = st.event_data().get_stream("inference")
        if res:
            print(res.input_sample_rate, res.samples_per_window, res.samples_per_hop, res.model_version)
            if res.has_events():
                # print(res.get_data_column("fail"))
                print(res.get_event(0).get_timestamp(), res.get_event(-1).get_timestamp())
                # print(res.get_event(0).get_schema(), res.get_event(-1).get_schema())
            # print(res.get_data_column("class_0"))
            # print(res.get_data_column("score_0"))
            # print(res)
        print("*********************************")

        # res.set_schema({"oops": ["will fail", "not properly formatted"]})
        #
        # if res:
        #     ev = res.get_event()
        #
        #     if ev:
        #         print(ev.get_byte_item("failure"))
        #         print(ev.get_numeric_item("score"))
        #         print(ev.get_string_item("class"))
        #
        #         print(ev.get_byte_column("failure"))
        #         print(ev.get_numeric_column("score"))
        #         print(ev.get_string_column("class"))
        #
        #     print(res)
    # del dw_e