Module redvox.common.parallel_utils
Module that contains utilities for working with data in parallel.
Expand source code
"""
Module that contains utilities for working with data in parallel.
"""
from enum import Enum
import multiprocessing
from multiprocessing.pool import Pool
from typing import Callable, Iterator, List, Optional, TypeVar
import numpy
import redvox.settings as settings
T = TypeVar("T")
R = TypeVar("R")
class MappingType(Enum):
ParallelManaged: str = "ParallelManaged"
ParallelUnmanaged: str = "ParallelUnmanaged"
Serial: str = "Serial"
def maybe_parallel_map(pool: Optional[Pool],
map_fn: Callable[[T], R],
iterator: Iterator[T],
condition: Optional[Callable[[], bool]] = None,
chunk_size: int = 64,
usage_out: Optional[List[MappingType]] = None) -> Iterator[R]:
"""
Maps a function over a set of values. This will either be run in parallel or serially depending on the value
of redvox.settings.
:param pool: An optional pool. If a pool is provided, the user is responsible for closing the pool. If the pool
is not provided, one is created by this process and then closed at the end of this process.
:param map_fn: A function that maps each value in the provided iterator.
:param iterator: An iterator of elements to be mapped.
:param condition: An optional condition, that when provided, will be checked and if the condition passes, this
function may run in parallel. This is useful for things like, only run in parallel if more than
n entries are provided.
:param chunk_size: An optional chunk side to pass to parallel maps.
:param usage_out: When provided, this value will be filled with a single value
describing which mapping strategy was used.
:return: A transformed iterator.
"""
def __usage_out(mapping_type: MappingType) -> None:
if usage_out is not None:
usage_out.append(mapping_type)
# If a condition is not provided, then it's always True.
_condition: bool = True if condition is None else condition()
res: R
if settings.is_parallelism_enabled() and _condition:
_pool: Pool = multiprocessing.Pool() if pool is None else pool
for res in _pool.imap(map_fn, iterator, chunksize=chunk_size):
yield res
# If we're managing this pool, close it.
if pool is None:
__usage_out(MappingType.ParallelManaged)
_pool.close()
else:
__usage_out(MappingType.ParallelUnmanaged)
else:
# Run serially
__usage_out(MappingType.Serial)
for res in map(map_fn, iterator):
yield res
def maybe_parallel_smap(pool: Optional[Pool],
map_fn: Callable[[T], R],
iterator: List[Iterator[T]],
condition: Optional[Callable[[], bool]] = None,
chunk_size: int = 64,
usage_out: Optional[List[MappingType]] = None) -> Iterator[R]:
"""
Maps a function over a set of values. This will either be run in parallel or serially depending on the value
of redvox.settings. accepts multiple arguments for the function
:param pool: An optional pool. If a pool is provided, the user is responsible for closing the pool. If the pool
is not provided, one is created by this process and then closed at the end of this process.
:param map_fn: A function that maps each value in the provided iterator.
:param iterator: A list of iterator of elements to be mapped.
:param condition: An optional condition, that when provided, will be checked and if the condition passes, this
function may run in parallel. This is useful for things like, only run in parallel if more than
n entries are provided.
:param chunk_size: An optional chunk side to pass to parallel maps.
:param usage_out: When provided, this value will be filled with a single value
describing which mapping strategy was used.
:return: A transformed iterator.
"""
def __usage_out(mapping_type: MappingType) -> None:
if usage_out is not None:
usage_out.append(mapping_type)
# If a condition is not provided, then it's always True.
_condition: bool = True if condition is None else condition()
res: R
if settings.is_parallelism_enabled() and _condition:
_pool: Pool = multiprocessing.Pool() if pool is None else pool
for res in _pool.starmap(map_fn, iterator, chunksize=chunk_size):
yield res
# If we're managing this pool, close it.
if pool is None:
__usage_out(MappingType.ParallelManaged)
_pool.close()
else:
__usage_out(MappingType.ParallelUnmanaged)
else:
# Run serially
__usage_out(MappingType.Serial)
for res in map(map_fn, *iterator):
yield res
Functions
def maybe_parallel_map(pool: Optional[multiprocessing.pool.Pool], map_fn: Callable[[~T], ~R], iterator: Iterator[~T], condition: Optional[Callable[[], bool]] = None, chunk_size: int = 64, usage_out: Optional[List[MappingType]] = None) ‑> Iterator[~R]
-
Maps a function over a set of values. This will either be run in parallel or serially depending on the value of redvox.settings.
:param pool: An optional pool. If a pool is provided, the user is responsible for closing the pool. If the pool is not provided, one is created by this process and then closed at the end of this process. :param map_fn: A function that maps each value in the provided iterator. :param iterator: An iterator of elements to be mapped. :param condition: An optional condition, that when provided, will be checked and if the condition passes, this function may run in parallel. This is useful for things like, only run in parallel if more than n entries are provided. :param chunk_size: An optional chunk side to pass to parallel maps. :param usage_out: When provided, this value will be filled with a single value describing which mapping strategy was used. :return: A transformed iterator.
Expand source code
def maybe_parallel_map(pool: Optional[Pool], map_fn: Callable[[T], R], iterator: Iterator[T], condition: Optional[Callable[[], bool]] = None, chunk_size: int = 64, usage_out: Optional[List[MappingType]] = None) -> Iterator[R]: """ Maps a function over a set of values. This will either be run in parallel or serially depending on the value of redvox.settings. :param pool: An optional pool. If a pool is provided, the user is responsible for closing the pool. If the pool is not provided, one is created by this process and then closed at the end of this process. :param map_fn: A function that maps each value in the provided iterator. :param iterator: An iterator of elements to be mapped. :param condition: An optional condition, that when provided, will be checked and if the condition passes, this function may run in parallel. This is useful for things like, only run in parallel if more than n entries are provided. :param chunk_size: An optional chunk side to pass to parallel maps. :param usage_out: When provided, this value will be filled with a single value describing which mapping strategy was used. :return: A transformed iterator. """ def __usage_out(mapping_type: MappingType) -> None: if usage_out is not None: usage_out.append(mapping_type) # If a condition is not provided, then it's always True. _condition: bool = True if condition is None else condition() res: R if settings.is_parallelism_enabled() and _condition: _pool: Pool = multiprocessing.Pool() if pool is None else pool for res in _pool.imap(map_fn, iterator, chunksize=chunk_size): yield res # If we're managing this pool, close it. if pool is None: __usage_out(MappingType.ParallelManaged) _pool.close() else: __usage_out(MappingType.ParallelUnmanaged) else: # Run serially __usage_out(MappingType.Serial) for res in map(map_fn, iterator): yield res
def maybe_parallel_smap(pool: Optional[multiprocessing.pool.Pool], map_fn: Callable[[~T], ~R], iterator: List[Iterator[~T]], condition: Optional[Callable[[], bool]] = None, chunk_size: int = 64, usage_out: Optional[List[MappingType]] = None) ‑> Iterator[~R]
-
Maps a function over a set of values. This will either be run in parallel or serially depending on the value of redvox.settings. accepts multiple arguments for the function
:param pool: An optional pool. If a pool is provided, the user is responsible for closing the pool. If the pool is not provided, one is created by this process and then closed at the end of this process. :param map_fn: A function that maps each value in the provided iterator. :param iterator: A list of iterator of elements to be mapped. :param condition: An optional condition, that when provided, will be checked and if the condition passes, this function may run in parallel. This is useful for things like, only run in parallel if more than n entries are provided. :param chunk_size: An optional chunk side to pass to parallel maps. :param usage_out: When provided, this value will be filled with a single value describing which mapping strategy was used. :return: A transformed iterator.
Expand source code
def maybe_parallel_smap(pool: Optional[Pool], map_fn: Callable[[T], R], iterator: List[Iterator[T]], condition: Optional[Callable[[], bool]] = None, chunk_size: int = 64, usage_out: Optional[List[MappingType]] = None) -> Iterator[R]: """ Maps a function over a set of values. This will either be run in parallel or serially depending on the value of redvox.settings. accepts multiple arguments for the function :param pool: An optional pool. If a pool is provided, the user is responsible for closing the pool. If the pool is not provided, one is created by this process and then closed at the end of this process. :param map_fn: A function that maps each value in the provided iterator. :param iterator: A list of iterator of elements to be mapped. :param condition: An optional condition, that when provided, will be checked and if the condition passes, this function may run in parallel. This is useful for things like, only run in parallel if more than n entries are provided. :param chunk_size: An optional chunk side to pass to parallel maps. :param usage_out: When provided, this value will be filled with a single value describing which mapping strategy was used. :return: A transformed iterator. """ def __usage_out(mapping_type: MappingType) -> None: if usage_out is not None: usage_out.append(mapping_type) # If a condition is not provided, then it's always True. _condition: bool = True if condition is None else condition() res: R if settings.is_parallelism_enabled() and _condition: _pool: Pool = multiprocessing.Pool() if pool is None else pool for res in _pool.starmap(map_fn, iterator, chunksize=chunk_size): yield res # If we're managing this pool, close it. if pool is None: __usage_out(MappingType.ParallelManaged) _pool.close() else: __usage_out(MappingType.ParallelUnmanaged) else: # Run serially __usage_out(MappingType.Serial) for res in map(map_fn, *iterator): yield res
Classes
class MappingType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class MappingType(Enum): ParallelManaged: str = "ParallelManaged" ParallelUnmanaged: str = "ParallelUnmanaged" Serial: str = "Serial"
Ancestors
- enum.Enum
Class variables
var ParallelManaged : str
var ParallelUnmanaged : str
var Serial : str