handleWebSocketMessages • Akka HTTP (original) (raw)

Signature

def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route

Description

The directive first checks if the request was a valid WebSocket handshake request and if yes, it completes the request with the passed handler. Otherwise, the request is rejected with an ExpectedWebSocketRequestRejectionExpectedWebSocketRequestRejection.

WebSocket subprotocols offered in the Sec-WebSocket-Protocol header of the request are ignored. If you want to support several protocols use the handleWebSocketMessagesForProtocol directive, instead.

For more information about the WebSocket support, see Server-Side WebSocket Support.

Example

Scala

source`def greeter: Flow[Message, Message, Any] = Flow[Message].mapConcat { case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream ++ Source.single("!")) :: Nil case bm: BinaryMessage => // ignore binary messages but drain content to avoid the stream being clogged bm.dataStream.runWith(Sink.ignore) Nil } val websocketRoute = path("greeter") { handleWebSocketMessages(greeter) }

// tests: // create a testing probe representing the client-side val wsClient = WSProbe()

// WS creates a WebSocket request for testing WS("/greeter", wsClient.flow) ~> websocketRoute ~> check { // check response for WS Upgrade headers isWebSocketUpgrade shouldEqual true

// manually run a WS conversation
wsClient.sendMessage("Peter")
wsClient.expectMessage("Hello Peter!")

wsClient.sendMessage(BinaryMessage(ByteString("abcdef")))
wsClient.expectNoMessage(100.millis)

wsClient.sendMessage("John")
wsClient.expectMessage("Hello John!")

wsClient.sendCompletion()
wsClient.expectCompletion()

}`

Java

source`import static akka.http.javadsl.server.Directives.path; import static akka.http.javadsl.server.Directives.handleWebSocketMessages;

final Flow<Message, Message, NotUsed> greeter = Flow.of(Message.class).mapConcat(msg -> { if (msg instanceof TextMessage) { final TextMessage tm = (TextMessage) msg; final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!"))); return Collections.singletonList(ret); } else if (msg instanceof BinaryMessage) { final BinaryMessage bm = (BinaryMessage) msg; bm.getStreamedData().runWith(Sink.ignore(), materializer()); return Collections.emptyList(); } else { throw new IllegalArgumentException("Unsupported message type!"); } });

final Route websocketRoute = path("greeter", () -> handleWebSocketMessages(greeter) );

// create a testing probe representing the client-side final WSProbe wsClient = WSProbe.create(system(), materializer());

// WS creates a WebSocket request for testing testRoute(websocketRoute).run(WS(Uri.create("/greeter"), wsClient.flow(), materializer())) .assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS);

// manually run a WS conversation wsClient.sendMessage("Peter"); wsClient.expectMessage("Hello Peter!");

wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));

wsClient.sendMessage("John"); wsClient.expectMessage("Hello John!");

wsClient.sendCompletion(); wsClient.expectCompletion();`

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.