Author: davsclaus Date: Sun Mar 25 13:51:07 2012 New Revision: 1305036 URL: http://svn.apache.org/viewvc?rev=1305036&view=rev Log: CAMEL-5117: Polished. Made it a bit easier with camel-twitter as you can configure sendToAll on endpoint.
Modified: camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java Modified: camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java (original) +++ camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java Sun Mar 25 13:51:07 2012 @@ -110,8 +110,7 @@ public final class Twitter4JFactory { case USER: if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) { - throw new IllegalArgumentException( - "Fetch type set to USER TIMELINE but no user was set."); + throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set."); } else { return new UserConsumer(te); } @@ -176,9 +175,8 @@ public final class Twitter4JFactory { } - LOG.warn("A producer type was not provided (or an incorrect pairing was used). Defaulting to a MOCK!"); - throw new IllegalArgumentException("Cannot create any producer with uri " + uri - + "A producer type was not provided (or an incorrect pairing was used)."); + throw new IllegalArgumentException("Cannot create any producer with uri " + uri + + ". A producer type was not provided (or an incorrect pairing was used)."); } private static String[] splitUri(String uri) { Modified: camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java (original) +++ camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java Sun Mar 25 13:51:07 2012 @@ -19,5 +19,6 @@ package org.apache.camel.component.twitt import twitter4j.Status; public interface TweeterStatusListener { + void onStatus(Status status); } Modified: camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java (original) +++ camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java Sun Mar 25 13:51:07 2012 @@ -43,6 +43,14 @@ public class TwitterConsumerEvent extend } @Override + protected void doStop() throws Exception { + super.doStop(); + if (twitter4jConsumer instanceof StreamingConsumer) { + ((StreamingConsumer)twitter4jConsumer).unregisterTweetListener(this); + } + } + + @Override public void onStatus(Status status) { Exchange exchange = getEndpoint().createExchange(); exchange.getIn().setBody(status); Modified: camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java (original) +++ camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java Sun Mar 25 13:51:07 2012 @@ -40,7 +40,7 @@ public class TwitterConsumerPolling exte this.twitter4jConsumer = twitter4jConsumer; int delay = endpoint.getProperties().getDelay(); - setInitialDelay(0); + setInitialDelay(1); setDelay(delay); setTimeUnit(TimeUnit.SECONDS); } Modified: camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java (original) +++ camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java Sun Mar 25 13:51:07 2012 @@ -41,7 +41,6 @@ public class StreamingConsumer extends T private volatile boolean clear; private TweeterStatusListener tweeterStatusListener; - public StreamingConsumer(TwitterEndpoint te) { this.te = te; twitterStream = te.getProperties().getTwitterStreamInstance(); @@ -77,17 +76,25 @@ public class StreamingConsumer extends T @Override public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { + // noop } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { + // noop } @Override public void onScrubGeo(long userId, long upToStatusId) { + // noop } public void registerTweetListener(TweeterStatusListener tweeterStatusListener) { this.tweeterStatusListener = tweeterStatusListener; } + + public void unregisterTweetListener(TweeterStatusListener tweeterStatusListener) { + this.tweeterStatusListener = null; + } + } Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java Sun Mar 25 13:51:07 2012 @@ -27,6 +27,7 @@ public class WebsocketEndpoint extends D private NodeSynchronization sync; private String remaining; private WebsocketStore memoryStore; + private Boolean sendToAll; public WebsocketEndpoint(String uri, WebsocketComponent component, String remaining) { super(uri, component); @@ -59,6 +60,14 @@ public class WebsocketEndpoint extends D return true; } + public Boolean getSendToAll() { + return sendToAll; + } + + public void setSendToAll(Boolean sendToAll) { + this.sendToAll = sendToAll; + } + @Override protected void doStart() throws Exception { ServiceHelper.startService(memoryStore); Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java (original) +++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java Sun Mar 25 13:51:07 2012 @@ -20,17 +20,19 @@ import java.io.IOException; import java.util.Collection; import org.apache.camel.CamelExchangeException; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; public class WebsocketProducer extends DefaultProducer { private final WebsocketStore store; + private final Boolean sendToAll; - public WebsocketProducer(Endpoint endpoint, WebsocketStore store) { + public WebsocketProducer(WebsocketEndpoint endpoint, WebsocketStore store) { super(endpoint); this.store = store; + this.sendToAll = endpoint.getSendToAll(); + System.out.println("Configured with " + sendToAll); } @Override @@ -45,6 +47,7 @@ public class WebsocketProducer extends D String connectionKey = in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class); if (connectionKey != null) { DefaultWebsocket websocket = store.get(connectionKey); + log.debug("Sending to connection key {} -> {}", connectionKey, message); sendMessage(websocket, message); } else { throw new IllegalArgumentException("Failed to send message to single connection; connetion key not set."); @@ -53,12 +56,13 @@ public class WebsocketProducer extends D } boolean isSendToAllSet(Message in) { - // header may be null; have to be careful here - Object value = in.getHeader(WebsocketConstants.SEND_TO_ALL); - return value == null ? false : (Boolean) value; + // header may be null; have to be careful here (and fallback to use sendToAll option configured from endpoint) + Boolean value = in.getHeader(WebsocketConstants.SEND_TO_ALL, sendToAll, Boolean.class); + return value == null ? false : value; } void sendToAll(WebsocketStore store, String message, Exchange exchange) throws Exception { + log.debug("Sending to all {}", message); Collection<DefaultWebsocket> websockets = store.getAll(); Exception exception = null; for (DefaultWebsocket websocket : websockets) { @@ -78,6 +82,7 @@ public class WebsocketProducer extends D void sendMessage(DefaultWebsocket websocket, String message) throws IOException { // in case there is web socket and socket connection is open - send message if (websocket != null && websocket.getConnection().isOpen()) { + log.trace("Sending to websocket {} -> {}", websocket.getConnectionKey(), message); websocket.getConnection().sendMessage(message); } } Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java (original) +++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java Sun Mar 25 13:51:07 2012 @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.eclipse.jetty.websocket.WebSocket.Connection; @@ -52,7 +51,7 @@ public class WebsocketProducerTest { private static final String SESSION_KEY = "random-session-key"; @Mock - private Endpoint endpoint; + private WebsocketEndpoint endpoint; @Mock private WebsocketStore store; @Mock @@ -80,7 +79,7 @@ public class WebsocketProducerTest { public void testProcessSingleMessage() throws Exception { when(exchange.getIn()).thenReturn(inMessage); when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(null); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(false); when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, String.class)).thenReturn(SESSION_KEY); when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1); when(defaultWebsocket1.getConnection()).thenReturn(connection); @@ -91,7 +90,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class); inOrder.verify(store, times(1)).get(SESSION_KEY); inOrder.verify(defaultWebsocket1, times(1)).getConnection(); @@ -105,7 +104,7 @@ public class WebsocketProducerTest { public void testProcessSingleMessageWithException() throws Exception { when(exchange.getIn()).thenReturn(inMessage); when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(false); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(false); when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, String.class)).thenReturn(SESSION_KEY); when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1); when(defaultWebsocket1.getConnection()).thenReturn(connection); @@ -122,7 +121,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class); inOrder.verify(store, times(1)).get(SESSION_KEY); inOrder.verify(defaultWebsocket1, times(1)).getConnection(); @@ -136,7 +135,7 @@ public class WebsocketProducerTest { public void testProcessMultipleMessages() throws Exception { when(exchange.getIn()).thenReturn(inMessage); when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(true); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true); when(store.getAll()).thenReturn(sockets); when(defaultWebsocket1.getConnection()).thenReturn(connection); when(defaultWebsocket2.getConnection()).thenReturn(connection); @@ -147,7 +146,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(store, times(1)).getAll(); inOrder.verify(defaultWebsocket1, times(1)).getConnection(); inOrder.verify(connection, times(1)).isOpen(); @@ -161,10 +160,10 @@ public class WebsocketProducerTest { } @Test - public void testProcessMultipleMessagesWithExcpetion() throws Exception { + public void testProcessMultipleMessagesWithException() throws Exception { when(exchange.getIn()).thenReturn(inMessage); when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE); - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(true); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true); when(store.getAll()).thenReturn(sockets); when(defaultWebsocket1.getConnection()).thenReturn(connection); when(defaultWebsocket2.getConnection()).thenReturn(connection); @@ -181,7 +180,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(store, times(1)).getAll(); inOrder.verify(defaultWebsocket1, times(1)).getConnection(); inOrder.verify(connection, times(1)).isOpen(); @@ -198,7 +197,7 @@ public class WebsocketProducerTest { public void testProcessSingleMessageNoConnectionKey() throws Exception { when(exchange.getIn()).thenReturn(inMessage); when(inMessage.getBody(String.class)).thenReturn(MESSAGE); - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(false); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(false); when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, String.class)).thenReturn(null); try { @@ -213,7 +212,7 @@ public class WebsocketProducerTest { InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); inOrder.verify(exchange, times(1)).getIn(); inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class); inOrder.verifyNoMoreInteractions(); } @@ -234,14 +233,6 @@ public class WebsocketProducerTest { } @Test - public void testSendMessageWebsocketIsNull() throws Exception { - websocketProducer.sendMessage(null, MESSAGE); - - InOrder inOrder = inOrder(endpoint, store, connection, defaultWebsocket1, defaultWebsocket2, exchange, inMessage); - inOrder.verifyNoMoreInteractions(); - } - - @Test public void testSendMessageConnetionIsClosed() throws Exception { when(defaultWebsocket1.getConnection()).thenReturn(connection); when(connection.isOpen()).thenReturn(false); @@ -277,20 +268,20 @@ public class WebsocketProducerTest { @Test public void testIsSendToAllSet() { - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(true, false); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(true, false); assertTrue(websocketProducer.isSendToAllSet(inMessage)); assertFalse(websocketProducer.isSendToAllSet(inMessage)); InOrder inOrder = inOrder(inMessage); - inOrder.verify(inMessage, times(2)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(2)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verifyNoMoreInteractions(); } @Test public void testIsSendToAllSetHeaderNull() { - when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(null); + when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class)).thenReturn(null); assertFalse(websocketProducer.isSendToAllSet(inMessage)); InOrder inOrder = inOrder(inMessage); - inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL); + inOrder.verify(inMessage, times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class); inOrder.verifyNoMoreInteractions(); } Modified: camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java?rev=1305036&r1=1305035&r2=1305036&view=diff ============================================================================== --- camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java (original) +++ camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java Sun Mar 25 13:51:07 2012 @@ -19,7 +19,6 @@ package org.apache.camel.example.websock import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.twitter.TwitterComponent; import org.apache.camel.component.websocket.WebsocketComponent; -import org.apache.camel.component.websocket.WebsocketConstants; /** * A Camel route that updates from twitter all tweets using having the search term. @@ -105,9 +104,9 @@ public class TwitterWebSocketRoute exten tc.setConsumerKey(consumerKey); tc.setConsumerSecret(consumerSecret); - // poll twitter search for new tweets, and push tweets to all web socket subscribers on camel-tweet + // poll twitter search for new tweets fromF("twitter://search?type=polling&delay=%s&keywords=%s", delay, searchTerm) - .setHeader(WebsocketConstants.SEND_TO_ALL).constant(true) - .to("websocket:camel-tweet"); + // and push tweets to all web socket subscribers on camel-tweet + .to("websocket:camel-tweet?sendToAll=true"); } }