Repository: camel Updated Branches: refs/heads/master df062eaf4 -> 75fc0671b
Updated Box.com component to support long polling for updates from Box.com, fixed minor issues with token handling Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/75fc0671 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/75fc0671 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/75fc0671 Branch: refs/heads/master Commit: 75fc0671bdc118d4d8e399889543431309ae63fe Parents: df062ea Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Tue Jul 1 09:29:06 2014 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Tue Jul 1 09:29:06 2014 -0700 ---------------------------------------------------------------------- components/camel-box/pom.xml | 91 +++++- .../apache/camel/component/box/BoxConsumer.java | 104 ++++++- .../apache/camel/component/box/BoxEndpoint.java | 22 +- .../apache/camel/component/box/BoxProducer.java | 4 - .../component/box/internal/BoxClientHelper.java | 8 +- .../component/box/internal/BoxConstants.java | 2 + .../component/box/internal/EventCallback.java | 29 ++ .../box/internal/LongPollingEventsManager.java | 302 +++++++++++++++++++ .../box/internal/OAuthHelperListener.java | 4 + .../signatures/long-polling-events-manager.txt | 1 + .../component/box/AbstractBoxTestSupport.java | 1 + .../box/IBoxFilesManagerIntegrationTest.java | 1 - ...LongPollingEventsManagerIntegrationTest.java | 108 +++++++ 13 files changed, 660 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-box/pom.xml b/components/camel-box/pom.xml index 26779de..b32661a 100644 --- a/components/camel-box/pom.xml +++ b/components/camel-box/pom.xml @@ -25,8 +25,6 @@ <camel.osgi.export.pkg>${componentPackage}</camel.osgi.export.pkg> <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=box</camel.osgi.export.service> - <httpclient.version>4.3.3</httpclient.version> - <httpunit.version>1.7</httpunit.version> </properties> <dependencies> @@ -35,6 +33,11 @@ <artifactId>camel-core</artifactId> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient4-version}</version> + </dependency> + <dependency> <groupId>net.box</groupId> <artifactId>boxjavalibv2</artifactId> <version>${boxjavalibv2.version}</version> @@ -47,6 +50,34 @@ <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient-cache</artifactId> </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -54,6 +85,11 @@ <artifactId>htmlunit</artifactId> <version>${htmlunit.version}</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson2-version}</version> + </dependency> <!-- <dependency> <groupId>httpunit</groupId> @@ -63,8 +99,28 @@ --> <dependency> <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>${httpcore4-version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>${httpcore4-version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient-cache</artifactId> - <version>${httpclient.version}</version> + <version>${httpclient4-version}</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons-io-version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3-version}</version> </dependency> <!-- Camel annotations in provided scope to avoid compile errors in IDEs --> @@ -106,6 +162,26 @@ <build> <plugins> + <!-- compile LongPollingEventsManager before generate-sources --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <id>compile-long-polling-events-mnager</id> + <goals> + <goal>compile</goal> + </goals> + <phase>generate-sources</phase> + <configuration> + <includes> + <include>**/LongPollingEventsManager.java</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <!-- generate Component source and test source --> <plugin> <groupId>org.apache.camel</groupId> @@ -229,6 +305,12 @@ </aliases> </api> <api> + <apiName>poll-events</apiName> + <proxyClass>org.apache.camel.component.box.internal.LongPollingEventsManager</proxyClass> + <fromSignatureFile>${project.basedir}/src/signatures/long-polling-events-manager.txt</fromSignatureFile> + <excludeConfigNames>callback</excludeConfigNames> + </api> + <api> <apiName>search</apiName> <proxyClass>com.box.boxjavalibv2.resourcemanagers.IBoxSearchManager</proxyClass> <fromJavadoc/> @@ -396,8 +478,9 @@ <useFile>true</useFile> <forkMode>once</forkMode> <forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds> + <excludes/> <includes> - <include>**/*Test.java</include> + <include>**/*IntegrationTest.java</include> </includes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java index 3c2679f..f0da8a5 100644 --- a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java @@ -16,17 +16,115 @@ */ package org.apache.camel.component.box; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.box.boxjavalibv2.dao.BoxEventCollection; +import com.box.boxjavalibv2.dao.BoxTypedObject; +import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.box.internal.BoxApiName; -import org.apache.camel.util.component.AbstractApiConsumer; +import org.apache.camel.component.box.internal.BoxConstants; +import org.apache.camel.component.box.internal.CachedBoxClient; +import org.apache.camel.component.box.internal.EventCallback; +import org.apache.camel.component.box.internal.LongPollingEventsManager; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.component.ApiConsumerHelper; +import org.apache.camel.util.component.ApiMethod; +import org.apache.camel.util.component.ApiMethodHelper; +import org.apache.camel.util.component.PropertyNamesInterceptor; +import org.apache.camel.util.component.ResultInterceptor; /** * The Box consumer. */ -public class BoxConsumer extends AbstractApiConsumer<BoxApiName, BoxConfiguration> { +//public class BoxConsumer extends AbstractApiConsumer<BoxApiName, BoxConfiguration> { +public class BoxConsumer extends DefaultConsumer + implements PropertyNamesInterceptor, ResultInterceptor, EventCallback { + + private static final String CALLBACK_PROPERTY = "callback"; + + private final LongPollingEventsManager apiProxy; + private final Map<String, Object> properties; + private final ApiMethod apiMethod; + + private boolean splitResult = true; + private CachedBoxClient cachedBoxClient; public BoxConsumer(BoxEndpoint endpoint, Processor processor) { super(endpoint, processor); + + apiMethod = ApiConsumerHelper.findMethod(endpoint, this, log); + + properties = new HashMap<String, Object>(); + properties.putAll(endpoint.getEndpointProperties()); + properties.put(CALLBACK_PROPERTY, this); + + // invoke LongPollingEventsManager.poll to start event polling + cachedBoxClient = endpoint.getBoxClient(); + apiProxy = new LongPollingEventsManager(cachedBoxClient, + endpoint.getConfiguration().getHttpParams(), endpoint.getExecutorService()); + } + + @Override + public void interceptPropertyNames(Set<String> propertyNames) { + propertyNames.add(CALLBACK_PROPERTY); + } + + @Override + public void onEvent(BoxEventCollection events) throws Exception { + // convert Events to exchange and process + log.debug("Processed {} events for {}", + ApiConsumerHelper.getResultsProcessed(this, events, splitResult), cachedBoxClient); + } + + @Override + public void onException(Exception e) { + getExceptionHandler().handleException(ObjectHelper.wrapRuntimeCamelException(e)); } + @Override + protected void doStart() throws Exception { + super.doStart(); + + // invoke the API method to start polling + ApiMethodHelper.invokeMethod(apiProxy, apiMethod, properties); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + apiProxy.stopPolling(); + } + + public boolean isSplitResult() { + return splitResult; + } + + public void setSplitResult(boolean splitResult) { + this.splitResult = splitResult; + } + + @Override + public Object splitResult(Object result) { + if (result instanceof BoxEventCollection && splitResult) { + BoxEventCollection eventCollection = (BoxEventCollection) result; + final ArrayList<BoxTypedObject> entries = eventCollection.getEntries(); + return entries.toArray(new BoxTypedObject[entries.size()]); + } + return result; + } + + @Override + public void interceptResult(Object result, Exchange resultExchange) { + if (result instanceof BoxEventCollection) { + BoxEventCollection boxEventCollection = (BoxEventCollection) result; + resultExchange.getIn().setHeader(BoxConstants.CHUNK_SIZE_PROPERTY, + boxEventCollection.getChunkSize()); + resultExchange.getIn().setHeader(BoxConstants.NEXT_STREAM_POSITION_PROPERTY, + boxEventCollection.getNextStreamPosition()); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java index d6f5ade..01c2678 100644 --- a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.Producer; import org.apache.camel.component.box.internal.BoxApiCollection; import org.apache.camel.component.box.internal.BoxApiName; import org.apache.camel.component.box.internal.BoxClientHelper; +import org.apache.camel.component.box.internal.BoxConstants; import org.apache.camel.component.box.internal.BoxPropertiesHelper; import org.apache.camel.component.box.internal.CachedBoxClient; import org.apache.camel.spi.UriEndpoint; @@ -61,6 +62,11 @@ public class BoxEndpoint extends AbstractApiEndpoint<BoxApiName, BoxConfiguratio } public Producer createProducer() throws Exception { + // validate producer APIs + if (getApiName() == BoxApiName.POLL_EVENTS) { + throw new IllegalArgumentException("Producer endpoints do not support endpoint prefix " + + BoxApiName.POLL_EVENTS.getName()); + } return new BoxProducer(this); } @@ -69,6 +75,12 @@ public class BoxEndpoint extends AbstractApiEndpoint<BoxApiName, BoxConfiguratio if (inBody != null) { throw new IllegalArgumentException("Option inBody is not supported for consumer endpoint"); } + + // validate consumer APIs + if (getApiName() != BoxApiName.POLL_EVENTS) { + throw new IllegalArgumentException("Consumer endpoint only supports endpoint prefix " + + BoxApiName.POLL_EVENTS.getName()); + } final BoxConsumer consumer = new BoxConsumer(this, processor); // also set consumer.* properties configureConsumer(consumer); @@ -80,6 +92,10 @@ public class BoxEndpoint extends AbstractApiEndpoint<BoxApiName, BoxConfiguratio return BoxPropertiesHelper.getHelper(); } + protected String getThreadProfileName() { + return BoxConstants.THREAD_PROFILE_NAME; + } + @Override protected void afterConfigureProperties() { // create client eagerly, a good way to validate configuration @@ -120,7 +136,7 @@ public class BoxEndpoint extends AbstractApiEndpoint<BoxApiName, BoxConfiguratio } @Override - protected void interceptProperties(Map<String, Object> properties) { + public void interceptProperties(Map<String, Object> properties) { // set shared link and password from configuration if not set as header properties if (!properties.containsKey(SHARED_LINK_PROPERTY) && !ObjectHelper.isEmpty(sharedLink)) { properties.put(SHARED_LINK_PROPERTY, sharedLink); @@ -227,4 +243,8 @@ public class BoxEndpoint extends AbstractApiEndpoint<BoxApiName, BoxConfiguratio super.doShutdown(); } } + + public CachedBoxClient getBoxClient() { + return cachedBoxClient; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java index a1d2cf3..e08a59c 100644 --- a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java @@ -29,8 +29,4 @@ public class BoxProducer extends AbstractApiProducer<BoxApiName, BoxConfiguratio public BoxProducer(BoxEndpoint endpoint) { super(endpoint, BoxPropertiesHelper.getHelper()); } - - protected String getThreadProfileName() { - return BoxConstants.THREAD_PROFILE_NAME; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java index 621132f..6df4d1a 100644 --- a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java @@ -206,14 +206,14 @@ public final class BoxClientHelper { LOG.debug("Revoking OAuth refresh token for {}", cachedBoxClient); + // revoke OAuth token + boxClient.getOAuthManager().revokeOAuth(boxClient.getAuthData().getAccessToken(), + configuration.getClientId(), configuration.getClientSecret()); + // notify the OAuthListener of revoked token cachedBoxClient.getListener().onRefresh(null); // mark auth data revoked boxClient.getOAuthDataController().setOAuthData(null); - - // revoke OAuth token - boxClient.getOAuthManager().revokeOAuth(boxClient.getAuthData().getAccessToken(), - configuration.getClientId(), configuration.getClientSecret()); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java index 11f584b..49a4474 100644 --- a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java @@ -23,6 +23,8 @@ public interface BoxConstants { // suffix for parameters when passed as exchange header properties String PROPERTY_PREFIX = "CamelBox."; + String NEXT_STREAM_POSITION_PROPERTY = PROPERTY_PREFIX + "nextStreamPosition"; + String CHUNK_SIZE_PROPERTY = PROPERTY_PREFIX + "chunkSize"; // thread profile name for this component String THREAD_PROFILE_NAME = "CamelBox"; http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java new file mode 100644 index 0000000..f217bf6 --- /dev/null +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.box.internal; + +import com.box.boxjavalibv2.dao.BoxEventCollection; + +/** + * Callback interface to handle BoxEvents received from long polling. + */ +public interface EventCallback { + + void onEvent(BoxEventCollection events) throws Exception; + + void onException(Exception e); +} http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java new file mode 100644 index 0000000..66bb2b7 --- /dev/null +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.box.internal; + +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import com.box.boxjavalibv2.dao.BoxCollection; +import com.box.boxjavalibv2.dao.BoxEventCollection; +import com.box.boxjavalibv2.dao.BoxRealTimeServer; +import com.box.boxjavalibv2.dao.BoxTypedObject; +import com.box.boxjavalibv2.exceptions.AuthFatalFailureException; +import com.box.boxjavalibv2.exceptions.BoxServerException; +import com.box.boxjavalibv2.requests.requestobjects.BoxEventRequestObject; +import com.box.boxjavalibv2.resourcemanagers.IBoxEventsManager; +import com.box.restclientv2.exceptions.BoxRestException; +import com.box.restclientv2.exceptions.BoxSDKException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.util.ObjectHelper; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.HttpContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager for monitoring events using long polling. + */ +@SuppressWarnings("deprecation") +public class LongPollingEventsManager { + + private static final Logger LOG = LoggerFactory.getLogger(LongPollingEventsManager.class); + private static final String RETRY_TIMEOUT = "retry_timeout"; + private static final String MAX_RETRIES = "max_retries"; + private static final String MESSAGE = "message"; + private static final String NEW_CHANGE = "new_change"; + private static final String RECONNECT = "reconnect"; + private static final String OUT_OF_DATE = "out_of_date"; + + private final CachedBoxClient cachedBoxClient; + private final ExecutorService executorService; + private final BasicHttpParams httpParams; + + private HttpClient httpClient; + private Future<?> pollFuture; + private HttpGet httpGet; + private boolean done; + + public LongPollingEventsManager(CachedBoxClient boxClient, + Map<String, Object> httpParams, ExecutorService executorService) { + + this.cachedBoxClient = boxClient; + this.executorService = executorService; + + this.httpParams = new BasicHttpParams(); + HttpConnectionParams.setSoKeepalive(this.httpParams, true); + + if (httpParams != null) { + for (Map.Entry<String, Object> entry : httpParams.entrySet()) { + this.httpParams.setParameter(entry.getKey(), entry.getValue()); + } + } + } + + @SuppressWarnings("unused") + public void poll(long streamPosition, final String streamType, final int limit, final EventCallback callback) + throws BoxServerException, AuthFatalFailureException, BoxRestException { + + // get BoxClient Event Manager + final IBoxEventsManager eventsManager = cachedBoxClient.getBoxClient().getEventsManager(); + + // get current stream position if requested + if (BoxEventRequestObject.STREAM_POSITION_NOW == streamPosition) { + streamPosition = getCurrentStreamPosition(eventsManager, streamPosition); + } + + // validate parameters + ObjectHelper.notNull(streamPosition, "streamPosition"); + ObjectHelper.notEmpty(streamType, "streamType"); + ObjectHelper.notNull(callback, "eventCallback"); + + httpClient = new DefaultHttpClient(cachedBoxClient.getClientConnectionManager(), httpParams); + + // start polling thread + LOG.info("Started event polling thread for " + cachedBoxClient); + + final long startStreamPosition = streamPosition; + pollFuture = executorService.submit(new Runnable() { + @Override + public void run() { + + final ObjectMapper mapper = new ObjectMapper(); + + long currentStreamPosition = startStreamPosition; + BoxRealTimeServer realTimeServer = null; + + boolean retry = false; + int retries = 0; + int maxRetries = 1; + + while (!done) { + try { + // set to true if no exceptions thrown + retry = false; + + if (realTimeServer == null) { + + // get RTS URL + realTimeServer = getBoxRealTimeServer(currentStreamPosition, eventsManager); + + // update HTTP timeout + final int requestTimeout = Integer.parseInt( + realTimeServer.getExtraData(RETRY_TIMEOUT).toString()); + final HttpParams params = httpClient.getParams(); + HttpConnectionParams.setSoTimeout(params, requestTimeout * 1000); + + // update maxRetries + maxRetries = Integer.parseInt(realTimeServer.getExtraData(MAX_RETRIES).toString()); + } + + // create HTTP request for RTS + httpGet = getPollRequest(realTimeServer.getUrl(), currentStreamPosition); + + // execute RTS poll + HttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(httpGet, (HttpContext) null); + } catch (SocketTimeoutException e) { + LOG.debug("Poll timed out, retrying for " + cachedBoxClient); + } + + if (httpResponse != null) { + + // parse response + final StatusLine statusLine = httpResponse.getStatusLine(); + if (statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK) { + final HttpEntity entity = httpResponse.getEntity(); + @SuppressWarnings("unchecked") + Map<String, String> rtsResponse = mapper.readValue(entity.getContent(), Map.class); + + final String message = rtsResponse.get(MESSAGE); + if (NEW_CHANGE.equals(message)) { + + // get events + final BoxEventRequestObject requestObject = + BoxEventRequestObject.getEventsRequestObject(currentStreamPosition); + requestObject.setStreamType(streamType); + requestObject.setLimit(limit); + final BoxEventCollection events = eventsManager.getEvents(requestObject); + + // notify callback + callback.onEvent(events); + + // update stream position + currentStreamPosition = events.getNextStreamPosition(); + + } else if (RECONNECT.equals(message) || MAX_RETRIES.equals(message)) { + LOG.debug("Long poll reconnect for " + cachedBoxClient); + realTimeServer = null; + } else if (OUT_OF_DATE.equals(message)) { + // update currentStreamPosition + LOG.debug("Long poll out of date for " + cachedBoxClient); + currentStreamPosition = getCurrentStreamPosition(eventsManager, + BoxEventRequestObject.STREAM_POSITION_NOW); + realTimeServer = null; + } else { + throw new RuntimeCamelException("Unknown poll response " + message); + } + } else { + String msg = "Unknown error"; + if (statusLine != null) { + msg = String.format("Error polling events for %s: code=%s, message=%s", + cachedBoxClient, statusLine.getStatusCode(), statusLine.getReasonPhrase()); + } + throw new RuntimeCamelException(msg); + } + } + + // keep polling + retry = true; + + } catch (InterruptedException e) { + LOG.debug("Interrupted event polling thread for {}, exiting...", cachedBoxClient); + } catch (BoxSDKException e) { + callback.onException(e); + } catch (RuntimeCamelException e) { + callback.onException(e); + } catch (SocketException e) { + // TODO handle connection aborts!!! + LOG.debug("Socket exception while event polling for {}", cachedBoxClient); + retry = true; + realTimeServer = null; + } catch (Exception e) { + callback.onException(new RuntimeCamelException("Error while polling for " + + cachedBoxClient + ": " + e.getMessage(), e)); + } finally { + // are we done yet? + if (!retry) { + done = true; + } else { + if (realTimeServer != null + && (++retries > maxRetries)) { + // make another option call + realTimeServer = null; + } + } + } + } + LOG.info("Stopped event polling thread for " + cachedBoxClient); + } + }); + } + + private long getCurrentStreamPosition(IBoxEventsManager eventsManager, long streamPosition) + throws BoxRestException, BoxServerException, AuthFatalFailureException { + + final BoxEventRequestObject requestObject = + BoxEventRequestObject.getEventsRequestObject(streamPosition); + final BoxEventCollection events = eventsManager.getEvents(requestObject); + streamPosition = events.getNextStreamPosition(); + return streamPosition; + } + + public void stopPolling() throws Exception { + if (!done) { + + // done polling + done = true; + + // make sure an HTTP GET is not in progress + if (httpGet != null && !httpGet.isAborted()) { + httpGet.abort(); + } + + // cancel polling thread + if (pollFuture.cancel(true)) { + LOG.info("Stopped event polling for " + cachedBoxClient); + } else { + LOG.warn("Unable to stop event polling for " + cachedBoxClient); + } + + httpClient = null; + pollFuture = null; + } + } + + private BoxRealTimeServer getBoxRealTimeServer(long currentStreamPosition, IBoxEventsManager eventsManager) + throws BoxRestException, BoxServerException, AuthFatalFailureException { + + final BoxEventRequestObject optionsRequest = + BoxEventRequestObject.getEventsRequestObject(currentStreamPosition); + + final BoxCollection eventOptions = eventsManager.getEventOptions(optionsRequest); + final ArrayList<BoxTypedObject> entries = eventOptions.getEntries(); + + // validate options + if (entries == null || entries.size() < 1 + || !(entries.get(0) instanceof BoxRealTimeServer)) { + + throw new RuntimeCamelException("No Real Time Server from event options for " + cachedBoxClient); + } + + return (BoxRealTimeServer) entries.get(0); + } + + private HttpGet getPollRequest(String url, long currentStreamPosition) throws AuthFatalFailureException { + + final HttpGet httpGet = new HttpGet(url + "&stream_position=" + currentStreamPosition); + final String accessToken = cachedBoxClient.getBoxClient().getAuthData().getAccessToken(); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken); + return httpGet; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java index 684d1ba..7a62796 100644 --- a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java +++ b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java @@ -41,6 +41,7 @@ class OAuthHelperListener implements OAuthRefreshListener { @Override public void onRefresh(IAuthData newAuthData) { + // look for refresh token update or revocation if (authSecureStorage != null && (newAuthData == null || !newAuthData.getRefreshToken().equals(refreshToken))) { @@ -49,5 +50,8 @@ class OAuthHelperListener implements OAuthRefreshListener { if (configListener != null) { configListener.onRefresh(newAuthData); } + + // update cached refresh token + refreshToken = newAuthData != null ? newAuthData.getRefreshToken() : null; } } http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/signatures/long-polling-events-manager.txt ---------------------------------------------------------------------- diff --git a/components/camel-box/src/signatures/long-polling-events-manager.txt b/components/camel-box/src/signatures/long-polling-events-manager.txt new file mode 100644 index 0000000..57ea578 --- /dev/null +++ b/components/camel-box/src/signatures/long-polling-events-manager.txt @@ -0,0 +1 @@ +void poll(long streamPosition, final String streamType, final int limit, final org.apache.camel.component.box.internal.EventCallback callback); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java b/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java index 7d2c33c..130bcaf 100644 --- a/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java +++ b/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java @@ -44,6 +44,7 @@ import org.junit.AfterClass; public abstract class AbstractBoxTestSupport extends CamelTestSupport { protected static final String CAMEL_TEST_TAG = "camel_was_here"; + protected static final String CAMEL_TEST_FILE = "CamelTestFile"; private static final String LINE_SEPARATOR = System.getProperty("line.separator"); private static final String TEST_OPTIONS_PROPERTIES = "/test-options.properties"; private static final String REFRESH_TOKEN_PROPERTY = "refreshToken"; http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java b/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java index 03f483d..5fd8239 100644 --- a/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java +++ b/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java @@ -54,7 +54,6 @@ public class IBoxFilesManagerIntegrationTest extends AbstractBoxTestSupport { private static final Logger LOG = LoggerFactory.getLogger(IBoxFilesManagerIntegrationTest.class); private static final String PATH_PREFIX = BoxApiName.FILES.getName(); - private static final String CAMEL_TEST_FILE = "CamelTestFile"; private static final BoxImageRequestObject BOX_IMAGE_REQUEST_OBJECT = BoxImageRequestObject.previewRequestObject(); private static final String PNG_EXTENSION = "png"; private static final String TEST_UPLOAD_FILE = "/log4j.properties"; http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java b/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java new file mode 100644 index 0000000..9502152 --- /dev/null +++ b/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java @@ -0,0 +1,108 @@ +/* + * Camel Api Route test generated by camel-component-util-maven-plugin + * Generated on: Mon Jun 30 14:29:45 PDT 2014 + */ +package org.apache.camel.component.box; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.box.boxjavalibv2.dao.BoxEvent; +import com.box.boxjavalibv2.dao.BoxFile; +import com.box.boxjavalibv2.requests.requestobjects.BoxItemCopyRequestObject; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.box.internal.BoxApiName; +import org.apache.camel.component.box.internal.BoxConstants; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test class for org.apache.camel.component.box.internal.LongPollingEventsManager APIs. + */ +public class LongPollingEventsManagerIntegrationTest extends AbstractBoxTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(LongPollingEventsManagerIntegrationTest.class); + + @Test + public void testPoll() throws Exception { + + // generate file copy event + final Map<String, Object> headers = new HashMap<String, Object>(); + headers.put("CamelBox.fileId", testFileId); + final BoxItemCopyRequestObject requestObject = BoxItemCopyRequestObject.copyItemRequestObject("0"); + requestObject.setName(CAMEL_TEST_FILE); + headers.put("CamelBox.itemCopyRequest", requestObject); + + BoxFile result = requestBodyAndHeaders("direct://COPYFILE", null, headers); + + assertNotNull("copyFile result", result); + LOG.debug("copyFile: " + result); + + // generate file delete event + headers.clear(); + headers.put("CamelBox.fileId", result.getId()); + headers.put("CamelBox.defaultRequest", null); + requestBodyAndHeaders("direct://DELETEFILE", null, headers); + + MockEndpoint mockEndpoint = getMockEndpoint("mock:boxEvents"); + mockEndpoint.expectedMinimumMessageCount(2); + mockEndpoint.setResultWaitTime(TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS)); + mockEndpoint.assertIsSatisfied(); + + final List<Exchange> exchanges = mockEndpoint.getExchanges(); + assertNotNull("poll result", exchanges); + assertFalse("poll result", exchanges.isEmpty()); + LOG.debug("poll result: " + exchanges); + + for (Exchange exchange : exchanges) { + + assertNotNull("poll result " + BoxConstants.CHUNK_SIZE_PROPERTY, + exchange.getIn().getHeader(BoxConstants.CHUNK_SIZE_PROPERTY)); + assertNotNull("poll result " + BoxConstants.NEXT_STREAM_POSITION_PROPERTY, + exchange.getIn().getHeader(BoxConstants.NEXT_STREAM_POSITION_PROPERTY)); + + final Object body = exchange.getIn().getBody(); + assertNotNull("poll result body", body); + assertEquals("poll result body type", BoxEvent.class, body.getClass()); + + final String eventType = ((BoxEvent)body).getEventType(); + assertTrue("poll result type", + BoxEvent.EVENT_TYPE_ITEM_COPY.equals(eventType) || BoxEvent.EVENT_TYPE_ITEM_TRASH.equals(eventType)); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + // test route for copyFile + from("direct://COPYFILE") + .to("box://" + BoxApiName.FILES.getName() + "/copyFile"); + + // test route for deleteFile + from("direct://DELETEFILE") + .to("box://" + BoxApiName.FILES.getName() + "/deleteFile"); + + // test route for poll + from("box://" + BoxApiName.POLL_EVENTS.getName() + "/poll?streamPosition=-1&streamType=all&limit=100") + .to("mock:boxEvents"); + + } + }; + } + + @Override + protected CamelContext createCamelContext() throws Exception { + // also test revoke on shutdown + final CamelContext camelContext = super.createCamelContext(); + final BoxComponent box = (BoxComponent) camelContext.getComponent("box"); + box.getConfiguration().setRevokeOnShutdown(true); + return camelContext; + } +}