Module redvox.cloud.data_client
This module provides a custom work-stealing thread pool for downloading API M data in parallel.
Expand source code
"""
This module provides a custom work-stealing thread pool for downloading API M data in parallel.
"""
from dataclasses import dataclass
import time
from typing import List, Optional
from multiprocessing import cpu_count, Manager, Process, Queue
import queue
import requests
from redvox.cloud.data_io import download_file
@dataclass
class DownloadResult:
"""
The result of downloading a file.
"""
data_key: str
resp_len: int
skipped: bool = False
def download_process(
input_queue: Queue, result_queue: Queue, out_dir: str, retries: int
) -> None:
"""
A function that runs in a separate process for downloading RedVox data.
:param input_queue: A shared queue containing the list of items to be downloaded.
:param result_queue: A queue used to send results from the process to the caller.
:param out_dir: The directory downloaded data should be stored to.
:param retries: Number of times to retry a failed download.
"""
session: requests.Session = requests.Session()
try:
# While there is still data in the queue, retrieve it.
while True:
url: str = input_queue.get_nowait()
data_key: str
resp_len: int
try:
data_key, resp_len = download_file(url, session, out_dir, retries)
result_queue.put(DownloadResult(data_key, resp_len), True, None)
except FileExistsError:
print(f"File already exists, skipping...")
result_queue.put(DownloadResult("", 0, skipped=True), True, None)
continue
# Thrown when the queue is empty
except queue.Empty:
session.close()
def download_files(
urls: List[str],
out_dir: str,
retries: int,
num_processes: int = cpu_count(),
out_queue: Optional[Queue] = None,
) -> None:
"""
Downloads files in parallel from the provided URLs.
:param out_queue: If provided, send results to this queue instead of printing them
:param urls: URLs to files to retrieve.
:param out_dir: The base output directory where files should be stored.
:param retries: The number of times to retry a failed download.
:param num_processes: Number of processes to create for downloading data.
"""
manager: Manager = Manager()
url_queue: Queue = manager.Queue(len(urls))
result_queue: Queue = manager.Queue(len(urls))
processes: List[Process] = []
# Add all URLs to shared queue
for url in urls:
url_queue.put(url)
# Create the process pool
for _ in range(num_processes):
process: Process = Process(
target=download_process, args=(url_queue, result_queue, out_dir, retries)
)
processes.append(process)
process.start()
# Display download status
i: int = 0
total_bytes: int = 0
start_time = time.monotonic_ns()
while i < len(urls):
res: DownloadResult = result_queue.get(True, None)
if res.skipped:
i += 1
continue
timestamp = time.monotonic_ns()
time_range = (timestamp - start_time) / 1_000_000_000.0
percentage: float = (float(i + 1) / float(len(urls))) * 100.0
remaining: float = ((100.0 / percentage) * time_range) - time_range
total_bytes += res.resp_len
out_str: str = f"\r[{(i + 1):5} / {len(urls):5}] [{percentage:04.1f}%] [{total_bytes:10} bytes] " \
f"[est time remaining {remaining:06.1f}s] {res.data_key:>55}"
if out_queue is None:
print(out_str)
else:
out_queue.put(out_str, False)
i += 1
if out_queue is not None:
out_queue.put("done", False)
# Wait for all processes in pool to finish
for process in processes:
process.join()
Functions
def download_files(urls: List[str], out_dir: str, retries: int, num_processes: int = 12, out_queue: Optional[
>] = None) ‑> None -
Downloads files in parallel from the provided URLs. :param out_queue: If provided, send results to this queue instead of printing them :param urls: URLs to files to retrieve. :param out_dir: The base output directory where files should be stored. :param retries: The number of times to retry a failed download. :param num_processes: Number of processes to create for downloading data.
Expand source code
def download_files( urls: List[str], out_dir: str, retries: int, num_processes: int = cpu_count(), out_queue: Optional[Queue] = None, ) -> None: """ Downloads files in parallel from the provided URLs. :param out_queue: If provided, send results to this queue instead of printing them :param urls: URLs to files to retrieve. :param out_dir: The base output directory where files should be stored. :param retries: The number of times to retry a failed download. :param num_processes: Number of processes to create for downloading data. """ manager: Manager = Manager() url_queue: Queue = manager.Queue(len(urls)) result_queue: Queue = manager.Queue(len(urls)) processes: List[Process] = [] # Add all URLs to shared queue for url in urls: url_queue.put(url) # Create the process pool for _ in range(num_processes): process: Process = Process( target=download_process, args=(url_queue, result_queue, out_dir, retries) ) processes.append(process) process.start() # Display download status i: int = 0 total_bytes: int = 0 start_time = time.monotonic_ns() while i < len(urls): res: DownloadResult = result_queue.get(True, None) if res.skipped: i += 1 continue timestamp = time.monotonic_ns() time_range = (timestamp - start_time) / 1_000_000_000.0 percentage: float = (float(i + 1) / float(len(urls))) * 100.0 remaining: float = ((100.0 / percentage) * time_range) - time_range total_bytes += res.resp_len out_str: str = f"\r[{(i + 1):5} / {len(urls):5}] [{percentage:04.1f}%] [{total_bytes:10} bytes] " \ f"[est time remaining {remaining:06.1f}s] {res.data_key:>55}" if out_queue is None: print(out_str) else: out_queue.put(out_str, False) i += 1 if out_queue is not None: out_queue.put("done", False) # Wait for all processes in pool to finish for process in processes: process.join()
def download_process(input_queue:
>, result_queue: >, out_dir: str, retries: int) ‑> None -
A function that runs in a separate process for downloading RedVox data. :param input_queue: A shared queue containing the list of items to be downloaded. :param result_queue: A queue used to send results from the process to the caller. :param out_dir: The directory downloaded data should be stored to. :param retries: Number of times to retry a failed download.
Expand source code
def download_process( input_queue: Queue, result_queue: Queue, out_dir: str, retries: int ) -> None: """ A function that runs in a separate process for downloading RedVox data. :param input_queue: A shared queue containing the list of items to be downloaded. :param result_queue: A queue used to send results from the process to the caller. :param out_dir: The directory downloaded data should be stored to. :param retries: Number of times to retry a failed download. """ session: requests.Session = requests.Session() try: # While there is still data in the queue, retrieve it. while True: url: str = input_queue.get_nowait() data_key: str resp_len: int try: data_key, resp_len = download_file(url, session, out_dir, retries) result_queue.put(DownloadResult(data_key, resp_len), True, None) except FileExistsError: print(f"File already exists, skipping...") result_queue.put(DownloadResult("", 0, skipped=True), True, None) continue # Thrown when the queue is empty except queue.Empty: session.close()
Classes
class DownloadResult (data_key: str, resp_len: int, skipped: bool = False)
-
The result of downloading a file.
Expand source code
@dataclass class DownloadResult: """ The result of downloading a file. """ data_key: str resp_len: int skipped: bool = False
Class variables
var data_key : str
var resp_len : int
var skipped : bool