scala - Splitting Spark stream by delimiter -
i trying split spark stream based on delimiter , save each of these chunks new file.
each of rdds appear partitioned according delimiter.
i having difficulty in configuring 1 delimiter message per rdd, or, being able save each partition individually new part-000...
file .
any appreciated.
val sparkconf = new sparkconf().setappname("datasink").setmaster("local[8]").set("spark.files.overwrite","false") val ssc = new streamingcontext(sparkconf, seconds(2)) class routeconsumer extends actor actorhelper consumer { def endpointuri = "rabbitmq://server:5672/myexc?declare=false&queue=in_hl7_q" def receive = { case msg: camelmessage => val m = msg.withbodyas[string] store(m.body) } } val dstream = ssc.actorstream[string](props(new routeconsumer()), "sparkreceiveractor") val splitstream = dstream.flatmap(_.split("msh|^~\\&")) splitstream.foreachrdd( rdd => rdd.saveastextfile("file:///home/user/spark/data") ) ssc.start() ssc.awaittermination()
you can't control part-nnnnn
(partition) file gets output, can write different directories. "easiest" way sort of column splitting separate map statements (like select
statements), this, assuming you'll have n
array elements after splitting:
... val dstream2 = dstream.map(_.split("...")) // above, map dstream2.cache() // important follows, repeated reads of this... val dstreams = new array[dstream[string]](n) (i <- 0 n-1) { dstreams[i] = dstream2.map(array => array[i] /* or similar */) dstreams[i].saveastextfiles(rootdir+"/"+i) } ssc.start() ssc.awaittermination()
Comments
Post a Comment