scala 2.11 - How do i create a TCP receiver that only consumes messages using akka streams? -


we on: akka-stream-experimental_2.11 1.0.

inspired example

we wrote tcp receiver follows:

def bind(address: string, port: int, target: actorref)           (implicit system: actorsystem, actormaterializer: actormaterializer): future[serverbinding] = {     val sink = sink.foreach[tcp.incomingconnection] { conn =>       val serverflow = flow[bytestring]         .via(framing.delimiter(bytestring("\n"), maximumframelength = 256, allowtruncation = true))         .map(message => {         target ? new message(message); bytestring.empty       })       conn handlewith serverflow     }      val connections = tcp().bind(address, port)     connections.to(sink).run()   } 

however, our intention have receiver not respond @ all , sink message. (the tcp message publisher not care response ).

is possible? not respond @ since akka.stream.scaladsl.tcp.incomingconnection takes flow of type: flow[bytestring, bytestring, unit]

if yes, guidance appreciated. in advance.

one attempt follows passes unit tests not sure if best idea:

def bind(address: string, port: int, target: actorref)           (implicit system: actorsystem, actormaterializer: actormaterializer): future[serverbinding] = {     val sink = sink.foreach[tcp.incomingconnection] { conn =>        val targetsubscriber = actorsubscriber[message](system.actorof(props(new targetsubscriber(target))))        val targetsink = flow[bytestring]         .via(framing.delimiter(bytestring("\n"), maximumframelength = 256, allowtruncation = true))         .map(message(_))         .to(sink(targetsubscriber))        conn.flow.to(targetsink).runwith(source(promise().future))     }      val connections = tcp().bind(address, port)     connections.to(sink).run()   } 

you on right track. keep possibility close connection @ point may want keep promise , complete later on. once completed element element published source. however, don't want any element published on connection, can use drop(1) make sure source never emit element.

here's updated version of example (untested):

val promise = promise[bytestring]() // source complete when promise fulfilled // or complete error if promise completed error val completionsource = source(promise.future).drop(1)  completionsource  // used complete later   .via(conn.flow) // reordered flow better readability (arguably)   .runwith(targetsink)  // close connection later complete promise: def closeconnection() = promise.success(bytestring.empty) // dummy element, dropped  // alternatively fail connection later, complete error def failconnection() = promise.failure(new runtimeexception) 

Comments

Popular posts from this blog

Fail to load namespace Spring Security http://www.springframework.org/security/tags -

sql - MySQL query optimization using coalesce -

Maven Javadoc 'Cannot find default setter' and fails -