erlang - Riak mapReduce fails with > 15 records -
problem
i've been learning riak , ran issue mapreduce. mapreduce functions work fine when there's 15 records, after that, throws stack trace error. i'm new riak , erlang, i'm unsure whether it's code or it's riak. advice on how debug or problem appreciated!
code
map
-module(identity_map). -export([identity/3]). identity(values, _, _) -> jsondata = riak_object:get_values(values), {struct, objects} = mochijson2:decode(jsondata), [objects].
reduce
-module(stocks_summary). -export([average_high/2]). % returns values map phase average_high(values, _) -> total = lists:foldl( fun(record, accum) -> high = proplists:get_value(<<"high_d">>, record), accum + high end, 0, values), [total / length(values)].
invocation
curl -xpost http://192.168.0.126:8098/mapred \ -h 'content-type: application/json' \ -d '{"inputs": ["stocks","goog"], "query": [{"map":{"language":"erlang","module":"identity_map","function":"identity"}}, {"reduce":{"language":"erlang","module":"stocks_summary","function":"average_high"}} ]}'
data
date,open,high,low,close,volume,adj close 2010-05-05,500.98,515.72,500.47,509.76,4566900,509.76 2010-05-04,526.52,526.74,504.21,506.37,6076300,506.37 2010-05-03,526.50,532.92,525.08,530.60,1857800,530.60 2010-04-30,531.13,537.68,525.44,525.70,2435400,525.70 2010-04-29,533.37,536.50,526.67,532.00,3058900,532.00 2010-04-28,532.10,534.83,521.03,529.19,3406100,529.19 2010-04-27,528.95,538.33,527.23,529.06,3844700,529.06 2010-04-26,544.97,544.99,529.21,531.64,4368800,531.64 2010-04-23,547.25,549.32,542.27,544.99,2089400,544.99 2010-04-22,552.00,552.50,543.35,547.06,3280700,547.06 2010-04-21,556.46,560.25,552.16,554.30,2391500,554.30 2010-04-20,554.17,559.66,551.06,555.04,2977400,555.04 2010-04-19,548.75,553.99,545.00,550.10,3894000,550.10 2010-04-16,563.00,568.81,549.63,550.15,12235500,550.15 2010-04-15,592.17,597.84,588.29,595.30,6716700,595.30 2010-04-14,590.06,592.34,584.01,589.00,3402700,589.00 2010-04-13,572.53,588.88,571.13,586.77,3845200,586.77 2010-04-12,567.35,574.00,566.22,572.73,2352400,572.73 2010-04-09,567.49,568.77,564.00,566.22,2056600,566.22 2010-04-08,563.32,569.85,560.05,567.49,1947500,567.49 2010-04-07,567.30,568.75,561.86,563.54,2581000,563.54
sample value
{ "date_s": "2010-04-07", "open_d": 567.3, "high_d": 568.75, "low_d": 561.86, "close_d": 563.54, "volume_i": 2581000, "adjclose_d": 563.54 }
stacktrace
{ "phase": 1, "error": "{function_clause,[{proplists,get_value,[<<\"high_d\">>,555.621,undefined],[{file,\"proplists.erl\"},{line,225}]},{stocks_summary,'-average_high/2-fun-0-',2,[{file,\"stocks_summary.erl\"},{line,9}]},{lists,foldl,3,[{file,\"lists.erl\"},{line,1248}]},{stocks_summary,average_high,2,[{file,\"stocks_summary.erl\"},{line,7}]},{riak_kv_w_reduce,reduce,3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,207}]},{riak_kv_w_reduce,done,1,[{file,\"src/riak_kv_w_reduce.erl\"},{line,170}]},{riak_pipe_vnode_worker,wait_for_input,...},...]}", "input": null, "type": null, "stack": null }
hypothesis
from stack trace, looks reduce being applied value opposed list of tuples, makes problem strange when put 10-15 records bucket, works fine.
the problem reduce phase. map phase spread around cluster , executed many vnodes forward map phase result node running reduce. since these not expect arrive simultaneously, reduce phase function may run multiple times, input on subsequent runs being concatenation of previous reduce result , newly arrived map phase results.
this means on second run, reduce function receives previous avarage plain number first element in list, rest of list being json objects/proplists expect.
to fix this, have reduce function return proplist containing current average , number of values seen far. below 1 possibility, example return final result of mapreduce object/proplist instead of number.
average_high(values, _) -> {count,total} = lists:foldl( fun(record, {cnt,tot}) -> case proplists:get_value(<<"average">>,record,undefined) of undefined -> high = proplists:get_value(<<"high_d">>, record), {cnt+1,tot + high}; ave -> c = proplists:get_value(<<"count">>, record, 1), {cnt + c, tot + (ave * c)} end end, {0,0}, values), [[{<<"average">>,total/count},{<<"count">>,count}]].
the phase function docs specify:
a reduce function should produce list of values, must true function commutative, associative, , idempotent. is, if input list [a,b,c,d] valid given f, of following must produce same result:
f([a,b,c,d]) f([a,d] ++ f([c,b])) f([f([a]),f([c]),f([b]),f([d])])
Comments
Post a Comment