http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java deleted file mode 100644 index a44fcb6..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/SearchProducer.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.producer; - -import java.util.List; - -import org.apache.camel.CamelExchangeException; -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterConstants; -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.util.ObjectHelper; -import twitter4j.Query; -import twitter4j.QueryResult; -import twitter4j.Status; -import twitter4j.Twitter; - -public class SearchProducer extends TwitterProducer { - - private volatile long lastId; - - public SearchProducer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - public void process(Exchange exchange) throws Exception { - long myLastId = lastId; - // KEYWORDS - // keywords from header take precedence - String keywords = exchange.getIn().getHeader(TwitterConstants.TWITTER_KEYWORDS, String.class); - if (keywords == null) { - keywords = endpoint.getProperties().getKeywords(); - } - - if (keywords == null) { - throw new CamelExchangeException("No keywords to use for query", exchange); - } - - Query query = new Query(keywords); - - // filter of older tweets - if (endpoint.getProperties().isFilterOld() && myLastId != 0) { - query.setSinceId(myLastId); - } - - // since id - Long sinceId = exchange.getIn().getHeader(TwitterConstants.TWITTER_SINCEID, Long.class); - if (sinceId == null) { - sinceId = endpoint.getProperties().getSinceId(); - } - if (ObjectHelper.isNotEmpty(sinceId)) { - query.setSinceId(sinceId); - } - - // max id - Long maxId = exchange.getIn().getHeader(TwitterConstants.TWITTER_MAXID, Long.class); - if (ObjectHelper.isNotEmpty(maxId)) { - query.setMaxId(maxId); - } - - // language - String lang = exchange.getIn().getHeader(TwitterConstants.TWITTER_SEARCH_LANGUAGE, String.class); - if (lang == null) { - lang = endpoint.getProperties().getLang(); - } - - if (ObjectHelper.isNotEmpty(lang)) { - query.setLang(lang); - } - - // number of elements per page - Integer count = exchange.getIn().getHeader(TwitterConstants.TWITTER_COUNT, Integer.class); - if (count == null) { - count = endpoint.getProperties().getCount(); - } - if (ObjectHelper.isNotEmpty(count)) { - query.setCount(count); - } - - // number of pages - Integer numberOfPages = exchange.getIn().getHeader(TwitterConstants.TWITTER_NUMBER_OF_PAGES, Integer.class); - if (numberOfPages == null) { - numberOfPages = endpoint.getProperties().getNumberOfPages(); - } - - Twitter twitter = endpoint.getProperties().getTwitter(); - log.debug("Searching twitter with keywords: {}", keywords); - QueryResult results = twitter.search(query); - List<Status> list = results.getTweets(); - - for (int i = 1; i < numberOfPages; i++) { - if (!results.hasNext()) { - break; - } - log.debug("Fetching page"); - results = twitter.search(results.nextQuery()); - list.addAll(results.getTweets()); - } - - if (endpoint.getProperties().isFilterOld()) { - for (Status t : list) { - long newId = t.getId(); - if (newId > myLastId) { - myLastId = newId; - } - } - } - - exchange.getIn().setBody(list); - // update the lastId after finished the processing - if (myLastId > lastId) { - lastId = myLastId; - } - } - -}
http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/TwitterProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/TwitterProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/TwitterProducer.java deleted file mode 100644 index 0f798e7..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/TwitterProducer.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.producer; - -import org.apache.camel.component.twitter.TwitterEndpoint; -import org.apache.camel.impl.DefaultProducer; - -/** - * Abstracts common producer capabilities. - */ -public abstract class TwitterProducer extends DefaultProducer { - - /** - * Instance of TwitterEndpoint. - */ - protected TwitterEndpoint endpoint; - - protected TwitterProducer(TwitterEndpoint endpoint) { - super(endpoint); - - this.endpoint = endpoint; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/UserProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/UserProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/UserProducer.java deleted file mode 100644 index 2967738..0000000 --- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/producer/UserProducer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.twitter.producer; - -import org.apache.camel.Exchange; -import org.apache.camel.component.twitter.TwitterEndpoint; - -import twitter4j.Status; -import twitter4j.StatusUpdate; - -/** - * Produces text as a status update. - */ -public class UserProducer extends TwitterProducer { - - public UserProducer(TwitterEndpoint endpoint) { - super(endpoint); - } - - @Override - public void process(Exchange exchange) throws Exception { - // update user's status - Object in = exchange.getIn().getBody(); - Status response; - if (in instanceof StatusUpdate) { - response = updateStatus((StatusUpdate) in); - } else { - String s = exchange.getIn().getMandatoryBody(String.class); - response = updateStatus(s); - } - - /* - * Support the InOut exchange pattern in order to provide access to - * the unique identifier for the published tweet which is returned in the response - * by the Twitter REST API: https://dev.twitter.com/docs/api/1/post/statuses/update - */ - if (exchange.getPattern().isOutCapable()) { - // here we just copy the header of in message to the out message - exchange.getOut().copyFrom(exchange.getIn()); - exchange.getOut().setBody(response); - } - } - - private Status updateStatus(StatusUpdate status) throws Exception { - Status response = endpoint.getProperties().getTwitter().updateStatus(status); - log.debug("Updated status: {}", status); - log.debug("Status id: {}", response.getId()); - return response; - } - - private Status updateStatus(String status) throws Exception { - Status response = endpoint.getProperties().getTwitter().updateStatus(status); - log.debug("Updated status: {}", status); - log.debug("Status id: {}", response.getId()); - return response; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchConsumerHandler.java new file mode 100644 index 0000000..649b836 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchConsumerHandler.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.search; + +import java.util.Collections; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.consumer.TwitterEventType; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import twitter4j.GeoLocation; +import twitter4j.Query; +import twitter4j.Query.Unit; +import twitter4j.QueryResult; +import twitter4j.Status; +import twitter4j.Twitter; +import twitter4j.TwitterException; + +/** + * Consumes search requests + */ +public class SearchConsumerHandler extends AbstractTwitterConsumerHandler { + + private static final Logger LOG = LoggerFactory.getLogger(SearchConsumerHandler.class); + + public SearchConsumerHandler(TwitterEndpoint te) { + super(te); + } + + public List<Exchange> pollConsume() throws TwitterException { + String keywords = endpoint.getProperties().getKeywords(); + + Query query; + + if (keywords != null && keywords.trim().length() > 0) { + query = new Query(keywords); + LOG.debug("Searching twitter with keywords: {}", keywords); + } else { + query = new Query(); + LOG.debug("Searching twitter without keywords."); + } + + if (endpoint.getProperties().isFilterOld()) { + query.setSinceId(getLastId()); + } + + return search(query); + } + + public List<Exchange> directConsume() throws TwitterException { + String keywords = endpoint.getProperties().getKeywords(); + if (keywords == null || keywords.trim().length() == 0) { + return Collections.emptyList(); + } + Query query = new Query(keywords); + + LOG.debug("Searching twitter with keywords: {}", keywords); + return search(query); + } + + private List<Exchange> search(Query query) throws TwitterException { + Integer numberOfPages = 1; + + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLang())) { + query.setLang(endpoint.getProperties().getLang()); + } + + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getCount())) { + query.setCount(endpoint.getProperties().getCount()); + } + + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getNumberOfPages())) { + numberOfPages = endpoint.getProperties().getNumberOfPages(); + } + + if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLatitude()) + && ObjectHelper.isNotEmpty(endpoint.getProperties().getLongitude()) + && ObjectHelper.isNotEmpty(endpoint.getProperties().getRadius())) { + GeoLocation location = new GeoLocation(endpoint.getProperties().getLatitude(), endpoint.getProperties().getLongitude()); + query.setGeoCode(location, endpoint.getProperties().getRadius(), Unit.valueOf(endpoint.getProperties().getDistanceMetric())); + + LOG.debug("Searching with additional geolocation parameters."); + } + + LOG.debug("Searching with {} pages.", numberOfPages); + + Twitter twitter = getTwitter(); + QueryResult qr = twitter.search(query); + List<Status> tweets = qr.getTweets(); + + for (int i = 1; i < numberOfPages; i++) { + if (!qr.hasNext()) { + break; + } + + qr = twitter.search(qr.nextQuery()); + tweets.addAll(qr.getTweets()); + } + + if (endpoint.getProperties().isFilterOld()) { + for (int i = 0; i < tweets.size(); i++) { + setLastIdIfGreater(tweets.get(i).getId()); + } + } + + return TwitterEventType.STATUS.createExchangeList(endpoint, tweets); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchProducer.java new file mode 100644 index 0000000..30886bd --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/SearchProducer.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.search; + +import java.util.List; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterConstants; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ObjectHelper; + +import twitter4j.Query; +import twitter4j.QueryResult; +import twitter4j.Status; +import twitter4j.Twitter; + +public class SearchProducer extends DefaultProducer { + + private volatile long lastId; + private TwitterEndpoint endpoint; + + public SearchProducer(TwitterEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + @Override + public void process(Exchange exchange) throws Exception { + long myLastId = lastId; + // KEYWORDS + // keywords from header take precedence + String keywords = exchange.getIn().getHeader(TwitterConstants.TWITTER_KEYWORDS, String.class); + if (keywords == null) { + keywords = endpoint.getProperties().getKeywords(); + } + + if (keywords == null) { + throw new CamelExchangeException("No keywords to use for query", exchange); + } + + Query query = new Query(keywords); + + // filter of older tweets + if (endpoint.getProperties().isFilterOld() && myLastId != 0) { + query.setSinceId(myLastId); + } + + // since id + Long sinceId = exchange.getIn().getHeader(TwitterConstants.TWITTER_SINCEID, Long.class); + if (sinceId == null) { + sinceId = endpoint.getProperties().getSinceId(); + } + if (ObjectHelper.isNotEmpty(sinceId)) { + query.setSinceId(sinceId); + } + + // max id + Long maxId = exchange.getIn().getHeader(TwitterConstants.TWITTER_MAXID, Long.class); + if (ObjectHelper.isNotEmpty(maxId)) { + query.setMaxId(maxId); + } + + // language + String lang = exchange.getIn().getHeader(TwitterConstants.TWITTER_SEARCH_LANGUAGE, String.class); + if (lang == null) { + lang = endpoint.getProperties().getLang(); + } + + if (ObjectHelper.isNotEmpty(lang)) { + query.setLang(lang); + } + + // number of elements per page + Integer count = exchange.getIn().getHeader(TwitterConstants.TWITTER_COUNT, Integer.class); + if (count == null) { + count = endpoint.getProperties().getCount(); + } + if (ObjectHelper.isNotEmpty(count)) { + query.setCount(count); + } + + // number of pages + Integer numberOfPages = exchange.getIn().getHeader(TwitterConstants.TWITTER_NUMBER_OF_PAGES, Integer.class); + if (numberOfPages == null) { + numberOfPages = endpoint.getProperties().getNumberOfPages(); + } + + Twitter twitter = endpoint.getProperties().getTwitter(); + log.debug("Searching twitter with keywords: {}", keywords); + QueryResult results = twitter.search(query); + List<Status> list = results.getTweets(); + + for (int i = 1; i < numberOfPages; i++) { + if (!results.hasNext()) { + break; + } + log.debug("Fetching page"); + results = twitter.search(results.nextQuery()); + list.addAll(results.getTweets()); + } + + if (endpoint.getProperties().isFilterOld()) { + for (Status t : list) { + long newId = t.getId(); + if (newId > myLastId) { + myLastId = newId; + } + } + } + + exchange.getIn().setBody(list); + // update the lastId after finished the processing + if (myLastId > lastId) { + lastId = myLastId; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchComponent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchComponent.java new file mode 100644 index 0000000..8457887 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchComponent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.search; + +import java.util.Map; + +import org.apache.camel.ComponentVerifier; +import org.apache.camel.Endpoint; +import org.apache.camel.VerifiableComponent; +import org.apache.camel.component.twitter.AbstractTwitterComponent; +import org.apache.camel.component.twitter.DefaultTwitterComponentVerifier; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.spi.Metadata; + +/** + * Twitter Search component. + */ +@Metadata(label = "verifiers", enums = "parameters,connectivity") +public class TwitterSearchComponent extends AbstractTwitterComponent { + + protected Endpoint doCreateEndpoint(TwitterConfiguration properties, String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new TwitterSearchEndpoint(uri, remaining, this, properties); + } + + /** + * Get a verifier for the twitter search component. + */ + public ComponentVerifier getVerifier() { + return new DefaultTwitterComponentVerifier(this, "twitter-search"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchEndpoint.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchEndpoint.java new file mode 100644 index 0000000..536fe8a --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/search/TwitterSearchEndpoint.java @@ -0,0 +1,44 @@ +package org.apache.camel.component.twitter.search; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.twitter.AbstractTwitterEndpoint; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.component.twitter.TwitterHelper; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriPath; + +/** + * The Twitter Search component consumes search results. + */ +@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter-search", title = "Twitter Search", syntax = "twitter-search:endpointId", consumerClass = SearchConsumerHandler.class, label = "api,social") +public class TwitterSearchEndpoint extends AbstractTwitterEndpoint { + + @UriPath(description = "The endpoint ID (not used).") + @Metadata(required = "true") + private String endpointId; + + public TwitterSearchEndpoint(String uri, String remaining, TwitterSearchComponent component, TwitterConfiguration properties) { + super(uri, component, properties); + this.endpointId = remaining; + } + + @Override + public Producer createProducer() throws Exception { + return new SearchProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + boolean hasNoKeywords = getProperties().getKeywords() == null + || getProperties().getKeywords().trim().isEmpty(); + if (hasNoKeywords) { + throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided."); + } else { + return TwitterHelper.createConsumer(processor, this, new SearchConsumerHandler(this)); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/AbstractStreamingConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/AbstractStreamingConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/AbstractStreamingConsumerHandler.java new file mode 100644 index 0000000..2c5e226 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/AbstractStreamingConsumerHandler.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.streaming; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.Exchange; +import org.apache.camel.Service; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.consumer.TwitterEventListener; +import org.apache.camel.component.twitter.consumer.TwitterEventType; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterException; +import twitter4j.TwitterStream; + +/** + * Super class providing consuming capabilities for the streaming API. + */ +public abstract class AbstractStreamingConsumerHandler extends AbstractTwitterConsumerHandler implements StatusListener, Service { + private final TwitterStream twitterStream; + private final List<Exchange> receivedStatuses; + private final AtomicReference<TwitterEventListener> twitterEventListener; + private boolean clear; + + public AbstractStreamingConsumerHandler(TwitterEndpoint te) { + super(te); + this.receivedStatuses = new ArrayList<>(); + this.twitterStream = te.getProperties().createTwitterStream(); + this.twitterStream.addListener(this); + this.twitterEventListener = new AtomicReference<>(); + this.clear = true; + } + + @Override + public List<Exchange> pollConsume() throws TwitterException { + List<Exchange> result; + + synchronized (receivedStatuses) { + clear = true; + result = Collections.unmodifiableList(new ArrayList<>(receivedStatuses)); + } + + return result; + } + + @Override + public List<Exchange> directConsume() throws TwitterException { + return Collections.emptyList(); + } + + @Override + public void onException(Exception ex) { + } + + @Override + public void onStatus(Status status) { + onEvent(TwitterEventType.STATUS.createExchange(endpoint, status)); + } + + @Override + public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { + // noop + } + + @Override + public void onTrackLimitationNotice(int numberOfLimitedStatuses) { + // noop + } + + @Override + public void onScrubGeo(long userId, long upToStatusId) { + // noop + } + + public void setEventListener(TwitterEventListener tweeterStatusListener) { + twitterEventListener.set(tweeterStatusListener); + } + + public void removeEventListener(TwitterEventListener tweeterStatusListener) { + twitterEventListener.compareAndSet(tweeterStatusListener, null); + } + + @Override + public void stop() { + twitterStream.removeListener(this); + twitterStream.shutdown(); + twitterStream.cleanUp(); + } + + protected TwitterStream getTwitterStream() { + return twitterStream; + } + + protected void onEvent(Exchange exchange) { + TwitterEventListener listener = twitterEventListener.get(); + if (listener != null) { + listener.onEvent(exchange); + } else { + synchronized (receivedStatuses) { + if (clear) { + receivedStatuses.clear(); + clear = false; + } + receivedStatuses.add(exchange); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/FilterStreamingConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/FilterStreamingConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/FilterStreamingConsumerHandler.java new file mode 100644 index 0000000..b505d9d --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/FilterStreamingConsumerHandler.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.streaming; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.FilterQuery; +import twitter4j.StallWarning; + +/** + * Consumes the filter stream + */ +public class FilterStreamingConsumerHandler extends AbstractStreamingConsumerHandler { + + public FilterStreamingConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + public void start() { + getTwitterStream().filter(createFilter()); + } + + @Override + public void onStallWarning(StallWarning stallWarning) { + // noop + } + + private FilterQuery createFilter() { + FilterQuery filterQuery = new FilterQuery(); + String allLocationsString = endpoint.getProperties().getLocations(); + if (allLocationsString != null) { + String[] locationStrings = allLocationsString.split(";"); + double[][] locations = new double[locationStrings.length][2]; + for (int i = 0; i < locationStrings.length; i++) { + String[] coords = locationStrings[i].split(","); + locations[i][0] = Double.valueOf(coords[0]); + locations[i][1] = Double.valueOf(coords[1]); + } + filterQuery.locations(locations); + } + + String keywords = endpoint.getProperties().getKeywords(); + if (keywords != null && keywords.length() > 0) { + filterQuery.track(keywords.split(",")); + } + + String userIds = endpoint.getProperties().getUserIds(); + if (userIds != null) { + String[] stringUserIds = userIds.split(","); + long[] longUserIds = new long[stringUserIds.length]; + for (int i = 0; i < stringUserIds.length; i++) { + longUserIds[i] = Long.valueOf(stringUserIds[i]); + } + filterQuery.follow(longUserIds); + } + + if (allLocationsString == null && keywords == null && userIds == null) { + throw new IllegalArgumentException("At least one filter parameter is required"); + } + + return filterQuery; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/SampleStreamingConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/SampleStreamingConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/SampleStreamingConsumerHandler.java new file mode 100644 index 0000000..6adc685 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/SampleStreamingConsumerHandler.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.streaming; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.StallWarning; + +/** + * Consumes the sample stream + */ +public class SampleStreamingConsumerHandler extends AbstractStreamingConsumerHandler { + + public SampleStreamingConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + public void start() { + getTwitterStream().sample(); + } + + @Override + public void onStallWarning(StallWarning stallWarning) { + // noop + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingComponent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingComponent.java new file mode 100644 index 0000000..1e4c058 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingComponent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.streaming; + +import java.util.Map; + +import org.apache.camel.ComponentVerifier; +import org.apache.camel.Endpoint; +import org.apache.camel.component.twitter.AbstractTwitterComponent; +import org.apache.camel.component.twitter.DefaultTwitterComponentVerifier; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.spi.Metadata; + +/** + * Twitter Streaming component. + */ +@Metadata(label = "verifiers", enums = "parameters,connectivity") +public class TwitterStreamingComponent extends AbstractTwitterComponent { + + protected Endpoint doCreateEndpoint(TwitterConfiguration properties, String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new TwitterStreamingEndpoint(uri, remaining, this, properties); + } + + /** + * Get a verifier for the twitter streaming component. + */ + public ComponentVerifier getVerifier() { + return new DefaultTwitterComponentVerifier(this, "twitter-streaming"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingEndpoint.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingEndpoint.java new file mode 100644 index 0000000..dc932b6 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/TwitterStreamingEndpoint.java @@ -0,0 +1,62 @@ +package org.apache.camel.component.twitter.streaming; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.twitter.AbstractTwitterEndpoint; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.component.twitter.TwitterHelper; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.data.StreamingType; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriPath; + +/** + * The Twitter Streaming component consumes twitter statuses using Streaming API. + */ +@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter-streaming", title = "Twitter Streaming", syntax = "twitter-streaming:streamingType", consumerClass = AbstractStreamingConsumerHandler.class, consumerOnly = true, label = "api,social") +public class TwitterStreamingEndpoint extends AbstractTwitterEndpoint { + + @UriPath(description = "The streaming type to consume.") + @Metadata(required = "true") + private StreamingType streamingType; + + public TwitterStreamingEndpoint(String uri, String remaining, TwitterStreamingComponent component, TwitterConfiguration properties) { + super(uri, component, properties); + if (remaining == null) { + throw new IllegalArgumentException(String.format("The streaming type must be specified for '%s'", uri)); + } + this.streamingType = StreamingType.valueOf(remaining.toUpperCase()); + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Producer not supported for twitter-streaming"); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + AbstractTwitterConsumerHandler handler; + switch (streamingType) { + case SAMPLE: + handler = new SampleStreamingConsumerHandler(this); + break; + case FILTER: + handler = new FilterStreamingConsumerHandler(this); + break; + case USER: + handler = new UserStreamingConsumerHandler(this); + break; + default: + throw new IllegalArgumentException("Cannot create any consumer with uri " + getEndpointUri() + + ". A streaming type was not provided (or an incorrect pairing was used)."); + } + return TwitterHelper.createConsumer(processor, this, handler); + } + + public StreamingType getStreamingType() { + return streamingType; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/UserStreamingConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/UserStreamingConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/UserStreamingConsumerHandler.java new file mode 100644 index 0000000..1c3d06c --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/streaming/UserStreamingConsumerHandler.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.streaming; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterConstants; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.TwitterHelper; +import org.apache.camel.component.twitter.consumer.TwitterEventType; +import twitter4j.DirectMessage; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.User; +import twitter4j.UserList; +import twitter4j.UserStreamListener; + +public class UserStreamingConsumerHandler extends AbstractStreamingConsumerHandler implements UserStreamListener { + + public UserStreamingConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + public void start() { + getTwitterStream().user(); + } + + @Override + public void onDeletionNotice(long directMessageId, long userId) { + // noop + } + + @Override + public void onFriendList(long[] friendIds) { + // noop + } + + @Override + public void onFavorite(User source, User target, Status favoritedStatus) { + Exchange exchange = TwitterEventType.FAVORITE.createExchange(endpoint, favoritedStatus); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); + } + + @Override + public void onUnfavorite(User source, User target, Status unfavoritedStatus) { + Exchange exchange = TwitterEventType.UNFAVORITE.createExchange(endpoint, unfavoritedStatus); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); + } + + @Override + public void onFollow(User source, User followedUser) { + Exchange exchange = TwitterEventType.FOLLOW.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, followedUser, "followed"); + + onEvent(exchange); + } + + @Override + public void onUnfollow(User source, User unfollowedUser) { + Exchange exchange = TwitterEventType.UNFOLLOW.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, unfollowedUser, "unfollowed"); + + onEvent(exchange); + } + + @Override + public void onDirectMessage(DirectMessage directMessage) { + onEvent(TwitterEventType.DIRECT_MESSAGE.createExchange(endpoint, directMessage)); + } + + @Override + public void onUserListMemberAddition(User addedMember, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_MEMBER_ADDITION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, addedMember, "addedMember"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); + } + + @Override + public void onUserListMemberDeletion(User deletedMember, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_MEMBER_DELETION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, deletedMember, "deletedMember"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); + } + + @Override + public void onUserListSubscription(User subscriber, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_SUBSCRIPTION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); + } + + @Override + public void onUserListUnsubscription(User subscriber, User listOwner, UserList list) { + Exchange exchange = TwitterEventType.USERLIST_UNSUBSCRIPTION.createExchange(endpoint, list); + TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber"); + TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner"); + + onEvent(exchange); + } + + @Override + public void onUserListCreation(User user, UserList userList) { + Exchange exchange = TwitterEventType.USERLIST_CREATION.createExchange(endpoint, userList); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); + } + + @Override + public void onUserListUpdate(User user, UserList userList) { + Exchange exchange = TwitterEventType.USERLIST_UPDATE.createExchange(endpoint, userList); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); + } + + @Override + public void onUserListDeletion(User user, UserList userList) { + Exchange exchange = TwitterEventType.USERLIST_DELETETION.createExchange(endpoint, userList); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); + } + + @Override + public void onUserProfileUpdate(User user) { + Exchange exchange = TwitterEventType.USER_PROFILE_UPDATE.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, user); + + onEvent(exchange); + } + + @Override + public void onUserSuspension(long suspendedUser) { + Exchange exchange = TwitterEventType.USER_SUSPENSION.createExchange(endpoint); + exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, suspendedUser); + + onEvent(exchange); + } + + @Override + public void onUserDeletion(long deletedUser) { + Exchange exchange = TwitterEventType.USER_DELETION.createExchange(endpoint); + exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, deletedUser); + + onEvent(exchange); + } + + @Override + public void onBlock(User source, User blockedUser) { + Exchange exchange = TwitterEventType.BLOCK.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, blockedUser, "blocked"); + + onEvent(exchange); + } + + @Override + public void onUnblock(User source, User unblockedUser) { + Exchange exchange = TwitterEventType.UNBLOCK.createExchange(endpoint); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, unblockedUser, "unblocked"); + + onEvent(exchange); + } + + @Override + public void onStallWarning(StallWarning stallWarning) { + // noop + } + + @Override + public void onRetweetedRetweet(User source, User target, Status retweetedStatus) { + Exchange exchange = TwitterEventType.RETWEETED_RETWEET.createExchange(endpoint, retweetedStatus); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); + } + + @Override + public void onFavoritedRetweet(User source, User target, Status favoritedRetweeet) { + Exchange exchange = TwitterEventType.FAVORITED_RETWEET.createExchange(endpoint, favoritedRetweeet); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); + } + + @Override + public void onQuotedTweet(User source, User target, Status quotingTweet) { + Exchange exchange = TwitterEventType.QUOTED_TWEET.createExchange(endpoint, quotingTweet); + TwitterHelper.setUserHeader(exchange, 1, source, "source"); + TwitterHelper.setUserHeader(exchange, 2, target, "target"); + + onEvent(exchange); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/AbstractStatusConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/AbstractStatusConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/AbstractStatusConsumerHandler.java new file mode 100644 index 0000000..ec7d801 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/AbstractStatusConsumerHandler.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.consumer.TwitterEventType; +import twitter4j.Status; +import twitter4j.TwitterException; + +/** + * Consumes the user's home timeline. + */ +abstract class AbstractStatusConsumerHandler extends AbstractTwitterConsumerHandler { + + AbstractStatusConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + public List<Exchange> pollConsume() throws TwitterException { + List<Status> statusList = doPoll(); + for (int i = 0; i < statusList.size(); i++) { + setLastIdIfGreater(statusList.get(i).getId()); + } + + return TwitterEventType.STATUS.createExchangeList(endpoint, statusList); + } + + @Override + public List<Exchange> directConsume() throws TwitterException { + return TwitterEventType.STATUS.createExchangeList(endpoint, doDirect()); + } + + protected abstract List<Status> doPoll() throws TwitterException; + + protected abstract List<Status> doDirect() throws TwitterException; +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/HomeConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/HomeConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/HomeConsumerHandler.java new file mode 100644 index 0000000..585ab03 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/HomeConsumerHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import java.util.List; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.Status; +import twitter4j.TwitterException; + +/** + * Consumes the user's home timeline. + */ +public class HomeConsumerHandler extends AbstractStatusConsumerHandler { + + public HomeConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + protected List<Status> doPoll() throws TwitterException { + return getTwitter().getHomeTimeline(getLastIdPaging()); + } + + @Override + protected List<Status> doDirect() throws TwitterException { + return getTwitter().getHomeTimeline(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/MentionsConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/MentionsConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/MentionsConsumerHandler.java new file mode 100644 index 0000000..95a2576 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/MentionsConsumerHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import java.util.List; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.Status; +import twitter4j.TwitterException; + +/** + * Consumes tweets in which the user has been mentioned. + */ +public class MentionsConsumerHandler extends AbstractStatusConsumerHandler { + + public MentionsConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + protected List<Status> doPoll() throws TwitterException { + return getTwitter().getMentionsTimeline(getLastIdPaging()); + } + + @Override + protected List<Status> doDirect() throws TwitterException { + return getTwitter().getMentionsTimeline(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/RetweetsConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/RetweetsConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/RetweetsConsumerHandler.java new file mode 100644 index 0000000..e603ebe --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/RetweetsConsumerHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import java.util.List; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.Status; +import twitter4j.TwitterException; + +/** + * Consumes a user's tweets that have been retweeted + */ +public class RetweetsConsumerHandler extends AbstractStatusConsumerHandler { + + public RetweetsConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + protected List<Status> doPoll() throws TwitterException { + return getTwitter().getRetweetsOfMe(getLastIdPaging()); + } + + @Override + protected List<Status> doDirect() throws TwitterException { + return getTwitter().getRetweetsOfMe(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineComponent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineComponent.java new file mode 100644 index 0000000..6af0e6d --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineComponent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import java.util.Map; + +import org.apache.camel.ComponentVerifier; +import org.apache.camel.Endpoint; +import org.apache.camel.component.twitter.AbstractTwitterComponent; +import org.apache.camel.component.twitter.DefaultTwitterComponentVerifier; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.spi.Metadata; + +/** + * Twitter Timeline component. + */ +@Metadata(label = "verifiers", enums = "parameters,connectivity") +public class TwitterTimelineComponent extends AbstractTwitterComponent { + + @Override + protected Endpoint doCreateEndpoint(TwitterConfiguration properties, String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new TwitterTimelineEndpoint(uri, remaining, this, properties); + } + + /** + * Get a verifier for the twitter timeline component. + */ + public ComponentVerifier getVerifier() { + return new DefaultTwitterComponentVerifier(this, "twitter-timeline"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineEndpoint.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineEndpoint.java new file mode 100644 index 0000000..5d030e7 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/TwitterTimelineEndpoint.java @@ -0,0 +1,79 @@ +package org.apache.camel.component.twitter.timeline; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.twitter.AbstractTwitterEndpoint; +import org.apache.camel.component.twitter.TwitterConfiguration; +import org.apache.camel.component.twitter.TwitterHelper; +import org.apache.camel.component.twitter.consumer.AbstractTwitterConsumerHandler; +import org.apache.camel.component.twitter.data.TimelineType; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriPath; + +/** + * The Twitter Timeline component consumes twitter timeline or update the status of specific user. + */ +@UriEndpoint(firstVersion = "2.10.0", scheme = "twitter-timeline", title = "Twitter Timeline", syntax = "twitter-timeline:timelineType", consumerClass = AbstractStatusConsumerHandler.class, label = "api,social") +public class TwitterTimelineEndpoint extends AbstractTwitterEndpoint { + + @UriPath(description = "The timeline type to produce/consume.") + @Metadata(required = "true") + private TimelineType timelineType; + + public TwitterTimelineEndpoint(String uri, String remaining, TwitterTimelineComponent component, TwitterConfiguration properties) { + super(uri, component, properties); + if (remaining == null) { + throw new IllegalArgumentException(String.format("The timeline type must be specified for '%s'", uri)); + } + this.timelineType = TimelineType.valueOf(remaining.toUpperCase()); + } + + @Override + public Producer createProducer() throws Exception { + switch (timelineType) { + case USER: + return new UserProducer(this); + default: + throw new IllegalArgumentException("Cannot create any producer with uri " + getEndpointUri() + + ". A producer type was not provided (or an incorrect pairing was used)."); + } + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + AbstractTwitterConsumerHandler handler = null; + switch (timelineType) { + case HOME: + handler = new HomeConsumerHandler(this); + break; + case MENTIONS: + handler = new MentionsConsumerHandler(this); + break; + case RETWEETSOFME: + handler = new RetweetsConsumerHandler(this); + break; + case USER: + if (getProperties().getUser() == null || getProperties().getUser().trim().isEmpty()) { + throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set."); + } else { + handler = new UserConsumerHandler(this); + break; + } + default: + break; + } + if (handler != null) { + return TwitterHelper.createConsumer(processor, this, handler); + } + throw new IllegalArgumentException("Cannot create any consumer with uri " + getEndpointUri() + + ". A consumer type was not provided (or an incorrect pairing was used)."); + + } + + public TimelineType getTimelineType() { + return timelineType; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserConsumerHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserConsumerHandler.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserConsumerHandler.java new file mode 100644 index 0000000..ad13b75 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserConsumerHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import java.util.List; + +import org.apache.camel.component.twitter.TwitterEndpoint; +import twitter4j.Status; +import twitter4j.TwitterException; + +/** + * Consumes the timeline of a given user. + */ +public class UserConsumerHandler extends AbstractStatusConsumerHandler { + + public UserConsumerHandler(TwitterEndpoint endpoint) { + super(endpoint); + } + + @Override + protected List<Status> doPoll() throws TwitterException { + return getTwitter().getUserTimeline(endpoint.getProperties().getUser(), getLastIdPaging()); + } + + @Override + protected List<Status> doDirect() throws TwitterException { + return getTwitter().getUserTimeline(endpoint.getProperties().getUser()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserProducer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserProducer.java new file mode 100644 index 0000000..63641d2 --- /dev/null +++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/timeline/UserProducer.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.twitter.timeline; + +import org.apache.camel.Exchange; +import org.apache.camel.component.twitter.TwitterEndpoint; +import org.apache.camel.impl.DefaultProducer; + +import twitter4j.Status; +import twitter4j.StatusUpdate; + +/** + * Produces text as a status update. + */ +public class UserProducer extends DefaultProducer { + + private TwitterEndpoint endpoint; + + public UserProducer(TwitterEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + @Override + public void process(Exchange exchange) throws Exception { + // update user's status + Object in = exchange.getIn().getBody(); + Status response; + if (in instanceof StatusUpdate) { + response = updateStatus((StatusUpdate) in); + } else { + String s = exchange.getIn().getMandatoryBody(String.class); + response = updateStatus(s); + } + + /* + * Support the InOut exchange pattern in order to provide access to + * the unique identifier for the published tweet which is returned in the response + * by the Twitter REST API: https://dev.twitter.com/docs/api/1/post/statuses/update + */ + if (exchange.getPattern().isOutCapable()) { + // here we just copy the header of in message to the out message + exchange.getOut().copyFrom(exchange.getIn()); + exchange.getOut().setBody(response); + } + } + + private Status updateStatus(StatusUpdate status) throws Exception { + Status response = endpoint.getProperties().getTwitter().updateStatus(status); + log.debug("Updated status: {}", status); + log.debug("Status id: {}", response.getId()); + return response; + } + + private Status updateStatus(String status) throws Exception { + Status response = endpoint.getProperties().getTwitter().updateStatus(status); + log.debug("Updated status: {}", status); + log.debug("Status id: {}", response.getId()); + return response; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-directmessage ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-directmessage b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-directmessage new file mode 100644 index 0000000..346f90c --- /dev/null +++ b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-directmessage @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.twitter.directmessage.TwitterDirectMessageComponent http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-search ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-search b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-search new file mode 100644 index 0000000..0770613 --- /dev/null +++ b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-search @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.twitter.search.TwitterSearchComponent http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-streaming ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-streaming b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-streaming new file mode 100644 index 0000000..8de261c --- /dev/null +++ b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-streaming @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.twitter.streaming.TwitterStreamingComponent http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-timeline ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-timeline b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-timeline new file mode 100644 index 0000000..79c571f --- /dev/null +++ b/components/camel-twitter/src/main/resources/META-INF/services/org/apache/camel/component/twitter-timeline @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.twitter.timeline.TwitterTimelineComponent http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/CamelComponentVerifierTest.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/CamelComponentVerifierTest.java b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/CamelComponentVerifierTest.java index f317b78..f131ab8 100644 --- a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/CamelComponentVerifierTest.java +++ b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/CamelComponentVerifierTest.java @@ -16,15 +16,19 @@ */ package org.apache.camel.component.twitter; -import java.util.*; +import static org.apache.camel.ComponentVerifier.VerificationError.asAttribute; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import org.apache.camel.ComponentVerifier; import org.apache.camel.ComponentVerifier.VerificationError; +import org.apache.camel.component.twitter.search.TwitterSearchComponent; import org.junit.Assert; import org.junit.Test; -import static org.apache.camel.ComponentVerifier.VerificationError.asAttribute; - public class CamelComponentVerifierTest extends CamelTwitterTestSupport { @Override public boolean isUseRouteBuilder() { @@ -33,8 +37,8 @@ public class CamelComponentVerifierTest extends CamelTwitterTestSupport { @Test public void testConnectivity() { - TwitterComponent component = context().getComponent("twitter", TwitterComponent.class); - TwitterComponentVerifier verifier = (TwitterComponentVerifier)component.getVerifier(); + TwitterSearchComponent component = context().getComponent("twitter-search", TwitterSearchComponent.class); + DefaultTwitterComponentVerifier verifier = (DefaultTwitterComponentVerifier)component.getVerifier(); Map<String, Object> parameters = getParameters(); ComponentVerifier.Result result = verifier.verify(ComponentVerifier.Scope.CONNECTIVITY, parameters); @@ -44,8 +48,8 @@ public class CamelComponentVerifierTest extends CamelTwitterTestSupport { @Test public void testInvalidKeyConfiguration() { - TwitterComponent component = context().getComponent("twitter", TwitterComponent.class); - TwitterComponentVerifier verifier = (TwitterComponentVerifier)component.getVerifier(); + TwitterSearchComponent component = context().getComponent("twitter-search", TwitterSearchComponent.class); + DefaultTwitterComponentVerifier verifier = (DefaultTwitterComponentVerifier)component.getVerifier(); Map<String, Object> parameters = getParameters(); parameters.put("consumerKey", "invalid"); @@ -61,8 +65,8 @@ public class CamelComponentVerifierTest extends CamelTwitterTestSupport { @Test public void testInvalidTokenConfiguration() { - TwitterComponent component = context().getComponent("twitter", TwitterComponent.class); - TwitterComponentVerifier verifier = (TwitterComponentVerifier)component.getVerifier(); + TwitterSearchComponent component = context().getComponent("twitter-search", TwitterSearchComponent.class); + DefaultTwitterComponentVerifier verifier = (DefaultTwitterComponentVerifier)component.getVerifier(); Map<String, Object> parameters = getParameters(); parameters.put("accessToken", "invalid"); @@ -80,8 +84,8 @@ public class CamelComponentVerifierTest extends CamelTwitterTestSupport { @Test public void testEmptyConfiguration() { - TwitterComponent component = context().getComponent("twitter", TwitterComponent.class); - TwitterComponentVerifier verifier = (TwitterComponentVerifier)component.getVerifier(); + TwitterSearchComponent component = context().getComponent("twitter-search", TwitterSearchComponent.class); + DefaultTwitterComponentVerifier verifier = (DefaultTwitterComponentVerifier)component.getVerifier(); { // Parameters validation @@ -91,7 +95,7 @@ public class CamelComponentVerifierTest extends CamelTwitterTestSupport { Assert.assertEquals(5, result.getErrors().size()); List<String> expected = new LinkedList<>(); - expected.add("kind"); + expected.add("endpointId"); expected.add("consumerKey"); expected.add("consumerSecret"); expected.add("accessToken"); http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessageDirectTest.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessageDirectTest.java b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessageDirectTest.java index 1054d45..8c7cf14 100644 --- a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessageDirectTest.java +++ b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessageDirectTest.java @@ -16,16 +16,35 @@ */ package org.apache.camel.component.twitter; +import java.util.Date; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import twitter4j.Twitter; + /** * consumes tweets */ public class DirectMessageDirectTest extends CamelTwitterConsumerTestSupport { + + @Override + public void setUp() throws Exception { + super.setUp(); + /* Uncomment when you need a test direct message + TwitterConfiguration properties = new TwitterConfiguration(); + properties.setConsumerKey(consumerKey); + properties.setConsumerSecret(consumerSecret); + properties.setAccessToken(accessToken); + properties.setAccessTokenSecret(accessTokenSecret); + Twitter twitter = properties.getTwitter(); + twitter.sendDirectMessage(twitter.getScreenName(), "Test Direct Message: " + new Date().toString()); + */ + } + @Override protected String getUri() { - return "twitter://directmessage?type=direct&"; + return "twitter-directmessage://foo?type=direct&"; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessagePollingTest.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessagePollingTest.java b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessagePollingTest.java index dc8c8f2..8c6938b 100644 --- a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessagePollingTest.java +++ b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/DirectMessagePollingTest.java @@ -16,16 +16,34 @@ */ package org.apache.camel.component.twitter; +import java.util.Date; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import twitter4j.Twitter; + /** * consumes tweets */ public class DirectMessagePollingTest extends CamelTwitterConsumerTestSupport { @Override + public void setUp() throws Exception { + super.setUp(); + /* Uncomment when you need a test direct message + TwitterConfiguration properties = new TwitterConfiguration(); + properties.setConsumerKey(consumerKey); + properties.setConsumerSecret(consumerSecret); + properties.setAccessToken(accessToken); + properties.setAccessTokenSecret(accessTokenSecret); + Twitter twitter = properties.getTwitter(); + twitter.sendDirectMessage(twitter.getScreenName(), "Test Direct Message: " + new Date().toString()); + */ + } + + @Override protected String getUri() { - return "twitter://directmessage?type=polling&"; + return "twitter-directmessage://foo?type=polling&"; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/bb64dca0/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/HomeTimeLineDirectTest.java ---------------------------------------------------------------------- diff --git a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/HomeTimeLineDirectTest.java b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/HomeTimeLineDirectTest.java index 5bd2f8f..fb14be4 100644 --- a/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/HomeTimeLineDirectTest.java +++ b/components/camel-twitter/src/test/java/org/apache/camel/component/twitter/HomeTimeLineDirectTest.java @@ -26,7 +26,7 @@ public class HomeTimeLineDirectTest extends CamelTwitterConsumerTestSupport { @Override protected String getUri() { - return "twitter://timeline/home?type=direct&"; + return "twitter-timeline://home?type=direct&"; } @Override