python - Efficiently applying a function to a grouped pandas DataFrame in parallel -
i need apply function groups of large dataframe
(of mixed data types) , take advantage of multiple cores.
i can create iterator groups , use multiprocessing module, not efficient because every group , results of function must pickled messaging between processes.
is there way avoid pickling or avoid copying of dataframe
completely? looks shared memory functions of multiprocessing modules limited numpy
arrays. there other options?
from comments above, seems planned pandas
time (there's interesting-looking rosetta
project noticed).
however, until every parallel functionality incorporated pandas
, noticed it's easy write efficient & non-memory-copying parallel augmentations pandas
directly using cython
+ openmp , c++.
here's short example of writing parallel groupby-sum, use this:
import pandas pd import para_group_demo df = pd.dataframe({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b)
and output is:
sum key 0 6 1 11 2 4
note doubtlessly, simple example's functionality part of pandas
. things, however, more natural parallelize in c++ time, , it's important aware of how easy combine pandas
.
to this, wrote simple single-source-file extension code follows.
it starts imports , type definitions
from libc.stdint cimport int64_t, uint64_t libcpp.vector cimport vector libcpp.unordered_map cimport unordered_map cimport cython cython.operator cimport dereference deref, preincrement inc cython.parallel import prange import pandas pd ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t
the c++ unordered_map
type summing single thread, , vector
summing threads.
now function sum
. starts off typed memory views fast access:
def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
the function continues dividing semi-equally threads (here hardcoded 4), , having each thread sum entries in range:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) cython.boundscheck(false): in prange(num_threads, nogil=true): j = * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
when threads have completed, function merges results (from different ranges) single unordered_map
:
cdef counts_t total cdef counts_it_t it, e_it in range(num_threads): = counts[i].begin() e_it = counts[i].end() while != e_it: total[deref(it).first] += deref(it).second inc(it)
all that's left create dataframe
, return results:
key, sum_ = [], [] = total.begin() e_it = total.end() while != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.dataframe({'key': key, 'sum': sum_}) df.set_index('key', inplace=true) return df
Comments
Post a Comment