Repository: camel Updated Branches: refs/heads/master 6f995a7d1 -> 74e75a537
CAMEL-10283: camel-websocket producer should not use blocking send operations. This can cause sendToAll to block while attempting to send to many clients. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/83f67061 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/83f67061 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/83f67061 Branch: refs/heads/master Commit: 83f67061a0cca04e81ee5766ced34bda4e99f443 Parents: 6f995a7 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Sep 4 11:37:00 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Sep 4 11:37:00 2016 +0200 ---------------------------------------------------------------------- .../src/main/docs/websocket-component.adoc | 3 +- .../component/websocket/WebsocketEndpoint.java | 14 +++ .../component/websocket/WebsocketProducer.java | 47 +++++++-- .../WebsocketProducerRouteExampleTest.java | 3 +- .../websocket/WebsocketProducerTest.java | 103 ++++++------------- 5 files changed, 87 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/main/docs/websocket-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/docs/websocket-component.adoc b/components/camel-websocket/src/main/docs/websocket-component.adoc index 9d9cad2..a473bec 100644 --- a/components/camel-websocket/src/main/docs/websocket-component.adoc +++ b/components/camel-websocket/src/main/docs/websocket-component.adoc @@ -72,7 +72,7 @@ The Jetty Websocket component supports 12 options which are listed below. // endpoint options: START -The Jetty Websocket component supports 20 endpoint options which are listed below: +The Jetty Websocket component supports 21 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] @@ -86,6 +86,7 @@ The Jetty Websocket component supports 20 endpoint options which are listed belo | sessionSupport | consumer | false | boolean | Whether to enable session support which enables HttpSession for each http request. | staticResources | consumer | | String | Set a resource path for static resources (such as .html files etc). The resources can be loaded from classpath if you prefix with classpath: otherwise the resources is loaded from file system or from JAR files. For example to load from root classpath use classpath:. or classpath:WEB-INF/static If not configured (eg null) then no static resource is in use. | exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. +| sendTimeout | producer | 60000 | Integer | Timeout in millise when sending to a websocket channel. The default timeout is 60000 (60 seconds). | sendToAll | producer | | Boolean | To send to all websocket subscribers. Can be used to configure on endpoint level instead of having to use the WebsocketConstants.SEND_TO_ALL header on the message. | bufferSize | advanced | 8192 | Integer | Set the buffer size of the websocketServlet which is also the max frame byte size (default 8192) | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java index 11e1eba..0ce9f49 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java @@ -54,6 +54,8 @@ public class WebsocketEndpoint extends DefaultEndpoint { @UriParam(label = "producer") private Boolean sendToAll; + @UriParam(label = "producer", defaultValue = "60000") + private Integer sendTimeout = 60000; @UriParam(label = "monitoring") private boolean enableJmx; @UriParam(label = "consumer") @@ -188,6 +190,18 @@ public class WebsocketEndpoint extends DefaultEndpoint { this.sendToAll = sendToAll; } + public Integer getSendTimeout() { + return sendTimeout; + } + + /** + * Timeout in millise when sending to a websocket channel. + * The default timeout is 60000 (60 seconds). + */ + public void setSendTimeout(Integer sendTimeout) { + this.sendTimeout = sendTimeout; + } + public String getProtocol() { return uri.getScheme(); } http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java index 18753a6..6f50317 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java @@ -19,11 +19,17 @@ package org.apache.camel.component.websocket; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.StopWatch; public class WebsocketProducer extends DefaultProducer implements WebsocketProducerConsumer { @@ -52,9 +58,15 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu if (connectionKey != null) { DefaultWebsocket websocket = store.get(connectionKey); log.debug("Sending to connection key {} -> {}", connectionKey, message); - sendMessage(websocket, message); + Future<Void> future = sendMessage(websocket, message); + if (future != null) { + future.get(endpoint.getSendTimeout(), TimeUnit.MILLISECONDS); + if (!future.isDone()) { + throw new ExchangeTimedOutException(exchange, endpoint.getSendTimeout(), "Failed to send message to the connection"); + } + } } else { - throw new IllegalArgumentException("Failed to send message to single connection; connetion key not set."); + throw new IllegalArgumentException("Failed to send message to single connection; connection key not set."); } } } @@ -86,31 +98,54 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu log.debug("Sending to all {}", message); Collection<DefaultWebsocket> websockets = store.getAll(); Exception exception = null; + + List<Future> futures = new CopyOnWriteArrayList<>(); for (DefaultWebsocket websocket : websockets) { try { - sendMessage(websocket, message); + Future<Void> future = sendMessage(websocket, message); + if (future != null) { + futures.add(future); + } } catch (Exception e) { if (exception == null) { exception = new CamelExchangeException("Failed to deliver message to one or more recipients.", exchange, e); } } } + + // check if they are all done within the timed out period + StopWatch watch = new StopWatch(); + while (!futures.isEmpty() && watch.taken() < endpoint.getSendTimeout()) { + // remove all that are done/cancelled + for (Future future : futures) { + if (future.isDone() || future.isCancelled()) { + futures.remove(future); + } + } + // TODO sleep a bit until done to avoid burning cpu cycles + } + if (!futures.isEmpty()) { + exception = new CamelExchangeException("Failed to deliver message within " + endpoint.getSendTimeout() + " millis to one or more recipients.", exchange); + } + if (exception != null) { throw exception; } } - void sendMessage(DefaultWebsocket websocket, Object message) throws IOException { + Future<Void> sendMessage(DefaultWebsocket websocket, Object message) throws IOException { + Future<Void> future = null; // in case there is web socket and socket connection is open - send message if (websocket != null && websocket.getSession().isOpen()) { log.trace("Sending to websocket {} -> {}", websocket.getConnectionKey(), message); if (message instanceof String) { - websocket.getSession().getRemote().sendString((String) message); + future = websocket.getSession().getRemote().sendStringByFuture((String) message); } else if (message instanceof byte[]) { ByteBuffer buf = ByteBuffer.wrap((byte[]) message); - websocket.getSession().getRemote().sendBytes(buf); + future = websocket.getSession().getRemote().sendBytesByFuture(buf); } } + return future; } //Store is set/unset upon connect/disconnect of the producer http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java index 5f8026a..2c52cbd 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java @@ -61,6 +61,7 @@ public class WebsocketProducerRouteExampleTest extends CamelTestSupport { WebSocket websocket = c.prepareGet("ws://localhost:" + port + "/shop").execute( new WebSocketUpgradeHandler.Builder() .addWebSocketListener(new WebSocketTextListener() { + @Override public void onMessage(String message) { received.add(message); @@ -68,7 +69,6 @@ public class WebsocketProducerRouteExampleTest extends CamelTestSupport { latch.countDown(); } - @Override public void onOpen(WebSocket websocket) { } @@ -112,7 +112,6 @@ public class WebsocketProducerRouteExampleTest extends CamelTestSupport { latch.countDown(); } - @Override public void onOpen(WebSocket websocket) { } http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java index 989ae0b..4e362c8 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java @@ -19,6 +19,9 @@ package org.apache.camel.component.websocket; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -67,8 +70,11 @@ public class WebsocketProducerTest { private Message inMessage; @Mock private RemoteEndpoint remoteEndpoint; + @Mock + private Future<Void> future; - private IOException exception = new IOException("BAD NEWS EVERYONE!"); + private IOException ioException = new IOException("BAD NEWS EVERYONE!"); + private ExecutionException exception = new ExecutionException("Failure", ioException); private WebsocketProducer websocketProducer; private Collection<DefaultWebsocket> sockets; @@ -101,7 +107,7 @@ public class WebsocketProducerTest { inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verifyNoMoreInteractions(); } @@ -115,13 +121,14 @@ public class WebsocketProducerTest { when(defaultWebsocket1.getSession()).thenReturn(session); when(session.isOpen()).thenReturn(true); when(session.getRemote()).thenReturn(remoteEndpoint); - doThrow(exception).when(remoteEndpoint).sendString(MESSAGE); + when(remoteEndpoint.sendStringByFuture(MESSAGE)).thenReturn(future); + doThrow(exception).when(future).get(60000, TimeUnit.MILLISECONDS); try { websocketProducer.process(exchange); fail("Exception expected"); - } catch (IOException ioe) { - assertEquals(exception, ioe); + } catch (Exception e) { + // expected } InOrder inOrder = inOrder(endpoint, store, session, defaultWebsocket1, defaultWebsocket2, exchange, inMessage, remoteEndpoint); @@ -133,7 +140,8 @@ public class WebsocketProducerTest { inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); + inOrder.verify(endpoint, times(2)).getSendTimeout(); inOrder.verifyNoMoreInteractions(); } @@ -158,46 +166,11 @@ public class WebsocketProducerTest { inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); - inOrder.verify(defaultWebsocket2, times(1)).getSession(); - inOrder.verify(session, times(1)).isOpen(); - inOrder.verify(defaultWebsocket2, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testProcessMultipleMessagesWithException() throws Exception { - when(exchange.getIn()).thenReturn(inMessage); - when(inMessage.getMandatoryBody()).thenReturn(MESSAGE); - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true); - when(store.getAll()).thenReturn(sockets); - when(defaultWebsocket1.getSession()).thenReturn(session); - when(defaultWebsocket2.getSession()).thenReturn(session); - when(session.getRemote()).thenReturn(remoteEndpoint); - doThrow(exception).when(remoteEndpoint).sendString(MESSAGE); - when(session.isOpen()).thenReturn(true); - - try { - websocketProducer.process(exchange); - fail("Exception expected"); - } catch (Exception e) { - assertEquals(exception, e.getCause()); - } - - InOrder inOrder = inOrder(endpoint, store, session, defaultWebsocket1, defaultWebsocket2, exchange, inMessage, remoteEndpoint); - inOrder.verify(exchange, times(1)).getIn(); - inOrder.verify(inMessage, times(1)).getMandatoryBody(); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); - inOrder.verify(store, times(1)).getAll(); - inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(session, times(1)).isOpen(); - inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verify(defaultWebsocket2, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket2, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verifyNoMoreInteractions(); } @@ -237,7 +210,7 @@ public class WebsocketProducerTest { inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verifyNoMoreInteractions(); } @@ -256,28 +229,6 @@ public class WebsocketProducerTest { } @Test - public void testSendMessageWithException() throws Exception { - when(defaultWebsocket1.getSession()).thenReturn(session); - when(session.isOpen()).thenReturn(true); - when(session.getRemote()).thenReturn(remoteEndpoint); - doThrow(exception).when(remoteEndpoint).sendString(MESSAGE); - - try { - websocketProducer.sendMessage(defaultWebsocket1, MESSAGE); - fail("Exception expected"); - } catch (IOException ioe) { - assertEquals(exception, ioe); - } - - InOrder inOrder = inOrder(endpoint, store, session, defaultWebsocket1, defaultWebsocket2, exchange, inMessage, remoteEndpoint); - inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(session, times(1)).isOpen(); - inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); - inOrder.verifyNoMoreInteractions(); - } - - @Test public void testIsSendToAllSet() { when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true, false); assertTrue(websocketProducer.isSendToAllSet(inMessage)); @@ -311,28 +262,32 @@ public class WebsocketProducerTest { inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verify(defaultWebsocket2, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket2, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verifyNoMoreInteractions(); } @Test - public void testSendToAllWithExcpetion() throws Exception { + public void testSendToAllWithException() throws Exception { + when(exchange.getIn()).thenReturn(inMessage); + when(inMessage.getMandatoryBody()).thenReturn(MESSAGE); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true); when(store.getAll()).thenReturn(sockets); when(defaultWebsocket1.getSession()).thenReturn(session); when(defaultWebsocket2.getSession()).thenReturn(session); when(session.getRemote()).thenReturn(remoteEndpoint); - doThrow(exception).when(remoteEndpoint).sendString(MESSAGE); when(session.isOpen()).thenReturn(true); + when(remoteEndpoint.sendStringByFuture(MESSAGE)).thenReturn(future); + doThrow(exception).when(future).get(60000, TimeUnit.MILLISECONDS); try { - websocketProducer.sendToAll(store, MESSAGE, exchange); + websocketProducer.process(exchange); fail("Exception expected"); } catch (Exception e) { - assertEquals(exception, e.getCause()); + // expected } InOrder inOrder = inOrder(store, session, defaultWebsocket1, defaultWebsocket2, remoteEndpoint); @@ -340,11 +295,11 @@ public class WebsocketProducerTest { inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verify(defaultWebsocket2, times(1)).getSession(); inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket2, times(1)).getSession(); - inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE); + inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); inOrder.verifyNoMoreInteractions(); } }