Author: davsclaus
Date: Sun Mar 25 13:51:07 2012
New Revision: 1305036

URL: http://svn.apache.org/viewvc?rev=1305036&view=rev
Log:
CAMEL-5117: Polished. Made it a bit easier with camel-twitter as you can 
configure sendToAll on endpoint.

Modified:
    
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
    
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
    
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
    
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
    
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
    
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
    
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
    
camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
    
camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java

Modified: 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
 (original)
+++ 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
 Sun Mar 25 13:51:07 2012
@@ -110,8 +110,7 @@ public final class Twitter4JFactory {
                     case USER:
                         if (te.getProperties().getUser() == null
                             || te.getProperties().getUser().trim().isEmpty()) {
-                            throw new IllegalArgumentException(
-                                                               "Fetch type set 
to USER TIMELINE but no user was set.");
+                            throw new IllegalArgumentException("Fetch type set 
to USER TIMELINE but no user was set.");
                         } else {
                             return new UserConsumer(te);
                         }
@@ -176,9 +175,8 @@ public final class Twitter4JFactory {
             
         }
 
-        LOG.warn("A producer type was not provided (or an incorrect pairing 
was used). Defaulting to a MOCK!");
-        throw new IllegalArgumentException("Cannot create any producer with 
uri " + uri 
-                                           + "A producer type was not provided 
(or an incorrect pairing was used).");
+        throw new IllegalArgumentException("Cannot create any producer with 
uri " + uri
+                                           + ". A producer type was not 
provided (or an incorrect pairing was used).");
     }
 
     private static String[] splitUri(String uri) {

Modified: 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
 (original)
+++ 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
 Sun Mar 25 13:51:07 2012
@@ -19,5 +19,6 @@ package org.apache.camel.component.twitt
 import twitter4j.Status;
 
 public interface TweeterStatusListener {
+
     void onStatus(Status status);
 }

Modified: 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
 (original)
+++ 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
 Sun Mar 25 13:51:07 2012
@@ -43,6 +43,14 @@ public class TwitterConsumerEvent extend
     }
 
     @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (twitter4jConsumer instanceof StreamingConsumer) {
+            
((StreamingConsumer)twitter4jConsumer).unregisterTweetListener(this);
+        }
+    }
+
+    @Override
     public void onStatus(Status status) {
         Exchange exchange = getEndpoint().createExchange();
         exchange.getIn().setBody(status);

Modified: 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
 (original)
+++ 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
 Sun Mar 25 13:51:07 2012
@@ -40,7 +40,7 @@ public class TwitterConsumerPolling exte
         this.twitter4jConsumer = twitter4jConsumer;
 
         int delay = endpoint.getProperties().getDelay();
-        setInitialDelay(0);
+        setInitialDelay(1);
         setDelay(delay);
         setTimeUnit(TimeUnit.SECONDS);
     }

Modified: 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
 (original)
+++ 
camel/trunk/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
 Sun Mar 25 13:51:07 2012
@@ -41,7 +41,6 @@ public class StreamingConsumer extends T
     private volatile boolean clear;
     private TweeterStatusListener tweeterStatusListener;
 
-
     public StreamingConsumer(TwitterEndpoint te) {
         this.te = te;
         twitterStream = te.getProperties().getTwitterStreamInstance();
@@ -77,17 +76,25 @@ public class StreamingConsumer extends T
 
     @Override
     public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+        // noop
     }
 
     @Override
     public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+        // noop
     }
 
     @Override
     public void onScrubGeo(long userId, long upToStatusId) {
+        // noop
     }
 
     public void registerTweetListener(TweeterStatusListener 
tweeterStatusListener) {
         this.tweeterStatusListener = tweeterStatusListener;
     }
+
+    public void unregisterTweetListener(TweeterStatusListener 
tweeterStatusListener) {
+        this.tweeterStatusListener = null;
+    }
+
 }

Modified: 
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
 Sun Mar 25 13:51:07 2012
@@ -27,6 +27,7 @@ public class WebsocketEndpoint extends D
     private NodeSynchronization sync;
     private String remaining;
     private WebsocketStore memoryStore;
+    private Boolean sendToAll;
 
     public WebsocketEndpoint(String uri, WebsocketComponent component, String 
remaining) {
         super(uri, component);
@@ -59,6 +60,14 @@ public class WebsocketEndpoint extends D
         return true;
     }
 
+    public Boolean getSendToAll() {
+        return sendToAll;
+    }
+
+    public void setSendToAll(Boolean sendToAll) {
+        this.sendToAll = sendToAll;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(memoryStore);

Modified: 
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
 (original)
+++ 
camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
 Sun Mar 25 13:51:07 2012
@@ -20,17 +20,19 @@ import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 
 public class WebsocketProducer extends DefaultProducer {
     private final WebsocketStore store;
+    private final Boolean sendToAll;
 
-    public WebsocketProducer(Endpoint endpoint, WebsocketStore store) {
+    public WebsocketProducer(WebsocketEndpoint endpoint, WebsocketStore store) 
{
         super(endpoint);
         this.store = store;
+        this.sendToAll = endpoint.getSendToAll();
+        System.out.println("Configured with " + sendToAll);
     }
 
     @Override
@@ -45,6 +47,7 @@ public class WebsocketProducer extends D
             String connectionKey = 
in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
             if (connectionKey != null) {
                 DefaultWebsocket websocket = store.get(connectionKey);
+                log.debug("Sending to connection key {} -> {}", connectionKey, 
message);
                 sendMessage(websocket, message);
             } else {
                 throw new IllegalArgumentException("Failed to send message to 
single connection; connetion key not set.");
@@ -53,12 +56,13 @@ public class WebsocketProducer extends D
     }
 
     boolean isSendToAllSet(Message in) {
-        // header may be null; have to be careful here
-        Object value = in.getHeader(WebsocketConstants.SEND_TO_ALL);
-        return value == null ? false : (Boolean) value;
+        // header may be null; have to be careful here (and fallback to use 
sendToAll option configured from endpoint)
+        Boolean value = in.getHeader(WebsocketConstants.SEND_TO_ALL, 
sendToAll, Boolean.class);
+        return value == null ? false : value;
     }
 
     void sendToAll(WebsocketStore store, String message, Exchange exchange) 
throws Exception {
+        log.debug("Sending to all {}", message);
         Collection<DefaultWebsocket> websockets = store.getAll();
         Exception exception = null;
         for (DefaultWebsocket websocket : websockets) {
@@ -78,6 +82,7 @@ public class WebsocketProducer extends D
     void sendMessage(DefaultWebsocket websocket, String message) throws 
IOException {
         // in case there is web socket and socket connection is open - send 
message
         if (websocket != null && websocket.getConnection().isOpen()) {
+            log.trace("Sending to websocket {} -> {}", 
websocket.getConnectionKey(), message);
             websocket.getConnection().sendMessage(message);
         }
     }

Modified: 
camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
 (original)
+++ 
camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
 Sun Mar 25 13:51:07 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.eclipse.jetty.websocket.WebSocket.Connection;
@@ -52,7 +51,7 @@ public class WebsocketProducerTest {
     private static final String SESSION_KEY = "random-session-key";
 
     @Mock
-    private Endpoint endpoint;
+    private WebsocketEndpoint endpoint;
     @Mock
     private WebsocketStore store;
     @Mock
@@ -80,7 +79,7 @@ public class WebsocketProducerTest {
     public void testProcessSingleMessage() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
         when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(null);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(false);
         when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, 
String.class)).thenReturn(SESSION_KEY);
         when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1);
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
@@ -91,7 +90,7 @@ public class WebsocketProducerTest {
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
         inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
         inOrder.verify(store, times(1)).get(SESSION_KEY);
         inOrder.verify(defaultWebsocket1, times(1)).getConnection();
@@ -105,7 +104,7 @@ public class WebsocketProducerTest {
     public void testProcessSingleMessageWithException() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
         when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(false);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(false);
         when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, 
String.class)).thenReturn(SESSION_KEY);
         when(store.get(SESSION_KEY)).thenReturn(defaultWebsocket1);
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
@@ -122,7 +121,7 @@ public class WebsocketProducerTest {
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
         inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
         inOrder.verify(store, times(1)).get(SESSION_KEY);
         inOrder.verify(defaultWebsocket1, times(1)).getConnection();
@@ -136,7 +135,7 @@ public class WebsocketProducerTest {
     public void testProcessMultipleMessages() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
         when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(true);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true);
         when(store.getAll()).thenReturn(sockets);
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
         when(defaultWebsocket2.getConnection()).thenReturn(connection);
@@ -147,7 +146,7 @@ public class WebsocketProducerTest {
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
         inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(store, times(1)).getAll();
         inOrder.verify(defaultWebsocket1, times(1)).getConnection();
         inOrder.verify(connection, times(1)).isOpen();
@@ -161,10 +160,10 @@ public class WebsocketProducerTest {
     }
 
     @Test
-    public void testProcessMultipleMessagesWithExcpetion() throws Exception {
+    public void testProcessMultipleMessagesWithException() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
         when(inMessage.getMandatoryBody(String.class)).thenReturn(MESSAGE);
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(true);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true);
         when(store.getAll()).thenReturn(sockets);
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
         when(defaultWebsocket2.getConnection()).thenReturn(connection);
@@ -181,7 +180,7 @@ public class WebsocketProducerTest {
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
         inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(store, times(1)).getAll();
         inOrder.verify(defaultWebsocket1, times(1)).getConnection();
         inOrder.verify(connection, times(1)).isOpen();
@@ -198,7 +197,7 @@ public class WebsocketProducerTest {
     public void testProcessSingleMessageNoConnectionKey() throws Exception {
         when(exchange.getIn()).thenReturn(inMessage);
         when(inMessage.getBody(String.class)).thenReturn(MESSAGE);
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(false);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(false);
         when(inMessage.getHeader(WebsocketConstants.CONNECTION_KEY, 
String.class)).thenReturn(null);
 
         try {
@@ -213,7 +212,7 @@ public class WebsocketProducerTest {
         InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
         inOrder.verify(exchange, times(1)).getIn();
         inOrder.verify(inMessage, times(1)).getMandatoryBody(String.class);
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
         inOrder.verifyNoMoreInteractions();
     }
@@ -234,14 +233,6 @@ public class WebsocketProducerTest {
     }
 
     @Test
-    public void testSendMessageWebsocketIsNull() throws Exception {
-        websocketProducer.sendMessage(null, MESSAGE);
-
-        InOrder inOrder = inOrder(endpoint, store, connection, 
defaultWebsocket1, defaultWebsocket2, exchange, inMessage);
-        inOrder.verifyNoMoreInteractions();
-    }
-
-    @Test
     public void testSendMessageConnetionIsClosed() throws Exception {
         when(defaultWebsocket1.getConnection()).thenReturn(connection);
         when(connection.isOpen()).thenReturn(false);
@@ -277,20 +268,20 @@ public class WebsocketProducerTest {
 
     @Test
     public void testIsSendToAllSet() {
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(true, 
false);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true, false);
         assertTrue(websocketProducer.isSendToAllSet(inMessage));
         assertFalse(websocketProducer.isSendToAllSet(inMessage));
         InOrder inOrder = inOrder(inMessage);
-        inOrder.verify(inMessage, 
times(2)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(2)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verifyNoMoreInteractions();
     }
 
     @Test
     public void testIsSendToAllSetHeaderNull() {
-        
when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL)).thenReturn(null);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(null);
         assertFalse(websocketProducer.isSendToAllSet(inMessage));
         InOrder inOrder = inOrder(inMessage);
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL);
+        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
         inOrder.verifyNoMoreInteractions();
     }
 

Modified: 
camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java?rev=1305036&r1=1305035&r2=1305036&view=diff
==============================================================================
--- 
camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java
 (original)
+++ 
camel/trunk/examples/camel-example-twitter-websocket/src/main/java/org/apache/camel/example/websocket/TwitterWebSocketRoute.java
 Sun Mar 25 13:51:07 2012
@@ -19,7 +19,6 @@ package org.apache.camel.example.websock
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.twitter.TwitterComponent;
 import org.apache.camel.component.websocket.WebsocketComponent;
-import org.apache.camel.component.websocket.WebsocketConstants;
 
 /**
  * A Camel route that updates from twitter all tweets using having the search 
term.
@@ -105,9 +104,9 @@ public class TwitterWebSocketRoute exten
         tc.setConsumerKey(consumerKey);
         tc.setConsumerSecret(consumerSecret);
 
-        // poll twitter search for new tweets, and push tweets to all web 
socket subscribers on camel-tweet
+        // poll twitter search for new tweets
         fromF("twitter://search?type=polling&delay=%s&keywords=%s", delay, 
searchTerm)
-            .setHeader(WebsocketConstants.SEND_TO_ALL).constant(true)
-            .to("websocket:camel-tweet");
+            // and push tweets to all web socket subscribers on camel-tweet
+            .to("websocket:camel-tweet?sendToAll=true");
     }
 }


Reply via email to