[CAMEL-9393] Add ability to send a message to multiple defined connections with guaranty of delivery
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/28831913 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/28831913 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/28831913 Branch: refs/heads/master Commit: 28831913fe7b3cff6e075dc08ccddc37eeea6c42 Parents: 1d762e5 Author: Pavlo Kletsko <pklet...@gmail.com> Authored: Sat Dec 5 13:44:05 2015 +0100 Committer: Pavlo Kletsko <pklet...@gmail.com> Committed: Sat Dec 5 13:44:05 2015 +0100 ---------------------------------------------------------------------- .../websocket/WebsocketConstants.java | 4 + .../atmosphere/websocket/WebsocketConsumer.java | 19 ++ .../atmosphere/websocket/WebsocketEndpoint.java | 8 +- .../atmosphere/websocket/WebsocketProducer.java | 88 +++++--- .../WebsocketRouteWithInitParamTest.java | 206 ++++++++++++++++++- 5 files changed, 295 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java index b85b039..c95da88 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConstants.java @@ -22,13 +22,17 @@ package org.apache.camel.component.atmosphere.websocket; public final class WebsocketConstants { public static final String CONNECTION_KEY = "websocket.connectionKey"; + public static final String CONNECTION_KEY_LIST = "websocket.connectionKey.list"; public static final String SEND_TO_ALL = "websocket.sendToAll"; public static final String EVENT_TYPE = "websocket.eventType"; + public static final String ERROR_TYPE = "websocket.errorType"; public static final int ONOPEN_EVENT_TYPE = 1; public static final int ONCLOSE_EVENT_TYPE = 0; public static final int ONERROR_EVENT_TYPE = -1; + public static final int MESSAGE_NOT_SENT_ERROR_TYPE = 1; + private WebsocketConstants() { //helper class } http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java index 86bd016..5bbda28 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.atmosphere.websocket; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -108,6 +109,24 @@ public class WebsocketConsumer extends ServletConsumer { }); } + public void sendNotDeliveredMessage(List<String> failedConnectionKeys, Object message) { + final Exchange exchange = getEndpoint().createExchange(); + + // set header and body + exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, failedConnectionKeys); + exchange.getIn().setHeader(WebsocketConstants.ERROR_TYPE, WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE); + exchange.getIn().setBody(message); + + // send exchange using the async routing engine + getAsyncProcessor().process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } + }); + } + public boolean isEnableEventsResending() { return enableEventsResending; } http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java index d8d803c..cdb7ff2 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketEndpoint.java @@ -36,6 +36,7 @@ import org.apache.camel.spi.UriPath; public class WebsocketEndpoint extends ServletEndpoint { private WebSocketStore store; + private WebsocketConsumer websocketConsumer; @UriPath(description = "Name of websocket endpoint") @Metadata(required = "true") private String servicePath; @@ -62,7 +63,8 @@ public class WebsocketEndpoint extends ServletEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { - return new WebsocketConsumer(this, processor); + websocketConsumer = new WebsocketConsumer(this, processor); + return websocketConsumer; } @Override @@ -95,4 +97,8 @@ public class WebsocketEndpoint extends ServletEndpoint { WebSocketStore getWebSocketStore() { return store; } + + public WebsocketConsumer getWebsocketConsumer() { + return websocketConsumer; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java index 7fda043..9501537 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketProducer.java @@ -18,6 +18,9 @@ package org.apache.camel.component.atmosphere.websocket; import java.io.InputStream; import java.io.Reader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,6 +37,8 @@ import org.slf4j.LoggerFactory; public class WebsocketProducer extends DefaultProducer { private static final transient Logger LOG = LoggerFactory.getLogger(WebsocketProducer.class); + private List<String> notValidConnectionKeys = new ArrayList<>(); + private static ExecutorService executor = Executors.newSingleThreadExecutor(); public WebsocketProducer(WebsocketEndpoint endpoint) { @@ -71,46 +76,73 @@ public class WebsocketProducer extends DefaultProducer { } else if (message instanceof InputStream) { message = in.getBody(byte[].class); } - + log.debug("Sending to {}", message); if (getEndpoint().isSendToAll()) { log.debug("Sending to all -> {}", message); //TODO consider using atmosphere's broadcast or a more configurable async send for (final WebSocket websocket : getEndpoint().getWebSocketStore().getAllWebSockets()) { - final Object msg = message; - executor.execute(new Runnable() { - @Override - public void run() { - sendMessage(websocket, msg); - } - }); + sendMessage(websocket, message); } + } else if (in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST) != null) { + List<String> connectionKeyList = in.getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class); + messageDistributor(connectionKeyList, message); } else { - // look for connection key and get Websocket String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class); - if (connectionKey != null) { - WebSocket websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey); - log.debug("Sending to connection key {} -> {}", connectionKey, message); - sendMessage(websocket, message); - } else { - throw new IllegalArgumentException("Failed to send message to single connection; connetion key not set."); - } - + messageDistributor(Arrays.asList(connectionKey), message); + } + } + + private void messageDistributor(final List<String> connectionKeyList, final Object message) { + if (connectionKeyList == null) { + throw new IllegalArgumentException("Failed to send message to multiple connections; connetion key list is not set."); + } + + notValidConnectionKeys = new ArrayList<>(); + + for (final String connectionKey : connectionKeyList) { + log.debug("Sending to connection key {} -> {}", connectionKey, message); + sendMessage(getWebSocket(connectionKey), message); + } + + if (!notValidConnectionKeys.isEmpty()) { + log.debug("Some connections have not received the message {}", message); + getEndpoint().getWebsocketConsumer().sendNotDeliveredMessage(notValidConnectionKeys, message); } } - private void sendMessage(WebSocket websocket, Object message) { - try { - if (message instanceof String) { - websocket.write((String)message); - } else if (message instanceof byte[]) { - websocket.write((byte[])message, 0, ((byte[])message).length); - } else { - // this should not happen unless one of the supported types is missing above. - LOG.error("unexpected message type {}", message == null ? null : message.getClass()); + private void sendMessage(final WebSocket websocket, final Object message) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + if (message instanceof String) { + websocket.write((String) message); + } else if (message instanceof byte[]) { + websocket.write((byte[]) message, 0, ((byte[]) message).length); + } else { + // this should not happen unless one of the supported types is missing above. + LOG.error("unexpected message type {}", message == null ? null : message.getClass()); + } + } catch (Exception e) { + LOG.error("Error when writing to websocket", e); + } + } + }); + } + + private WebSocket getWebSocket(final String connectionKey) { + WebSocket websocket; + if (connectionKey == null) { + throw new IllegalArgumentException("Failed to send message to single connection; connetion key is not set."); + } else { + websocket = getEndpoint().getWebSocketStore().getWebSocket(connectionKey); + if (websocket == null) { + //collect for call back to handle not sent message(s) to guaranty delivery + notValidConnectionKeys.add(connectionKey); + log.debug("Failed to send message to single connection; connetion key is not valid. {}", connectionKey); } - } catch (Exception e) { - LOG.error("Error when writing to websocket", e); } + return websocket; } } http://git-wip-us.apache.org/repos/asf/camel/blob/28831913/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java index f0f8182..1407cbb 100644 --- a/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java +++ b/components/camel-atmosphere-websocket/src/test/java/org/apache/camel/component/atmosphere/websocket/WebsocketRouteWithInitParamTest.java @@ -21,8 +21,17 @@ import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithInitParamTestSupport { + private static final String[] EXISTED_USERS = {"Kim", "Pavlo", "Peter"}; + private static String[] BROADCAST_MESSAGE_TO = {}; + private static Map<String,String> connectionKeyUserMap = new HashMap<>(); + @Test public void testWebsocketEventsResendingEnabled() throws Exception { TestClient wsclient = new TestClient("ws://localhost:" + PORT + "/hola"); @@ -37,6 +46,99 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni wsclient.close(); } + @Test + public void testWebsocketSingleClientBroadcastMultipleClients() throws Exception { + final int AWAIT_TIME = 5; + connectionKeyUserMap.clear(); + + TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); + TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); + TestClient wsclient3 = new TestClient("ws://localhost:" + PORT + "/hola2", 2); + + wsclient1.connect(); + wsclient1.await(AWAIT_TIME); + + wsclient2.connect(); + wsclient2.await(AWAIT_TIME); + + wsclient3.connect(); + wsclient3.await(AWAIT_TIME); + + //all connections were registered in external store + assertTrue(connectionKeyUserMap.size() == EXISTED_USERS.length); + + BROADCAST_MESSAGE_TO = new String[]{EXISTED_USERS[0], EXISTED_USERS[1]}; + + wsclient1.sendTextMessage("Gambas"); + wsclient1.await(AWAIT_TIME); + + List<String> received1 = wsclient1.getReceived(String.class); + assertEquals(1, received1.size()); + + for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) { + assertTrue(received1.get(0).contains(BROADCAST_MESSAGE_TO[i])); + } + + List<String> received2 = wsclient2.getReceived(String.class); + assertEquals(1, received2.size()); + for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) { + assertTrue(received2.get(0).contains(BROADCAST_MESSAGE_TO[i])); + } + + List<String> received3 = wsclient3.getReceived(String.class); + assertEquals(0, received3.size()); + + wsclient1.close(); + wsclient2.close(); + wsclient3.close(); + } + + @Test + public void testWebsocketSingleClientBroadcastMultipleClientsGuaranteeDelivery() throws Exception { + final int AWAIT_TIME = 5; + connectionKeyUserMap.clear(); + + TestClient wsclient1 = new TestClient("ws://localhost:" + PORT + "/hola3", 2); + TestClient wsclient2 = new TestClient("ws://localhost:" + PORT + "/hola3", 2); + TestClient wsclient3 = new TestClient("ws://localhost:" + PORT + "/hola3", 2); + + wsclient1.connect(); + wsclient1.await(AWAIT_TIME); + + wsclient2.connect(); + wsclient2.await(AWAIT_TIME); + + wsclient3.connect(); + wsclient3.await(AWAIT_TIME); + + //all connections were registered in external store + assertTrue(connectionKeyUserMap.size() == EXISTED_USERS.length); + + wsclient2.close(); + wsclient2.await(AWAIT_TIME); + + BROADCAST_MESSAGE_TO = new String[]{EXISTED_USERS[0], EXISTED_USERS[1]}; + + wsclient1.sendTextMessage("Gambas"); + wsclient1.await(AWAIT_TIME); + + List<String> received1 = wsclient1.getReceived(String.class); + assertEquals(1, received1.size()); + + for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) { + assertTrue(received1.get(0).contains(BROADCAST_MESSAGE_TO[i])); + } + + List<String> received2 = wsclient2.getReceived(String.class); + assertEquals(0, received2.size()); + + List<String> received3 = wsclient3.getReceived(String.class); + assertEquals(0, received3.size()); + + wsclient1.close(); + wsclient3.close(); + } + // START SNIPPET: payload protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @@ -48,16 +150,118 @@ public class WebsocketRouteWithInitParamTest extends WebsocketCamelRouterWithIni } }); - // route for events resending enabled with parameters from url + // route for events resending enabled from("atmosphere-websocket:///hola1").to("log:info").process(new Processor() { public void process(final Exchange exchange) throws Exception { checkPassedParameters(exchange); } }); + + // route for single client broadcast to multiple clients + from("atmosphere-websocket:///hola2").to("log:info") + .choice() + .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createExternalConnectionRegister(exchange); + } + }) + .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + removeExternalConnectionRegister(exchange); + } + }) + .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + removeExternalConnectionRegister(exchange); + } + }) + .otherwise() + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createBroadcastMultipleClientsResponse(exchange); + } + }).to("atmosphere-websocket:///hola2"); + + // route for single client broadcast to multiple clients guarantee delivery + from("atmosphere-websocket:///hola3").to("log:info") + .choice() + .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONOPEN_EVENT_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createExternalConnectionRegister(exchange); + } + }) + .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONCLOSE_EVENT_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + removeExternalConnectionRegister(exchange); + } + }) + .when(header(WebsocketConstants.EVENT_TYPE).isEqualTo(WebsocketConstants.ONERROR_EVENT_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + removeExternalConnectionRegister(exchange); + } + }) + .when(header(WebsocketConstants.ERROR_TYPE).isEqualTo(WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE)) + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + handleNotDeliveredMessage(exchange); + } + }) + .otherwise() + .process(new Processor() { + public void process(final Exchange exchange) throws Exception { + createBroadcastMultipleClientsResponse(exchange); + } + }).to("atmosphere-websocket:///hola3"); } }; } + private static void handleNotDeliveredMessage(Exchange exchange) { + List<String> connectionKeyList = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY_LIST, List.class); + assertEquals(1, connectionKeyList.size()); + assertEquals(connectionKeyList.get(0), connectionKeyUserMap.get(BROADCAST_MESSAGE_TO[1])); + } + + private static void createExternalConnectionRegister(Exchange exchange) { + Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY); + + String userName = EXISTED_USERS[0]; + + if (connectionKeyUserMap.size() > 0) { + userName = EXISTED_USERS[connectionKeyUserMap.size()]; + } + + connectionKeyUserMap.put(userName, (String) connectionKey); + } + + private static void removeExternalConnectionRegister(Exchange exchange) { + // remove connectionKey from external store + } + + private static void createBroadcastMultipleClientsResponse(Exchange exchange) { + List<String> connectionKeyList = new ArrayList<>(); + Object msg = exchange.getIn().getBody(); + + String additionalMessage = ""; + + //send the message only to selected connections + for (int i = 0; i < BROADCAST_MESSAGE_TO.length; i++) { + connectionKeyList.add(connectionKeyUserMap.get(BROADCAST_MESSAGE_TO[i])); + additionalMessage += BROADCAST_MESSAGE_TO[i] + " "; + } + + additionalMessage += " Received the message: "; + + exchange.getIn().setBody(additionalMessage + msg); + exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, connectionKeyList); + } + private static void checkEventsResendingEnabled(Exchange exchange) { Object connectionKey = exchange.getIn().getHeader(WebsocketConstants.CONNECTION_KEY); Object eventType = exchange.getIn().getHeader(WebsocketConstants.EVENT_TYPE);