CAMEL-8258 - Support Streaming from User Endpoint including Direct Messages
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33b3bd5c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33b3bd5c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33b3bd5c Branch: refs/heads/master Commit: 33b3bd5c9a011788787c094e83848ca666a52724 Parents: b09eca2 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Fri Feb 26 12:36:21 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Feb 29 17:35:39 2016 +0100 ---------------------------------------------------------------------- .../component/twitter/Twitter4JFactory.java | 173 ------------------- .../component/twitter/TwitterConstants.java | 25 ++- .../twitter/TwitterEndpointDirect.java | 6 +- .../component/twitter/TwitterEndpointEvent.java | 4 +- .../twitter/TwitterEndpointPolling.java | 8 +- .../camel/component/twitter/TwitterHelper.java | 172 ++++++++++++++++++ .../twitter/consumer/TweeterStatusListener.java | 24 --- .../twitter/consumer/Twitter4JConsumer.java | 72 -------- .../twitter/consumer/TwitterConsumer.java | 86 +++++++++ .../twitter/consumer/TwitterConsumerDirect.java | 16 +- .../twitter/consumer/TwitterConsumerEvent.java | 33 ++-- .../consumer/TwitterConsumerPolling.java | 38 ++-- .../twitter/consumer/TwitterEventListener.java | 26 +++ .../twitter/consumer/TwitterEventType.java | 77 +++++++++ .../directmessage/DirectMessageConsumer.java | 28 +-- .../twitter/consumer/search/SearchConsumer.java | 52 +++--- .../streaming/AbstractStreamingConsumer.java | 128 ++++++++++++++ .../consumer/streaming/FilterConsumer.java | 77 --------- .../streaming/FilterStreamingConsumer.java | 77 +++++++++ .../consumer/streaming/SampleConsumer.java | 40 ----- .../streaming/SampleStreamingConsumer.java | 40 +++++ .../consumer/streaming/StreamingConsumer.java | 108 ------------ .../streaming/UserStreamingConsumer.java | 156 ++++++++++++----- .../timeline/AbstractStatusConsumer.java | 55 ++++++ .../twitter/consumer/timeline/HomeConsumer.java | 23 +-- .../consumer/timeline/MentionsConsumer.java | 23 +-- .../consumer/timeline/RetweetsConsumer.java | 23 +-- .../twitter/consumer/timeline/UserConsumer.java | 23 +-- .../component/twitter/data/ConsumerType.java | 12 +- .../component/twitter/data/EndpointType.java | 12 +- .../component/twitter/data/StreamingType.java | 11 +- .../component/twitter/data/TimelineType.java | 11 +- .../component/twitter/data/TrendsType.java | 11 +- .../twitter/producer/DirectMessageProducer.java | 10 +- .../twitter/producer/SearchProducer.java | 24 +-- .../twitter/producer/Twitter4JProducer.java | 36 ---- .../twitter/producer/TwitterProducer.java | 37 ++++ .../twitter/producer/UserProducer.java | 14 +- .../twitter/util/TwitterConverter.java | 28 ++- .../twitter/CamelTwitterTestSupport.java | 65 ++++--- .../component/twitter/UserStreamingTest.java | 49 ++++++ .../src/test/resources/log4j.properties | 1 + 42 files changed, 1112 insertions(+), 822 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java deleted file mode 100644 index 9025164..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter; - -import java.util.regex.Pattern; - -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; -import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer; -import org.apache.camel.component.twitter.consumer.search.SearchConsumer; -import org.apache.camel.component.twitter.consumer.streaming.FilterConsumer; -import org.apache.camel.component.twitter.consumer.streaming.SampleConsumer; -import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer; -import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer; -import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer; -import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer; -import org.apache.camel.component.twitter.consumer.timeline.UserConsumer; -import org.apache.camel.component.twitter.data.ConsumerType; -import org.apache.camel.component.twitter.data.StreamingType; -import org.apache.camel.component.twitter.data.TimelineType; -import org.apache.camel.component.twitter.producer.DirectMessageProducer; -import org.apache.camel.component.twitter.producer.SearchProducer; -import org.apache.camel.component.twitter.producer.UserProducer; -import org.apache.camel.impl.DefaultProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Maps the endpoint URI to the respective Twitter4J consumer or producer. - * <p/> - * URI STRUCTURE: - * <p/> - * timeline/ - * public - * home - * friends - * user (ALSO A PRODUCER) - * mentions - * retweetsofme - * user/ - * search users (DIRECT ONLY) - * user suggestions (DIRECT ONLY) - * trends/ - * daily - * weekly - * userlist - * directmessage (ALSO A PRODUCER) - * streaming/ - * filter (POLLING ONLY) - * sample (POLLING ONLY) - * user (POLLING ONLY) - */ -public final class Twitter4JFactory { - - private static final Logger LOG = LoggerFactory.getLogger(Twitter4JFactory.class); - - private Twitter4JFactory() { - // helper class - } - - public static Twitter4JConsumer getConsumer(TwitterEndpoint te, String uri) throws IllegalArgumentException { - String[] uriSplit = splitUri(uri); - - if (uriSplit.length > 0) { - switch (ConsumerType.fromUri(uriSplit[0])) { - case DIRECTMESSAGE: - return new DirectMessageConsumer(te); - case SEARCH: - boolean hasNoKeywords = te.getProperties().getKeywords() == null - || te.getProperties().getKeywords().trim().isEmpty(); - if (hasNoKeywords) { - throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided."); - } else { - return new SearchConsumer(te); - } - case STREAMING: - switch (StreamingType.fromUri(uriSplit[1])) { - case SAMPLE: - return new SampleConsumer(te); - case FILTER: - return new FilterConsumer(te); - case USER: - return new UserStreamingConsumer(te); - default: - break; - } - break; - case TIMELINE: - if (uriSplit.length > 1) { - switch (TimelineType.fromUri(uriSplit[1])) { - case HOME: - return new HomeConsumer(te); - case MENTIONS: - return new MentionsConsumer(te); - case RETWEETSOFME: - return new RetweetsConsumer(te); - case USER: - if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) { - throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set."); - } else { - return new UserConsumer(te); - } - default: - break; - } - } - break; - default: - break; - } - } - - throw new IllegalArgumentException("Cannot create any consumer with uri " + uri - + ". A consumer type was not provided (or an incorrect pairing was used)."); - } - - public static DefaultProducer getProducer(TwitterEndpoint te, String uri) throws IllegalArgumentException { - String[] uriSplit = splitUri(uri); - - if (uriSplit.length > 0) { - switch (ConsumerType.fromUri(uriSplit[0])) { - case DIRECTMESSAGE: - if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) { - throw new IllegalArgumentException( - "Producer type set to DIRECT MESSAGE but no recipient user was set."); - } else { - return new DirectMessageProducer(te); - } - case TIMELINE: - if (uriSplit.length > 1) { - switch (TimelineType.fromUri(uriSplit[1])) { - case USER: - return new UserProducer(te); - default: - break; - } - } - break; - case SEARCH: - return new SearchProducer(te); - default: - break; - } - - } - - throw new IllegalArgumentException("Cannot create any producer with uri " + uri - + ". A producer type was not provided (or an incorrect pairing was used)."); - } - - private static String[] splitUri(String uri) { - Pattern p1 = Pattern.compile("twitter:(//)*"); - Pattern p2 = Pattern.compile("\\?.*"); - - uri = p1.matcher(uri).replaceAll(""); - uri = p2.matcher(uri).replaceAll(""); - - return uri.split("/"); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java index a6e845b..fd9bf27 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java @@ -19,19 +19,14 @@ package org.apache.camel.component.twitter; /** * Defines common constants */ -public final class TwitterConstants { - - public static final String TWITTER_KEYWORDS = "CamelTwitterKeywords"; - - public static final String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage"; - public static final String TWITTER_COUNT = "CamelTwitterCount"; - public static final String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages"; - public static final String TWITTER_SINCEID = "CamelTwitterSinceId"; - public static final String TWITTER_MAXID = "CamelTwitterMaxId"; - public static final String TWITTER_USER = "CamelTwitterUser"; - - private TwitterConstants() { - // utility - } - +public interface TwitterConstants { + String TWITTER_KEYWORDS = "CamelTwitterKeywords"; + String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage"; + String TWITTER_COUNT = "CamelTwitterCount"; + String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages"; + String TWITTER_SINCEID = "CamelTwitterSinceId"; + String TWITTER_MAXID = "CamelTwitterMaxId"; + String TWITTER_USER = "CamelTwitterUser"; + String TWITTER_USER_ROLE = "CamelTwitterUserRole"; + String TWITTER_EVENT_TYPE = "CamelTwitterEventType"; } http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java index bc5b350..a3f2254 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java @@ -23,7 +23,7 @@ import org.apache.camel.Producer; import org.apache.camel.ServiceStatus; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.component.direct.DirectEndpoint; -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; +import org.apache.camel.component.twitter.consumer.TwitterConsumer; import org.apache.camel.component.twitter.consumer.TwitterConsumerDirect; import org.apache.camel.component.twitter.data.EndpointType; @@ -43,7 +43,7 @@ public class TwitterEndpointDirect extends DirectEndpoint implements TwitterEndp @Override public Consumer createConsumer(Processor processor) throws Exception { - Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri()); + TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri()); TwitterConsumerDirect answer = new TwitterConsumerDirect(this, processor, twitter4jConsumer); configureConsumer(answer); return answer; @@ -51,7 +51,7 @@ public class TwitterEndpointDirect extends DirectEndpoint implements TwitterEndp @Override public Producer createProducer() throws Exception { - return Twitter4JFactory.getProducer(this, getEndpointUri()); + return TwitterHelper.createProducer(this, getEndpointUri()); } @ManagedAttribute http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java index 0690664..0c49e17 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java @@ -19,7 +19,7 @@ package org.apache.camel.component.twitter; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; +import org.apache.camel.component.twitter.consumer.TwitterConsumer; import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent; import org.apache.camel.component.twitter.data.EndpointType; import org.apache.camel.impl.DefaultEndpoint; @@ -36,7 +36,7 @@ public class TwitterEndpointEvent extends DefaultEndpoint implements TwitterEndp @Override public Consumer createConsumer(Processor processor) throws Exception { - Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri()); + TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri()); return new TwitterConsumerEvent(this, processor, twitter4jConsumer); } http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java index 250c33a..8c17d1e 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java @@ -21,7 +21,7 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; +import org.apache.camel.component.twitter.consumer.TwitterConsumer; import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling; import org.apache.camel.component.twitter.data.EndpointType; import org.apache.camel.impl.DefaultPollingEndpoint; @@ -32,7 +32,7 @@ import org.apache.camel.spi.UriParam; * This component integrates with Twitter to send tweets or search for tweets and more. */ @ManagedResource(description = "Managed Twitter Endpoint") -@UriEndpoint(scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = Twitter4JConsumer.class, label = "api,social") +@UriEndpoint(scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = TwitterConsumer.class, label = "api,social") public class TwitterEndpointPolling extends DefaultPollingEndpoint implements TwitterEndpoint { @UriParam(optionalPrefix = "consumer.", defaultValue = "" + TwitterConsumerPolling.DEFAULT_CONSUMER_DELAY, label = "consumer,scheduler", @@ -49,7 +49,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw @Override public Consumer createConsumer(Processor processor) throws Exception { - Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri()); + TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri()); // update the pulling lastID with sinceId twitter4jConsumer.setLastId(properties.getSinceId()); TwitterConsumerPolling tc = new TwitterConsumerPolling(this, processor, twitter4jConsumer); @@ -59,7 +59,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw @Override public Producer createProducer() throws Exception { - return Twitter4JFactory.getProducer(this, getEndpointUri()); + return TwitterHelper.createProducer(this, getEndpointUri()); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java new file mode 100644 index 0000000..de503a5 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter; + +import java.util.regex.Pattern; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.twitter.consumer.TwitterConsumer; +import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer; +import org.apache.camel.component.twitter.consumer.search.SearchConsumer; +import org.apache.camel.component.twitter.consumer.streaming.FilterStreamingConsumer; +import org.apache.camel.component.twitter.consumer.streaming.SampleStreamingConsumer; +import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer; +import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer; +import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer; +import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer; +import org.apache.camel.component.twitter.consumer.timeline.UserConsumer; +import org.apache.camel.component.twitter.data.ConsumerType; +import org.apache.camel.component.twitter.data.StreamingType; +import org.apache.camel.component.twitter.data.TimelineType; +import org.apache.camel.component.twitter.producer.DirectMessageProducer; +import org.apache.camel.component.twitter.producer.SearchProducer; +import org.apache.camel.component.twitter.producer.TwitterProducer; +import org.apache.camel.component.twitter.producer.UserProducer; +import twitter4j.User; + +public final class TwitterHelper { + private TwitterHelper() { + } + + public static void setUserHeader(Exchange exchange, User user) { + setUserHeader(exchange.getIn(), user); + } + + public static void setUserHeader(Message message, User user) { + message.setHeader(TwitterConstants.TWITTER_USER, user); + } + + public static void setUserHeader(Exchange exchange, int index, User user, String role) { + setUserHeader(exchange.getIn(), index, user, role); + } + + public static void setUserHeader(Message message, int index, User user, String role) { + message.setHeader(TwitterConstants.TWITTER_USER + index, user); + message.setHeader(TwitterConstants.TWITTER_USER_ROLE + index, role); + } + + public static TwitterConsumer createConsumer(TwitterEndpoint te, String uri) throws IllegalArgumentException { + String[] uriSplit = splitUri(uri); + + if (uriSplit.length > 0) { + switch (ConsumerType.fromUri(uriSplit[0])) { + case DIRECTMESSAGE: + return new DirectMessageConsumer(te); + case SEARCH: + boolean hasNoKeywords = te.getProperties().getKeywords() == null + || te.getProperties().getKeywords().trim().isEmpty(); + if (hasNoKeywords) { + throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided."); + } else { + return new SearchConsumer(te); + } + case STREAMING: + switch (StreamingType.fromUri(uriSplit[1])) { + case SAMPLE: + return new SampleStreamingConsumer(te); + case FILTER: + return new FilterStreamingConsumer(te); + case USER: + return new UserStreamingConsumer(te); + default: + break; + } + break; + case TIMELINE: + if (uriSplit.length > 1) { + switch (TimelineType.fromUri(uriSplit[1])) { + case HOME: + return new HomeConsumer(te); + case MENTIONS: + return new MentionsConsumer(te); + case RETWEETSOFME: + return new RetweetsConsumer(te); + case USER: + if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) { + throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set."); + } else { + return new UserConsumer(te); + } + default: + break; + } + } + break; + default: + break; + } + } + + throw new IllegalArgumentException("Cannot create any consumer with uri " + uri + + ". A consumer type was not provided (or an incorrect pairing was used)."); + } + + public static TwitterProducer createProducer(TwitterEndpoint te, String uri) throws IllegalArgumentException { + String[] uriSplit = splitUri(uri); + + if (uriSplit.length > 0) { + switch (ConsumerType.fromUri(uriSplit[0])) { + case DIRECTMESSAGE: + if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) { + throw new IllegalArgumentException( + "Producer type set to DIRECT MESSAGE but no recipient user was set."); + } else { + return new DirectMessageProducer(te); + } + case TIMELINE: + if (uriSplit.length > 1) { + switch (TimelineType.fromUri(uriSplit[1])) { + case USER: + return new UserProducer(te); + default: + break; + } + } + break; + case SEARCH: + return new SearchProducer(te); + default: + break; + } + + } + + throw new IllegalArgumentException("Cannot create any producer with uri " + uri + + ". A producer type was not provided (or an incorrect pairing was used)."); + } + + private static String[] splitUri(String uri) { + Pattern p1 = Pattern.compile("twitter:(//)*"); + Pattern p2 = Pattern.compile("\\?.*"); + + uri = p1.matcher(uri).replaceAll(""); + uri = p2.matcher(uri).replaceAll(""); + + return uri.split("/"); + } + + public static <T extends Enum<T>> T enumFromString(T[] values, String uri, T defaultValue) { + for (int i = values.length - 1; i >= 0; i--) { + if (values[i].name().equalsIgnoreCase(uri)) { + return values[i]; + } + } + + return defaultValue; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java deleted file mode 100644 index 98830f8..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer; - -import twitter4j.Status; - -public interface TweeterStatusListener { - - void onStatus(Status status); -} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java deleted file mode 100644 index 14ecae3..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer; - -import java.io.Serializable; -import java.util.List; - -import org.apache.camel.component.twitter.TwitterEndpoint; - -import twitter4j.TwitterException; - - -public abstract class Twitter4JConsumer { - - /** - * Instance of TwitterEndpoint. - */ - protected TwitterEndpoint te; - - /** - * The last tweet ID received. - */ - protected long lastId = 1; - - protected Twitter4JConsumer(TwitterEndpoint te) { - this.te = te; - } - - /** - * Can't assume that the end of the list will be the most recent ID. - * The Twitter API sometimes returns them slightly out of order. - */ - protected void checkLastId(long newId) { - if (newId > lastId) { - lastId = newId; - } - } - - /** - * Called by polling consumers during each poll. It needs to be separate - * from directConsume() since, as an example, streaming API polling allows - * tweets to build up between polls. - */ - public abstract List<? extends Serializable> pollConsume() throws TwitterException; - - /** - * Called by direct consumers. - */ - public abstract List<? extends Serializable> directConsume() throws TwitterException; - - /** - * Support to update the Consumer's lastId when starting the consumer - * @param sinceId - */ - public void setLastId(long sinceId) { - lastId = sinceId; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java new file mode 100644 index 0000000..b1d9b37 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.Paging; +import twitter4j.Twitter; +import twitter4j.TwitterException; + + +public abstract class TwitterConsumer { + + /** + * Instance of TwitterEndpoint. + */ + protected final TwitterEndpoint endpoint; + + /** + * The last tweet ID received. + */ + private long lastId; + + protected TwitterConsumer(TwitterEndpoint endpoint) { + this.endpoint = endpoint; + this.lastId = -1; + } + + /** + * Called by polling consumers during each poll. It needs to be separate + * from directConsume() since, as an example, streaming API polling allows + * tweets to build up between polls. + */ + public abstract List<Exchange> pollConsume() throws TwitterException; + + /** + * Called by direct consumers. + */ + public abstract List<Exchange> directConsume() throws TwitterException; + + /** + * Can't assume that the end of the list will be the most recent ID. + * The Twitter API sometimes returns them slightly out of order. + */ + protected void setLastIdIfGreater(long newId) { + if (newId > lastId) { + lastId = newId; + } + } + + /** + * Support to update the Consumer's lastId when starting the consumer + * @param sinceId + */ + public void setLastId(long sinceId) { + lastId = sinceId; + } + + protected Twitter getTwitter() { + return endpoint.getProperties().getTwitter(); + } + + protected long getLastId() { + return lastId; + } + + protected Paging getLastIdPaging() { + return new Paging(lastId); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java index ced7e3b..9c47259 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java @@ -16,8 +16,7 @@ */ package org.apache.camel.component.twitter.consumer; -import java.io.Serializable; -import java.util.Iterator; +import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -29,10 +28,9 @@ import org.apache.camel.impl.DefaultConsumer; */ public class TwitterConsumerDirect extends DefaultConsumer { - private Twitter4JConsumer twitter4jConsumer; + private final TwitterConsumer twitter4jConsumer; - public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor, - Twitter4JConsumer twitter4jConsumer) { + public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) { super(endpoint, processor); this.twitter4jConsumer = twitter4jConsumer; @@ -42,11 +40,9 @@ public class TwitterConsumerDirect extends DefaultConsumer { protected void doStart() throws Exception { super.doStart(); - Iterator<? extends Serializable> i = twitter4jConsumer.directConsume().iterator(); - while (i.hasNext()) { - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(i.next()); - getProcessor().process(e); + List<Exchange> exchanges = twitter4jConsumer.directConsume(); + for (int i = 0; i < exchanges.size(); i++) { + getProcessor().process(exchanges.get(i)); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java index 4feaf8a..f5b94a0 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java @@ -19,15 +19,13 @@ package org.apache.camel.component.twitter.consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.streaming.StreamingConsumer; +import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer; import org.apache.camel.impl.DefaultConsumer; -import twitter4j.Status; -public class TwitterConsumerEvent extends DefaultConsumer implements TweeterStatusListener { - private Twitter4JConsumer twitter4jConsumer; +public class TwitterConsumerEvent extends DefaultConsumer implements TwitterEventListener { + private final TwitterConsumer twitter4jConsumer; - public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, - Twitter4JConsumer twitter4jConsumer) { + public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) { super(endpoint, processor); this.twitter4jConsumer = twitter4jConsumer; } @@ -35,25 +33,28 @@ public class TwitterConsumerEvent extends DefaultConsumer implements TweeterStat @Override protected void doStart() throws Exception { super.doStart(); - if (twitter4jConsumer instanceof StreamingConsumer) { - ((StreamingConsumer) twitter4jConsumer).registerTweetListener(this); - ((StreamingConsumer) twitter4jConsumer).doStart(); + + if (twitter4jConsumer instanceof AbstractStreamingConsumer) { + ((AbstractStreamingConsumer) twitter4jConsumer).setEventListener(this); + ((AbstractStreamingConsumer) twitter4jConsumer).start(); } } @Override protected void doStop() throws Exception { - super.doStop(); - if (twitter4jConsumer instanceof StreamingConsumer) { - ((StreamingConsumer) twitter4jConsumer).unregisterTweetListener(this); - ((StreamingConsumer) twitter4jConsumer).doStop(); + if (twitter4jConsumer instanceof AbstractStreamingConsumer) { + ((AbstractStreamingConsumer) twitter4jConsumer).removeEventListener(this); + ((AbstractStreamingConsumer) twitter4jConsumer).stop(); } + + super.doStop(); } @Override - public void onStatus(Status status) { - Exchange exchange = getEndpoint().createExchange(); - exchange.getIn().setBody(status); + public void onEvent(Exchange exchange) { + if (!isRunAllowed()) { + return; + } try { getProcessor().process(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java index 6236116..364e55a 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java @@ -16,15 +16,12 @@ */ package org.apache.camel.component.twitter.consumer; -import java.io.Serializable; -import java.util.Iterator; -import java.util.concurrent.TimeUnit; +import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.twitter.TwitterEndpoint; import org.apache.camel.component.twitter.TwitterEndpointPolling; -import org.apache.camel.component.twitter.consumer.streaming.StreamingConsumer; +import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer; import org.apache.camel.impl.ScheduledPollConsumer; /** @@ -33,10 +30,9 @@ import org.apache.camel.impl.ScheduledPollConsumer; public class TwitterConsumerPolling extends ScheduledPollConsumer { public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L; - private Twitter4JConsumer twitter4jConsumer; + private final TwitterConsumer twitter4jConsumer; - public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor, - Twitter4JConsumer twitter4jConsumer) { + public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor, TwitterConsumer twitter4jConsumer) { super(endpoint, processor); setDelay(DEFAULT_CONSUMER_DELAY); this.twitter4jConsumer = twitter4jConsumer; @@ -45,31 +41,29 @@ public class TwitterConsumerPolling extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - if (twitter4jConsumer instanceof StreamingConsumer) { - ((StreamingConsumer) twitter4jConsumer).doStart(); + if (twitter4jConsumer instanceof AbstractStreamingConsumer) { + ((AbstractStreamingConsumer) twitter4jConsumer).start(); } } @Override protected void doStop() throws Exception { - super.doStop(); - if (twitter4jConsumer instanceof StreamingConsumer) { - ((StreamingConsumer) twitter4jConsumer).doStop(); + if (twitter4jConsumer instanceof AbstractStreamingConsumer) { + ((AbstractStreamingConsumer) twitter4jConsumer).stop(); } + + super.doStop(); } + @Override protected int poll() throws Exception { - Iterator<? extends Serializable> i = twitter4jConsumer.pollConsume().iterator(); - - int total = 0; - while (i.hasNext()) { - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(i.next()); - getProcessor().process(e); + List<Exchange> exchanges = twitter4jConsumer.pollConsume(); - total++; + int index = 0; + for (; index < exchanges.size(); index++) { + getProcessor().process(exchanges.get(index)); } - return total; + return index; } } http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java new file mode 100644 index 0000000..022dbff --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer; + +import java.util.EventListener; + +import org.apache.camel.Exchange; + +public interface TwitterEventListener extends EventListener { + + void onEvent(Exchange exchange); +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java new file mode 100644 index 0000000..77600d2 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterConstants; +import org.apache.camel.component.twitter.TwitterEndpoint; + +public enum TwitterEventType { + STATUS, + DIRECT_MESSAGE, + FAVORITE, + UNFAVORITE, + FOLLOW, + UNFOLLOW, + USERLIST_MEMBER_ADDITION, + USERLIST_MEMBER_DELETION, + USERLIST_SUBSCRIPTION, + USERLIST_UNSUBSCRIPTION, + USERLIST_CREATION, + USERLIST_UPDATE, + USERLIST_DELETETION, + USER_PROFILE_UPDATE, + USER_SUSPENSION, + USER_DELETION, + BLOCK, + UNBLOCK, + RETWEETED_RETWEET, + FAVORITED_RETWEET, + QUOTED_TWEET; + + public Exchange createExchange(TwitterEndpoint endpoint) { + return createExchange(endpoint, null); + } + + public <T> Exchange createExchange(TwitterEndpoint endpoint, T body) { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setHeader(TwitterConstants.TWITTER_EVENT_TYPE, name()); + + if (body != null) { + exchange.getIn().setBody(body); + } + + return exchange; + } + + public <T> List<Exchange> createExchangeList(TwitterEndpoint endpoint, List<T> bodyList) { + List<Exchange> exchanges = Collections.emptyList(); + + if (bodyList != null && !bodyList.isEmpty()) { + exchanges = new ArrayList<>(bodyList.size()); + for (int i = 0; i < bodyList.size(); i++) { + exchanges.add(createExchange(endpoint, bodyList.get(i))); + } + } + + return exchanges; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java index 500c6e6..d0b89de 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java @@ -18,31 +18,37 @@ package org.apache.camel.component.twitter.consumer.directmessage; import java.util.List; +import org.apache.camel.Exchange; import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; - +import org.apache.camel.component.twitter.consumer.TwitterConsumer; +import org.apache.camel.component.twitter.consumer.TwitterEventType; import twitter4j.DirectMessage; -import twitter4j.Paging; import twitter4j.TwitterException; /** * Consumes a user's direct messages */ -public class DirectMessageConsumer extends Twitter4JConsumer { +public class DirectMessageConsumer extends TwitterConsumer { public DirectMessageConsumer(TwitterEndpoint te) { super(te); } - public List<DirectMessage> pollConsume() throws TwitterException { - List<DirectMessage> list = te.getProperties().getTwitter().getDirectMessages(new Paging(lastId)); - for (DirectMessage dm : list) { - checkLastId(dm.getId()); + @Override + public List<Exchange> pollConsume() throws TwitterException { + List<DirectMessage> directMessages = getTwitter().getDirectMessages(getLastIdPaging()); + for (int i = 0; i < directMessages.size(); i++) { + setLastIdIfGreater(directMessages.get(i).getId()); } - return list; + + return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, directMessages); } - public List<DirectMessage> directConsume() throws TwitterException { - return te.getProperties().getTwitter().getDirectMessages(); + @Override + public List<Exchange> directConsume() throws TwitterException { + return TwitterEventType.DIRECT_MESSAGE.createExchangeList( + endpoint, + getTwitter().getDirectMessages() + ); } } http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java index 4b2cdf1..1c3a087 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java @@ -19,8 +19,10 @@ package org.apache.camel.component.twitter.consumer.search; import java.util.Collections; import java.util.List; +import org.apache.camel.Exchange; import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; +import org.apache.camel.component.twitter.consumer.TwitterConsumer; +import org.apache.camel.component.twitter.consumer.TwitterEventType; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +37,7 @@ import twitter4j.TwitterException; /** * Consumes search requests */ -public class SearchConsumer extends Twitter4JConsumer { +public class SearchConsumer extends TwitterConsumer { private static final Logger LOG = LoggerFactory.getLogger(SearchConsumer.class); @@ -43,8 +45,8 @@ public class SearchConsumer extends Twitter4JConsumer { super(te); } - public List<Status> pollConsume() throws TwitterException { - String keywords = te.getProperties().getKeywords(); + public List<Exchange> pollConsume() throws TwitterException { + String keywords = endpoint.getProperties().getKeywords(); Query query; @@ -56,15 +58,15 @@ public class SearchConsumer extends Twitter4JConsumer { LOG.debug("Searching twitter without keywords."); } - if (te.getProperties().isFilterOld()) { - query.setSinceId(lastId); + if (endpoint.getProperties().isFilterOld()) { + query.setSinceId(getLastId()); } return search(query); } - public List<Status> directConsume() throws TwitterException { - String keywords = te.getProperties().getKeywords(); + public List<Exchange> directConsume() throws TwitterException { + String keywords = endpoint.getProperties().getKeywords(); if (keywords == null || keywords.trim().length() == 0) { return Collections.emptyList(); } @@ -74,33 +76,33 @@ public class SearchConsumer extends Twitter4JConsumer { return search(query); } - private List<Status> search(Query query) throws TwitterException { + private List<Exchange> search(Query query) throws TwitterException { Integer numberOfPages = 1; - if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) { - query.setLang(te.getProperties().getLang()); + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLang())) { + query.setLang(endpoint.getProperties().getLang()); } - if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) { - query.setCount(te.getProperties().getCount()); + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getCount())) { + query.setCount(endpoint.getProperties().getCount()); } - if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) { - numberOfPages = te.getProperties().getNumberOfPages(); + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getNumberOfPages())) { + numberOfPages = endpoint.getProperties().getNumberOfPages(); } - if (ObjectHelper.isNotEmpty(te.getProperties().getLatitude()) - && ObjectHelper.isNotEmpty(te.getProperties().getLongitude()) - && ObjectHelper.isNotEmpty(te.getProperties().getRadius())) { - GeoLocation location = new GeoLocation(te.getProperties().getLatitude(), te.getProperties().getLongitude()); - query.setGeoCode(location, te.getProperties().getRadius(), Unit.valueOf(te.getProperties().getDistanceMetric())); + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLatitude()) + && ObjectHelper.isNotEmpty(endpoint.getProperties().getLongitude()) + && ObjectHelper.isNotEmpty(endpoint.getProperties().getRadius())) { + GeoLocation location = new GeoLocation(endpoint.getProperties().getLatitude(), endpoint.getProperties().getLongitude()); + query.setGeoCode(location, endpoint.getProperties().getRadius(), Unit.valueOf(endpoint.getProperties().getDistanceMetric())); LOG.debug("Searching with additional geolocation parameters."); } LOG.debug("Searching with {} pages.", numberOfPages); - Twitter twitter = te.getProperties().getTwitter(); + Twitter twitter = getTwitter(); QueryResult qr = twitter.search(query); List<Status> tweets = qr.getTweets(); @@ -113,13 +115,13 @@ public class SearchConsumer extends Twitter4JConsumer { tweets.addAll(qr.getTweets()); } - if (te.getProperties().isFilterOld()) { - for (Status t : tweets) { - checkLastId(t.getId()); + if (endpoint.getProperties().isFilterOld()) { + for (int i = 0; i < tweets.size(); i++) { + setLastIdIfGreater(tweets.get(i).getId()); } } - return tweets; + return TwitterEventType.STATUS.createExchangeList(endpoint, tweets); } } http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java new file mode 100644 index 0000000..7a7bfcb --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer.streaming; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.Exchange; +import org.apache.camel.Service; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.consumer.TwitterConsumer; +import org.apache.camel.component.twitter.consumer.TwitterEventListener; +import org.apache.camel.component.twitter.consumer.TwitterEventType; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterException; +import twitter4j.TwitterStream; + +/** + * Super class providing consuming capabilities for the streaming API. + */ +public abstract class AbstractStreamingConsumer extends TwitterConsumer implements StatusListener, Service { + private final TwitterStream twitterStream; + private final List<Exchange> receivedStatuses; + private final AtomicReference<TwitterEventListener> twitterEventListener; + private boolean clear; + + public AbstractStreamingConsumer(TwitterEndpoint te) { + super(te); + this.receivedStatuses = new ArrayList<>(); + this.twitterStream = te.getProperties().createTwitterStream(); + this.twitterStream.addListener(this); + this.twitterEventListener = new AtomicReference<>(); + this.clear = true; + } + + @Override + public List<Exchange> pollConsume() throws TwitterException { + List<Exchange> result; + + synchronized (receivedStatuses) { + clear = true; + result = Collections.unmodifiableList(new ArrayList<>(receivedStatuses)); + } + + return result; + } + + @Override + public List<Exchange> directConsume() throws TwitterException { + return Collections.emptyList(); + } + + @Override + public void onException(Exception ex) { + } + + @Override + public void onStatus(Status status) { + onEvent(TwitterEventType.STATUS.createExchange(endpoint, status)); + } + + @Override + public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { + // noop + } + + @Override + public void onTrackLimitationNotice(int numberOfLimitedStatuses) { + // noop + } + + @Override + public void onScrubGeo(long userId, long upToStatusId) { + // noop + } + + public void setEventListener(TwitterEventListener tweeterStatusListener) { + twitterEventListener.set(tweeterStatusListener); + } + + public void removeEventListener(TwitterEventListener tweeterStatusListener) { + twitterEventListener.compareAndSet(tweeterStatusListener, null); + } + + @Override + public void stop() { + twitterStream.removeListener(this); + twitterStream.shutdown(); + twitterStream.cleanUp(); + } + + protected TwitterStream getTwitterStream() { + return twitterStream; + } + + protected void onEvent(Exchange exchange) { + TwitterEventListener listener = twitterEventListener.get(); + if (listener != null) { + listener.onEvent(exchange); + } else { + synchronized (receivedStatuses) { + if (clear) { + receivedStatuses.clear(); + clear = false; + } + receivedStatuses.add(exchange); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java deleted file mode 100644 index 588a5e1..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.FilterQuery; -import twitter4j.StallWarning; - -/** - * Consumes the filter stream - */ -public class FilterConsumer extends StreamingConsumer { - - public FilterConsumer(TwitterEndpoint te) { - super(te); - } - - @Override - protected void startStreaming() { - twitterStream.filter(createFilter(te)); - } - - @Override - public void onStallWarning(StallWarning stallWarning) { - // noop - } - - private FilterQuery createFilter(TwitterEndpoint te) { - FilterQuery filterQuery = new FilterQuery(); - String allLocationsString = te.getProperties().getLocations(); - if (allLocationsString != null) { - String[] locationStrings = allLocationsString.split(";"); - double[][] locations = new double[locationStrings.length][2]; - for (int i = 0; i < locationStrings.length; i++) { - String[] coords = locationStrings[i].split(","); - locations[i][0] = Double.valueOf(coords[0]); - locations[i][1] = Double.valueOf(coords[1]); - } - filterQuery.locations(locations); - } - - String keywords = te.getProperties().getKeywords(); - if (keywords != null && keywords.length() > 0) { - filterQuery.track(keywords.split(",")); - } - - String userIds = te.getProperties().getUserIds(); - if (userIds != null) { - String[] stringUserIds = userIds.split(","); - long[] longUserIds = new long[stringUserIds.length]; - for (int i = 0; i < stringUserIds.length; i++) { - longUserIds[i] = Long.valueOf(stringUserIds[i]); - } - filterQuery.follow(longUserIds); - } - - if (allLocationsString == null && keywords == null && userIds == null) { - throw new IllegalArgumentException("At least one filter parameter is required"); - } - - return filterQuery; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java new file mode 100644 index 0000000..62460cc --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer.streaming; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.FilterQuery; +import twitter4j.StallWarning; + +/** + * Consumes the filter stream + */ +public class FilterStreamingConsumer extends AbstractStreamingConsumer { + + public FilterStreamingConsumer(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + public void start() { + getTwitterStream().filter(createFilter()); + } + + @Override + public void onStallWarning(StallWarning stallWarning) { + // noop + } + + private FilterQuery createFilter() { + FilterQuery filterQuery = new FilterQuery(); + String allLocationsString = endpoint.getProperties().getLocations(); + if (allLocationsString != null) { + String[] locationStrings = allLocationsString.split(";"); + double[][] locations = new double[locationStrings.length][2]; + for (int i = 0; i < locationStrings.length; i++) { + String[] coords = locationStrings[i].split(","); + locations[i][0] = Double.valueOf(coords[0]); + locations[i][1] = Double.valueOf(coords[1]); + } + filterQuery.locations(locations); + } + + String keywords = endpoint.getProperties().getKeywords(); + if (keywords != null && keywords.length() > 0) { + filterQuery.track(keywords.split(",")); + } + + String userIds = endpoint.getProperties().getUserIds(); + if (userIds != null) { + String[] stringUserIds = userIds.split(","); + long[] longUserIds = new long[stringUserIds.length]; + for (int i = 0; i < stringUserIds.length; i++) { + longUserIds[i] = Long.valueOf(stringUserIds[i]); + } + filterQuery.follow(longUserIds); + } + + if (allLocationsString == null && keywords == null && userIds == null) { + throw new IllegalArgumentException("At least one filter parameter is required"); + } + + return filterQuery; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java deleted file mode 100644 index 89cc157..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.StallWarning; - -/** - * Consumes the sample stream - */ -public class SampleConsumer extends StreamingConsumer { - - public SampleConsumer(TwitterEndpoint te) { - super(te); - } - - @Override - protected void startStreaming() { - twitterStream.sample(); - } - - @Override - public void onStallWarning(StallWarning stallWarning) { - // noop - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java new file mode 100644 index 0000000..e0a5b27 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer.streaming; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.StallWarning; + +/** + * Consumes the sample stream + */ +public class SampleStreamingConsumer extends AbstractStreamingConsumer { + + public SampleStreamingConsumer(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + public void start() { + getTwitterStream().sample(); + } + + @Override + public void onStallWarning(StallWarning stallWarning) { + // noop + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java deleted file mode 100644 index abb59cd..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.TweeterStatusListener; -import org.apache.camel.component.twitter.consumer.Twitter4JConsumer; - -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.StatusListener; -import twitter4j.TwitterException; -import twitter4j.TwitterStream; - -/** - * Super class providing consuming capabilities for the streaming API. - */ -public abstract class StreamingConsumer extends Twitter4JConsumer implements StatusListener { - protected final TwitterStream twitterStream; - private final List<Status> receivedStatuses = new ArrayList<Status>(); - private volatile boolean clear; - private TweeterStatusListener tweeterStatusListener; - - public StreamingConsumer(TwitterEndpoint te) { - super(te); - twitterStream = te.getProperties().createTwitterStream(); - twitterStream.addListener(this); - } - - public List<Status> pollConsume() throws TwitterException { - clear = true; - return Collections.unmodifiableList(new ArrayList<Status>(receivedStatuses)); - } - - public List<Status> directConsume() throws TwitterException { - // not used - return null; - } - - @Override - public void onException(Exception ex) { - } - - @Override - public void onStatus(Status status) { - if (tweeterStatusListener != null) { - tweeterStatusListener.onStatus(status); - } else { - if (clear) { - receivedStatuses.clear(); - clear = false; - } - receivedStatuses.add(status); - } - } - - @Override - public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { - // noop - } - - @Override - public void onTrackLimitationNotice(int numberOfLimitedStatuses) { - // noop - } - - @Override - public void onScrubGeo(long userId, long upToStatusId) { - // noop - } - - public void registerTweetListener(TweeterStatusListener tweeterStatusListener) { - this.tweeterStatusListener = tweeterStatusListener; - } - - public void unregisterTweetListener(TweeterStatusListener tweeterStatusListener) { - this.tweeterStatusListener = null; - } - - public void doStart() { - startStreaming(); - } - - public void doStop() { - twitterStream.shutdown(); - } - - protected abstract void startStreaming(); - -} http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java index 751d9e9..e04ba31 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java @@ -16,7 +16,11 @@ */ package org.apache.camel.component.twitter.consumer.streaming; +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterConstants; import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.TwitterHelper; +import org.apache.camel.component.twitter.consumer.TwitterEventType; import twitter4j.DirectMessage; import twitter4j.StallWarning; import twitter4j.Status; @@ -24,110 +28,168 @@ import twitter4j.User; import twitter4j.UserList; import twitter4j.UserStreamListener; -public class UserStreamingConsumer extends StreamingConsumer implements UserStreamListener { +public class UserStreamingConsumer extends AbstractStreamingConsumer implements UserStreamListener { - public UserStreamingConsumer(TwitterEndpoint te) { - super(te); + public UserStreamingConsumer(TwitterEndpoint endpoint) { + super(endpoint); } @Override - protected void startStreaming() { - twitterStream.user(); + public void start() { + getTwitterStream().user(); } @Override - public void onDeletionNotice(long l, long l2) { + public void onDeletionNotice(long directMessageId, long userId) { // noop } @Override - public void onFriendList(long[] longs) { + public void onFriendList(long[] friendIds) { // noop } @Override - public void onFavorite(User user, User user2, Status status) { - // noop + public void onFavorite(User source, User target, Status favoritedStatus) { + Exchange exchange = TwitterEventType.FAVORITE.createExchange(endpoint, favoritedStatus); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); } @Override - public void onUnfavorite(User user, User user2, Status status) { - // noop + public void onUnfavorite(User source, User target, Status unfavoritedStatus) { + Exchange exchange = TwitterEventType.UNFAVORITE.createExchange(endpoint, unfavoritedStatus); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); } @Override - public void onFollow(User user, User user2) { - // noop + public void onFollow(User source, User followedUser) { + Exchange exchange = TwitterEventType.FOLLOW.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, followedUser, "followed"); + + onEvent(exchange); } @Override - public void onUnfollow(User user, User user2) { - // noop + public void onUnfollow(User source, User unfollowedUser) { + Exchange exchange = TwitterEventType.UNFOLLOW.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, unfollowedUser, "unfollowed"); + + onEvent(exchange); } @Override public void onDirectMessage(DirectMessage directMessage) { - // noop + onEvent(TwitterEventType.DIRECT_MESSAGE.createExchange(endpoint, directMessage)); } @Override - public void onUserListMemberAddition(User user, User user2, UserList userList) { - // noop + public void onUserListMemberAddition(User addedMember, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_MEMBER_ADDITION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, addedMember, "addedMember"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); } @Override - public void onUserListMemberDeletion(User user, User user2, UserList userList) { - // noop + public void onUserListMemberDeletion(User deletedMember, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_MEMBER_DELETION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, deletedMember, "deletedMember"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); } @Override - public void onUserListSubscription(User user, User user2, UserList userList) { - // noop + public void onUserListSubscription(User subscriber, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_SUBSCRIPTION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); } @Override - public void onUserListUnsubscription(User user, User user2, UserList userList) { - // noop + public void onUserListUnsubscription(User subscriber, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_UNSUBSCRIPTION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); } @Override public void onUserListCreation(User user, UserList userList) { - // noop + Exchange exchange = TwitterEventType.USERLIST_CREATION.createExchange(endpoint, userList); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); } @Override public void onUserListUpdate(User user, UserList userList) { - // noop + Exchange exchange = TwitterEventType.USERLIST_UPDATE.createExchange(endpoint, userList); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); } @Override public void onUserListDeletion(User user, UserList userList) { - // noop + Exchange exchange = TwitterEventType.USERLIST_DELETETION.createExchange(endpoint, userList); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); } @Override public void onUserProfileUpdate(User user) { - // noop + Exchange exchange = TwitterEventType.USER_PROFILE_UPDATE.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); } @Override - public void onUserSuspension(long l) { - // noop + public void onUserSuspension(long suspendedUser) { + Exchange exchange = TwitterEventType.USER_SUSPENSION.createExchange(endpoint); + exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, suspendedUser); + + onEvent(exchange); } @Override - public void onUserDeletion(long l) { - // noop + public void onUserDeletion(long deletedUser) { + Exchange exchange = TwitterEventType.USER_DELETION.createExchange(endpoint); + exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, deletedUser); + + onEvent(exchange); } @Override - public void onBlock(User user, User user2) { - // noop + public void onBlock(User source, User blockedUser) { + Exchange exchange = TwitterEventType.BLOCK.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, blockedUser, "blocked"); + + onEvent(exchange); } @Override - public void onUnblock(User user, User user2) { - // noop + public void onUnblock(User source, User unblockedUser) { + Exchange exchange = TwitterEventType.UNBLOCK.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, unblockedUser, "unblocked"); + + onEvent(exchange); } @Override @@ -137,20 +199,28 @@ public class UserStreamingConsumer extends StreamingConsumer implements UserStre @Override public void onRetweetedRetweet(User source, User target, Status retweetedStatus) { - // TODO Auto-generated method stub - + Exchange exchange = TwitterEventType.RETWEETED_RETWEET.createExchange(endpoint, retweetedStatus); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); } @Override public void onFavoritedRetweet(User source, User target, Status favoritedRetweeet) { - // TODO Auto-generated method stub - + Exchange exchange = TwitterEventType.FAVORITED_RETWEET.createExchange(endpoint, favoritedRetweeet); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); } @Override public void onQuotedTweet(User source, User target, Status quotingTweet) { - // TODO Auto-generated method stub - - } + Exchange exchange = TwitterEventType.QUOTED_TWEET.createExchange(endpoint, quotingTweet); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + onEvent(exchange); + } }