Module redvox.cli.cli
This module provides a command line interface (CLI) for converting, viewing, and downloading RedVox data files.
Expand source code
"""
This module provides a command line interface (CLI) for converting, viewing, and downloading RedVox data files.
"""
import argparse
import logging
import os.path
import sys
from typing import Dict, List, Optional, Any, Callable
from redvox.api1000.wrapped_redvox_packet.sensors.image import Image, ImageCodec
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
from redvox.cloud.data_api import DataRangeReqType
import redvox.cloud.client as cloud_client
from redvox.cloud.config import RedVoxConfig
import redvox.cloud.data_api as data_api
import redvox.cli.conversions as conversions
import redvox.cli.data_req as data_req
import redvox.common.io as io
from redvox.common.gui import cloud_data_retrieval
# pylint: disable=C0103
log = logging.getLogger(__name__)
def map_or_default(val: Any, apply: Callable[[Any], Any], default: Any) -> Any:
if val is None:
return default
return apply(val)
def check_path(
path: str, path_is_file: bool = True, file_ext: Optional[str] = None
) -> bool:
"""
Checks that the passed in path exists.
:param file_ext: Optional extension to check.
:param path: The path to check.
:param path_is_file: The path is a file when True or a directory when False.
:return: True if the path exists, False otherwise.
"""
if path_is_file:
return os.path.isfile(path) and (
file_ext is None or os.path.basename(path).endswith(file_ext)
)
else:
return os.path.isdir(path)
def check_files(paths: List[str], file_ext: Optional[str] = None) -> bool:
"""
Checks this given files to determine if they exist.
:param paths: The paths to check.
:param file_ext: An optional file extension to filter against.
:return: True if all paths exist, False otherwise
"""
invalid_paths: List[str] = list(
filter(lambda path: not check_path(path, file_ext=file_ext), paths)
)
if len(invalid_paths) > 0:
log.error("%d invalid paths found", len(invalid_paths))
for invalid_path in invalid_paths:
log.error("Invalid path %s", invalid_path)
return False
return True
def check_out_dir(out_dir: Optional[str] = None) -> bool:
"""
Checks if a given directory exists.
:param out_dir: The directory to check.
:return: True if it exists, False otherwise.
"""
if out_dir is not None and not check_path(out_dir, path_is_file=False):
log.error("out_dir is invalid: %s", out_dir)
return False
return True
def determine_exit(status: bool) -> None:
"""
Determine the exit status and exit the CLI.
:param status: True will exit with a status of 0 and False will exit with a status of 1.
"""
if status:
log.info("Exiting with status = 0")
sys.exit(0)
log.error("Exiting with status = 1")
sys.exit(1)
def rdvxz_to_rdvxm(args) -> None:
"""
Convert rdvxz to rdvxm
:param args: Args from argparse.
"""
if not check_files(args.rdvxz_paths, ".rdvxz"):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(conversions.rdvxz_to_rdvxm(args.rdvxz_paths, args.out_dir))
def rdvxm_to_rdvxz(args) -> None:
"""
Convert rdvxm to rdvxz
:param args: Args from argparse.
"""
if not check_files(args.rdvxm_paths, ".rdvxm"):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(conversions.rdvxm_to_rdvxz(args.rdvxm_paths, args.out_dir))
def rdvxz_to_json_args(args) -> None:
"""
Wrapper function that calls the to_json conversion.
:param args: Args from argparse.
"""
if not check_files(args.rdvxz_paths, ".rdvxz"):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(conversions.rdvxz_to_json(args.rdvxz_paths, args.out_dir))
def rdvxm_to_json_args(args) -> None:
"""
Wrapper function that calls the to_json conversion.
:param args: Args from argparse.
"""
if not check_files(args.rdvxm_paths, ".rdvxm"):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(conversions.rdvxm_to_json(args.rdvxm_paths, args.out_dir))
def json_to_rdvxz_args(args) -> None:
"""
Wrapper function that calls the to_rdvxz conversion.
:param args: Args from argparse.
"""
if not check_files(args.json_paths, ".json"):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(conversions.json_to_rdvxz(args.json_paths, args.out_dir))
def json_to_rdvxm_args(args) -> None:
"""
Wrapper function that calls the to_rdvxm conversion.
:param args: Args from argparse.
"""
if not check_files(args.json_paths, ".json"):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(conversions.json_to_rdvxm(args.json_paths, args.out_dir))
def rdvxz_print_stdout_args(args) -> None:
"""
Wrapper function that calls the print to stdout.
:param args: Args from argparse.
"""
if not check_files(args.rdvxz_paths, ".rdvxz"):
determine_exit(False)
determine_exit(conversions.rdvxz_print_stdout(args.rdvxz_paths))
def rdvxm_print_stdout_args(args) -> None:
"""
Wrapper function that calls the print to stdout.
:param args: Args from argparse.
"""
if not check_files(args.rdvxm_paths, ".rdvxm"):
determine_exit(False)
determine_exit(conversions.rdvxm_print_stdout(args.rdvxm_paths))
def validate_rdvxm_args(args) -> None:
"""
Validates the args
:param args: Args from argparse
"""
if not check_files(args.rdvxm_paths, ".rdvxm"):
determine_exit(False)
determine_exit(conversions.validate_rdvxm(args.rdvxm_paths))
def data_req_args(args) -> None:
"""
Wrapper function that calls the data_req.
:param args: Args from argparse.
"""
if not check_out_dir(args.out_dir):
determine_exit(False)
api_type: DataRangeReqType = DataRangeReqType[args.api_type]
# Rebuild RedVox config from potentially optional passed in args
if args.email is None:
log.error(
f"The argument 'email' is required, but was not found in the environment or provided."
)
determine_exit(False)
if args.password is None:
log.error(
f"The argument 'password' is required, but was not found in the environment or provided."
)
determine_exit(False)
redvox_config: RedVoxConfig = RedVoxConfig(
args.email,
args.password,
args.protocol,
args.host,
args.port,
args.secret_token,
)
determine_exit(
data_req.make_data_req(
args.out_dir,
redvox_config,
args.req_start_s,
args.req_end_s,
args.station_ids,
api_type,
args.retries,
args.timeout,
not args.disable_timing_correction
)
)
def data_req_report(
redvox_config: RedVoxConfig,
report_id: str,
out_dir: str,
retries: int,
) -> bool:
"""
Uses the built-in cloud based HTTP API to generate a signed URL for report data and then downloads the report data.
:param report_id: The full RedVox report id.
:param out_dir: The output directory to play the report distribution.
:param retries: Number of times to attempt to retry the download on failed attempts.
"""
client = cloud_client.CloudClient(redvox_config)
resp: Optional[data_api.ReportDataResp] = client.request_report_data(report_id)
client.close()
if resp:
resp.download_fs(out_dir, retries)
return True
log.error("Response was None")
return False
def data_req_report_args(args) -> None:
"""
Wrapper function that calls the data_req_report.
:param args: Args from argparse.
"""
if not check_out_dir(args.out_dir):
determine_exit(False)
# Rebuild RedVox config from potentially optional passed in args
if args.email is None:
log.error(
f"The argument 'email' is required, but was not found in the environment or provided."
)
determine_exit(False)
if args.password is None:
log.error(
f"The argument 'password' is required, but was not found in the environment or provided."
)
determine_exit(False)
redvox_config: RedVoxConfig = RedVoxConfig(
args.email,
args.password,
args.protocol,
args.host,
args.port,
args.secret_token,
)
determine_exit(
data_req_report(
redvox_config,
args.report_id,
args.out_dir,
args.retries,
)
)
def gallery(rdvxm_paths: List[str]) -> bool:
"""
Displays a gallery of images from the combined images collected from the given paths.
:param rdvxm_paths: Paths to collect images from.
:return: True if this completes successfully, False otherwise
"""
# Create a new image sensor to hold images from all packets
try:
from redvox.api1000.gui.image_viewer import start_gui
image: Image = Image.new()
# noinspection PyTypeChecker
image.set_image_codec(ImageCodec.JPG)
packets: List[WrappedRedvoxPacketM] = list(
map(WrappedRedvoxPacketM.from_compressed_path, rdvxm_paths)
)
for packet in packets:
image_sensor: Optional[Image] = packet.get_sensors().get_image()
if image_sensor is not None:
image.get_timestamps().append_timestamps(
image_sensor.get_timestamps().get_timestamps()
)
image.append_values(image_sensor.get_samples())
start_gui(image)
return True
except ImportError:
import warnings
warnings.warn("GUI dependencies are not installed. Install the 'GUI' extra to enable this functionality.")
def gallery_args(args) -> None:
"""
CLI function for opening image gallery.
:param args: Args for passing to gallery function.
"""
if not check_files(args.rdvxm_paths):
determine_exit(False)
determine_exit(gallery(args.rdvxm_paths))
def sort_unstructured(
input_dir: str, out_dir: Optional[str] = None, copy: bool = True
) -> bool:
out_dir = out_dir if out_dir is not None else "."
io.sort_unstructured_redvox_data(input_dir, out_dir, copy=copy)
return True
def sort_unstructured_args(args) -> None:
if not check_out_dir(args.input_dir):
determine_exit(False)
if not check_out_dir(args.out_dir):
determine_exit(False)
determine_exit(sort_unstructured(args.input_dir, args.out_dir, not args.mv))
def main():
"""
Entry point into the CLI.
"""
redvox_config: Optional[RedVoxConfig] = RedVoxConfig.find()
parser: argparse.ArgumentParser = argparse.ArgumentParser(
"redvox-cli",
description="Command line tools for viewing, converting,"
" and downloading RedVox data.",
)
parser.add_argument(
"--verbose", "-v", help="Enable verbose logging", action="count", default=0
)
sub_parser = parser.add_subparsers()
sub_parser.required = True
sub_parser.dest = "command"
# Cloud data retrieval
cloud_download_parser = sub_parser.add_parser("cloud-download")
cloud_download_parser.set_defaults(func=lambda _: cloud_data_retrieval.run_gui())
# Gallery
gallery_parser = sub_parser.add_parser("gallery")
gallery_parser.add_argument(
"rdvxm_paths", help="One or more rdvxm files", nargs="+"
)
gallery_parser.set_defaults(func=gallery_args)
# rdvxz -> rdvxm
rdvxz_to_rdvxm_parser = sub_parser.add_parser(
"rdvxz-to-rdvxm", help="Convert rdvxz (API 900) to rdvxm (API 1000/M) files"
)
rdvxz_to_rdvxm_parser.add_argument(
"rdvxz_paths",
help="One or more rdvxz files to convert to json files",
nargs="+",
)
rdvxz_to_rdvxm_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (will use same directory as source files by "
"default)",
)
rdvxz_to_rdvxm_parser.set_defaults(func=rdvxz_to_rdvxm)
# rdvxm -> rdvxz
rdvxm_to_rdvxz_parser = sub_parser.add_parser(
"rdvxm-to-rdvxz", help="Convert rdvxm (API 1000/M) to rdvxx (API 900) files"
)
rdvxm_to_rdvxz_parser.add_argument(
"rdvxm_paths",
help="One or more rdvxm files to convert to json files",
nargs="+",
)
rdvxm_to_rdvxz_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (will use same directory as source files by "
"default)",
)
rdvxm_to_rdvxz_parser.set_defaults(func=rdvxm_to_rdvxz)
# rdvxz -> json
rdvxz_to_json_parser = sub_parser.add_parser(
"rdvxz-to-json", help="Convert rdvxz files to json files"
)
rdvxz_to_json_parser.add_argument(
"rdvxz_paths",
help="One or more rdvxz files to convert to json files",
nargs="+",
)
rdvxz_to_json_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (will use same directory as source files by "
"default)",
)
rdvxz_to_json_parser.set_defaults(func=rdvxz_to_json_args)
# rdvxm -> json
rdvxm_to_json_parser = sub_parser.add_parser(
"rdvxm-to-json", help="Convert rdvxm files to json files"
)
rdvxm_to_json_parser.add_argument(
"rdvxm_paths",
help="One or more rdvxm files to convert to json files",
nargs="+",
)
rdvxm_to_json_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (will use same directory as source files by "
"default)",
)
rdvxm_to_json_parser.set_defaults(func=rdvxm_to_json_args)
# json -> rdvxz
json_to_rdvxz_parser = sub_parser.add_parser(
"json-to-rdvxz", help="Convert json files to rdvxz files"
)
json_to_rdvxz_parser.add_argument(
"json_paths", help="One or more json files to convert to rdvxz files", nargs="+"
)
json_to_rdvxz_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (will use same directory as source files by "
"default)",
)
json_to_rdvxz_parser.set_defaults(func=json_to_rdvxz_args)
# json -> rdvxm
json_to_rdvxm_parser = sub_parser.add_parser(
"json-to-rdvxm", help="Convert json files to rdvxm files"
)
json_to_rdvxm_parser.add_argument(
"json_paths", help="One or more json files to convert to rdvxm files", nargs="+"
)
json_to_rdvxm_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (will use same directory as source files by "
"default)",
)
json_to_rdvxm_parser.set_defaults(func=json_to_rdvxm_args)
# sort unstructured data into structured data
sort_unstructured_parser = sub_parser.add_parser(
"sort-unstructured",
help="Sorts unstructured RedVox files into their structured counterpart",
)
sort_unstructured_parser.add_argument(
"input_dir",
help="Directory containing RedVox files to sort into a structured layout",
)
sort_unstructured_parser.add_argument(
"--out-dir",
"-o",
help="Optional output directory (current working directory by default)",
)
sort_unstructured_parser.add_argument(
"--mv",
help="When set, file contents will be moved to the structured layout rather than copied.",
action="store_true",
)
sort_unstructured_parser.set_defaults(func=sort_unstructured_args)
# print rdvxz
rdvxz_print_parser = sub_parser.add_parser(
"print-z", help="Print contents of rdvxz files to stdout"
)
rdvxz_print_parser.add_argument(
"rdvxz_paths", help="One or more rdvxz files to print", nargs="+"
)
rdvxz_print_parser.set_defaults(func=rdvxz_print_stdout_args)
# print rdvxm
rdvxm_print_parser = sub_parser.add_parser(
"print-m", help="Print contents of rdvxm files to stdout"
)
rdvxm_print_parser.add_argument(
"rdvxm_paths", help="One or more rdvxm files to print", nargs="+"
)
rdvxm_print_parser.set_defaults(func=rdvxm_print_stdout_args)
# validation
rdvxm_validation_parser = sub_parser.add_parser(
"validate-m", help="Validate the structure of API M files"
)
rdvxm_validation_parser.add_argument(
"rdvxm_paths", help="One or more rdvxm files to print", nargs="+"
)
rdvxm_validation_parser.set_defaults(func=validate_rdvxm_args)
# data_req
data_req_parser = sub_parser.add_parser(
"data-req", help="Request bulk RedVox data from RedVox servers"
)
data_req_parser.add_argument(
"--email",
help="redvox.io account email",
default=map_or_default(redvox_config, lambda config: config.username, None),
)
data_req_parser.add_argument(
"--password",
help="redvox.io account password",
default=map_or_default(redvox_config, lambda config: config.password, None),
)
data_req_parser.add_argument(
"--out-dir",
help="The output directory that RedVox files will be written to (default=.)",
default=".",
)
data_req_parser.add_argument(
"--disable-timing-correction",
help="Disables query timing correction",
default=False,
action="store_true"
)
data_req_parser.add_argument(
"--retries",
help="The number of times the client should retry getting a file on failure "
"(default=1)",
default=1,
choices=set(range(0, 6)),
type=int,
)
data_req_parser.add_argument(
"--host",
help="Data server host",
default=map_or_default(redvox_config, lambda config: config.host, "redvox.io"),
)
data_req_parser.add_argument(
"--port",
type=int,
help="Data server port",
default=map_or_default(redvox_config, lambda config: config.port, 8080),
)
data_req_parser.add_argument(
"--protocol",
help="One of either http or https",
choices=["https", "http"],
default=map_or_default(redvox_config, lambda config: config.protocol, "https"),
)
data_req_parser.add_argument(
"--secret-token",
help="A shared secret token provided by RedVox required for accessing the data "
"request service",
default=map_or_default(redvox_config, lambda config: config.secret_token, None),
)
data_req_parser.add_argument(
"--api-type",
help="Data API to be retrieved",
choices=["API_900", "API_1000", "API_900_1000"],
default="API_900_1000",
)
data_req_parser.add_argument(
"--timeout",
help="Read timeout in seconds (default=10 seconds)",
type=int,
default=10
)
data_req_parser.add_argument(
"req_start_s",
type=int,
help="Data request start as number of seconds since the epoch UTC",
)
data_req_parser.add_argument(
"req_end_s",
type=int,
help="Data request end as number of seconds since the epoch UTC",
)
data_req_parser.add_argument(
"station_ids", nargs="+", help="A list of RedVox ids delimited by a space"
)
data_req_parser.set_defaults(func=data_req_args)
# data req report
data_req_report_parser = sub_parser.add_parser(
"data-req-report", help="Request bulk RedVox data from the RedVox servers"
)
data_req_report_parser.add_argument(
"--out-dir",
help="The output directory that RedVox files will be written to (default=.)",
default=".",
)
data_req_report_parser.add_argument(
"--email",
help="redvox.io account email",
default=map_or_default(redvox_config, lambda config: config.username, None),
)
data_req_report_parser.add_argument(
"--password",
help="redvox.io account password",
default=map_or_default(redvox_config, lambda config: config.password, None),
)
data_req_report_parser.add_argument(
"--retries",
help="The number of times the client should retry getting a file on failure "
"(default=1)",
default=1,
choices=set(range(0, 6)),
type=int,
)
data_req_report_parser.add_argument(
"--host",
help="Data server host",
default=map_or_default(redvox_config, lambda config: config.host, "redvox.io"),
)
data_req_report_parser.add_argument(
"--port",
type=int,
help="Data server port",
default=map_or_default(redvox_config, lambda config: config.port, 8080),
)
data_req_report_parser.add_argument(
"--protocol",
help="One of either http or https (default https)",
choices=["https", "http"],
default=map_or_default(redvox_config, lambda config: config.protocol, "https"),
)
data_req_report_parser.add_argument(
"--secret-token",
help="A shared secret token provided by RedVox required for accessing the data "
"request service",
default=map_or_default(redvox_config, lambda config: config.secret_token, None),
)
data_req_report_parser.add_argument(
"report_id",
type=str,
help="The full report id that data is being requested for",
)
data_req_report_parser.set_defaults(func=data_req_report_args)
# Parse the args
args = parser.parse_args()
# Setup logging
log_levels: Dict[int, str] = {0: "WARN", 1: "INFO", 2: "DEBUG"}
log_level: str = (
log_levels[args.verbose] if args.verbose in log_levels else log_levels[0]
)
logging.basicConfig(
level=log_level,
format="[%(levelname)s:%(process)d:%(filename)s:%(module)s:%(funcName)s:%(lineno)d:%(asctime)s]"
" %(message)s",
)
log.info("Running with args=%s and log_level=%s", str(args), log_level)
# Try calling the appropriate handler
# pylint: disable=W0703
try:
args.func(args)
except Exception as e:
log.error("Encountered an error: %s", str(e))
sys.exit(1)
if __name__ == "__main__":
main()
Functions
def check_files(paths: List[str], file_ext: Optional[str] = None) ‑> bool
-
Checks this given files to determine if they exist. :param paths: The paths to check. :param file_ext: An optional file extension to filter against. :return: True if all paths exist, False otherwise
Expand source code
def check_files(paths: List[str], file_ext: Optional[str] = None) -> bool: """ Checks this given files to determine if they exist. :param paths: The paths to check. :param file_ext: An optional file extension to filter against. :return: True if all paths exist, False otherwise """ invalid_paths: List[str] = list( filter(lambda path: not check_path(path, file_ext=file_ext), paths) ) if len(invalid_paths) > 0: log.error("%d invalid paths found", len(invalid_paths)) for invalid_path in invalid_paths: log.error("Invalid path %s", invalid_path) return False return True
def check_out_dir(out_dir: Optional[str] = None) ‑> bool
-
Checks if a given directory exists. :param out_dir: The directory to check. :return: True if it exists, False otherwise.
Expand source code
def check_out_dir(out_dir: Optional[str] = None) -> bool: """ Checks if a given directory exists. :param out_dir: The directory to check. :return: True if it exists, False otherwise. """ if out_dir is not None and not check_path(out_dir, path_is_file=False): log.error("out_dir is invalid: %s", out_dir) return False return True
def check_path(path: str, path_is_file: bool = True, file_ext: Optional[str] = None) ‑> bool
-
Checks that the passed in path exists. :param file_ext: Optional extension to check. :param path: The path to check. :param path_is_file: The path is a file when True or a directory when False. :return: True if the path exists, False otherwise.
Expand source code
def check_path( path: str, path_is_file: bool = True, file_ext: Optional[str] = None ) -> bool: """ Checks that the passed in path exists. :param file_ext: Optional extension to check. :param path: The path to check. :param path_is_file: The path is a file when True or a directory when False. :return: True if the path exists, False otherwise. """ if path_is_file: return os.path.isfile(path) and ( file_ext is None or os.path.basename(path).endswith(file_ext) ) else: return os.path.isdir(path)
def data_req_args(args) ‑> None
-
Wrapper function that calls the data_req. :param args: Args from argparse.
Expand source code
def data_req_args(args) -> None: """ Wrapper function that calls the data_req. :param args: Args from argparse. """ if not check_out_dir(args.out_dir): determine_exit(False) api_type: DataRangeReqType = DataRangeReqType[args.api_type] # Rebuild RedVox config from potentially optional passed in args if args.email is None: log.error( f"The argument 'email' is required, but was not found in the environment or provided." ) determine_exit(False) if args.password is None: log.error( f"The argument 'password' is required, but was not found in the environment or provided." ) determine_exit(False) redvox_config: RedVoxConfig = RedVoxConfig( args.email, args.password, args.protocol, args.host, args.port, args.secret_token, ) determine_exit( data_req.make_data_req( args.out_dir, redvox_config, args.req_start_s, args.req_end_s, args.station_ids, api_type, args.retries, args.timeout, not args.disable_timing_correction ) )
def data_req_report(redvox_config: RedVoxConfig, report_id: str, out_dir: str, retries: int) ‑> bool
-
Uses the built-in cloud based HTTP API to generate a signed URL for report data and then downloads the report data. :param report_id: The full RedVox report id. :param out_dir: The output directory to play the report distribution. :param retries: Number of times to attempt to retry the download on failed attempts.
Expand source code
def data_req_report( redvox_config: RedVoxConfig, report_id: str, out_dir: str, retries: int, ) -> bool: """ Uses the built-in cloud based HTTP API to generate a signed URL for report data and then downloads the report data. :param report_id: The full RedVox report id. :param out_dir: The output directory to play the report distribution. :param retries: Number of times to attempt to retry the download on failed attempts. """ client = cloud_client.CloudClient(redvox_config) resp: Optional[data_api.ReportDataResp] = client.request_report_data(report_id) client.close() if resp: resp.download_fs(out_dir, retries) return True log.error("Response was None") return False
def data_req_report_args(args) ‑> None
-
Wrapper function that calls the data_req_report. :param args: Args from argparse.
Expand source code
def data_req_report_args(args) -> None: """ Wrapper function that calls the data_req_report. :param args: Args from argparse. """ if not check_out_dir(args.out_dir): determine_exit(False) # Rebuild RedVox config from potentially optional passed in args if args.email is None: log.error( f"The argument 'email' is required, but was not found in the environment or provided." ) determine_exit(False) if args.password is None: log.error( f"The argument 'password' is required, but was not found in the environment or provided." ) determine_exit(False) redvox_config: RedVoxConfig = RedVoxConfig( args.email, args.password, args.protocol, args.host, args.port, args.secret_token, ) determine_exit( data_req_report( redvox_config, args.report_id, args.out_dir, args.retries, ) )
def determine_exit(status: bool) ‑> None
-
Determine the exit status and exit the CLI. :param status: True will exit with a status of 0 and False will exit with a status of 1.
Expand source code
def determine_exit(status: bool) -> None: """ Determine the exit status and exit the CLI. :param status: True will exit with a status of 0 and False will exit with a status of 1. """ if status: log.info("Exiting with status = 0") sys.exit(0) log.error("Exiting with status = 1") sys.exit(1)
def gallery(rdvxm_paths: List[str]) ‑> bool
-
Displays a gallery of images from the combined images collected from the given paths. :param rdvxm_paths: Paths to collect images from. :return: True if this completes successfully, False otherwise
Expand source code
def gallery(rdvxm_paths: List[str]) -> bool: """ Displays a gallery of images from the combined images collected from the given paths. :param rdvxm_paths: Paths to collect images from. :return: True if this completes successfully, False otherwise """ # Create a new image sensor to hold images from all packets try: from redvox.api1000.gui.image_viewer import start_gui image: Image = Image.new() # noinspection PyTypeChecker image.set_image_codec(ImageCodec.JPG) packets: List[WrappedRedvoxPacketM] = list( map(WrappedRedvoxPacketM.from_compressed_path, rdvxm_paths) ) for packet in packets: image_sensor: Optional[Image] = packet.get_sensors().get_image() if image_sensor is not None: image.get_timestamps().append_timestamps( image_sensor.get_timestamps().get_timestamps() ) image.append_values(image_sensor.get_samples()) start_gui(image) return True except ImportError: import warnings warnings.warn("GUI dependencies are not installed. Install the 'GUI' extra to enable this functionality.")
def gallery_args(args) ‑> None
-
CLI function for opening image gallery. :param args: Args for passing to gallery function.
Expand source code
def gallery_args(args) -> None: """ CLI function for opening image gallery. :param args: Args for passing to gallery function. """ if not check_files(args.rdvxm_paths): determine_exit(False) determine_exit(gallery(args.rdvxm_paths))
def json_to_rdvxm_args(args) ‑> None
-
Wrapper function that calls the to_rdvxm conversion. :param args: Args from argparse.
Expand source code
def json_to_rdvxm_args(args) -> None: """ Wrapper function that calls the to_rdvxm conversion. :param args: Args from argparse. """ if not check_files(args.json_paths, ".json"): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(conversions.json_to_rdvxm(args.json_paths, args.out_dir))
def json_to_rdvxz_args(args) ‑> None
-
Wrapper function that calls the to_rdvxz conversion. :param args: Args from argparse.
Expand source code
def json_to_rdvxz_args(args) -> None: """ Wrapper function that calls the to_rdvxz conversion. :param args: Args from argparse. """ if not check_files(args.json_paths, ".json"): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(conversions.json_to_rdvxz(args.json_paths, args.out_dir))
def main()
-
Entry point into the CLI.
Expand source code
def main(): """ Entry point into the CLI. """ redvox_config: Optional[RedVoxConfig] = RedVoxConfig.find() parser: argparse.ArgumentParser = argparse.ArgumentParser( "redvox-cli", description="Command line tools for viewing, converting," " and downloading RedVox data.", ) parser.add_argument( "--verbose", "-v", help="Enable verbose logging", action="count", default=0 ) sub_parser = parser.add_subparsers() sub_parser.required = True sub_parser.dest = "command" # Cloud data retrieval cloud_download_parser = sub_parser.add_parser("cloud-download") cloud_download_parser.set_defaults(func=lambda _: cloud_data_retrieval.run_gui()) # Gallery gallery_parser = sub_parser.add_parser("gallery") gallery_parser.add_argument( "rdvxm_paths", help="One or more rdvxm files", nargs="+" ) gallery_parser.set_defaults(func=gallery_args) # rdvxz -> rdvxm rdvxz_to_rdvxm_parser = sub_parser.add_parser( "rdvxz-to-rdvxm", help="Convert rdvxz (API 900) to rdvxm (API 1000/M) files" ) rdvxz_to_rdvxm_parser.add_argument( "rdvxz_paths", help="One or more rdvxz files to convert to json files", nargs="+", ) rdvxz_to_rdvxm_parser.add_argument( "--out-dir", "-o", help="Optional output directory (will use same directory as source files by " "default)", ) rdvxz_to_rdvxm_parser.set_defaults(func=rdvxz_to_rdvxm) # rdvxm -> rdvxz rdvxm_to_rdvxz_parser = sub_parser.add_parser( "rdvxm-to-rdvxz", help="Convert rdvxm (API 1000/M) to rdvxx (API 900) files" ) rdvxm_to_rdvxz_parser.add_argument( "rdvxm_paths", help="One or more rdvxm files to convert to json files", nargs="+", ) rdvxm_to_rdvxz_parser.add_argument( "--out-dir", "-o", help="Optional output directory (will use same directory as source files by " "default)", ) rdvxm_to_rdvxz_parser.set_defaults(func=rdvxm_to_rdvxz) # rdvxz -> json rdvxz_to_json_parser = sub_parser.add_parser( "rdvxz-to-json", help="Convert rdvxz files to json files" ) rdvxz_to_json_parser.add_argument( "rdvxz_paths", help="One or more rdvxz files to convert to json files", nargs="+", ) rdvxz_to_json_parser.add_argument( "--out-dir", "-o", help="Optional output directory (will use same directory as source files by " "default)", ) rdvxz_to_json_parser.set_defaults(func=rdvxz_to_json_args) # rdvxm -> json rdvxm_to_json_parser = sub_parser.add_parser( "rdvxm-to-json", help="Convert rdvxm files to json files" ) rdvxm_to_json_parser.add_argument( "rdvxm_paths", help="One or more rdvxm files to convert to json files", nargs="+", ) rdvxm_to_json_parser.add_argument( "--out-dir", "-o", help="Optional output directory (will use same directory as source files by " "default)", ) rdvxm_to_json_parser.set_defaults(func=rdvxm_to_json_args) # json -> rdvxz json_to_rdvxz_parser = sub_parser.add_parser( "json-to-rdvxz", help="Convert json files to rdvxz files" ) json_to_rdvxz_parser.add_argument( "json_paths", help="One or more json files to convert to rdvxz files", nargs="+" ) json_to_rdvxz_parser.add_argument( "--out-dir", "-o", help="Optional output directory (will use same directory as source files by " "default)", ) json_to_rdvxz_parser.set_defaults(func=json_to_rdvxz_args) # json -> rdvxm json_to_rdvxm_parser = sub_parser.add_parser( "json-to-rdvxm", help="Convert json files to rdvxm files" ) json_to_rdvxm_parser.add_argument( "json_paths", help="One or more json files to convert to rdvxm files", nargs="+" ) json_to_rdvxm_parser.add_argument( "--out-dir", "-o", help="Optional output directory (will use same directory as source files by " "default)", ) json_to_rdvxm_parser.set_defaults(func=json_to_rdvxm_args) # sort unstructured data into structured data sort_unstructured_parser = sub_parser.add_parser( "sort-unstructured", help="Sorts unstructured RedVox files into their structured counterpart", ) sort_unstructured_parser.add_argument( "input_dir", help="Directory containing RedVox files to sort into a structured layout", ) sort_unstructured_parser.add_argument( "--out-dir", "-o", help="Optional output directory (current working directory by default)", ) sort_unstructured_parser.add_argument( "--mv", help="When set, file contents will be moved to the structured layout rather than copied.", action="store_true", ) sort_unstructured_parser.set_defaults(func=sort_unstructured_args) # print rdvxz rdvxz_print_parser = sub_parser.add_parser( "print-z", help="Print contents of rdvxz files to stdout" ) rdvxz_print_parser.add_argument( "rdvxz_paths", help="One or more rdvxz files to print", nargs="+" ) rdvxz_print_parser.set_defaults(func=rdvxz_print_stdout_args) # print rdvxm rdvxm_print_parser = sub_parser.add_parser( "print-m", help="Print contents of rdvxm files to stdout" ) rdvxm_print_parser.add_argument( "rdvxm_paths", help="One or more rdvxm files to print", nargs="+" ) rdvxm_print_parser.set_defaults(func=rdvxm_print_stdout_args) # validation rdvxm_validation_parser = sub_parser.add_parser( "validate-m", help="Validate the structure of API M files" ) rdvxm_validation_parser.add_argument( "rdvxm_paths", help="One or more rdvxm files to print", nargs="+" ) rdvxm_validation_parser.set_defaults(func=validate_rdvxm_args) # data_req data_req_parser = sub_parser.add_parser( "data-req", help="Request bulk RedVox data from RedVox servers" ) data_req_parser.add_argument( "--email", help="redvox.io account email", default=map_or_default(redvox_config, lambda config: config.username, None), ) data_req_parser.add_argument( "--password", help="redvox.io account password", default=map_or_default(redvox_config, lambda config: config.password, None), ) data_req_parser.add_argument( "--out-dir", help="The output directory that RedVox files will be written to (default=.)", default=".", ) data_req_parser.add_argument( "--disable-timing-correction", help="Disables query timing correction", default=False, action="store_true" ) data_req_parser.add_argument( "--retries", help="The number of times the client should retry getting a file on failure " "(default=1)", default=1, choices=set(range(0, 6)), type=int, ) data_req_parser.add_argument( "--host", help="Data server host", default=map_or_default(redvox_config, lambda config: config.host, "redvox.io"), ) data_req_parser.add_argument( "--port", type=int, help="Data server port", default=map_or_default(redvox_config, lambda config: config.port, 8080), ) data_req_parser.add_argument( "--protocol", help="One of either http or https", choices=["https", "http"], default=map_or_default(redvox_config, lambda config: config.protocol, "https"), ) data_req_parser.add_argument( "--secret-token", help="A shared secret token provided by RedVox required for accessing the data " "request service", default=map_or_default(redvox_config, lambda config: config.secret_token, None), ) data_req_parser.add_argument( "--api-type", help="Data API to be retrieved", choices=["API_900", "API_1000", "API_900_1000"], default="API_900_1000", ) data_req_parser.add_argument( "--timeout", help="Read timeout in seconds (default=10 seconds)", type=int, default=10 ) data_req_parser.add_argument( "req_start_s", type=int, help="Data request start as number of seconds since the epoch UTC", ) data_req_parser.add_argument( "req_end_s", type=int, help="Data request end as number of seconds since the epoch UTC", ) data_req_parser.add_argument( "station_ids", nargs="+", help="A list of RedVox ids delimited by a space" ) data_req_parser.set_defaults(func=data_req_args) # data req report data_req_report_parser = sub_parser.add_parser( "data-req-report", help="Request bulk RedVox data from the RedVox servers" ) data_req_report_parser.add_argument( "--out-dir", help="The output directory that RedVox files will be written to (default=.)", default=".", ) data_req_report_parser.add_argument( "--email", help="redvox.io account email", default=map_or_default(redvox_config, lambda config: config.username, None), ) data_req_report_parser.add_argument( "--password", help="redvox.io account password", default=map_or_default(redvox_config, lambda config: config.password, None), ) data_req_report_parser.add_argument( "--retries", help="The number of times the client should retry getting a file on failure " "(default=1)", default=1, choices=set(range(0, 6)), type=int, ) data_req_report_parser.add_argument( "--host", help="Data server host", default=map_or_default(redvox_config, lambda config: config.host, "redvox.io"), ) data_req_report_parser.add_argument( "--port", type=int, help="Data server port", default=map_or_default(redvox_config, lambda config: config.port, 8080), ) data_req_report_parser.add_argument( "--protocol", help="One of either http or https (default https)", choices=["https", "http"], default=map_or_default(redvox_config, lambda config: config.protocol, "https"), ) data_req_report_parser.add_argument( "--secret-token", help="A shared secret token provided by RedVox required for accessing the data " "request service", default=map_or_default(redvox_config, lambda config: config.secret_token, None), ) data_req_report_parser.add_argument( "report_id", type=str, help="The full report id that data is being requested for", ) data_req_report_parser.set_defaults(func=data_req_report_args) # Parse the args args = parser.parse_args() # Setup logging log_levels: Dict[int, str] = {0: "WARN", 1: "INFO", 2: "DEBUG"} log_level: str = ( log_levels[args.verbose] if args.verbose in log_levels else log_levels[0] ) logging.basicConfig( level=log_level, format="[%(levelname)s:%(process)d:%(filename)s:%(module)s:%(funcName)s:%(lineno)d:%(asctime)s]" " %(message)s", ) log.info("Running with args=%s and log_level=%s", str(args), log_level) # Try calling the appropriate handler # pylint: disable=W0703 try: args.func(args) except Exception as e: log.error("Encountered an error: %s", str(e)) sys.exit(1)
def map_or_default(val: Any, apply: Callable[[Any], Any], default: Any) ‑> Any
-
Expand source code
def map_or_default(val: Any, apply: Callable[[Any], Any], default: Any) -> Any: if val is None: return default return apply(val)
def rdvxm_print_stdout_args(args) ‑> None
-
Wrapper function that calls the print to stdout. :param args: Args from argparse.
Expand source code
def rdvxm_print_stdout_args(args) -> None: """ Wrapper function that calls the print to stdout. :param args: Args from argparse. """ if not check_files(args.rdvxm_paths, ".rdvxm"): determine_exit(False) determine_exit(conversions.rdvxm_print_stdout(args.rdvxm_paths))
def rdvxm_to_json_args(args) ‑> None
-
Wrapper function that calls the to_json conversion. :param args: Args from argparse.
Expand source code
def rdvxm_to_json_args(args) -> None: """ Wrapper function that calls the to_json conversion. :param args: Args from argparse. """ if not check_files(args.rdvxm_paths, ".rdvxm"): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(conversions.rdvxm_to_json(args.rdvxm_paths, args.out_dir))
def rdvxm_to_rdvxz(args) ‑> None
-
Convert rdvxm to rdvxz :param args: Args from argparse.
Expand source code
def rdvxm_to_rdvxz(args) -> None: """ Convert rdvxm to rdvxz :param args: Args from argparse. """ if not check_files(args.rdvxm_paths, ".rdvxm"): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(conversions.rdvxm_to_rdvxz(args.rdvxm_paths, args.out_dir))
def rdvxz_print_stdout_args(args) ‑> None
-
Wrapper function that calls the print to stdout. :param args: Args from argparse.
Expand source code
def rdvxz_print_stdout_args(args) -> None: """ Wrapper function that calls the print to stdout. :param args: Args from argparse. """ if not check_files(args.rdvxz_paths, ".rdvxz"): determine_exit(False) determine_exit(conversions.rdvxz_print_stdout(args.rdvxz_paths))
def rdvxz_to_json_args(args) ‑> None
-
Wrapper function that calls the to_json conversion. :param args: Args from argparse.
Expand source code
def rdvxz_to_json_args(args) -> None: """ Wrapper function that calls the to_json conversion. :param args: Args from argparse. """ if not check_files(args.rdvxz_paths, ".rdvxz"): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(conversions.rdvxz_to_json(args.rdvxz_paths, args.out_dir))
def rdvxz_to_rdvxm(args) ‑> None
-
Convert rdvxz to rdvxm :param args: Args from argparse.
Expand source code
def rdvxz_to_rdvxm(args) -> None: """ Convert rdvxz to rdvxm :param args: Args from argparse. """ if not check_files(args.rdvxz_paths, ".rdvxz"): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(conversions.rdvxz_to_rdvxm(args.rdvxz_paths, args.out_dir))
def sort_unstructured(input_dir: str, out_dir: Optional[str] = None, copy: bool = True) ‑> bool
-
Expand source code
def sort_unstructured( input_dir: str, out_dir: Optional[str] = None, copy: bool = True ) -> bool: out_dir = out_dir if out_dir is not None else "." io.sort_unstructured_redvox_data(input_dir, out_dir, copy=copy) return True
def sort_unstructured_args(args) ‑> None
-
Expand source code
def sort_unstructured_args(args) -> None: if not check_out_dir(args.input_dir): determine_exit(False) if not check_out_dir(args.out_dir): determine_exit(False) determine_exit(sort_unstructured(args.input_dir, args.out_dir, not args.mv))
def validate_rdvxm_args(args) ‑> None
-
Validates the args :param args: Args from argparse
Expand source code
def validate_rdvxm_args(args) -> None: """ Validates the args :param args: Args from argparse """ if not check_files(args.rdvxm_paths, ".rdvxm"): determine_exit(False) determine_exit(conversions.validate_rdvxm(args.rdvxm_paths))