scala - Splitting Spark stream by delimiter -


i trying split spark stream based on delimiter , save each of these chunks new file.

each of rdds appear partitioned according delimiter.

i having difficulty in configuring 1 delimiter message per rdd, or, being able save each partition individually new part-000... file .

any appreciated.

 val sparkconf = new sparkconf().setappname("datasink").setmaster("local[8]").set("spark.files.overwrite","false")  val ssc = new streamingcontext(sparkconf, seconds(2))   class routeconsumer extends actor actorhelper consumer {     def endpointuri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q"     def receive = {         case msg: camelmessage =>            val m = msg.withbodyas[string]            store(m.body)      }  }   val dstream = ssc.actorstream[string](props(new routeconsumer()), "sparkreceiveractor")  val splitstream = dstream.flatmap(_.split("msh|^~\\&"))  splitstream.foreachrdd( rdd => rdd.saveastextfile("file:///home/user/spark/data") )   ssc.start()  ssc.awaittermination() 

you can't control part-nnnnn (partition) file gets output, can write different directories. "easiest" way sort of column splitting separate map statements (like select statements), this, assuming you'll have n array elements after splitting:

... val dstream2 = dstream.map(_.split("...")) // above, map dstream2.cache() // important follows, repeated reads of this... val dstreams = new array[dstream[string]](n) (i <- 0 n-1) { dstreams[i] = dstream2.map(array => array[i] /* or similar */) dstreams[i].saveastextfiles(rootdir+"/"+i) } ssc.start() ssc.awaittermination()


Comments

Popular posts from this blog

Fail to load namespace Spring Security http://www.springframework.org/security/tags -

sql - MySQL query optimization using coalesce -

unity3d - Unity local avoidance in user created world -