Source code for pyace.paralleldataexecutor

import logging

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

import warnings
from concurrent.futures import Future
from functools import partial
import multiprocessing
from threading import Lock

from concurrent.futures import Executor
from concurrent.futures import ProcessPoolExecutor

import pandas as pd

# very random name to avoid global namespace collision
LOCAL_DATAFRAME_VARIALBE_NAME = "dh3ah3k5sk3n9v62m3l2"

import __main__



# https://stackoverflow.com/questions/10434593/dummyexecutor-for-pythons-futures
[docs]class SerialExecutor(Executor):
[docs] def __init__(self, initializer=None, initargs=None): self.initializer = initializer self.initargs = initargs self._shutdown = False self._shutdownLock = Lock() self._initialize()
def _initialize(self): self.initializer(*self.initargs)
[docs] def submit(self, fn, *args, **kwargs): with self._shutdownLock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = Future() try: result = fn(*args, **kwargs) except BaseException as e: f.set_exception(e) else: f.set_result(result) return f
[docs] def shutdown(self, wait=True): with self._shutdownLock: self._shutdown = True
[docs]def split_by_batches(l, batch_size): return [l[i:i + batch_size] for i in range(0, len(l), batch_size)]
[docs]def local_dataframe_initializer(df): setattr(__main__, LOCAL_DATAFRAME_VARIALBE_NAME, df)
[docs]def batch_function_wrapper(batch_indices, pure_row_func): # import __main__ _local_df = getattr(__main__, LOCAL_DATAFRAME_VARIALBE_NAME) batch_df = _local_df.loc[batch_indices] if isinstance(batch_df, pd.Series): return batch_df.map(pure_row_func) elif isinstance(batch_df, pd.DataFrame): return batch_df.apply(pure_row_func, axis=1)
[docs]class ParallelDataExecutor: MODE_PROCESS = "process" MODE_MPI = "mpi" MODE_SERIAL = "serial"
[docs] def __init__(self, distributed_data: pd.DataFrame, parallel_mode: str = MODE_SERIAL, index_col: str = None, batch_size: int = None, n_workers: int = None): self._distributed_df = distributed_data if index_col is not None: self._distributed_df.set_index(index_col) self._data_index = self._distributed_df.index if len(set(self._data_index)) < len(self._data_index): raise ValueError("Non-unique indices found in dataset") self._data_indices_batches = None self.parallel_mode = parallel_mode self._n_workers = n_workers if self.parallel_mode != ParallelDataExecutor.MODE_PROCESS: warnings.warn("ParallelDataExecutor: n_workers ({}) would be ignored for parallel_mode={}".format(n_workers, parallel_mode)) self._executor = None if batch_size is None: self.batch_size = len(distributed_data) else: self.batch_size = batch_size self._actual_batch_size = None
def _split_data_indices_batches(self): self._data_indices_batches = split_by_batches(self._data_index, self._actual_batch_size)
[docs] def start_executor(self): if self._executor is not None: self.stop_executor() if self.parallel_mode == ParallelDataExecutor.MODE_MPI: try: from mpi4py import MPI from mpi4py.futures import MPIPoolExecutor except ImportError: raise ImportError( 'Could not import mpi4py to run in "mpi" parallel mode. Please install mpi4py or use another mode') comm = MPI.COMM_WORLD size = comm.Get_size() self._n_workers = max(1, size - 1) self._executor = MPIPoolExecutor(globals={LOCAL_DATAFRAME_VARIALBE_NAME: self._distributed_df}) elif self.parallel_mode == ParallelDataExecutor.MODE_PROCESS: if self._n_workers is None: self._n_workers = multiprocessing.cpu_count() self._executor = ProcessPoolExecutor(max_workers=self._n_workers, initializer=local_dataframe_initializer, initargs=(self._distributed_df,)) elif self.parallel_mode == ParallelDataExecutor.MODE_SERIAL: self._n_workers = 1 self._executor = SerialExecutor(initializer=local_dataframe_initializer, initargs=(self._distributed_df,)) else: raise ValueError("Unrecognized parallel_mode='{}'".format(self.parallel_mode)) self._actual_batch_size = max(1, min(self.batch_size, len(self._data_index) // self._n_workers)) log.info("Start executor in '{}' mode with {} workers, actual batch size is {}" \ .format(self.parallel_mode, self._n_workers, self._actual_batch_size)) self._split_data_indices_batches()
[docs] def stop_executor(self): if self._executor is not None: self._executor.shutdown() self._executor = None
[docs] def map(self, pure_row_func=None, wrapped_pure_func=None): if self._executor is None: self.start_executor() res_series = pd.Series(index=self._data_index, dtype=object) if wrapped_pure_func is None: wrapped_pure_func = partial(batch_function_wrapper, pure_row_func=pure_row_func) for ind_range, val in zip(self._data_indices_batches, self._executor.map(wrapped_pure_func, self._data_indices_batches)): res_series[ind_range] = val return res_series