http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
index c9edc40..54d707f 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
@@ -19,26 +19,27 @@ package org.apache.camel.component.twitter;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import 
org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler;
 import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent;
 import org.apache.camel.component.twitter.data.EndpointType;
 import org.apache.camel.impl.DefaultEndpoint;
 
+@Deprecated
 public class TwitterEndpointEvent extends DefaultEndpoint implements 
TwitterEndpoint {
-    private final String remaining;
+    private final String kind;
 
     // only TwitterEndpointPolling is annotated
     private TwitterConfiguration properties;
 
     public TwitterEndpointEvent(String uri, String remaining, TwitterComponent 
component, TwitterConfiguration properties) {
         super(uri, component);
-        this.remaining = remaining;
+        this.kind = remaining;
         this.properties = properties;
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, 
getEndpointUri(), remaining);
+        AbstractTwitterConsumerHandler twitter4jConsumer = 
TwitterHelper.createConsumer(this, getEndpointUri(), kind);
         return new TwitterConsumerEvent(this, processor, twitter4jConsumer);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
index 51b739e..008167a 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
@@ -21,20 +21,26 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import 
org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler;
 import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling;
 import org.apache.camel.component.twitter.data.EndpointType;
 import org.apache.camel.impl.DefaultPollingEndpoint;
+import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
 
 /**
- * This component integrates with Twitter to send tweets or search for tweets 
and more.
+ * Use twitter-directmessage, twitter-search, twitter-streaming and 
twitter-timeline instead of this component.
+ * @deprecated
  */
+@Deprecated
 @ManagedResource(description = "Managed Twitter Endpoint")
-@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter", title = "Twitter", 
syntax = "twitter:kind", consumerClass = TwitterConsumer.class, label = 
"api,social")
+@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter", title = "Twitter", 
syntax = "twitter:kind", consumerClass = AbstractTwitterConsumerHandler.class, 
label = "api,social")
 public class TwitterEndpointPolling extends DefaultPollingEndpoint implements 
TwitterEndpoint {
-    private final String remaining;
+    @UriPath(description = "The kind of endpoint", enums = 
"directmessage,search,streaming/filter,streaming/sample,streaming/user"
+            + 
",timeline/home,timeline/mentions,timeline/retweetsofme,timeline/user") 
@Metadata(required = "true")
+    private final String kind;
 
     @UriParam(optionalPrefix = "consumer.", defaultValue = "" + 
TwitterConsumerPolling.DEFAULT_CONSUMER_DELAY, label = "consumer,scheduler",
             description = "Milliseconds before the next poll.")
@@ -45,7 +51,7 @@ public class TwitterEndpointPolling extends 
DefaultPollingEndpoint implements Tw
 
     public TwitterEndpointPolling(String uri, String remaining, 
TwitterComponent component, TwitterConfiguration properties) {
         super(uri, component);
-        this.remaining = remaining;
+        this.kind = remaining;
         this.properties = properties;
 
         setDelay(delay); // reconfigure the default delay
@@ -53,7 +59,7 @@ public class TwitterEndpointPolling extends 
DefaultPollingEndpoint implements Tw
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, 
getEndpointUri(), remaining);
+        AbstractTwitterConsumerHandler twitter4jConsumer = 
TwitterHelper.createConsumer(this, getEndpointUri(), kind);
         // update the pulling lastID with sinceId
         twitter4jConsumer.setLastId(properties.getSinceId());
         TwitterConsumerPolling tc = new TwitterConsumerPolling(this, 
processor, twitter4jConsumer);
@@ -63,7 +69,7 @@ public class TwitterEndpointPolling extends 
DefaultPollingEndpoint implements Tw
 
     @Override
     public Producer createProducer() throws Exception {
-        return TwitterHelper.createProducer(this, getEndpointUri(), remaining);
+        return TwitterHelper.createProducer(this, getEndpointUri(), kind);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
index 81fefa9..e51e08f 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
@@ -16,25 +16,32 @@
  */
 package org.apache.camel.component.twitter;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
-import 
org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer;
-import org.apache.camel.component.twitter.consumer.search.SearchConsumer;
-import 
org.apache.camel.component.twitter.consumer.streaming.FilterStreamingConsumer;
-import 
org.apache.camel.component.twitter.consumer.streaming.SampleStreamingConsumer;
-import 
org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.UserConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import 
org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler;
+import org.apache.camel.component.twitter.consumer.DefaultTwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumerDirect;
+import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent;
+import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling;
 import org.apache.camel.component.twitter.data.ConsumerType;
 import org.apache.camel.component.twitter.data.StreamingType;
 import org.apache.camel.component.twitter.data.TimelineType;
-import org.apache.camel.component.twitter.producer.DirectMessageProducer;
-import org.apache.camel.component.twitter.producer.SearchProducer;
-import org.apache.camel.component.twitter.producer.TwitterProducer;
-import org.apache.camel.component.twitter.producer.UserProducer;
+import 
org.apache.camel.component.twitter.directmessage.DirectMessageConsumerHandler;
+import org.apache.camel.component.twitter.directmessage.DirectMessageProducer;
+import org.apache.camel.component.twitter.search.SearchConsumerHandler;
+import org.apache.camel.component.twitter.search.SearchProducer;
+import 
org.apache.camel.component.twitter.streaming.FilterStreamingConsumerHandler;
+import 
org.apache.camel.component.twitter.streaming.SampleStreamingConsumerHandler;
+import 
org.apache.camel.component.twitter.streaming.UserStreamingConsumerHandler;
+import org.apache.camel.component.twitter.timeline.HomeConsumerHandler;
+import org.apache.camel.component.twitter.timeline.MentionsConsumerHandler;
+import org.apache.camel.component.twitter.timeline.RetweetsConsumerHandler;
+import org.apache.camel.component.twitter.timeline.UserConsumerHandler;
+import org.apache.camel.component.twitter.timeline.UserProducer;
+
 import twitter4j.User;
 
 public final class TwitterHelper {
@@ -58,30 +65,31 @@ public final class TwitterHelper {
         message.setHeader(TwitterConstants.TWITTER_USER_ROLE + index, role);
     }
 
-    public static TwitterConsumer createConsumer(TwitterEndpoint te, String 
uri, String remaining) throws IllegalArgumentException {
+    @Deprecated
+    public static AbstractTwitterConsumerHandler 
createConsumer(TwitterEndpoint te, String uri, String remaining) throws 
IllegalArgumentException {
         String[] tokens = remaining.split("/");
-
+        
         if (tokens.length > 0) {
             switch (ConsumerType.fromString(tokens[0])) {
             case DIRECTMESSAGE:
-                return new DirectMessageConsumer(te);
+                return new DirectMessageConsumerHandler(te);
             case SEARCH:
                 boolean hasNoKeywords = te.getProperties().getKeywords() == 
null
                     || te.getProperties().getKeywords().trim().isEmpty();
                 if (hasNoKeywords) {
                     throw new IllegalArgumentException("Type set to SEARCH but 
no keywords were provided.");
                 } else {
-                    return new SearchConsumer(te);
+                    return new SearchConsumerHandler(te);
                 }
             case STREAMING:
                 if (tokens.length > 1) {
                     switch (StreamingType.fromString(tokens[1])) {
                     case SAMPLE:
-                        return new SampleStreamingConsumer(te);
+                        return new SampleStreamingConsumerHandler(te);
                     case FILTER:
-                        return new FilterStreamingConsumer(te);
+                        return new FilterStreamingConsumerHandler(te);
                     case USER:
-                        return new UserStreamingConsumer(te);
+                        return new UserStreamingConsumerHandler(te);
                     default:
                         break;
                     }
@@ -91,16 +99,16 @@ public final class TwitterHelper {
                 if (tokens.length > 1) {
                     switch (TimelineType.fromString(tokens[1])) {
                     case HOME:
-                        return new HomeConsumer(te);
+                        return new HomeConsumerHandler(te);
                     case MENTIONS:
-                        return new MentionsConsumer(te);
+                        return new MentionsConsumerHandler(te);
                     case RETWEETSOFME:
-                        return new RetweetsConsumer(te);
+                        return new RetweetsConsumerHandler(te);
                     case USER:
                         if (te.getProperties().getUser() == null || 
te.getProperties().getUser().trim().isEmpty()) {
                             throw new IllegalArgumentException("Fetch type set 
to USER TIMELINE but no user was set.");
                         } else {
-                            return new UserConsumer(te);
+                            return new UserConsumerHandler(te);
                         }
                     default:
                         break;
@@ -116,7 +124,24 @@ public final class TwitterHelper {
             + ". A consumer type was not provided (or an incorrect pairing was 
used).");
     }
 
-    public static TwitterProducer createProducer(TwitterEndpoint te, String 
uri, String remaining) throws IllegalArgumentException {
+    public static Consumer createConsumer(Processor processor, 
AbstractTwitterEndpoint endpoint, AbstractTwitterConsumerHandler handler) 
throws Exception {
+        Consumer answer = new DefaultTwitterConsumer(endpoint, processor, 
handler);
+        switch (endpoint.getEndpointType()) {
+        case POLLING:
+            handler.setLastId(endpoint.getProperties().getSinceId());
+            endpoint.configureConsumer(answer);
+            break;
+        case DIRECT:
+            endpoint.configureConsumer(answer);
+            break;
+        default:
+            break;
+        }
+        return answer;
+    }
+
+    @Deprecated
+    public static Producer createProducer(TwitterEndpoint te, String uri, 
String remaining) throws IllegalArgumentException {
         String[] tokens = remaining.split("/");
 
         if (tokens.length > 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java
new file mode 100644
index 0000000..b0ec1f0
--- /dev/null
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.Paging;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+
+public abstract class AbstractTwitterConsumerHandler {
+
+    /**
+     * Instance of TwitterEndpoint.
+     */
+    protected final TwitterEndpoint endpoint;
+
+    /**
+     * The last tweet ID received.
+     */
+    private long lastId;
+
+    protected AbstractTwitterConsumerHandler(TwitterEndpoint endpoint) {
+        this.endpoint = endpoint;
+        this.lastId = -1;
+    }
+
+    /**
+     * Called by polling consumers during each poll.  It needs to be separate
+     * from directConsume() since, as an example, streaming API polling allows
+     * tweets to build up between polls.
+     */
+    public abstract List<Exchange> pollConsume() throws TwitterException;
+
+    /**
+     * Called by direct consumers.
+     */
+    public abstract List<Exchange> directConsume() throws TwitterException;
+
+    /**
+     * Can't assume that the end of the list will be the most recent ID.
+     * The Twitter API sometimes returns them slightly out of order.
+     */
+    protected void setLastIdIfGreater(long newId) {
+        if (newId > lastId) {
+            lastId = newId;
+        }
+    }
+
+    /**
+     * Support to update the Consumer's lastId when starting the consumer
+     */
+    public void setLastId(long sinceId) {
+        lastId = sinceId;
+    }
+
+    protected Twitter getTwitter() {
+        return endpoint.getProperties().getTwitter();
+    }
+
+    protected long getLastId() {
+        return lastId;
+    }
+
+    protected Paging getLastIdPaging() {
+        return new Paging(lastId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java
new file mode 100644
index 0000000..7b2b669
--- /dev/null
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.twitter.AbstractTwitterEndpoint;
+import 
org.apache.camel.component.twitter.streaming.AbstractStreamingConsumerHandler;
+import org.apache.camel.impl.ScheduledPollConsumer;
+
+/**
+ * Provides a scheduled polling consumer as well as event based consumer for 
streaming.
+ */
+public class DefaultTwitterConsumer extends ScheduledPollConsumer implements 
TwitterEventListener {
+
+    public static final long DEFAULT_CONSUMER_DELAY = 30 * 1000L;
+    private final AbstractTwitterEndpoint endpoint;
+    private final AbstractTwitterConsumerHandler handler;
+
+    public DefaultTwitterConsumer(AbstractTwitterEndpoint endpoint, Processor 
processor, AbstractTwitterConsumerHandler handler) {
+        super(endpoint, processor);
+        setDelay(DEFAULT_CONSUMER_DELAY);
+        this.endpoint = endpoint;
+        this.handler = handler;
+    }
+
+    @Override
+    public AbstractTwitterEndpoint getEndpoint() {
+        return (AbstractTwitterEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        switch (endpoint.getEndpointType()) {
+        case POLLING:
+            if (handler instanceof AbstractStreamingConsumerHandler) {
+                ((AbstractStreamingConsumerHandler) handler).start();
+            }
+            break;
+        case EVENT:
+            if (handler instanceof AbstractStreamingConsumerHandler) {
+                ((AbstractStreamingConsumerHandler) 
handler).setEventListener(this);
+                ((AbstractStreamingConsumerHandler) handler).start();
+            }
+            break;
+        default:
+            List<Exchange> exchanges = handler.directConsume();
+            for (int i = 0; i < exchanges.size(); i++) {
+                getProcessor().process(exchanges.get(i));
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        switch (endpoint.getEndpointType()) {
+        case POLLING:
+            if (handler instanceof AbstractStreamingConsumerHandler) {
+                ((AbstractStreamingConsumerHandler) handler).stop();
+            }
+            break;
+        case EVENT:
+            if (handler instanceof AbstractStreamingConsumerHandler) {
+                ((AbstractStreamingConsumerHandler) 
handler).removeEventListener(this);
+                ((AbstractStreamingConsumerHandler) handler).stop();
+            }
+            break;
+        default:
+            break;
+        }
+
+        super.doStop();
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        List<Exchange> exchanges = handler.pollConsume();
+
+        int index = 0;
+        for (; index < exchanges.size(); index++) {
+            getProcessor().process(exchanges.get(index));
+        }
+
+        return index;
+    }
+
+    @Override
+    public void onEvent(Exchange exchange) {
+        if (!isRunAllowed()) {
+            return;
+        }
+
+        try {
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException("Error processing exchange 
on status update", exchange, exchange.getException());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
deleted file mode 100644
index 3379105..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.Paging;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-
-public abstract class TwitterConsumer {
-
-    /**
-     * Instance of TwitterEndpoint.
-     */
-    protected final TwitterEndpoint endpoint;
-
-    /**
-     * The last tweet ID received.
-     */
-    private long lastId;
-
-    protected TwitterConsumer(TwitterEndpoint endpoint) {
-        this.endpoint = endpoint;
-        this.lastId = -1;
-    }
-
-    /**
-     * Called by polling consumers during each poll.  It needs to be separate
-     * from directConsume() since, as an example, streaming API polling allows
-     * tweets to build up between polls.
-     */
-    public abstract List<Exchange> pollConsume() throws TwitterException;
-
-    /**
-     * Called by direct consumers.
-     */
-    public abstract List<Exchange> directConsume() throws TwitterException;
-
-    /**
-     * Can't assume that the end of the list will be the most recent ID.
-     * The Twitter API sometimes returns them slightly out of order.
-     */
-    protected void setLastIdIfGreater(long newId) {
-        if (newId > lastId) {
-            lastId = newId;
-        }
-    }
-
-    /**
-     * Support to update the Consumer's lastId when starting the consumer
-     */
-    public void setLastId(long sinceId) {
-        lastId = sinceId;
-    }
-
-    protected Twitter getTwitter() {
-        return endpoint.getProperties().getTwitter();
-    }
-
-    protected long getLastId() {
-        return lastId;
-    }
-
-    protected Paging getLastIdPaging() {
-        return new Paging(lastId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
index 9c47259..3ae4ba6 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
@@ -26,11 +26,12 @@ import org.apache.camel.impl.DefaultConsumer;
 /**
  * Camel DirectConsumer implementation.
  */
+@Deprecated
 public class TwitterConsumerDirect extends DefaultConsumer {
 
-    private final TwitterConsumer twitter4jConsumer;
+    private final AbstractTwitterConsumerHandler twitter4jConsumer;
 
-    public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor 
processor, TwitterConsumer twitter4jConsumer) {
+    public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor 
processor, AbstractTwitterConsumerHandler twitter4jConsumer) {
         super(endpoint, processor);
 
         this.twitter4jConsumer = twitter4jConsumer;

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
index f5b94a0..6c3f87c 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
@@ -19,13 +19,14 @@ package org.apache.camel.component.twitter.consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.twitter.TwitterEndpoint;
-import 
org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer;
+import 
org.apache.camel.component.twitter.streaming.AbstractStreamingConsumerHandler;
 import org.apache.camel.impl.DefaultConsumer;
 
+@Deprecated
 public class TwitterConsumerEvent extends DefaultConsumer implements 
TwitterEventListener {
-    private final TwitterConsumer twitter4jConsumer;
+    private final AbstractTwitterConsumerHandler twitter4jConsumer;
 
-    public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, 
TwitterConsumer twitter4jConsumer) {
+    public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, 
AbstractTwitterConsumerHandler twitter4jConsumer) {
         super(endpoint, processor);
         this.twitter4jConsumer = twitter4jConsumer;
     }
@@ -34,17 +35,17 @@ public class TwitterConsumerEvent extends DefaultConsumer 
implements TwitterEven
     protected void doStart() throws Exception {
         super.doStart();
 
-        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
-            ((AbstractStreamingConsumer) 
twitter4jConsumer).setEventListener(this);
-            ((AbstractStreamingConsumer) twitter4jConsumer).start();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) {
+            ((AbstractStreamingConsumerHandler) 
twitter4jConsumer).setEventListener(this);
+            ((AbstractStreamingConsumerHandler) twitter4jConsumer).start();
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
-            ((AbstractStreamingConsumer) 
twitter4jConsumer).removeEventListener(this);
-            ((AbstractStreamingConsumer) twitter4jConsumer).stop();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) {
+            ((AbstractStreamingConsumerHandler) 
twitter4jConsumer).removeEventListener(this);
+            ((AbstractStreamingConsumerHandler) twitter4jConsumer).stop();
         }
 
         super.doStop();

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
index 368d6ef..0adbb3c 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
@@ -20,20 +20,21 @@ import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.twitter.TwitterEndpoint;
 import org.apache.camel.component.twitter.TwitterEndpointPolling;
-import 
org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer;
-import org.apache.camel.component.twitter.util.TwitterSorter;
+import 
org.apache.camel.component.twitter.streaming.AbstractStreamingConsumerHandler;
 import org.apache.camel.impl.ScheduledPollConsumer;
 
 /**
  * Provides a scheduled polling consumer
  */
+@Deprecated
 public class TwitterConsumerPolling extends ScheduledPollConsumer {
 
     public static final long DEFAULT_CONSUMER_DELAY = 30 * 1000L;
-    private final TwitterConsumer twitter4jConsumer;
+    private final AbstractTwitterConsumerHandler twitter4jConsumer;
 
-    public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor 
processor, TwitterConsumer twitter4jConsumer) {
+    public TwitterConsumerPolling(TwitterEndpoint endpoint, Processor 
processor, AbstractTwitterConsumerHandler twitter4jConsumer) {
         super(endpoint, processor);
         setDelay(DEFAULT_CONSUMER_DELAY);
         this.twitter4jConsumer = twitter4jConsumer;
@@ -47,15 +48,15 @@ public class TwitterConsumerPolling extends 
ScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
-            ((AbstractStreamingConsumer) twitter4jConsumer).start();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) {
+            ((AbstractStreamingConsumerHandler) twitter4jConsumer).start();
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
-            ((AbstractStreamingConsumer) twitter4jConsumer).stop();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) {
+            ((AbstractStreamingConsumerHandler) twitter4jConsumer).stop();
         }
 
         super.doStop();

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
deleted file mode 100644
index d0b89de..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.directmessage;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
-import org.apache.camel.component.twitter.consumer.TwitterEventType;
-import twitter4j.DirectMessage;
-import twitter4j.TwitterException;
-
-/**
- * Consumes a user's direct messages
- */
-public class DirectMessageConsumer extends TwitterConsumer {
-
-    public DirectMessageConsumer(TwitterEndpoint te) {
-        super(te);
-    }
-
-    @Override
-    public List<Exchange> pollConsume() throws TwitterException {
-        List<DirectMessage> directMessages =  
getTwitter().getDirectMessages(getLastIdPaging());
-        for (int i = 0; i < directMessages.size(); i++) {
-            setLastIdIfGreater(directMessages.get(i).getId());
-        }
-
-        return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, 
directMessages);
-    }
-
-    @Override
-    public List<Exchange> directConsume() throws TwitterException {
-        return TwitterEventType.DIRECT_MESSAGE.createExchangeList(
-            endpoint,
-            getTwitter().getDirectMessages()
-        );
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
deleted file mode 100644
index 1c3a087..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.search;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
-import org.apache.camel.component.twitter.consumer.TwitterEventType;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.GeoLocation;
-import twitter4j.Query;
-import twitter4j.Query.Unit;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-/**
- * Consumes search requests
- */
-public class SearchConsumer extends TwitterConsumer {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SearchConsumer.class);
-
-    public SearchConsumer(TwitterEndpoint te) {
-        super(te);
-    }
-
-    public List<Exchange> pollConsume() throws TwitterException {
-        String keywords = endpoint.getProperties().getKeywords();
-
-        Query query;
-
-        if (keywords != null && keywords.trim().length() > 0) {
-            query = new Query(keywords);
-            LOG.debug("Searching twitter with keywords: {}", keywords);
-        } else {
-            query = new Query();
-            LOG.debug("Searching twitter without keywords.");
-        }
-
-        if (endpoint.getProperties().isFilterOld()) {
-            query.setSinceId(getLastId());
-        }
-
-        return search(query);
-    }
-
-    public List<Exchange> directConsume() throws TwitterException {
-        String keywords = endpoint.getProperties().getKeywords();
-        if (keywords == null || keywords.trim().length() == 0) {
-            return Collections.emptyList();
-        }
-        Query query = new Query(keywords);
-
-        LOG.debug("Searching twitter with keywords: {}", keywords);
-        return search(query);
-    }
-
-    private List<Exchange> search(Query query) throws TwitterException {
-        Integer numberOfPages = 1;
-
-        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLang())) {
-            query.setLang(endpoint.getProperties().getLang());
-        }
-
-        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getCount())) {
-            query.setCount(endpoint.getProperties().getCount());
-        }
-
-        if 
(ObjectHelper.isNotEmpty(endpoint.getProperties().getNumberOfPages())) {
-            numberOfPages = endpoint.getProperties().getNumberOfPages();
-        }
-
-        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLatitude())
-                && 
ObjectHelper.isNotEmpty(endpoint.getProperties().getLongitude())
-                && 
ObjectHelper.isNotEmpty(endpoint.getProperties().getRadius())) {
-            GeoLocation location = new 
GeoLocation(endpoint.getProperties().getLatitude(), 
endpoint.getProperties().getLongitude());
-            query.setGeoCode(location, endpoint.getProperties().getRadius(), 
Unit.valueOf(endpoint.getProperties().getDistanceMetric()));
-
-            LOG.debug("Searching with additional geolocation parameters.");
-        }
-
-        LOG.debug("Searching with {} pages.", numberOfPages);
-
-        Twitter twitter = getTwitter();
-        QueryResult qr = twitter.search(query);
-        List<Status> tweets = qr.getTweets();
-
-        for (int i = 1; i < numberOfPages; i++) {
-            if (!qr.hasNext()) {
-                break;
-            }
-
-            qr = twitter.search(qr.nextQuery());
-            tweets.addAll(qr.getTweets());
-        }
-
-        if (endpoint.getProperties().isFilterOld()) {
-            for (int i = 0; i < tweets.size(); i++) {
-                setLastIdIfGreater(tweets.get(i).getId());
-            }
-        }
-
-        return TwitterEventType.STATUS.createExchangeList(endpoint, tweets);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
deleted file mode 100644
index 7a7bfcb..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Service;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
-import org.apache.camel.component.twitter.consumer.TwitterEventListener;
-import org.apache.camel.component.twitter.consumer.TwitterEventType;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterException;
-import twitter4j.TwitterStream;
-
-/**
- * Super class providing consuming capabilities for the streaming API.
- */
-public abstract class AbstractStreamingConsumer extends TwitterConsumer 
implements StatusListener, Service {
-    private final TwitterStream twitterStream;
-    private final List<Exchange> receivedStatuses;
-    private final AtomicReference<TwitterEventListener> twitterEventListener;
-    private boolean clear;
-
-    public AbstractStreamingConsumer(TwitterEndpoint te) {
-        super(te);
-        this.receivedStatuses = new ArrayList<>();
-        this.twitterStream = te.getProperties().createTwitterStream();
-        this.twitterStream.addListener(this);
-        this.twitterEventListener = new AtomicReference<>();
-        this.clear = true;
-    }
-
-    @Override
-    public List<Exchange> pollConsume() throws TwitterException {
-        List<Exchange> result;
-
-        synchronized (receivedStatuses) {
-            clear = true;
-            result = Collections.unmodifiableList(new 
ArrayList<>(receivedStatuses));
-        }
-
-        return result;
-    }
-
-    @Override
-    public List<Exchange> directConsume() throws TwitterException {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void onException(Exception ex) {
-    }
-
-    @Override
-    public void onStatus(Status status) {
-        onEvent(TwitterEventType.STATUS.createExchange(endpoint, status));
-    }
-
-    @Override
-    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
-        // noop
-    }
-
-    @Override
-    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
-        // noop
-    }
-
-    @Override
-    public void onScrubGeo(long userId, long upToStatusId) {
-        // noop
-    }
-
-    public void setEventListener(TwitterEventListener tweeterStatusListener) {
-        twitterEventListener.set(tweeterStatusListener);
-    }
-
-    public void removeEventListener(TwitterEventListener 
tweeterStatusListener) {
-        twitterEventListener.compareAndSet(tweeterStatusListener, null);
-    }
-
-    @Override
-    public void stop() {
-        twitterStream.removeListener(this);
-        twitterStream.shutdown();
-        twitterStream.cleanUp();
-    }
-
-    protected TwitterStream getTwitterStream() {
-        return twitterStream;
-    }
-
-    protected void onEvent(Exchange exchange) {
-        TwitterEventListener listener = twitterEventListener.get();
-        if (listener != null) {
-            listener.onEvent(exchange);
-        } else {
-            synchronized (receivedStatuses) {
-                if (clear) {
-                    receivedStatuses.clear();
-                    clear = false;
-                }
-                receivedStatuses.add(exchange);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
deleted file mode 100644
index 62460cc..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-
-/**
- * Consumes the filter stream
- */
-public class FilterStreamingConsumer extends AbstractStreamingConsumer {
-
-    public FilterStreamingConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    public void start() {
-        getTwitterStream().filter(createFilter());
-    }
-
-    @Override
-    public void onStallWarning(StallWarning stallWarning) {
-        // noop
-    }
-
-    private FilterQuery createFilter() {
-        FilterQuery filterQuery = new FilterQuery();
-        String allLocationsString = endpoint.getProperties().getLocations();
-        if (allLocationsString != null) {
-            String[] locationStrings = allLocationsString.split(";");
-            double[][] locations = new double[locationStrings.length][2];
-            for (int i = 0; i < locationStrings.length; i++) {
-                String[] coords = locationStrings[i].split(",");
-                locations[i][0] = Double.valueOf(coords[0]);
-                locations[i][1] = Double.valueOf(coords[1]);
-            }
-            filterQuery.locations(locations);
-        }
-
-        String keywords = endpoint.getProperties().getKeywords();
-        if (keywords != null && keywords.length() > 0) {
-            filterQuery.track(keywords.split(","));
-        }
-
-        String userIds = endpoint.getProperties().getUserIds();
-        if (userIds != null) {
-            String[] stringUserIds = userIds.split(",");
-            long[] longUserIds = new long[stringUserIds.length];
-            for (int i = 0; i < stringUserIds.length; i++) {
-                longUserIds[i] = Long.valueOf(stringUserIds[i]);
-            }
-            filterQuery.follow(longUserIds);
-        }
-
-        if (allLocationsString == null && keywords == null && userIds == null) 
{
-            throw new IllegalArgumentException("At least one filter parameter 
is required");
-        }
-
-        return filterQuery;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
deleted file mode 100644
index e0a5b27..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.StallWarning;
-
-/**
- * Consumes the sample stream
- */
-public class SampleStreamingConsumer extends AbstractStreamingConsumer {
-
-    public SampleStreamingConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    public void start() {
-        getTwitterStream().sample();
-    }
-
-    @Override
-    public void onStallWarning(StallWarning stallWarning) {
-        // noop
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
deleted file mode 100644
index e04ba31..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.twitter.TwitterConstants;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.TwitterHelper;
-import org.apache.camel.component.twitter.consumer.TwitterEventType;
-import twitter4j.DirectMessage;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.User;
-import twitter4j.UserList;
-import twitter4j.UserStreamListener;
-
-public class UserStreamingConsumer extends AbstractStreamingConsumer 
implements UserStreamListener {
-
-    public UserStreamingConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    public void start() {
-        getTwitterStream().user();
-    }
-
-    @Override
-    public void onDeletionNotice(long directMessageId, long userId) {
-        // noop
-    }
-
-    @Override
-    public void onFriendList(long[] friendIds) {
-        // noop
-    }
-
-    @Override
-    public void onFavorite(User source, User target, Status favoritedStatus) {
-        Exchange exchange = TwitterEventType.FAVORITE.createExchange(endpoint, 
favoritedStatus);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, target, "target");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUnfavorite(User source, User target, Status 
unfavoritedStatus) {
-        Exchange exchange = 
TwitterEventType.UNFAVORITE.createExchange(endpoint, unfavoritedStatus);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, target, "target");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onFollow(User source, User followedUser) {
-        Exchange exchange = TwitterEventType.FOLLOW.createExchange(endpoint);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, followedUser, "followed");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUnfollow(User source, User unfollowedUser) {
-        Exchange exchange = TwitterEventType.UNFOLLOW.createExchange(endpoint);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, unfollowedUser, "unfollowed");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onDirectMessage(DirectMessage directMessage) {
-        onEvent(TwitterEventType.DIRECT_MESSAGE.createExchange(endpoint, 
directMessage));
-    }
-
-    @Override
-    public void onUserListMemberAddition(User addedMember, User listOwner, 
UserList list) {
-        Exchange exchange = 
TwitterEventType.USERLIST_MEMBER_ADDITION.createExchange(endpoint, list);
-        TwitterHelper.setUserHeader(exchange, 1, addedMember, "addedMember");
-        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserListMemberDeletion(User deletedMember, User listOwner, 
UserList list) {
-        Exchange exchange = 
TwitterEventType.USERLIST_MEMBER_DELETION.createExchange(endpoint, list);
-        TwitterHelper.setUserHeader(exchange, 1, deletedMember, 
"deletedMember");
-        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserListSubscription(User subscriber, User listOwner, 
UserList list) {
-        Exchange exchange = 
TwitterEventType.USERLIST_SUBSCRIPTION.createExchange(endpoint, list);
-        TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber");
-        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserListUnsubscription(User subscriber, User listOwner, 
UserList list) {
-        Exchange exchange = 
TwitterEventType.USERLIST_UNSUBSCRIPTION.createExchange(endpoint, list);
-        TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber");
-        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserListCreation(User user, UserList userList) {
-        Exchange exchange = 
TwitterEventType.USERLIST_CREATION.createExchange(endpoint, userList);
-        TwitterHelper.setUserHeader(exchange, user);
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserListUpdate(User user, UserList userList) {
-        Exchange exchange = 
TwitterEventType.USERLIST_UPDATE.createExchange(endpoint, userList);
-        TwitterHelper.setUserHeader(exchange, user);
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserListDeletion(User user, UserList userList) {
-        Exchange exchange = 
TwitterEventType.USERLIST_DELETETION.createExchange(endpoint, userList);
-        TwitterHelper.setUserHeader(exchange, user);
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserProfileUpdate(User user) {
-        Exchange exchange = 
TwitterEventType.USER_PROFILE_UPDATE.createExchange(endpoint);
-        TwitterHelper.setUserHeader(exchange, user);
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserSuspension(long suspendedUser) {
-        Exchange exchange = 
TwitterEventType.USER_SUSPENSION.createExchange(endpoint);
-        exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, 
suspendedUser);
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUserDeletion(long deletedUser) {
-        Exchange exchange = 
TwitterEventType.USER_DELETION.createExchange(endpoint);
-        exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, deletedUser);
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onBlock(User source, User blockedUser) {
-        Exchange exchange = TwitterEventType.BLOCK.createExchange(endpoint);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, blockedUser, "blocked");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onUnblock(User source, User unblockedUser) {
-        Exchange exchange = TwitterEventType.UNBLOCK.createExchange(endpoint);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, unblockedUser, "unblocked");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onStallWarning(StallWarning stallWarning) {
-        // noop
-    }
-
-    @Override
-    public void onRetweetedRetweet(User source, User target, Status 
retweetedStatus) {
-        Exchange exchange = 
TwitterEventType.RETWEETED_RETWEET.createExchange(endpoint, retweetedStatus);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, target, "target");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onFavoritedRetweet(User source, User target, Status 
favoritedRetweeet) {
-        Exchange exchange = 
TwitterEventType.FAVORITED_RETWEET.createExchange(endpoint, favoritedRetweeet);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, target, "target");
-
-        onEvent(exchange);
-    }
-
-    @Override
-    public void onQuotedTweet(User source, User target, Status quotingTweet) {
-        Exchange exchange = 
TwitterEventType.QUOTED_TWEET.createExchange(endpoint, quotingTweet);
-        TwitterHelper.setUserHeader(exchange, 1, source, "source");
-        TwitterHelper.setUserHeader(exchange, 2, target, "target");
-
-        onEvent(exchange);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java
deleted file mode 100644
index 18f0aee..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.timeline;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.TwitterConsumer;
-import org.apache.camel.component.twitter.consumer.TwitterEventType;
-import twitter4j.Status;
-import twitter4j.TwitterException;
-
-/**
- * Consumes the user's home timeline.
- */
-abstract class AbstractStatusConsumer extends TwitterConsumer {
-
-    AbstractStatusConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    public List<Exchange> pollConsume() throws TwitterException {
-        List<Status> statusList =  doPoll();
-        for (int i = 0; i < statusList.size(); i++) {
-            setLastIdIfGreater(statusList.get(i).getId());
-        }
-
-        return TwitterEventType.STATUS.createExchangeList(endpoint, 
statusList);
-    }
-
-    @Override
-    public List<Exchange> directConsume() throws TwitterException {
-        return TwitterEventType.STATUS.createExchangeList(endpoint, 
doDirect());
-    }
-
-    protected abstract List<Status> doPoll() throws TwitterException;
-
-    protected abstract List<Status> doDirect() throws TwitterException;
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java
deleted file mode 100644
index 252bacc..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.timeline;
-
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.Status;
-import twitter4j.TwitterException;
-
-/**
- * Consumes the user's home timeline.
- */
-public class HomeConsumer extends AbstractStatusConsumer {
-
-    public HomeConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    protected List<Status> doPoll() throws TwitterException {
-        return getTwitter().getHomeTimeline(getLastIdPaging());
-    }
-
-    @Override
-    protected List<Status> doDirect() throws TwitterException {
-        return getTwitter().getHomeTimeline();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java
deleted file mode 100644
index c3671c8..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.timeline;
-
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.Status;
-import twitter4j.TwitterException;
-
-/**
- * Consumes tweets in which the user has been mentioned.
- */
-public class MentionsConsumer extends AbstractStatusConsumer {
-
-    public MentionsConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    protected List<Status> doPoll() throws TwitterException {
-        return getTwitter().getMentionsTimeline(getLastIdPaging());
-    }
-
-    @Override
-    protected List<Status> doDirect() throws TwitterException {
-        return getTwitter().getMentionsTimeline();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java
deleted file mode 100644
index dfd0815..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.timeline;
-
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.Status;
-import twitter4j.TwitterException;
-
-/**
- * Consumes a user's tweets that have been retweeted
- */
-public class RetweetsConsumer extends AbstractStatusConsumer {
-
-    public RetweetsConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    protected List<Status> doPoll() throws TwitterException {
-        return getTwitter().getRetweetsOfMe(getLastIdPaging());
-    }
-
-    @Override
-    protected List<Status> doDirect() throws TwitterException {
-        return getTwitter().getRetweetsOfMe();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java
deleted file mode 100644
index 51ef02a..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.timeline;
-
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.Status;
-import twitter4j.TwitterException;
-
-/**
- * Consumes the timeline of a given user.
- */
-public class UserConsumer extends AbstractStatusConsumer {
-
-    public UserConsumer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    protected List<Status> doPoll() throws TwitterException {
-        return 
getTwitter().getUserTimeline(endpoint.getProperties().getUser(), 
getLastIdPaging());
-    }
-
-    @Override
-    protected List<Status> doDirect() throws TwitterException {
-        return 
getTwitter().getUserTimeline(endpoint.getProperties().getUser());
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java
index 2c4f49a..8e481d4 100644
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.twitter.data;
 
 import org.apache.camel.component.twitter.TwitterHelper;
 
+@Deprecated
 public enum TrendsType {
     DAILY, WEEKLY, UNKNOWN;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java
new file mode 100644
index 0000000..2fd125e
--- /dev/null
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.directmessage;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import 
org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
+import twitter4j.DirectMessage;
+import twitter4j.TwitterException;
+
+/**
+ * Consumes a user's direct messages
+ */
+public class DirectMessageConsumerHandler extends 
AbstractTwitterConsumerHandler {
+
+    public DirectMessageConsumerHandler(TwitterEndpoint te) {
+        super(te);
+    }
+
+    @Override
+    public List<Exchange> pollConsume() throws TwitterException {
+        List<DirectMessage> directMessages =  
getTwitter().getDirectMessages(getLastIdPaging());
+        for (int i = 0; i < directMessages.size(); i++) {
+            setLastIdIfGreater(directMessages.get(i).getId());
+        }
+
+        return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, 
directMessages);
+    }
+
+    @Override
+    public List<Exchange> directConsume() throws TwitterException {
+        return TwitterEventType.DIRECT_MESSAGE.createExchangeList(
+            endpoint,
+            getTwitter().getDirectMessages()
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java
new file mode 100644
index 0000000..078c6cd
--- /dev/null
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.directmessage;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterConstants;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Produces text as a direct message.
+ */
+public class DirectMessageProducer extends DefaultProducer {
+
+    private TwitterEndpoint endpoint;
+
+    public DirectMessageProducer(TwitterEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        // send direct message
+        String toUsername = endpoint.getProperties().getUser();
+        if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(TwitterConstants.TWITTER_USER,
 String.class))) {
+            toUsername = 
exchange.getIn().getHeader(TwitterConstants.TWITTER_USER, String.class);
+        }
+        String text = exchange.getIn().getBody(String.class);
+
+        if (toUsername.isEmpty()) {
+            throw new CamelExchangeException("Username not configured on 
TwitterEndpoint", exchange);
+        } else {
+            log.debug("Sending to: {} message: {}", toUsername, text);
+            
endpoint.getProperties().getTwitter().sendDirectMessage(toUsername, text);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java
new file mode 100644
index 0000000..516bb31
--- /dev/null
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.directmessage;
+
+import java.util.Map;
+
+import org.apache.camel.ComponentVerifier;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.twitter.AbstractTwitterComponent;
+import org.apache.camel.component.twitter.DefaultTwitterComponentVerifier;
+import org.apache.camel.component.twitter.TwitterConfiguration;
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Twitter direct message component.
+ */
+@Metadata(label = "verifiers", enums = "parameters,connectivity")
+public class TwitterDirectMessageComponent extends AbstractTwitterComponent {
+
+    protected Endpoint doCreateEndpoint(TwitterConfiguration properties, 
String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        return new TwitterDirectMessageEndpoint(uri, remaining, this, 
properties);
+    }
+
+    /**
+     * Get a verifier for the twitter directmessage component.
+     */
+    public ComponentVerifier getVerifier() {
+        return new DefaultTwitterComponentVerifier(this, 
"twitter-directmessage");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java
new file mode 100644
index 0000000..86ede93
--- /dev/null
+++ 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java
@@ -0,0 +1,44 @@
+package org.apache.camel.component.twitter.directmessage;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.twitter.AbstractTwitterEndpoint;
+import org.apache.camel.component.twitter.TwitterConfiguration;
+import org.apache.camel.component.twitter.TwitterHelper;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriPath;
+
+/**
+ * The Twitter Direct Message Component consumes/produces user's direct 
messages.
+ */
+@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter-directmessage", title 
= "Twitter Direct Message", syntax = "twitter-directmessage:endpointId", 
consumerClass = DirectMessageConsumerHandler.class, label = "api,social")
+public class TwitterDirectMessageEndpoint extends AbstractTwitterEndpoint {
+
+    @UriPath(description = "The endpoint ID (not used).")
+    @Metadata(required = "true")
+    private String endpointId;
+
+    public TwitterDirectMessageEndpoint(String uri, String remaining, 
TwitterDirectMessageComponent component, TwitterConfiguration properties) {
+        super(uri, component, properties);
+        this.endpointId = remaining;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        if (getProperties().getUser() == null || 
getProperties().getUser().trim().isEmpty()) {
+            throw new IllegalArgumentException(
+                "Producer type set to DIRECT MESSAGE but no recipient user was 
set.");
+        } else {
+            return new DirectMessageProducer(this);
+        }
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        Consumer answer = TwitterHelper.createConsumer(processor, this, new 
DirectMessageConsumerHandler(this));
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java
 
b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java
deleted file mode 100644
index 3f22526..0000000
--- 
a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.producer;
-
-import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Exchange;
-import org.apache.camel.component.twitter.TwitterConstants;
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.util.ObjectHelper;
-
-/**
- * Produces text as a direct message.
- */
-public class DirectMessageProducer extends TwitterProducer {
-
-    public DirectMessageProducer(TwitterEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        // send direct message
-        String toUsername = endpoint.getProperties().getUser();
-        if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(TwitterConstants.TWITTER_USER,
 String.class))) {
-            toUsername = 
exchange.getIn().getHeader(TwitterConstants.TWITTER_USER, String.class);
-        }
-        String text = exchange.getIn().getBody(String.class);
-
-        if (toUsername.isEmpty()) {
-            throw new CamelExchangeException("Username not configured on 
TwitterEndpoint", exchange);
-        } else {
-            log.debug("Sending to: {} message: {}", toUsername, text);
-            
endpoint.getProperties().getTwitter().sendDirectMessage(toUsername, text);
-        }
-    }
-
-}

Reply via email to