scala 2.11 - How do i create a TCP receiver that only consumes messages using akka streams? -
we on: akka-stream-experimental_2.11 1.0.
inspired example
we wrote tcp receiver follows:
def bind(address: string, port: int, target: actorref) (implicit system: actorsystem, actormaterializer: actormaterializer): future[serverbinding] = { val sink = sink.foreach[tcp.incomingconnection] { conn => val serverflow = flow[bytestring] .via(framing.delimiter(bytestring("\n"), maximumframelength = 256, allowtruncation = true)) .map(message => { target ? new message(message); bytestring.empty }) conn handlewith serverflow } val connections = tcp().bind(address, port) connections.to(sink).run() }
however, our intention have receiver not respond @ all , sink message. (the tcp message publisher not care response ).
is possible? not respond @ since akka.stream.scaladsl.tcp.incomingconnection takes flow of type: flow[bytestring, bytestring, unit]
if yes, guidance appreciated. in advance.
one attempt follows passes unit tests not sure if best idea:
def bind(address: string, port: int, target: actorref) (implicit system: actorsystem, actormaterializer: actormaterializer): future[serverbinding] = { val sink = sink.foreach[tcp.incomingconnection] { conn => val targetsubscriber = actorsubscriber[message](system.actorof(props(new targetsubscriber(target)))) val targetsink = flow[bytestring] .via(framing.delimiter(bytestring("\n"), maximumframelength = 256, allowtruncation = true)) .map(message(_)) .to(sink(targetsubscriber)) conn.flow.to(targetsink).runwith(source(promise().future)) } val connections = tcp().bind(address, port) connections.to(sink).run() }
you on right track. keep possibility close connection @ point may want keep promise , complete later on. once completed element element published source. however, don't want any element published on connection, can use drop(1)
make sure source never emit element.
here's updated version of example (untested):
val promise = promise[bytestring]() // source complete when promise fulfilled // or complete error if promise completed error val completionsource = source(promise.future).drop(1) completionsource // used complete later .via(conn.flow) // reordered flow better readability (arguably) .runwith(targetsink) // close connection later complete promise: def closeconnection() = promise.success(bytestring.empty) // dummy element, dropped // alternatively fail connection later, complete error def failconnection() = promise.failure(new runtimeexception)
Comments
Post a Comment