Repository: camel
Updated Branches:
  refs/heads/master df062eaf4 -> 75fc0671b


Updated Box.com component to support long polling for updates from Box.com, 
fixed minor issues with token handling


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/75fc0671
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/75fc0671
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/75fc0671

Branch: refs/heads/master
Commit: 75fc0671bdc118d4d8e399889543431309ae63fe
Parents: df062ea
Author: Dhiraj Bokde <dhira...@yahoo.com>
Authored: Tue Jul 1 09:29:06 2014 -0700
Committer: Dhiraj Bokde <dhira...@yahoo.com>
Committed: Tue Jul 1 09:29:06 2014 -0700

----------------------------------------------------------------------
 components/camel-box/pom.xml                    |  91 +++++-
 .../apache/camel/component/box/BoxConsumer.java | 104 ++++++-
 .../apache/camel/component/box/BoxEndpoint.java |  22 +-
 .../apache/camel/component/box/BoxProducer.java |   4 -
 .../component/box/internal/BoxClientHelper.java |   8 +-
 .../component/box/internal/BoxConstants.java    |   2 +
 .../component/box/internal/EventCallback.java   |  29 ++
 .../box/internal/LongPollingEventsManager.java  | 302 +++++++++++++++++++
 .../box/internal/OAuthHelperListener.java       |   4 +
 .../signatures/long-polling-events-manager.txt  |   1 +
 .../component/box/AbstractBoxTestSupport.java   |   1 +
 .../box/IBoxFilesManagerIntegrationTest.java    |   1 -
 ...LongPollingEventsManagerIntegrationTest.java | 108 +++++++
 13 files changed, 660 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-box/pom.xml b/components/camel-box/pom.xml
index 26779de..b32661a 100644
--- a/components/camel-box/pom.xml
+++ b/components/camel-box/pom.xml
@@ -25,8 +25,6 @@
 
     <camel.osgi.export.pkg>${componentPackage}</camel.osgi.export.pkg>
     
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=box</camel.osgi.export.service>
-    <httpclient.version>4.3.3</httpclient.version>
-    <httpunit.version>1.7</httpunit.version>
   </properties>
 
   <dependencies>
@@ -35,6 +33,11 @@
       <artifactId>camel-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>${httpclient4-version}</version>
+    </dependency>
+    <dependency>
       <groupId>net.box</groupId>
       <artifactId>boxjavalibv2</artifactId>
       <version>${boxjavalibv2.version}</version>
@@ -47,6 +50,34 @@
           <groupId>org.apache.httpcomponents</groupId>
           <artifactId>httpclient-cache</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.easymock</groupId>
+          <artifactId>easymock</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-io</groupId>
+          <artifactId>commons-io</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-lang</groupId>
+          <artifactId>commons-lang</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -54,6 +85,11 @@
       <artifactId>htmlunit</artifactId>
       <version>${htmlunit.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson2-version}</version>
+    </dependency>
 <!--
     <dependency>
       <groupId>httpunit</groupId>
@@ -63,8 +99,28 @@
 -->
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+      <version>${httpcore4-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpmime</artifactId>
+      <version>${httpcore4-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient-cache</artifactId>
-      <version>${httpclient.version}</version>
+      <version>${httpclient4-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons-io-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3-version}</version>
     </dependency>
 
     <!-- Camel annotations in provided scope to avoid compile errors in IDEs 
-->
@@ -106,6 +162,26 @@
   <build>
     <plugins>
 
+      <!-- compile LongPollingEventsManager before generate-sources -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>compile-long-polling-events-mnager</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>generate-sources</phase>
+            <configuration>
+              <includes>
+                <include>**/LongPollingEventsManager.java</include>
+              </includes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
       <!-- generate Component source and test source -->
       <plugin>
         <groupId>org.apache.camel</groupId>
@@ -229,6 +305,12 @@
                   </aliases>
                 </api>
                 <api>
+                  <apiName>poll-events</apiName>
+                  
<proxyClass>org.apache.camel.component.box.internal.LongPollingEventsManager</proxyClass>
+                  
<fromSignatureFile>${project.basedir}/src/signatures/long-polling-events-manager.txt</fromSignatureFile>
+                  <excludeConfigNames>callback</excludeConfigNames>
+                </api>
+                <api>
                   <apiName>search</apiName>
                   
<proxyClass>com.box.boxjavalibv2.resourcemanagers.IBoxSearchManager</proxyClass>
                   <fromJavadoc/>
@@ -396,8 +478,9 @@
               <useFile>true</useFile>
               <forkMode>once</forkMode>
               
<forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds>
+              <excludes/>
               <includes>
-                <include>**/*Test.java</include>
+                <include>**/*IntegrationTest.java</include>
               </includes>
             </configuration>
           </plugin>

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java
index 3c2679f..f0da8a5 100644
--- 
a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxConsumer.java
@@ -16,17 +16,115 @@
  */
 package org.apache.camel.component.box;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.box.boxjavalibv2.dao.BoxEventCollection;
+import com.box.boxjavalibv2.dao.BoxTypedObject;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.box.internal.BoxApiName;
-import org.apache.camel.util.component.AbstractApiConsumer;
+import org.apache.camel.component.box.internal.BoxConstants;
+import org.apache.camel.component.box.internal.CachedBoxClient;
+import org.apache.camel.component.box.internal.EventCallback;
+import org.apache.camel.component.box.internal.LongPollingEventsManager;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.component.ApiConsumerHelper;
+import org.apache.camel.util.component.ApiMethod;
+import org.apache.camel.util.component.ApiMethodHelper;
+import org.apache.camel.util.component.PropertyNamesInterceptor;
+import org.apache.camel.util.component.ResultInterceptor;
 
 /**
  * The Box consumer.
  */
-public class BoxConsumer extends AbstractApiConsumer<BoxApiName, 
BoxConfiguration> {
+//public class BoxConsumer extends AbstractApiConsumer<BoxApiName, 
BoxConfiguration> {
+public class BoxConsumer extends DefaultConsumer
+    implements PropertyNamesInterceptor, ResultInterceptor, EventCallback {
+
+    private static final String CALLBACK_PROPERTY = "callback";
+
+    private final LongPollingEventsManager apiProxy;
+    private final Map<String, Object> properties;
+    private final ApiMethod apiMethod;
+
+    private boolean splitResult = true;
+    private CachedBoxClient cachedBoxClient;
 
     public BoxConsumer(BoxEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
+
+        apiMethod = ApiConsumerHelper.findMethod(endpoint, this, log);
+
+        properties = new HashMap<String, Object>();
+        properties.putAll(endpoint.getEndpointProperties());
+        properties.put(CALLBACK_PROPERTY, this);
+
+        // invoke LongPollingEventsManager.poll to start event polling
+        cachedBoxClient = endpoint.getBoxClient();
+        apiProxy = new LongPollingEventsManager(cachedBoxClient,
+            endpoint.getConfiguration().getHttpParams(), 
endpoint.getExecutorService());
+    }
+
+    @Override
+    public void interceptPropertyNames(Set<String> propertyNames) {
+        propertyNames.add(CALLBACK_PROPERTY);
+    }
+
+    @Override
+    public void onEvent(BoxEventCollection events) throws Exception {
+        // convert Events to exchange and process
+        log.debug("Processed {} events for {}",
+            ApiConsumerHelper.getResultsProcessed(this, events, splitResult), 
cachedBoxClient);
+    }
+
+    @Override
+    public void onException(Exception e) {
+        
getExceptionHandler().handleException(ObjectHelper.wrapRuntimeCamelException(e));
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // invoke the API method to start polling
+        ApiMethodHelper.invokeMethod(apiProxy, apiMethod, properties);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        apiProxy.stopPolling();
+    }
+
+    public boolean isSplitResult() {
+        return splitResult;
+    }
+
+    public void setSplitResult(boolean splitResult) {
+        this.splitResult = splitResult;
+    }
+
+    @Override
+    public Object splitResult(Object result) {
+        if (result instanceof BoxEventCollection && splitResult) {
+            BoxEventCollection eventCollection = (BoxEventCollection) result;
+            final ArrayList<BoxTypedObject> entries = 
eventCollection.getEntries();
+            return entries.toArray(new BoxTypedObject[entries.size()]);
+        }
+        return result;
+    }
+
+    @Override
+    public void interceptResult(Object result, Exchange resultExchange) {
+        if (result instanceof BoxEventCollection) {
+            BoxEventCollection boxEventCollection = (BoxEventCollection) 
result;
+            resultExchange.getIn().setHeader(BoxConstants.CHUNK_SIZE_PROPERTY,
+                boxEventCollection.getChunkSize());
+            
resultExchange.getIn().setHeader(BoxConstants.NEXT_STREAM_POSITION_PROPERTY,
+                boxEventCollection.getNextStreamPosition());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java
index d6f5ade..01c2678 100644
--- 
a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxEndpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.component.box.internal.BoxApiCollection;
 import org.apache.camel.component.box.internal.BoxApiName;
 import org.apache.camel.component.box.internal.BoxClientHelper;
+import org.apache.camel.component.box.internal.BoxConstants;
 import org.apache.camel.component.box.internal.BoxPropertiesHelper;
 import org.apache.camel.component.box.internal.CachedBoxClient;
 import org.apache.camel.spi.UriEndpoint;
@@ -61,6 +62,11 @@ public class BoxEndpoint extends 
AbstractApiEndpoint<BoxApiName, BoxConfiguratio
     }
 
     public Producer createProducer() throws Exception {
+        // validate producer APIs
+        if (getApiName() == BoxApiName.POLL_EVENTS) {
+            throw new IllegalArgumentException("Producer endpoints do not 
support endpoint prefix "
+                + BoxApiName.POLL_EVENTS.getName());
+        }
         return new BoxProducer(this);
     }
 
@@ -69,6 +75,12 @@ public class BoxEndpoint extends 
AbstractApiEndpoint<BoxApiName, BoxConfiguratio
         if (inBody != null) {
             throw new IllegalArgumentException("Option inBody is not supported 
for consumer endpoint");
         }
+
+        // validate consumer APIs
+        if (getApiName() != BoxApiName.POLL_EVENTS) {
+            throw new IllegalArgumentException("Consumer endpoint only 
supports endpoint prefix "
+                + BoxApiName.POLL_EVENTS.getName());
+        }
         final BoxConsumer consumer = new BoxConsumer(this, processor);
         // also set consumer.* properties
         configureConsumer(consumer);
@@ -80,6 +92,10 @@ public class BoxEndpoint extends 
AbstractApiEndpoint<BoxApiName, BoxConfiguratio
         return BoxPropertiesHelper.getHelper();
     }
 
+    protected String getThreadProfileName() {
+        return BoxConstants.THREAD_PROFILE_NAME;
+    }
+
     @Override
     protected void afterConfigureProperties() {
         // create client eagerly, a good way to validate configuration
@@ -120,7 +136,7 @@ public class BoxEndpoint extends 
AbstractApiEndpoint<BoxApiName, BoxConfiguratio
     }
 
     @Override
-    protected void interceptProperties(Map<String, Object> properties) {
+    public void interceptProperties(Map<String, Object> properties) {
         // set shared link and password from configuration if not set as 
header properties
         if (!properties.containsKey(SHARED_LINK_PROPERTY) && 
!ObjectHelper.isEmpty(sharedLink)) {
             properties.put(SHARED_LINK_PROPERTY, sharedLink);
@@ -227,4 +243,8 @@ public class BoxEndpoint extends 
AbstractApiEndpoint<BoxApiName, BoxConfiguratio
             super.doShutdown();
         }
     }
+
+    public CachedBoxClient getBoxClient() {
+        return cachedBoxClient;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java
index a1d2cf3..e08a59c 100644
--- 
a/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/BoxProducer.java
@@ -29,8 +29,4 @@ public class BoxProducer extends 
AbstractApiProducer<BoxApiName, BoxConfiguratio
     public BoxProducer(BoxEndpoint endpoint) {
         super(endpoint, BoxPropertiesHelper.getHelper());
     }
-
-    protected String getThreadProfileName() {
-        return BoxConstants.THREAD_PROFILE_NAME;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java
index 621132f..6df4d1a 100644
--- 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxClientHelper.java
@@ -206,14 +206,14 @@ public final class BoxClientHelper {
 
                 LOG.debug("Revoking OAuth refresh token for {}", 
cachedBoxClient);
 
+                // revoke OAuth token
+                
boxClient.getOAuthManager().revokeOAuth(boxClient.getAuthData().getAccessToken(),
+                    configuration.getClientId(), 
configuration.getClientSecret());
+
                 // notify the OAuthListener of revoked token
                 cachedBoxClient.getListener().onRefresh(null);
                 // mark auth data revoked
                 boxClient.getOAuthDataController().setOAuthData(null);
-
-                // revoke OAuth token
-                
boxClient.getOAuthManager().revokeOAuth(boxClient.getAuthData().getAccessToken(),
-                    configuration.getClientId(), 
configuration.getClientSecret());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java
index 11f584b..49a4474 100644
--- 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/BoxConstants.java
@@ -23,6 +23,8 @@ public interface BoxConstants {
 
     // suffix for parameters when passed as exchange header properties
     String PROPERTY_PREFIX = "CamelBox.";
+    String NEXT_STREAM_POSITION_PROPERTY = PROPERTY_PREFIX + 
"nextStreamPosition";
+    String CHUNK_SIZE_PROPERTY = PROPERTY_PREFIX + "chunkSize";
 
     // thread profile name for this component
     String THREAD_PROFILE_NAME = "CamelBox";

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java
new file mode 100644
index 0000000..f217bf6
--- /dev/null
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/EventCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.box.internal;
+
+import com.box.boxjavalibv2.dao.BoxEventCollection;
+
+/**
+ * Callback interface to handle BoxEvents received from long polling.
+ */
+public interface EventCallback {
+
+    void onEvent(BoxEventCollection events) throws Exception;
+
+    void onException(Exception e);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java
new file mode 100644
index 0000000..66bb2b7
--- /dev/null
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/LongPollingEventsManager.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.box.internal;
+
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import com.box.boxjavalibv2.dao.BoxCollection;
+import com.box.boxjavalibv2.dao.BoxEventCollection;
+import com.box.boxjavalibv2.dao.BoxRealTimeServer;
+import com.box.boxjavalibv2.dao.BoxTypedObject;
+import com.box.boxjavalibv2.exceptions.AuthFatalFailureException;
+import com.box.boxjavalibv2.exceptions.BoxServerException;
+import com.box.boxjavalibv2.requests.requestobjects.BoxEventRequestObject;
+import com.box.boxjavalibv2.resourcemanagers.IBoxEventsManager;
+import com.box.restclientv2.exceptions.BoxRestException;
+import com.box.restclientv2.exceptions.BoxSDKException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager for monitoring events using long polling.
+ */
+@SuppressWarnings("deprecation")
+public class LongPollingEventsManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LongPollingEventsManager.class);
+    private static final String RETRY_TIMEOUT = "retry_timeout";
+    private static final String MAX_RETRIES = "max_retries";
+    private static final String MESSAGE = "message";
+    private static final String NEW_CHANGE = "new_change";
+    private static final String RECONNECT = "reconnect";
+    private static final String OUT_OF_DATE = "out_of_date";
+
+    private final CachedBoxClient cachedBoxClient;
+    private final ExecutorService executorService;
+    private final BasicHttpParams httpParams;
+
+    private HttpClient httpClient;
+    private Future<?> pollFuture;
+    private HttpGet httpGet;
+    private boolean done;
+
+    public LongPollingEventsManager(CachedBoxClient boxClient,
+                                    Map<String, Object> httpParams, 
ExecutorService executorService) {
+
+        this.cachedBoxClient = boxClient;
+        this.executorService = executorService;
+
+        this.httpParams = new BasicHttpParams();
+        HttpConnectionParams.setSoKeepalive(this.httpParams, true);
+
+        if (httpParams != null) {
+            for (Map.Entry<String, Object> entry : httpParams.entrySet()) {
+                this.httpParams.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public void poll(long streamPosition, final String streamType, final int 
limit, final EventCallback callback)
+        throws BoxServerException, AuthFatalFailureException, BoxRestException 
{
+
+        // get BoxClient Event Manager
+        final IBoxEventsManager eventsManager = 
cachedBoxClient.getBoxClient().getEventsManager();
+
+        // get current stream position if requested
+        if (BoxEventRequestObject.STREAM_POSITION_NOW == streamPosition) {
+            streamPosition = getCurrentStreamPosition(eventsManager, 
streamPosition);
+        }
+
+        // validate parameters
+        ObjectHelper.notNull(streamPosition, "streamPosition");
+        ObjectHelper.notEmpty(streamType, "streamType");
+        ObjectHelper.notNull(callback, "eventCallback");
+
+        httpClient = new 
DefaultHttpClient(cachedBoxClient.getClientConnectionManager(), httpParams);
+
+        // start polling thread
+        LOG.info("Started event polling thread for " + cachedBoxClient);
+
+        final long startStreamPosition = streamPosition;
+        pollFuture = executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+
+                final ObjectMapper mapper = new ObjectMapper();
+
+                long currentStreamPosition = startStreamPosition;
+                BoxRealTimeServer realTimeServer = null;
+
+                boolean retry = false;
+                int retries = 0;
+                int maxRetries = 1;
+
+                while (!done) {
+                    try {
+                        // set to true if no exceptions thrown
+                        retry = false;
+
+                        if (realTimeServer == null) {
+
+                            // get RTS URL
+                            realTimeServer = 
getBoxRealTimeServer(currentStreamPosition, eventsManager);
+
+                            // update HTTP timeout
+                            final int requestTimeout = Integer.parseInt(
+                                
realTimeServer.getExtraData(RETRY_TIMEOUT).toString());
+                            final HttpParams params = httpClient.getParams();
+                            HttpConnectionParams.setSoTimeout(params, 
requestTimeout * 1000);
+
+                            // update maxRetries
+                            maxRetries = 
Integer.parseInt(realTimeServer.getExtraData(MAX_RETRIES).toString());
+                        }
+
+                        // create HTTP request for RTS
+                        httpGet = getPollRequest(realTimeServer.getUrl(), 
currentStreamPosition);
+
+                        // execute RTS poll
+                        HttpResponse httpResponse = null;
+                        try {
+                            httpResponse = httpClient.execute(httpGet, 
(HttpContext) null);
+                        } catch (SocketTimeoutException e) {
+                            LOG.debug("Poll timed out, retrying for " + 
cachedBoxClient);
+                        }
+
+                        if (httpResponse != null) {
+
+                            // parse response
+                            final StatusLine statusLine = 
httpResponse.getStatusLine();
+                            if (statusLine != null && 
statusLine.getStatusCode() == HttpStatus.SC_OK) {
+                                final HttpEntity entity = 
httpResponse.getEntity();
+                                @SuppressWarnings("unchecked")
+                                Map<String, String> rtsResponse = 
mapper.readValue(entity.getContent(), Map.class);
+
+                                final String message = 
rtsResponse.get(MESSAGE);
+                                if (NEW_CHANGE.equals(message)) {
+
+                                    // get events
+                                    final BoxEventRequestObject requestObject =
+                                        
BoxEventRequestObject.getEventsRequestObject(currentStreamPosition);
+                                    requestObject.setStreamType(streamType);
+                                    requestObject.setLimit(limit);
+                                    final BoxEventCollection events = 
eventsManager.getEvents(requestObject);
+
+                                    // notify callback
+                                    callback.onEvent(events);
+
+                                    // update stream position
+                                    currentStreamPosition = 
events.getNextStreamPosition();
+
+                                } else if (RECONNECT.equals(message) || 
MAX_RETRIES.equals(message)) {
+                                    LOG.debug("Long poll reconnect for " + 
cachedBoxClient);
+                                    realTimeServer = null;
+                                } else if (OUT_OF_DATE.equals(message)) {
+                                    // update currentStreamPosition
+                                    LOG.debug("Long poll out of date for " + 
cachedBoxClient);
+                                    currentStreamPosition = 
getCurrentStreamPosition(eventsManager,
+                                        
BoxEventRequestObject.STREAM_POSITION_NOW);
+                                    realTimeServer = null;
+                                } else {
+                                    throw new RuntimeCamelException("Unknown 
poll response " + message);
+                                }
+                            } else {
+                                String msg = "Unknown error";
+                                if (statusLine != null) {
+                                    msg = String.format("Error polling events 
for %s: code=%s, message=%s",
+                                        cachedBoxClient, 
statusLine.getStatusCode(), statusLine.getReasonPhrase());
+                                }
+                                throw new RuntimeCamelException(msg);
+                            }
+                        }
+
+                        // keep polling
+                        retry = true;
+
+                    } catch (InterruptedException e) {
+                        LOG.debug("Interrupted event polling thread for {}, 
exiting...", cachedBoxClient);
+                    } catch (BoxSDKException e) {
+                        callback.onException(e);
+                    } catch (RuntimeCamelException e) {
+                        callback.onException(e);
+                    } catch (SocketException e) {
+                        // TODO handle connection aborts!!!
+                        LOG.debug("Socket exception while event polling for 
{}", cachedBoxClient);
+                        retry = true;
+                        realTimeServer = null;
+                    } catch (Exception e) {
+                        callback.onException(new RuntimeCamelException("Error 
while polling for "
+                            + cachedBoxClient + ": " + e.getMessage(), e));
+                    } finally {
+                        // are we done yet?
+                        if (!retry) {
+                            done = true;
+                        } else {
+                            if (realTimeServer != null
+                                && (++retries > maxRetries)) {
+                                // make another option call
+                                realTimeServer = null;
+                            }
+                        }
+                    }
+                }
+                LOG.info("Stopped event polling thread for " + 
cachedBoxClient);
+            }
+        });
+    }
+
+    private long getCurrentStreamPosition(IBoxEventsManager eventsManager, 
long streamPosition)
+        throws BoxRestException, BoxServerException, AuthFatalFailureException 
{
+
+        final BoxEventRequestObject requestObject =
+            BoxEventRequestObject.getEventsRequestObject(streamPosition);
+        final BoxEventCollection events = 
eventsManager.getEvents(requestObject);
+        streamPosition = events.getNextStreamPosition();
+        return streamPosition;
+    }
+
+    public void stopPolling() throws Exception {
+        if (!done) {
+
+            // done polling
+            done = true;
+
+            // make sure an HTTP GET is not in progress
+            if (httpGet != null && !httpGet.isAborted()) {
+                httpGet.abort();
+            }
+
+            // cancel polling thread
+            if (pollFuture.cancel(true)) {
+                LOG.info("Stopped event polling for " + cachedBoxClient);
+            } else {
+                LOG.warn("Unable to stop event polling for " + 
cachedBoxClient);
+            }
+
+            httpClient = null;
+            pollFuture = null;
+        }
+    }
+
+    private BoxRealTimeServer getBoxRealTimeServer(long currentStreamPosition, 
IBoxEventsManager eventsManager)
+        throws BoxRestException, BoxServerException, AuthFatalFailureException 
{
+
+        final BoxEventRequestObject optionsRequest =
+            
BoxEventRequestObject.getEventsRequestObject(currentStreamPosition);
+
+        final BoxCollection eventOptions = 
eventsManager.getEventOptions(optionsRequest);
+        final ArrayList<BoxTypedObject> entries = eventOptions.getEntries();
+
+        // validate options
+        if (entries == null || entries.size() < 1
+            || !(entries.get(0) instanceof BoxRealTimeServer)) {
+
+            throw new RuntimeCamelException("No Real Time Server from event 
options for " + cachedBoxClient);
+        }
+
+        return (BoxRealTimeServer) entries.get(0);
+    }
+
+    private HttpGet getPollRequest(String url, long currentStreamPosition) 
throws AuthFatalFailureException {
+
+        final HttpGet httpGet = new HttpGet(url + "&stream_position=" + 
currentStreamPosition);
+        final String accessToken = 
cachedBoxClient.getBoxClient().getAuthData().getAccessToken();
+        httpGet.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken);
+        return httpGet;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java
 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java
index 684d1ba..7a62796 100644
--- 
a/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java
+++ 
b/components/camel-box/src/main/java/org/apache/camel/component/box/internal/OAuthHelperListener.java
@@ -41,6 +41,7 @@ class OAuthHelperListener implements OAuthRefreshListener {
 
     @Override
     public void onRefresh(IAuthData newAuthData) {
+
         // look for refresh token update or revocation
         if (authSecureStorage != null
             && (newAuthData == null || 
!newAuthData.getRefreshToken().equals(refreshToken))) {
@@ -49,5 +50,8 @@ class OAuthHelperListener implements OAuthRefreshListener {
         if (configListener != null) {
             configListener.onRefresh(newAuthData);
         }
+
+        // update cached refresh token
+        refreshToken = newAuthData != null ? newAuthData.getRefreshToken() : 
null;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/signatures/long-polling-events-manager.txt
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/signatures/long-polling-events-manager.txt 
b/components/camel-box/src/signatures/long-polling-events-manager.txt
new file mode 100644
index 0000000..57ea578
--- /dev/null
+++ b/components/camel-box/src/signatures/long-polling-events-manager.txt
@@ -0,0 +1 @@
+void poll(long streamPosition, final String streamType, final int limit, final 
org.apache.camel.component.box.internal.EventCallback callback);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java
 
b/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java
index 7d2c33c..130bcaf 100644
--- 
a/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java
+++ 
b/components/camel-box/src/test/java/org/apache/camel/component/box/AbstractBoxTestSupport.java
@@ -44,6 +44,7 @@ import org.junit.AfterClass;
 public abstract class AbstractBoxTestSupport extends CamelTestSupport {
 
     protected static final String CAMEL_TEST_TAG = "camel_was_here";
+    protected static final String CAMEL_TEST_FILE = "CamelTestFile";
     private static final String LINE_SEPARATOR = 
System.getProperty("line.separator");
     private static final String TEST_OPTIONS_PROPERTIES = 
"/test-options.properties";
     private static final String REFRESH_TOKEN_PROPERTY = "refreshToken";

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java
 
b/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java
index 03f483d..5fd8239 100644
--- 
a/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java
+++ 
b/components/camel-box/src/test/java/org/apache/camel/component/box/IBoxFilesManagerIntegrationTest.java
@@ -54,7 +54,6 @@ public class IBoxFilesManagerIntegrationTest extends 
AbstractBoxTestSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IBoxFilesManagerIntegrationTest.class);
     private static final String PATH_PREFIX = BoxApiName.FILES.getName();
-    private static final String CAMEL_TEST_FILE = "CamelTestFile";
     private static final BoxImageRequestObject BOX_IMAGE_REQUEST_OBJECT = 
BoxImageRequestObject.previewRequestObject();
     private static final String PNG_EXTENSION = "png";
     private static final String TEST_UPLOAD_FILE = "/log4j.properties";

http://git-wip-us.apache.org/repos/asf/camel/blob/75fc0671/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java
 
b/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java
new file mode 100644
index 0000000..9502152
--- /dev/null
+++ 
b/components/camel-box/src/test/java/org/apache/camel/component/box/LongPollingEventsManagerIntegrationTest.java
@@ -0,0 +1,108 @@
+/*
+ * Camel Api Route test generated by camel-component-util-maven-plugin
+ * Generated on: Mon Jun 30 14:29:45 PDT 2014
+ */
+package org.apache.camel.component.box;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.box.boxjavalibv2.dao.BoxEvent;
+import com.box.boxjavalibv2.dao.BoxFile;
+import com.box.boxjavalibv2.requests.requestobjects.BoxItemCopyRequestObject;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.box.internal.BoxApiName;
+import org.apache.camel.component.box.internal.BoxConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test class for 
org.apache.camel.component.box.internal.LongPollingEventsManager APIs.
+ */
+public class LongPollingEventsManagerIntegrationTest extends 
AbstractBoxTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LongPollingEventsManagerIntegrationTest.class);
+
+    @Test
+    public void testPoll() throws Exception {
+
+        // generate file copy event
+        final Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("CamelBox.fileId", testFileId);
+        final BoxItemCopyRequestObject requestObject = 
BoxItemCopyRequestObject.copyItemRequestObject("0");
+        requestObject.setName(CAMEL_TEST_FILE);
+        headers.put("CamelBox.itemCopyRequest", requestObject);
+
+        BoxFile result = requestBodyAndHeaders("direct://COPYFILE", null, 
headers);
+
+        assertNotNull("copyFile result", result);
+        LOG.debug("copyFile: " + result);
+
+        // generate file delete event
+        headers.clear();
+        headers.put("CamelBox.fileId", result.getId());
+        headers.put("CamelBox.defaultRequest", null);
+        requestBodyAndHeaders("direct://DELETEFILE", null, headers);
+
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:boxEvents");
+        mockEndpoint.expectedMinimumMessageCount(2);
+        mockEndpoint.setResultWaitTime(TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.SECONDS));
+        mockEndpoint.assertIsSatisfied();
+
+        final List<Exchange> exchanges = mockEndpoint.getExchanges();
+        assertNotNull("poll result", exchanges);
+        assertFalse("poll result", exchanges.isEmpty());
+        LOG.debug("poll result: " + exchanges);
+
+        for (Exchange exchange : exchanges) {
+
+            assertNotNull("poll result " + BoxConstants.CHUNK_SIZE_PROPERTY,
+                exchange.getIn().getHeader(BoxConstants.CHUNK_SIZE_PROPERTY));
+            assertNotNull("poll result " + 
BoxConstants.NEXT_STREAM_POSITION_PROPERTY,
+                
exchange.getIn().getHeader(BoxConstants.NEXT_STREAM_POSITION_PROPERTY));
+
+            final Object body = exchange.getIn().getBody();
+            assertNotNull("poll result body", body);
+            assertEquals("poll result body type", BoxEvent.class, 
body.getClass());
+
+            final String eventType = ((BoxEvent)body).getEventType();
+            assertTrue("poll result type",
+                BoxEvent.EVENT_TYPE_ITEM_COPY.equals(eventType) || 
BoxEvent.EVENT_TYPE_ITEM_TRASH.equals(eventType));
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                // test route for copyFile
+                from("direct://COPYFILE")
+                    .to("box://" + BoxApiName.FILES.getName() + "/copyFile");
+
+                // test route for deleteFile
+                from("direct://DELETEFILE")
+                    .to("box://" + BoxApiName.FILES.getName() + "/deleteFile");
+
+                // test route for poll
+                from("box://" + BoxApiName.POLL_EVENTS.getName() + 
"/poll?streamPosition=-1&streamType=all&limit=100")
+                    .to("mock:boxEvents");
+
+            }
+        };
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        // also test revoke on shutdown
+        final CamelContext camelContext = super.createCamelContext();
+        final BoxComponent box = (BoxComponent) 
camelContext.getComponent("box");
+        box.getConfiguration().setRevokeOnShutdown(true);
+        return camelContext;
+    }
+}

Reply via email to