Source code for mtalg.alg.elementwise_algebra

import concurrent.futures
import numpy as np
import mtalg.core.threads
from typing import Optional, Union
from numbers import Number

argmax = lambda iterable: max(enumerate(iterable), key=lambda x: x[1])[0]


def _add_inplace(x, y): x += y


def _sub_inplace(x, y): x -= y


def _mul_inplace(x, y): x *= y


def _div_inplace(x, y): x /= y


def _pow_inplace(x, y): x **= y


[docs] def add(a: Union[np.ndarray, Number], b: Union[np.ndarray, Number], num_threads: Optional[int] = None, direction: str = 'left'): """Add multithreaded. Modifies a or b in-place depending on the direction. Args: a: Numpy array or scalar. b: Numpy array or scalar. num_threads: Number of threads to be used, overrides threads as set by :func:`~mtalg.set_num_threads`. direction: 'left' or 'right' to decide if a or b is modified. Examples: Add b to a; a is modified in-place. >>> a = mtalg.random.standard_normal(size=(10_000, 5_000)) >>> b = mtalg.random.uniform(size=(10_000, 5_000), low=0, high=10) >>> mtalg.add(a, b, direction='left') Add a to b; b is modified in-place. >>> mtalg.add(a, b, direction='right') Add a to b using 4 threads. >>> mtalg.add(a, b, num_threads=4, direction='right') """ __multithreaded_opr_direction(a, b, _add_inplace, num_threads, direction=direction)
[docs] def sub(a: Union[np.ndarray, Number], b: Union[np.ndarray, Number], num_threads: Optional[int] = None, direction: str = 'left'): """Subtract multithreaded. Modifies a or b in-place depending on the direction. Args: a: Numpy array or scalar. b: Numpy array or scalar. num_threads: Number of threads to be used, overrides threads as set by :func:`~mtalg.set_num_threads`. direction: 'left' or 'right' to decide if a or b is modified. Examples: Subtract b from a; a is modified in-place. >>> a = mtalg.random.standard_normal(size=(10_000, 5_000)) >>> b = mtalg.random.uniform(size=(10_000, 5_000), low=0, high=10) >>> mtalg.sub(a, b, direction='left') Subtract a from b; b is modified in-place. >>> mtalg.sub(a, b, direction='right') Subtract a from b using 4 threads. >>> mtalg.sub(a, b, num_threads=4, direction='right') """ __multithreaded_opr_direction(a, b, _sub_inplace, num_threads, direction=direction)
[docs] def mul(a: Union[np.ndarray, Number], b: Union[np.ndarray, Number], num_threads: Optional[int] = None, direction: str = 'left'): """Multiply multithreaded. Modifies a or b in-place depending on the direction. Args: a: Numpy array or scalar. b: Numpy array or scalar. num_threads: Number of threads to be used, overrides threads as set by :func:`~mtalg.set_num_threads`. direction: 'left' or 'right' to decide if a or b is modified. Examples: Multiply a by b; a is modified in-place. >>> a = mtalg.random.standard_normal(size=(10_000, 5_000)) >>> b = mtalg.random.uniform(size=(10_000, 5_000), low=0, high=10) >>> mtalg.mul(a, b, direction='left') Multiply b by a; b is modified in-place. >>> mtalg.mul(a, b, direction='right') Multiply b by a using 4 threads. >>> mtalg.mul(a, b, num_threads=4, direction='right') """ __multithreaded_opr_direction(a, b, _mul_inplace, num_threads, direction=direction)
[docs] def div(a: Union[np.ndarray, Number], b: Union[np.ndarray, Number], num_threads: Optional[int] = None, direction: str = 'left'): """Divide multithreaded. Modifies a or b in-place depending on the direction. Args: a: Numpy array or scalar. b: Numpy array or scalar. num_threads: Number of threads to be used, overrides threads as set by :func:`~mtalg.set_num_threads`. direction: 'left' or 'right' to decide if a or b is modified. Examples: Divide a by b; a is modified in-place. >>> a = mtalg.random.standard_normal(size=(10_000, 5_000)) >>> b = mtalg.random.uniform(size=(10_000, 5_000), low=0, high=10) >>> mtalg.div(a, b, direction='left') Divide b by a; b is modified in-place. >>> mtalg.div(a, b, direction='right') Divide b by a using 4 threads. >>> mtalg.div(a, b, num_threads=4, direction='right') """ __multithreaded_opr_direction(a, b, _div_inplace, num_threads, direction=direction)
[docs] def pow(a: Union[np.ndarray, Number], b: Union[np.ndarray, Number], num_threads: Optional[int] = None, direction: str = 'left'): """Raise to power multithreaded. Modifies a or b in-place depending on the direction. Args: a: Numpy array or scalar. b: Numpy array or scalar. num_threads: Number of threads to be used, overrides threads as set by :func:`~mtalg.set_num_threads`. direction: 'left' or 'right' to decide if a or b is modified. Examples: Raise a to the bth power; a is modified in-place. >>> a = mtalg.random.standard_normal(size=(10_000, 5_000)) >>> b = mtalg.random.uniform(size=(10_000, 5_000), low=0, high=10) >>> mtalg.pow(a, b, direction='left') Raise b to the ath power; b is modified in-place. >>> mtalg.pow(a, b, direction='right') Raise b to the ath power b using 4 threads. >>> mtalg.pow(a, b, num_threads=4, direction='right') """ __multithreaded_opr_direction(a, b, _pow_inplace, num_threads, direction=direction)
def __multithreaded_opr_direction(a, b, opr, num_threads, direction='left'): if direction == 'left': __multithreaded_opr(a, b, opr, num_threads=num_threads) elif direction == 'right': __multithreaded_opr(b, a, opr, num_threads=num_threads) else: raise ValueError(f"Invalid direction {direction}. Must take value either 'left' or 'right'. ") def __multithreaded_opr(a, b, opr, num_threads: int = None): """Modifies a in-place. Beats numpy from around 1e7 operations onwards. Args: a (numpy.array): Left array to be summed. Modified in place. b (numpy.array): Right array to be summed. num_threads (int or None): Number of threads. """ a_scalar = isinstance(a, (int, float, complex, np.integer, np.floating)) b_scalar = isinstance(b, (int, float, complex, np.integer, np.floating)) if not a_scalar and not b_scalar: scalar = False assert a.shape == b.shape, 'Shapes of both arrays must be the same' else: scalar = True if a_scalar: raise ValueError('Cannot modify scalar in-place') shape = a.shape shp_max = argmax(shape) num_threads = num_threads or mtalg.core.threads._global_num_threads assert num_threads > 0 and isinstance(num_threads, int), \ f'Number of threads must be an integer > 0, found: {num_threads}' steps = [(t * (shape[shp_max] // num_threads), (t + 1) * (shape[shp_max] // num_threads)) if t < (num_threads - 1) else (t * (shape[shp_max] // num_threads), shape[shp_max]) for t in range(num_threads)] if scalar: def _fill(first, last): opr(a[(slice(None),) * shp_max + (slice(first, last),)], b) else: def _fill(first, last): opr(a[(slice(None),) * shp_max + (slice(first, last),)], b[(slice(None),) * shp_max + (slice(first, last),)]) with concurrent.futures.ThreadPoolExecutor(num_threads) as executor: futures = [executor.submit(_fill, steps[i][0], steps[i][1]) for i in range(num_threads)] for fut in concurrent.futures.as_completed(futures): fut.result() def std(a: np.ndarray) -> Number: """Numba version of np.std. Args: a: np.ndarray. Returns: standard deviation. """ try: from numba import njit except ModuleNotFoundError: raise ModuleNotFoundError("Optional dependency missing: 'numba'.\n" "Install via pip as `pip install numba` or via conda as `conda install numba`.") @njit(parallel=True) def std_numba(x): return np.std(x) return std_numba(a)