Simple calculation using apache spark -
i have javapairrdd
(string, tuple2)
out of join operation. below data detail - [userid, [(name, rating)]]
output: [(user2,[(john,5)]), (user3,[(mac,3), (mac,2)]), (user1,[(phil,3), (phil,4)])]
i want calculate min, max , average each user. not sure transformation/action can me here.
there multiple ways achieve this, easiest use aggregatebykey
method.
here example shows how it
public static void main(string[] args) { sparkconf conf = new sparkconf().setappname("simple application").setmaster("local[4]"); javasparkcontext sc = new javasparkcontext(conf); string[] values = {"user2:john", "user1:phil", "user3:mac"}; string[] ratingvalues = {"user2:5", "user3:3", "user3:2", "user1:3", "user1:4"}; javardd<string> users = sc.parallelize(arrays.aslist(values)); javardd<string> ratings = sc.parallelize(arrays.aslist(ratingvalues)); javapairrdd<string, string> userspair = users.maptopair( new pairfunction<string, string, string>() { public tuple2<string, string> call(string s) throws exception { string[] splits = s.split(":"); return new tuple2<string, string>(splits[0], splits[1]); } } ); javapairrdd<string, integer> ratingspair = ratings.maptopair( new pairfunction<string, string, integer>() { public tuple2<string, integer> call(string s) throws exception { string[] splits = s.split(":"); return new tuple2<string, integer>(splits[0], integer.parseint(splits[1])); } } ); javapairrdd<string, tuple2<string, integer>> joined = userspair.join(ratingspair); javapairrdd<string, tuple4<integer, integer, integer, integer>> aggregate = joined.aggregatebykey(new tuple4<integer, integer, integer, integer>(integer.min_value, integer.max_value, 0, 0), new function2<tuple4<integer, integer, integer, integer>, tuple2<string, integer>, tuple4<integer, integer, integer, integer>>() { public tuple4<integer, integer, integer, integer> call(tuple4<integer, integer, integer, integer> a, tuple2<string, integer> b) throws exception { return new tuple4<integer, integer, integer, integer>(math.max(a._1(), b._2()), math.min(a._2(), b._2()), a._3() + b._2(), a._4() + 1); } }, new function2<tuple4<integer, integer, integer, integer>, tuple4<integer, integer, integer, integer>, tuple4<integer, integer, integer, integer>>() { public tuple4<integer, integer, integer, integer> call(tuple4<integer, integer, integer, integer> a, tuple4<integer, integer, integer, integer> b) throws exception { return new tuple4<integer, integer, integer, integer>(math.max(a._1(), b._1()), math.min(a._2(), b._2()), a._3() + b._3(), a._4() + b._4()); } }); javardd<tuple2<string, tuple3<integer, integer, double>>> aggregatewithmean = aggregate.map( new function<tuple2<string,tuple4<integer,integer,integer,integer>>, tuple2<string, tuple3<integer, integer, double>>>() { public tuple2<string, tuple3<integer, integer, double>> call(tuple2<string, tuple4<integer, integer, integer, integer>> a) throws exception { tuple3<integer, integer, double> mean = new tuple3<integer, integer, double>(a._2()._1(), a._2()._2(), a._2()._3().doublevalue()/a._2()._4()); return new tuple2<string, tuple3<integer, integer, double>>(a._1(), mean); } } ); aggregatewithmean.foreach(new voidfunction<tuple2<string, tuple3<integer, integer, double>>>() { public void call(tuple2<string, tuple3<integer, integer, double>> stringtuple3tuple2) throws exception { system.out.println(stringtuple3tuple2); } }); }
with java api becomes messy because of generic types. i'd recommend use scala api instead. looks nicer :-)
Comments
Post a Comment