python - Efficiently applying a function to a grouped pandas DataFrame in parallel -


i need apply function groups of large dataframe (of mixed data types) , take advantage of multiple cores.

i can create iterator groups , use multiprocessing module, not efficient because every group , results of function must pickled messaging between processes.

is there way avoid pickling or avoid copying of dataframe completely? looks shared memory functions of multiprocessing modules limited numpy arrays. there other options?

from comments above, seems planned pandas time (there's interesting-looking rosetta project noticed).

however, until every parallel functionality incorporated pandas, noticed it's easy write efficient & non-memory-copying parallel augmentations pandas directly using cython + openmp , c++.

here's short example of writing parallel groupby-sum, use this:

import pandas pd import para_group_demo  df = pd.dataframe({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b) 

and output is:

     sum key      0      6 1      11 2      4 

note doubtlessly, simple example's functionality part of pandas. things, however, more natural parallelize in c++ time, , it's important aware of how easy combine pandas.


to this, wrote simple single-source-file extension code follows.

it starts imports , type definitions

from libc.stdint cimport int64_t, uint64_t libcpp.vector cimport vector libcpp.unordered_map cimport unordered_map  cimport cython cython.operator cimport dereference deref, preincrement inc cython.parallel import prange  import pandas pd  ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t 

the c++ unordered_map type summing single thread, , vector summing threads.

now function sum. starts off typed memory views fast access:

def sum(crit, vals):     cdef int64_t[:] crit_view = crit.values     cdef int64_t[:] vals_view = vals.values 

the function continues dividing semi-equally threads (here hardcoded 4), , having each thread sum entries in range:

    cdef uint64_t num_threads = 4     cdef uint64_t l = len(crit)     cdef uint64_t s = l / num_threads + 1     cdef uint64_t i, j, e     cdef counts_vec_t counts     counts = counts_vec_t(num_threads)     counts.resize(num_threads)     cython.boundscheck(false):         in prange(num_threads, nogil=true):              j = * s             e = j + s             if e > l:                 e = l             while j < e:                 counts[i][crit_view[j]] += vals_view[j]                 inc(j) 

when threads have completed, function merges results (from different ranges) single unordered_map:

    cdef counts_t total     cdef counts_it_t it, e_it     in range(num_threads):         = counts[i].begin()         e_it = counts[i].end()         while != e_it:             total[deref(it).first] += deref(it).second             inc(it)         

all that's left create dataframe , return results:

    key, sum_ = [], []     = total.begin()     e_it = total.end()     while != e_it:         key.append(deref(it).first)         sum_.append(deref(it).second)         inc(it)      df = pd.dataframe({'key': key, 'sum': sum_})     df.set_index('key', inplace=true)     return df 

Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - .htaccess mod_rewrite for dynamic url which has domain names -

Website Login Issue developed in magento -