This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2329e8ba3f31872a994aba576b00cbe71e7d748e Author: Nick Houghton <nhoug...@gmail.com> AuthorDate: Wed Jul 31 15:20:51 2019 +1000 Expose WebSocketChannel and WebSocketHttpExchange to the Camel Exchange --- .../camel/component/undertow/UndertowConstants.java | 2 ++ .../camel/component/undertow/UndertowConsumer.java | 19 ++++++++++++++++--- .../undertow/handlers/CamelWebSocketHandler.java | 12 ++++++------ .../undertow/ws/UndertowWsConsumerRouteTest.java | 17 +++++++++++++++-- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java index 20203cb..c53a039 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConstants.java @@ -23,6 +23,8 @@ public final class UndertowConstants { public static final String SEND_TO_ALL = "websocket.sendToAll"; public static final String EVENT_TYPE = "websocket.eventType"; public static final String EVENT_TYPE_ENUM = "websocket.eventTypeEnum"; + public static final String CHANNEL = "websocket.channel"; + public static final String EXCHANGE = "websocket.exchange"; /** * WebSocket peers related events the {@link UndertowConsumer} sends to the Camel route. diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java index 72b9e49..02b6f07 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java @@ -36,6 +36,8 @@ import io.undertow.util.Methods; import io.undertow.util.MimeMappings; import io.undertow.util.StatusCodes; import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.spi.WebSocketHttpExchange; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -198,14 +200,18 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { * {@code connectionKey}. * * @param connectionKey an identifier of {@link WebSocketChannel} through which the {@code message} was received + * @param channel the {@link WebSocketChannel} through which the {@code message} was received * @param message the message received via the {@link WebSocketChannel} */ - public void sendMessage(final String connectionKey, final Object message) { + public void sendMessage(final String connectionKey, WebSocketChannel channel, final Object message) { final Exchange exchange = getEndpoint().createExchange(); // set header and body exchange.getIn().setHeader(UndertowConstants.CONNECTION_KEY, connectionKey); + if(channel != null) { + exchange.getIn().setHeader(UndertowConstants.CHANNEL, channel); + } exchange.getIn().setBody(message); // send exchange using the async routing engine @@ -223,16 +229,23 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler { * Send a notification related a WebSocket peer. * * @param connectionKey of WebSocket peer + * @param transportExchange the exchange for the websocket transport, only available for ON_OPEN events + * @param channel the {@link WebSocketChannel} through which the {@code message} was received * @param eventType the type of the event */ - public void sendEventNotification(String connectionKey, EventType eventType) { + public void sendEventNotification(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) { final Exchange exchange = getEndpoint().createExchange(); final Message in = exchange.getIn(); in.setHeader(UndertowConstants.CONNECTION_KEY, connectionKey); in.setHeader(UndertowConstants.EVENT_TYPE, eventType.getCode()); in.setHeader(UndertowConstants.EVENT_TYPE_ENUM, eventType); - + if(channel != null){ + in.setHeader(UndertowConstants.CHANNEL, channel); + } + if(transportExchange != null){ + in.setHeader(UndertowConstants.EXCHANGE, transportExchange); + } // send exchange using the async routing engine getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean doneSync) { diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java index f6065d6..5e67b98 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java @@ -85,7 +85,7 @@ public class CamelWebSocketHandler implements HttpHandler { @Override public void handleEvent(WebSocketChannel channel) { sendEventNotificationIfNeeded((String) channel.getAttribute(UndertowConstants.CONNECTION_KEY), - EventType.ONCLOSE); + null, channel, EventType.ONCLOSE); } }; this.delegate = Handlers.websocket(callback); @@ -181,12 +181,12 @@ public class CamelWebSocketHandler implements HttpHandler { } } - void sendEventNotificationIfNeeded(String connectionKey, EventType eventType) { + void sendEventNotificationIfNeeded(String connectionKey, WebSocketHttpExchange transportExchange, WebSocketChannel channel, EventType eventType) { synchronized (consumerLock) { synchronized (consumerLock) { if (consumer != null) { if (consumer.getEndpoint().isFireWebSocketChannelEvents()) { - consumer.sendEventNotification(connectionKey, eventType); + consumer.sendEventNotification(connectionKey, transportExchange, channel, eventType); } } else { LOG.debug("No consumer to handle a peer {} event type {}", connectionKey, eventType); @@ -315,7 +315,7 @@ public class CamelWebSocketHandler implements HttpHandler { synchronized (consumerLock) { if (consumer != null) { final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new ByteArrayInputStream(bytes) : bytes; - consumer.sendMessage(connectionKey, outMsg); + consumer.sendMessage(connectionKey, channel, outMsg); } else { LOG.debug("No consumer to handle message received: {}", message); } @@ -337,7 +337,7 @@ public class CamelWebSocketHandler implements HttpHandler { synchronized (consumerLock) { if (consumer != null) { final Object outMsg = consumer.getEndpoint().isUseStreaming() ? new StringReader(text) : text; - consumer.sendMessage(connectionKey, outMsg); + consumer.sendMessage(connectionKey, channel, outMsg); } else { LOG.debug("No consumer to handle message received: {}", message); } @@ -362,7 +362,7 @@ public class CamelWebSocketHandler implements HttpHandler { channel.setAttribute(UndertowConstants.CONNECTION_KEY, connectionKey); channel.getReceiveSetter().set(receiveListener); channel.addCloseTask(closeListener); - sendEventNotificationIfNeeded(connectionKey, EventType.ONOPEN); + sendEventNotificationIfNeeded(connectionKey, exchange, channel, EventType.ONOPEN); channel.resumeReceives(); } diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java index 11b7654..573ebd1 100644 --- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/ws/UndertowWsConsumerRouteTest.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.undertow.ws; +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.spi.WebSocketHttpExchange; + import java.io.InputStream; import java.io.Reader; import java.util.ArrayList; @@ -125,7 +128,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest { result.await(60, TimeUnit.SECONDS); List<Exchange> exchanges = result.getReceivedExchanges(); Assert.assertEquals(1, exchanges.size()); - Object body = result.getReceivedExchanges().get(0).getIn().getBody(); + Exchange exchange = result.getReceivedExchanges().get(0); + assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL)); + Object body = exchange.getIn().getBody(); Assert.assertTrue("body is " + body.getClass().getName(), body instanceof Reader); Reader r = (Reader) body; Assert.assertEquals("Test", IOConverter.toString(r)); @@ -208,7 +213,9 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest { result.await(60, TimeUnit.SECONDS); List<Exchange> exchanges = result.getReceivedExchanges(); Assert.assertEquals(1, exchanges.size()); - Object body = result.getReceivedExchanges().get(0).getIn().getBody(); + Exchange exchange = result.getReceivedExchanges().get(0); + assertNotNull(exchange.getIn().getHeader(UndertowConstants.CHANNEL)); + Object body = exchange.getIn().getBody(); Assert.assertTrue("body is " + body.getClass().getName(), body instanceof InputStream); InputStream in = (InputStream) body; Assert.assertArrayEquals(testmessage, IOConverter.toBytes(in)); @@ -376,6 +383,12 @@ public class UndertowWsConsumerRouteTest extends BaseUndertowTest { final Message in = exchange.getIn(); final String key = (String) in.getHeader(UndertowConstants.CONNECTION_KEY); Assert.assertNotNull(key); + final WebSocketChannel channel = in.getHeader(UndertowConstants.CHANNEL, WebSocketChannel.class); + Assert.assertNotNull(channel); + if(in.getHeader(UndertowConstants.EVENT_TYPE_ENUM, EventType.class) == EventType.ONOPEN){ + final WebSocketHttpExchange transportExchange = in.getHeader(UndertowConstants.EXCHANGE, WebSocketHttpExchange.class); + Assert.assertNotNull(transportExchange); + } List<String> messages = connections.get(key); if (messages == null) { messages = new ArrayList<>();