Simple calculation using apache spark -


i have javapairrdd (string, tuple2) out of join operation. below data detail - [userid, [(name, rating)]]

output: [(user2,[(john,5)]), (user3,[(mac,3), (mac,2)]), (user1,[(phil,3), (phil,4)])] 

i want calculate min, max , average each user. not sure transformation/action can me here.

there multiple ways achieve this, easiest use aggregatebykey method.

here example shows how it

public static void main(string[] args) {     sparkconf conf = new sparkconf().setappname("simple application").setmaster("local[4]");     javasparkcontext sc = new javasparkcontext(conf);      string[] values = {"user2:john", "user1:phil", "user3:mac"};     string[] ratingvalues = {"user2:5", "user3:3", "user3:2", "user1:3", "user1:4"};      javardd<string> users = sc.parallelize(arrays.aslist(values));     javardd<string> ratings = sc.parallelize(arrays.aslist(ratingvalues));      javapairrdd<string, string> userspair = users.maptopair(             new pairfunction<string, string, string>() {                 public tuple2<string, string> call(string s) throws exception {                     string[] splits = s.split(":");                     return new tuple2<string, string>(splits[0], splits[1]);                 }             }     );      javapairrdd<string, integer> ratingspair = ratings.maptopair(             new pairfunction<string, string, integer>() {                 public tuple2<string, integer> call(string s) throws exception {                     string[] splits = s.split(":");                     return new tuple2<string, integer>(splits[0], integer.parseint(splits[1]));                 }             }     );      javapairrdd<string, tuple2<string, integer>> joined = userspair.join(ratingspair);      javapairrdd<string, tuple4<integer, integer, integer, integer>> aggregate = joined.aggregatebykey(new tuple4<integer, integer, integer, integer>(integer.min_value, integer.max_value, 0, 0),             new function2<tuple4<integer, integer, integer, integer>, tuple2<string, integer>, tuple4<integer, integer, integer, integer>>() {         public tuple4<integer, integer, integer, integer> call(tuple4<integer, integer, integer, integer> a, tuple2<string, integer> b) throws exception {             return new tuple4<integer, integer, integer, integer>(math.max(a._1(), b._2()), math.min(a._2(), b._2()), a._3() + b._2(), a._4() + 1);         }     }, new function2<tuple4<integer, integer, integer, integer>, tuple4<integer, integer, integer, integer>, tuple4<integer, integer, integer, integer>>() {         public tuple4<integer, integer, integer, integer> call(tuple4<integer, integer, integer, integer> a, tuple4<integer, integer, integer, integer> b) throws exception {             return new tuple4<integer, integer, integer, integer>(math.max(a._1(), b._1()), math.min(a._2(), b._2()), a._3() + b._3(), a._4() + b._4());         }     });      javardd<tuple2<string, tuple3<integer, integer, double>>> aggregatewithmean = aggregate.map(             new function<tuple2<string,tuple4<integer,integer,integer,integer>>, tuple2<string, tuple3<integer, integer, double>>>() {                 public tuple2<string, tuple3<integer, integer, double>> call(tuple2<string, tuple4<integer, integer, integer, integer>> a) throws exception {                     tuple3<integer, integer, double> mean = new tuple3<integer, integer, double>(a._2()._1(), a._2()._2(), a._2()._3().doublevalue()/a._2()._4());                     return new tuple2<string, tuple3<integer, integer, double>>(a._1(), mean);                 }             }     );      aggregatewithmean.foreach(new voidfunction<tuple2<string, tuple3<integer, integer, double>>>() {         public void call(tuple2<string, tuple3<integer, integer, double>> stringtuple3tuple2) throws exception {             system.out.println(stringtuple3tuple2);         }     }); } 

with java api becomes messy because of generic types. i'd recommend use scala api instead. looks nicer :-)


Comments

Popular posts from this blog

Fail to load namespace Spring Security http://www.springframework.org/security/tags -

sql - MySQL query optimization using coalesce -

unity3d - Unity local avoidance in user created world -