http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java index c9edc40..54d707f 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java @@ -19,26 +19,27 @@ package org.apache.camel.component.twitter; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent; import org.apache.camel.component.twitter.data.EndpointType; import org.apache.camel.impl.DefaultEndpoint; +@Deprecated public class TwitterEndpointEvent extends DefaultEndpoint implements TwitterEndpoint { - private final String remaining; + private final String kind; // only TwitterEndpointPolling is annotated private TwitterConfiguration properties; public TwitterEndpointEvent(String uri, String remaining, TwitterComponent component, TwitterConfiguration properties) { super(uri, component); - this.remaining = remaining; + this.kind = remaining; this.properties = properties; } @Override public Consumer createConsumer(Processor processor) throws Exception { - TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri(), remaining); + AbstractTwitterConsumerHandler twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri(), kind); return new TwitterConsumerEvent(this, processor, twitter4jConsumer); }
http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java index 51b739e..008167a 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java @@ -21,20 +21,26 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling; import org.apache.camel.component.twitter.data.EndpointType; import org.apache.camel.impl.DefaultPollingEndpoint; +import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; /** - * This component integrates with Twitter to send tweets or search for tweets and more. + * Use twitter-directmessage, twitter-search, twitter-streaming and twitter-timeline instead of this component. + * @deprecated */ +@Deprecated @ManagedResource(description = "Managed Twitter Endpoint") -@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = TwitterConsumer.class, label = "api,social") +@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = AbstractTwitterConsumerHandler.class, label = "api,social") public class TwitterEndpointPolling extends DefaultPollingEndpoint implements TwitterEndpoint { - private final String remaining; + @UriPath(description = "The kind of endpoint", enums = "directmessage,search,streaming/filter,streaming/sample,streaming/user" + + ",timeline/home,timeline/mentions,timeline/retweetsofme,timeline/user") @Metadata(required = "true") + private final String kind; @UriParam(optionalPrefix = "consumer.", defaultValue = "" + TwitterConsumerPolling.DEFAULT_CONSUMER_DELAY, label = "consumer,scheduler", description = "Milliseconds before the next poll.") @@ -45,7 +51,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw public TwitterEndpointPolling(String uri, String remaining, TwitterComponent component, TwitterConfiguration properties) { super(uri, component); - this.remaining = remaining; + this.kind = remaining; this.properties = properties; setDelay(delay); // reconfigure the default delay @@ -53,7 +59,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw @Override public Consumer createConsumer(Processor processor) throws Exception { - TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri(), remaining); + AbstractTwitterConsumerHandler twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri(), kind); // update the pulling lastID with sinceId twitter4jConsumer.setLastId(properties.getSinceId()); TwitterConsumerPolling tc = new TwitterConsumerPolling(this, processor, twitter4jConsumer); @@ -63,7 +69,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw @Override public Producer createProducer() throws Exception { - return TwitterHelper.createProducer(this, getEndpointUri(), remaining); + return TwitterHelper.createProducer(this, getEndpointUri(), kind); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java index 81fefa9..e51e08f 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java @@ -16,25 +16,32 @@ */ package org.apache.camel.component.twitter; +import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; -import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer; -import org.apache.camel.component.twitter.consumer.search.SearchConsumer; -import org.apache.camel.component.twitter.consumer.streaming.FilterStreamingConsumer; -import org.apache.camel.component.twitter.consumer.streaming.SampleStreamingConsumer; -import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer; -import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer; -import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer; -import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer; -import org.apache.camel.component.twitter.consumer.timeline.UserConsumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.consumer.DefaultTwitterConsumer; +import org.apache.camel.component.twitter.consumer.TwitterConsumerDirect; +import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent; +import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling; import org.apache.camel.component.twitter.data.ConsumerType; import org.apache.camel.component.twitter.data.StreamingType; import org.apache.camel.component.twitter.data.TimelineType; -import org.apache.camel.component.twitter.producer.DirectMessageProducer; -import org.apache.camel.component.twitter.producer.SearchProducer; -import org.apache.camel.component.twitter.producer.TwitterProducer; -import org.apache.camel.component.twitter.producer.UserProducer; +import org.apache.camel.component.twitter.directmessage.DirectMessageConsumerHandler; +import org.apache.camel.component.twitter.directmessage.DirectMessageProducer; +import org.apache.camel.component.twitter.search.SearchConsumerHandler; +import org.apache.camel.component.twitter.search.SearchProducer; +import org.apache.camel.component.twitter.streaming.FilterStreamingConsumerHandler; +import org.apache.camel.component.twitter.streaming.SampleStreamingConsumerHandler; +import org.apache.camel.component.twitter.streaming.UserStreamingConsumerHandler; +import org.apache.camel.component.twitter.timeline.HomeConsumerHandler; +import org.apache.camel.component.twitter.timeline.MentionsConsumerHandler; +import org.apache.camel.component.twitter.timeline.RetweetsConsumerHandler; +import org.apache.camel.component.twitter.timeline.UserConsumerHandler; +import org.apache.camel.component.twitter.timeline.UserProducer; + import twitter4j.User; public final class TwitterHelper { @@ -58,30 +65,31 @@ public final class TwitterHelper { message.setHeader(TwitterConstants.TWITTER_USER_ROLE + index, role); } - public static TwitterConsumer createConsumer(TwitterEndpoint te, String uri, String remaining) throws IllegalArgumentException { + @Deprecated + public static AbstractTwitterConsumerHandler createConsumer(TwitterEndpoint te, String uri, String remaining) throws IllegalArgumentException { String[] tokens = remaining.split("/"); - + if (tokens.length > 0) { switch (ConsumerType.fromString(tokens[0])) { case DIRECTMESSAGE: - return new DirectMessageConsumer(te); + return new DirectMessageConsumerHandler(te); case SEARCH: boolean hasNoKeywords = te.getProperties().getKeywords() == null || te.getProperties().getKeywords().trim().isEmpty(); if (hasNoKeywords) { throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided."); } else { - return new SearchConsumer(te); + return new SearchConsumerHandler(te); } case STREAMING: if (tokens.length > 1) { switch (StreamingType.fromString(tokens[1])) { case SAMPLE: - return new SampleStreamingConsumer(te); + return new SampleStreamingConsumerHandler(te); case FILTER: - return new FilterStreamingConsumer(te); + return new FilterStreamingConsumerHandler(te); case USER: - return new UserStreamingConsumer(te); + return new UserStreamingConsumerHandler(te); default: break; } @@ -91,16 +99,16 @@ public final class TwitterHelper { if (tokens.length > 1) { switch (TimelineType.fromString(tokens[1])) { case HOME: - return new HomeConsumer(te); + return new HomeConsumerHandler(te); case MENTIONS: - return new MentionsConsumer(te); + return new MentionsConsumerHandler(te); case RETWEETSOFME: - return new RetweetsConsumer(te); + return new RetweetsConsumerHandler(te); case USER: if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) { throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set."); } else { - return new UserConsumer(te); + return new UserConsumerHandler(te); } default: break; @@ -116,7 +124,24 @@ public final class TwitterHelper { + ". A consumer type was not provided (or an incorrect pairing was used)."); } - public static TwitterProducer createProducer(TwitterEndpoint te, String uri, String remaining) throws IllegalArgumentException { + public static Consumer createConsumer(Processor processor, AbstractTwitterEndpoint endpoint, AbstractTwitterConsumerHandler handler) throws Exception { + Consumer answer = new DefaultTwitterConsumer(endpoint, processor, handler); + switch (endpoint.getEndpointType()) { + case POLLING: + handler.setLastId(endpoint.getProperties().getSinceId()); + endpoint.configureConsumer(answer); + break; + case DIRECT: + endpoint.configureConsumer(answer); + break; + default: + break; + } + return answer; + } + + @Deprecated + public static Producer createProducer(TwitterEndpoint te, String uri, String remaining) throws IllegalArgumentException { String[] tokens = remaining.split("/"); if (tokens.length > 0) { http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java new file mode 100644 index 0000000..b0ec1f0 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/AbstractTwitterConsumerHandler.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.Paging; +import twitter4j.Twitter; +import twitter4j.TwitterException; + + +public abstract class AbstractTwitterConsumerHandler { + + /** + * Instance of TwitterEndpoint. + */ + protected final TwitterEndpoint endpoint; + + /** + * The last tweet ID received. + */ + private long lastId; + + protected AbstractTwitterConsumerHandler(TwitterEndpoint endpoint) { + this.endpoint = endpoint; + this.lastId = -1; + } + + /** + * Called by polling consumers during each poll. It needs to be separate + * from directConsume() since, as an example, streaming API polling allows + * tweets to build up between polls. + */ + public abstract List<Exchange> pollConsume() throws TwitterException; + + /** + * Called by direct consumers. + */ + public abstract List<Exchange> directConsume() throws TwitterException; + + /** + * Can't assume that the end of the list will be the most recent ID. + * The Twitter API sometimes returns them slightly out of order. + */ + protected void setLastIdIfGreater(long newId) { + if (newId > lastId) { + lastId = newId; + } + } + + /** + * Support to update the Consumer's lastId when starting the consumer + */ + public void setLastId(long sinceId) { + lastId = sinceId; + } + + protected Twitter getTwitter() { + return endpoint.getProperties().getTwitter(); + } + + protected long getLastId() { + return lastId; + } + + protected Paging getLastIdPaging() { + return new Paging(lastId); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java new file mode 100644 index 0000000..7b2b669 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/DefaultTwitterConsumer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.consumer; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.twitter.AbstractTwitterEndpoint; +import org.apache.camel.component.twitter.streaming.AbstractStreamingConsumerHandler; +import org.apache.camel.impl.ScheduledPollConsumer; + +/** + * Provides a scheduled polling consumer as well as event based consumer for streaming. + */ +public class DefaultTwitterConsumer extends ScheduledPollConsumer implements TwitterEventListener { + + public static final long DEFAULT_CONSUMER_DELAY = 30 * 1000L; + private final AbstractTwitterEndpoint endpoint; + private final AbstractTwitterConsumerHandler handler; + + public DefaultTwitterConsumer(AbstractTwitterEndpoint endpoint, Processor processor, AbstractTwitterConsumerHandler handler) { + super(endpoint, processor); + setDelay(DEFAULT_CONSUMER_DELAY); + this.endpoint = endpoint; + this.handler = handler; + } + + @Override + public AbstractTwitterEndpoint getEndpoint() { + return (AbstractTwitterEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + switch (endpoint.getEndpointType()) { + case POLLING: + if (handler instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) handler).start(); + } + break; + case EVENT: + if (handler instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) handler).setEventListener(this); + ((AbstractStreamingConsumerHandler) handler).start(); + } + break; + default: + List<Exchange> exchanges = handler.directConsume(); + for (int i = 0; i < exchanges.size(); i++) { + getProcessor().process(exchanges.get(i)); + } + } + } + + @Override + protected void doStop() throws Exception { + switch (endpoint.getEndpointType()) { + case POLLING: + if (handler instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) handler).stop(); + } + break; + case EVENT: + if (handler instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) handler).removeEventListener(this); + ((AbstractStreamingConsumerHandler) handler).stop(); + } + break; + default: + break; + } + + super.doStop(); + } + + @Override + protected int poll() throws Exception { + List<Exchange> exchanges = handler.pollConsume(); + + int index = 0; + for (; index < exchanges.size(); index++) { + getProcessor().process(exchanges.get(index)); + } + + return index; + } + + @Override + public void onEvent(Exchange exchange) { + if (!isRunAllowed()) { + return; + } + + try { + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange on status update", exchange, exchange.getException()); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java deleted file mode 100644 index 3379105..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer; - -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.Paging; -import twitter4j.Twitter; -import twitter4j.TwitterException; - - -public abstract class TwitterConsumer { - - /** - * Instance of TwitterEndpoint. - */ - protected final TwitterEndpoint endpoint; - - /** - * The last tweet ID received. - */ - private long lastId; - - protected TwitterConsumer(TwitterEndpoint endpoint) { - this.endpoint = endpoint; - this.lastId = -1; - } - - /** - * Called by polling consumers during each poll. It needs to be separate - * from directConsume() since, as an example, streaming API polling allows - * tweets to build up between polls. - */ - public abstract List<Exchange> pollConsume() throws TwitterException; - - /** - * Called by direct consumers. - */ - public abstract List<Exchange> directConsume() throws TwitterException; - - /** - * Can't assume that the end of the list will be the most recent ID. - * The Twitter API sometimes returns them slightly out of order. - */ - protected void setLastIdIfGreater(long newId) { - if (newId > lastId) { - lastId = newId; - } - } - - /** - * Support to update the Consumer's lastId when starting the consumer - */ - public void setLastId(long sinceId) { - lastId = sinceId; - } - - protected Twitter getTwitter() { - return endpoint.getProperties().getTwitter(); - } - - protected long getLastId() { - return lastId; - } - - protected Paging getLastIdPaging() { - return new Paging(lastId); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java index 9c47259..3ae4ba6 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java @@ -26,11 +26,12 @@ import org.apache.camel.impl.DefaultConsumer; /** * Camel DirectConsumer implementation. */ +@Deprecated public class TwitterConsumerDirect extends DefaultConsumer { - private final TwitterConsumer twitter4jConsumer; + private final AbstractTwitterConsumerHandler twitter4jConsumer; - public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) { + public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor, AbstractTwitterConsumerHandler twitter4jConsumer) { super(endpoint, processor); this.twitter4jConsumer = twitter4jConsumer; http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java index f5b94a0..6c3f87c 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java @@ -19,13 +19,14 @@ package org.apache.camel.component.twitter.consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer; +import org.apache.camel.component.twitter.streaming.AbstractStreamingConsumerHandler; import org.apache.camel.impl.DefaultConsumer; +@Deprecated public class TwitterConsumerEvent extends DefaultConsumer implements TwitterEventListener { - private final TwitterConsumer twitter4jConsumer; + private final AbstractTwitterConsumerHandler twitter4jConsumer; - public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) { + public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, AbstractTwitterConsumerHandler twitter4jConsumer) { super(endpoint, processor); this.twitter4jConsumer = twitter4jConsumer; } @@ -34,17 +35,17 @@ public class TwitterConsumerEvent extends DefaultConsumer implements TwitterEven protected void doStart() throws Exception { super.doStart(); - if (twitter4jConsumer instanceof AbstractStreamingConsumer) { - ((AbstractStreamingConsumer) twitter4jConsumer).setEventListener(this); - ((AbstractStreamingConsumer) twitter4jConsumer).start(); + if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) twitter4jConsumer).setEventListener(this); + ((AbstractStreamingConsumerHandler) twitter4jConsumer).start(); } } @Override protected void doStop() throws Exception { - if (twitter4jConsumer instanceof AbstractStreamingConsumer) { - ((AbstractStreamingConsumer) twitter4jConsumer).removeEventListener(this); - ((AbstractStreamingConsumer) twitter4jConsumer).stop(); + if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) twitter4jConsumer).removeEventListener(this); + ((AbstractStreamingConsumerHandler) twitter4jConsumer).stop(); } super.doStop(); http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java index 368d6ef..0adbb3c 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java @@ -20,20 +20,21 @@ import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.twitter.TwitterEndpoint; import org.apache.camel.component.twitter.TwitterEndpointPolling; -import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer; -import org.apache.camel.component.twitter.util.TwitterSorter; +import org.apache.camel.component.twitter.streaming.AbstractStreamingConsumerHandler; import org.apache.camel.impl.ScheduledPollConsumer; /** * Provides a scheduled polling consumer */ +@Deprecated public class TwitterConsumerPolling extends ScheduledPollConsumer { public static final long DEFAULT_CONSUMER_DELAY = 30 * 1000L; - private final TwitterConsumer twitter4jConsumer; + private final AbstractTwitterConsumerHandler twitter4jConsumer; - public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor, TwitterConsumer twitter4jConsumer) { + public TwitterConsumerPolling(TwitterEndpoint endpoint, Processor processor, AbstractTwitterConsumerHandler twitter4jConsumer) { super(endpoint, processor); setDelay(DEFAULT_CONSUMER_DELAY); this.twitter4jConsumer = twitter4jConsumer; @@ -47,15 +48,15 @@ public class TwitterConsumerPolling extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - if (twitter4jConsumer instanceof AbstractStreamingConsumer) { - ((AbstractStreamingConsumer) twitter4jConsumer).start(); + if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) twitter4jConsumer).start(); } } @Override protected void doStop() throws Exception { - if (twitter4jConsumer instanceof AbstractStreamingConsumer) { - ((AbstractStreamingConsumer) twitter4jConsumer).stop(); + if (twitter4jConsumer instanceof AbstractStreamingConsumerHandler) { + ((AbstractStreamingConsumerHandler) twitter4jConsumer).stop(); } super.doStop(); http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java deleted file mode 100644 index d0b89de..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.directmessage; - -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; -import org.apache.camel.component.twitter.consumer.TwitterEventType; -import twitter4j.DirectMessage; -import twitter4j.TwitterException; - -/** - * Consumes a user's direct messages - */ -public class DirectMessageConsumer extends TwitterConsumer { - - public DirectMessageConsumer(TwitterEndpoint te) { - super(te); - } - - @Override - public List<Exchange> pollConsume() throws TwitterException { - List<DirectMessage> directMessages = getTwitter().getDirectMessages(getLastIdPaging()); - for (int i = 0; i < directMessages.size(); i++) { - setLastIdIfGreater(directMessages.get(i).getId()); - } - - return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, directMessages); - } - - @Override - public List<Exchange> directConsume() throws TwitterException { - return TwitterEventType.DIRECT_MESSAGE.createExchangeList( - endpoint, - getTwitter().getDirectMessages() - ); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java deleted file mode 100644 index 1c3a087..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.search; - -import java.util.Collections; -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; -import org.apache.camel.component.twitter.consumer.TwitterEventType; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import twitter4j.GeoLocation; -import twitter4j.Query; -import twitter4j.Query.Unit; -import twitter4j.QueryResult; -import twitter4j.Status; -import twitter4j.Twitter; -import twitter4j.TwitterException; - -/** - * Consumes search requests - */ -public class SearchConsumer extends TwitterConsumer { - - private static final Logger LOG = LoggerFactory.getLogger(SearchConsumer.class); - - public SearchConsumer(TwitterEndpoint te) { - super(te); - } - - public List<Exchange> pollConsume() throws TwitterException { - String keywords = endpoint.getProperties().getKeywords(); - - Query query; - - if (keywords != null && keywords.trim().length() > 0) { - query = new Query(keywords); - LOG.debug("Searching twitter with keywords: {}", keywords); - } else { - query = new Query(); - LOG.debug("Searching twitter without keywords."); - } - - if (endpoint.getProperties().isFilterOld()) { - query.setSinceId(getLastId()); - } - - return search(query); - } - - public List<Exchange> directConsume() throws TwitterException { - String keywords = endpoint.getProperties().getKeywords(); - if (keywords == null || keywords.trim().length() == 0) { - return Collections.emptyList(); - } - Query query = new Query(keywords); - - LOG.debug("Searching twitter with keywords: {}", keywords); - return search(query); - } - - private List<Exchange> search(Query query) throws TwitterException { - Integer numberOfPages = 1; - - if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLang())) { - query.setLang(endpoint.getProperties().getLang()); - } - - if (ObjectHelper.isNotEmpty(endpoint.getProperties().getCount())) { - query.setCount(endpoint.getProperties().getCount()); - } - - if (ObjectHelper.isNotEmpty(endpoint.getProperties().getNumberOfPages())) { - numberOfPages = endpoint.getProperties().getNumberOfPages(); - } - - if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLatitude()) - && ObjectHelper.isNotEmpty(endpoint.getProperties().getLongitude()) - && ObjectHelper.isNotEmpty(endpoint.getProperties().getRadius())) { - GeoLocation location = new GeoLocation(endpoint.getProperties().getLatitude(), endpoint.getProperties().getLongitude()); - query.setGeoCode(location, endpoint.getProperties().getRadius(), Unit.valueOf(endpoint.getProperties().getDistanceMetric())); - - LOG.debug("Searching with additional geolocation parameters."); - } - - LOG.debug("Searching with {} pages.", numberOfPages); - - Twitter twitter = getTwitter(); - QueryResult qr = twitter.search(query); - List<Status> tweets = qr.getTweets(); - - for (int i = 1; i < numberOfPages; i++) { - if (!qr.hasNext()) { - break; - } - - qr = twitter.search(qr.nextQuery()); - tweets.addAll(qr.getTweets()); - } - - if (endpoint.getProperties().isFilterOld()) { - for (int i = 0; i < tweets.size(); i++) { - setLastIdIfGreater(tweets.get(i).getId()); - } - } - - return TwitterEventType.STATUS.createExchangeList(endpoint, tweets); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java deleted file mode 100644 index 7a7bfcb..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.camel.Exchange; -import org.apache.camel.Service; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; -import org.apache.camel.component.twitter.consumer.TwitterEventListener; -import org.apache.camel.component.twitter.consumer.TwitterEventType; -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.StatusListener; -import twitter4j.TwitterException; -import twitter4j.TwitterStream; - -/** - * Super class providing consuming capabilities for the streaming API. - */ -public abstract class AbstractStreamingConsumer extends TwitterConsumer implements StatusListener, Service { - private final TwitterStream twitterStream; - private final List<Exchange> receivedStatuses; - private final AtomicReference<TwitterEventListener> twitterEventListener; - private boolean clear; - - public AbstractStreamingConsumer(TwitterEndpoint te) { - super(te); - this.receivedStatuses = new ArrayList<>(); - this.twitterStream = te.getProperties().createTwitterStream(); - this.twitterStream.addListener(this); - this.twitterEventListener = new AtomicReference<>(); - this.clear = true; - } - - @Override - public List<Exchange> pollConsume() throws TwitterException { - List<Exchange> result; - - synchronized (receivedStatuses) { - clear = true; - result = Collections.unmodifiableList(new ArrayList<>(receivedStatuses)); - } - - return result; - } - - @Override - public List<Exchange> directConsume() throws TwitterException { - return Collections.emptyList(); - } - - @Override - public void onException(Exception ex) { - } - - @Override - public void onStatus(Status status) { - onEvent(TwitterEventType.STATUS.createExchange(endpoint, status)); - } - - @Override - public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { - // noop - } - - @Override - public void onTrackLimitationNotice(int numberOfLimitedStatuses) { - // noop - } - - @Override - public void onScrubGeo(long userId, long upToStatusId) { - // noop - } - - public void setEventListener(TwitterEventListener tweeterStatusListener) { - twitterEventListener.set(tweeterStatusListener); - } - - public void removeEventListener(TwitterEventListener tweeterStatusListener) { - twitterEventListener.compareAndSet(tweeterStatusListener, null); - } - - @Override - public void stop() { - twitterStream.removeListener(this); - twitterStream.shutdown(); - twitterStream.cleanUp(); - } - - protected TwitterStream getTwitterStream() { - return twitterStream; - } - - protected void onEvent(Exchange exchange) { - TwitterEventListener listener = twitterEventListener.get(); - if (listener != null) { - listener.onEvent(exchange); - } else { - synchronized (receivedStatuses) { - if (clear) { - receivedStatuses.clear(); - clear = false; - } - receivedStatuses.add(exchange); - } - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java deleted file mode 100644 index 62460cc..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.FilterQuery; -import twitter4j.StallWarning; - -/** - * Consumes the filter stream - */ -public class FilterStreamingConsumer extends AbstractStreamingConsumer { - - public FilterStreamingConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - public void start() { - getTwitterStream().filter(createFilter()); - } - - @Override - public void onStallWarning(StallWarning stallWarning) { - // noop - } - - private FilterQuery createFilter() { - FilterQuery filterQuery = new FilterQuery(); - String allLocationsString = endpoint.getProperties().getLocations(); - if (allLocationsString != null) { - String[] locationStrings = allLocationsString.split(";"); - double[][] locations = new double[locationStrings.length][2]; - for (int i = 0; i < locationStrings.length; i++) { - String[] coords = locationStrings[i].split(","); - locations[i][0] = Double.valueOf(coords[0]); - locations[i][1] = Double.valueOf(coords[1]); - } - filterQuery.locations(locations); - } - - String keywords = endpoint.getProperties().getKeywords(); - if (keywords != null && keywords.length() > 0) { - filterQuery.track(keywords.split(",")); - } - - String userIds = endpoint.getProperties().getUserIds(); - if (userIds != null) { - String[] stringUserIds = userIds.split(","); - long[] longUserIds = new long[stringUserIds.length]; - for (int i = 0; i < stringUserIds.length; i++) { - longUserIds[i] = Long.valueOf(stringUserIds[i]); - } - filterQuery.follow(longUserIds); - } - - if (allLocationsString == null && keywords == null && userIds == null) { - throw new IllegalArgumentException("At least one filter parameter is required"); - } - - return filterQuery; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java deleted file mode 100644 index e0a5b27..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.StallWarning; - -/** - * Consumes the sample stream - */ -public class SampleStreamingConsumer extends AbstractStreamingConsumer { - - public SampleStreamingConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - public void start() { - getTwitterStream().sample(); - } - - @Override - public void onStallWarning(StallWarning stallWarning) { - // noop - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java deleted file mode 100644 index e04ba31..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.streaming; - -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterConstants; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.TwitterHelper; -import org.apache.camel.component.twitter.consumer.TwitterEventType; -import twitter4j.DirectMessage; -import twitter4j.StallWarning; -import twitter4j.Status; -import twitter4j.User; -import twitter4j.UserList; -import twitter4j.UserStreamListener; - -public class UserStreamingConsumer extends AbstractStreamingConsumer implements UserStreamListener { - - public UserStreamingConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - public void start() { - getTwitterStream().user(); - } - - @Override - public void onDeletionNotice(long directMessageId, long userId) { - // noop - } - - @Override - public void onFriendList(long[] friendIds) { - // noop - } - - @Override - public void onFavorite(User source, User target, Status favoritedStatus) { - Exchange exchange = TwitterEventType.FAVORITE.createExchange(endpoint, favoritedStatus); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, target, "target"); - - onEvent(exchange); - } - - @Override - public void onUnfavorite(User source, User target, Status unfavoritedStatus) { - Exchange exchange = TwitterEventType.UNFAVORITE.createExchange(endpoint, unfavoritedStatus); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, target, "target"); - - onEvent(exchange); - } - - @Override - public void onFollow(User source, User followedUser) { - Exchange exchange = TwitterEventType.FOLLOW.createExchange(endpoint); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, followedUser, "followed"); - - onEvent(exchange); - } - - @Override - public void onUnfollow(User source, User unfollowedUser) { - Exchange exchange = TwitterEventType.UNFOLLOW.createExchange(endpoint); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, unfollowedUser, "unfollowed"); - - onEvent(exchange); - } - - @Override - public void onDirectMessage(DirectMessage directMessage) { - onEvent(TwitterEventType.DIRECT_MESSAGE.createExchange(endpoint, directMessage)); - } - - @Override - public void onUserListMemberAddition(User addedMember, User listOwner, UserList list) { - Exchange exchange = TwitterEventType.USERLIST_MEMBER_ADDITION.createExchange(endpoint, list); - TwitterHelper.setUserHeader(exchange, 1, addedMember, "addedMember"); - TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); - - onEvent(exchange); - } - - @Override - public void onUserListMemberDeletion(User deletedMember, User listOwner, UserList list) { - Exchange exchange = TwitterEventType.USERLIST_MEMBER_DELETION.createExchange(endpoint, list); - TwitterHelper.setUserHeader(exchange, 1, deletedMember, "deletedMember"); - TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); - - onEvent(exchange); - } - - @Override - public void onUserListSubscription(User subscriber, User listOwner, UserList list) { - Exchange exchange = TwitterEventType.USERLIST_SUBSCRIPTION.createExchange(endpoint, list); - TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber"); - TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); - - onEvent(exchange); - } - - @Override - public void onUserListUnsubscription(User subscriber, User listOwner, UserList list) { - Exchange exchange = TwitterEventType.USERLIST_UNSUBSCRIPTION.createExchange(endpoint, list); - TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber"); - TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); - - onEvent(exchange); - } - - @Override - public void onUserListCreation(User user, UserList userList) { - Exchange exchange = TwitterEventType.USERLIST_CREATION.createExchange(endpoint, userList); - TwitterHelper.setUserHeader(exchange, user); - - onEvent(exchange); - } - - @Override - public void onUserListUpdate(User user, UserList userList) { - Exchange exchange = TwitterEventType.USERLIST_UPDATE.createExchange(endpoint, userList); - TwitterHelper.setUserHeader(exchange, user); - - onEvent(exchange); - } - - @Override - public void onUserListDeletion(User user, UserList userList) { - Exchange exchange = TwitterEventType.USERLIST_DELETETION.createExchange(endpoint, userList); - TwitterHelper.setUserHeader(exchange, user); - - onEvent(exchange); - } - - @Override - public void onUserProfileUpdate(User user) { - Exchange exchange = TwitterEventType.USER_PROFILE_UPDATE.createExchange(endpoint); - TwitterHelper.setUserHeader(exchange, user); - - onEvent(exchange); - } - - @Override - public void onUserSuspension(long suspendedUser) { - Exchange exchange = TwitterEventType.USER_SUSPENSION.createExchange(endpoint); - exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, suspendedUser); - - onEvent(exchange); - } - - @Override - public void onUserDeletion(long deletedUser) { - Exchange exchange = TwitterEventType.USER_DELETION.createExchange(endpoint); - exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, deletedUser); - - onEvent(exchange); - } - - @Override - public void onBlock(User source, User blockedUser) { - Exchange exchange = TwitterEventType.BLOCK.createExchange(endpoint); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, blockedUser, "blocked"); - - onEvent(exchange); - } - - @Override - public void onUnblock(User source, User unblockedUser) { - Exchange exchange = TwitterEventType.UNBLOCK.createExchange(endpoint); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, unblockedUser, "unblocked"); - - onEvent(exchange); - } - - @Override - public void onStallWarning(StallWarning stallWarning) { - // noop - } - - @Override - public void onRetweetedRetweet(User source, User target, Status retweetedStatus) { - Exchange exchange = TwitterEventType.RETWEETED_RETWEET.createExchange(endpoint, retweetedStatus); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, target, "target"); - - onEvent(exchange); - } - - @Override - public void onFavoritedRetweet(User source, User target, Status favoritedRetweeet) { - Exchange exchange = TwitterEventType.FAVORITED_RETWEET.createExchange(endpoint, favoritedRetweeet); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, target, "target"); - - onEvent(exchange); - } - - @Override - public void onQuotedTweet(User source, User target, Status quotingTweet) { - Exchange exchange = TwitterEventType.QUOTED_TWEET.createExchange(endpoint, quotingTweet); - TwitterHelper.setUserHeader(exchange, 1, source, "source"); - TwitterHelper.setUserHeader(exchange, 2, target, "target"); - - onEvent(exchange); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java deleted file mode 100644 index 18f0aee..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/AbstractStatusConsumer.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.timeline; - -import java.util.List; - -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.component.twitter.consumer.TwitterConsumer; -import org.apache.camel.component.twitter.consumer.TwitterEventType; -import twitter4j.Status; -import twitter4j.TwitterException; - -/** - * Consumes the user's home timeline. - */ -abstract class AbstractStatusConsumer extends TwitterConsumer { - - AbstractStatusConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - public List<Exchange> pollConsume() throws TwitterException { - List<Status> statusList = doPoll(); - for (int i = 0; i < statusList.size(); i++) { - setLastIdIfGreater(statusList.get(i).getId()); - } - - return TwitterEventType.STATUS.createExchangeList(endpoint, statusList); - } - - @Override - public List<Exchange> directConsume() throws TwitterException { - return TwitterEventType.STATUS.createExchangeList(endpoint, doDirect()); - } - - protected abstract List<Status> doPoll() throws TwitterException; - - protected abstract List<Status> doDirect() throws TwitterException; -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java deleted file mode 100644 index 252bacc..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/HomeConsumer.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.timeline; - -import java.util.List; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.Status; -import twitter4j.TwitterException; - -/** - * Consumes the user's home timeline. - */ -public class HomeConsumer extends AbstractStatusConsumer { - - public HomeConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - protected List<Status> doPoll() throws TwitterException { - return getTwitter().getHomeTimeline(getLastIdPaging()); - } - - @Override - protected List<Status> doDirect() throws TwitterException { - return getTwitter().getHomeTimeline(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java deleted file mode 100644 index c3671c8..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/MentionsConsumer.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.timeline; - -import java.util.List; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.Status; -import twitter4j.TwitterException; - -/** - * Consumes tweets in which the user has been mentioned. - */ -public class MentionsConsumer extends AbstractStatusConsumer { - - public MentionsConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - protected List<Status> doPoll() throws TwitterException { - return getTwitter().getMentionsTimeline(getLastIdPaging()); - } - - @Override - protected List<Status> doDirect() throws TwitterException { - return getTwitter().getMentionsTimeline(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java deleted file mode 100644 index dfd0815..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/RetweetsConsumer.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.timeline; - -import java.util.List; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.Status; -import twitter4j.TwitterException; - -/** - * Consumes a user's tweets that have been retweeted - */ -public class RetweetsConsumer extends AbstractStatusConsumer { - - public RetweetsConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - protected List<Status> doPoll() throws TwitterException { - return getTwitter().getRetweetsOfMe(getLastIdPaging()); - } - - @Override - protected List<Status> doDirect() throws TwitterException { - return getTwitter().getRetweetsOfMe(); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java deleted file mode 100644 index 51ef02a..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/timeline/UserConsumer.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.consumer.timeline; - -import java.util.List; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import twitter4j.Status; -import twitter4j.TwitterException; - -/** - * Consumes the timeline of a given user. - */ -public class UserConsumer extends AbstractStatusConsumer { - - public UserConsumer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - protected List<Status> doPoll() throws TwitterException { - return getTwitter().getUserTimeline(endpoint.getProperties().getUser(), getLastIdPaging()); - } - - @Override - protected List<Status> doDirect() throws TwitterException { - return getTwitter().getUserTimeline(endpoint.getProperties().getUser()); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java index 2c4f49a..8e481d4 100644 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/data/TrendsType.java @@ -18,6 +18,7 @@ package org.apache.camel.component.twitter.data; import org.apache.camel.component.twitter.TwitterHelper; +@Deprecated public enum TrendsType { DAILY, WEEKLY, UNKNOWN; http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java new file mode 100644 index 0000000..2fd125e --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageConsumerHandler.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.directmessage; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.consumer.TwitterEventType; +import twitter4j.DirectMessage; +import twitter4j.TwitterException; + +/** + * Consumes a user's direct messages + */ +public class DirectMessageConsumerHandler extends AbstractTwitterConsumerHandler { + + public DirectMessageConsumerHandler(TwitterEndpoint te) { + super(te); + } + + @Override + public List<Exchange> pollConsume() throws TwitterException { + List<DirectMessage> directMessages = getTwitter().getDirectMessages(getLastIdPaging()); + for (int i = 0; i < directMessages.size(); i++) { + setLastIdIfGreater(directMessages.get(i).getId()); + } + + return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, directMessages); + } + + @Override + public List<Exchange> directConsume() throws TwitterException { + return TwitterEventType.DIRECT_MESSAGE.createExchangeList( + endpoint, + getTwitter().getDirectMessages() + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java new file mode 100644 index 0000000..078c6cd --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/DirectMessageProducer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.directmessage; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterConstants; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ObjectHelper; + +/** + * Produces text as a direct message. + */ +public class DirectMessageProducer extends DefaultProducer { + + private TwitterEndpoint endpoint; + + public DirectMessageProducer(TwitterEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + public void process(Exchange exchange) throws Exception { + // send direct message + String toUsername = endpoint.getProperties().getUser(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(TwitterConstants.TWITTER_USER, String.class))) { + toUsername = exchange.getIn().getHeader(TwitterConstants.TWITTER_USER, String.class); + } + String text = exchange.getIn().getBody(String.class); + + if (toUsername.isEmpty()) { + throw new CamelExchangeException("Username not configured on TwitterEndpoint", exchange); + } else { + log.debug("Sending to: {} message: {}", toUsername, text); + endpoint.getProperties().getTwitter().sendDirectMessage(toUsername, text); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java new file mode 100644 index 0000000..516bb31 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageComponent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.directmessage; + +import java.util.Map; + +import org.apache.camel.ComponentVerifier; +import org.apache.camel.Endpoint; +import org.apache.camel.component.twitter.AbstractTwitterComponent; +import org.apache.camel.component.twitter.DefaultTwitterComponentVerifier; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.spi.Metadata; + +/** + * Twitter direct message component. + */ +@Metadata(label = "verifiers", enums = "parameters,connectivity") +public class TwitterDirectMessageComponent extends AbstractTwitterComponent { + + protected Endpoint doCreateEndpoint(TwitterConfiguration properties, String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new TwitterDirectMessageEndpoint(uri, remaining, this, properties); + } + + /** + * Get a verifier for the twitter directmessage component. + */ + public ComponentVerifier getVerifier() { + return new DefaultTwitterComponentVerifier(this, "twitter-directmessage"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java new file mode 100644 index 0000000..86ede93 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/directmessage/TwitterDirectMessageEndpoint.java @@ -0,0 +1,44 @@ +package org.apache.camel.component.twitter.directmessage; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.twitter.AbstractTwitterEndpoint; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.component.twitter.TwitterHelper; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriPath; + +/** + * The Twitter Direct Message Component consumes/produces user's direct messages. + */ +@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter-directmessage", title = "Twitter Direct Message", syntax = "twitter-directmessage:endpointId", consumerClass = DirectMessageConsumerHandler.class, label = "api,social") +public class TwitterDirectMessageEndpoint extends AbstractTwitterEndpoint { + + @UriPath(description = "The endpoint ID (not used).") + @Metadata(required = "true") + private String endpointId; + + public TwitterDirectMessageEndpoint(String uri, String remaining, TwitterDirectMessageComponent component, TwitterConfiguration properties) { + super(uri, component, properties); + this.endpointId = remaining; + } + + @Override + public Producer createProducer() throws Exception { + if (getProperties().getUser() == null || getProperties().getUser().trim().isEmpty()) { + throw new IllegalArgumentException( + "Producer type set to DIRECT MESSAGE but no recipient user was set."); + } else { + return new DirectMessageProducer(this); + } + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + Consumer answer = TwitterHelper.createConsumer(processor, this, new DirectMessageConsumerHandler(this)); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java deleted file mode 100644 index 3f22526..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/DirectMessageProducer.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.producer; - -import org.apache.camel.CamelExchangeException; -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterConstants; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.util.ObjectHelper; - -/** - * Produces text as a direct message. - */ -public class DirectMessageProducer extends TwitterProducer { - - public DirectMessageProducer(TwitterEndpoint endpoint) { - super(endpoint); - } - - public void process(Exchange exchange) throws Exception { - // send direct message - String toUsername = endpoint.getProperties().getUser(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(TwitterConstants.TWITTER_USER, String.class))) { - toUsername = exchange.getIn().getHeader(TwitterConstants.TWITTER_USER, String.class); - } - String text = exchange.getIn().getBody(String.class); - - if (toUsername.isEmpty()) { - throw new CamelExchangeException("Username not configured on TwitterEndpoint", exchange); - } else { - log.debug("Sending to: {} message: {}", toUsername, text); - endpoint.getProperties().getTwitter().sendDirectMessage(toUsername, text); - } - } - -}