erlang - Riak mapReduce fails with > 15 records -


problem

i've been learning riak , ran issue mapreduce. mapreduce functions work fine when there's 15 records, after that, throws stack trace error. i'm new riak , erlang, i'm unsure whether it's code or it's riak. advice on how debug or problem appreciated!

code

map

 -module(identity_map).   -export([identity/3]).   identity(values, _, _) ->    jsondata = riak_object:get_values(values),    {struct, objects} = mochijson2:decode(jsondata),    [objects]. 

reduce

 -module(stocks_summary).   -export([average_high/2]).   % returns values map phase  average_high(values, _) ->    total = lists:foldl(      fun(record, accum) ->        high = proplists:get_value(<<"high_d">>, record),        accum + high      end,    0,    values),    [total / length(values)]. 

invocation

curl -xpost http://192.168.0.126:8098/mapred \ -h 'content-type: application/json' \ -d '{"inputs": ["stocks","goog"], "query": [{"map":{"language":"erlang","module":"identity_map","function":"identity"}}, {"reduce":{"language":"erlang","module":"stocks_summary","function":"average_high"}} ]}' 

data

date,open,high,low,close,volume,adj close 2010-05-05,500.98,515.72,500.47,509.76,4566900,509.76 2010-05-04,526.52,526.74,504.21,506.37,6076300,506.37 2010-05-03,526.50,532.92,525.08,530.60,1857800,530.60 2010-04-30,531.13,537.68,525.44,525.70,2435400,525.70 2010-04-29,533.37,536.50,526.67,532.00,3058900,532.00 2010-04-28,532.10,534.83,521.03,529.19,3406100,529.19 2010-04-27,528.95,538.33,527.23,529.06,3844700,529.06 2010-04-26,544.97,544.99,529.21,531.64,4368800,531.64 2010-04-23,547.25,549.32,542.27,544.99,2089400,544.99 2010-04-22,552.00,552.50,543.35,547.06,3280700,547.06 2010-04-21,556.46,560.25,552.16,554.30,2391500,554.30 2010-04-20,554.17,559.66,551.06,555.04,2977400,555.04 2010-04-19,548.75,553.99,545.00,550.10,3894000,550.10 2010-04-16,563.00,568.81,549.63,550.15,12235500,550.15 2010-04-15,592.17,597.84,588.29,595.30,6716700,595.30 2010-04-14,590.06,592.34,584.01,589.00,3402700,589.00 2010-04-13,572.53,588.88,571.13,586.77,3845200,586.77 2010-04-12,567.35,574.00,566.22,572.73,2352400,572.73 2010-04-09,567.49,568.77,564.00,566.22,2056600,566.22 2010-04-08,563.32,569.85,560.05,567.49,1947500,567.49 2010-04-07,567.30,568.75,561.86,563.54,2581000,563.54 

sample value

{     "date_s": "2010-04-07",     "open_d": 567.3,     "high_d": 568.75,     "low_d": 561.86,     "close_d": 563.54,     "volume_i": 2581000,     "adjclose_d": 563.54 } 

stacktrace

{     "phase": 1,     "error": "{function_clause,[{proplists,get_value,[<<\"high_d\">>,555.621,undefined],[{file,\"proplists.erl\"},{line,225}]},{stocks_summary,'-average_high/2-fun-0-',2,[{file,\"stocks_summary.erl\"},{line,9}]},{lists,foldl,3,[{file,\"lists.erl\"},{line,1248}]},{stocks_summary,average_high,2,[{file,\"stocks_summary.erl\"},{line,7}]},{riak_kv_w_reduce,reduce,3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,207}]},{riak_kv_w_reduce,done,1,[{file,\"src/riak_kv_w_reduce.erl\"},{line,170}]},{riak_pipe_vnode_worker,wait_for_input,...},...]}",     "input": null,     "type": null,     "stack": null } 

hypothesis

from stack trace, looks reduce being applied value opposed list of tuples, makes problem strange when put 10-15 records bucket, works fine.

the problem reduce phase. map phase spread around cluster , executed many vnodes forward map phase result node running reduce. since these not expect arrive simultaneously, reduce phase function may run multiple times, input on subsequent runs being concatenation of previous reduce result , newly arrived map phase results.

this means on second run, reduce function receives previous avarage plain number first element in list, rest of list being json objects/proplists expect.

to fix this, have reduce function return proplist containing current average , number of values seen far. below 1 possibility, example return final result of mapreduce object/proplist instead of number.

average_high(values, _) ->    {count,total} = lists:foldl(      fun(record, {cnt,tot}) ->        case proplists:get_value(<<"average">>,record,undefined) of          undefined ->            high = proplists:get_value(<<"high_d">>, record),            {cnt+1,tot + high};          ave ->            c = proplists:get_value(<<"count">>, record, 1),            {cnt + c, tot + (ave * c)}        end      end,    {0,0},    values),    [[{<<"average">>,total/count},{<<"count">>,count}]]. 

the phase function docs specify:

a reduce function should produce list of values, must true function commutative, associative, , idempotent. is, if input list [a,b,c,d] valid given f, of following must produce same result:

f([a,b,c,d])    f([a,d] ++ f([c,b]))   f([f([a]),f([c]),f([b]),f([d])]) 

Comments

Popular posts from this blog

Magento/PHP - Get phones on all members in a customer group -

php - Bypass Geo Redirect for specific directories -

php - .htaccess mod_rewrite for dynamic url which has domain names -