This is an automated email from the ASF dual-hosted git repository.

zbendhiba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9700b2c  CAMEL-16439: improve timeout and retry strategy (#5287)
9700b2c is described below

commit 9700b2caffc55c3b07414ce20a3faf5a59087815
Author: Zineb BENDHIBA <bendhiba.zi...@gmail.com>
AuthorDate: Thu Apr 1 16:36:49 2021 +0200

    CAMEL-16439: improve timeout and retry strategy (#5287)
---
 .../couchbase/CouchbaseCollectionOperation.java    | 102 +++++++++++++++++++++
 .../component/couchbase/CouchbaseConsumer.java     |   9 +-
 .../component/couchbase/CouchbaseEndpoint.java     |  10 ++
 .../component/couchbase/CouchbaseProducer.java     |  42 +++------
 4 files changed, 128 insertions(+), 35 deletions(-)

diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java
new file mode 100644
index 0000000..58fdd5f
--- /dev/null
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.couchbase;
+
+import java.time.Duration;
+
+import com.couchbase.client.core.retry.BestEffortRetryStrategy;
+import com.couchbase.client.java.Collection;
+import com.couchbase.client.java.kv.GetOptions;
+import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.MutationResult;
+import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.RemoveOptions;
+import com.couchbase.client.java.kv.ReplicateTo;
+import com.couchbase.client.java.kv.UpsertOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CouchbaseCollectionOperation {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CouchbaseCollectionOperation.class);
+
+    private CouchbaseCollectionOperation() {
+    }
+
+    /**
+     * Adds or updates a document in a Collection
+     *
+     * @param  collection
+     * @param  id
+     * @param  expiry
+     * @param  obj
+     * @param  persistTo
+     * @param  replicateTo
+     * @param  writeQueryTimeout
+     * @param  producerRetryPause
+     * @return
+     */
+    protected static Boolean setDocument(
+            Collection collection, String id, int expiry, Object obj, 
PersistTo persistTo, ReplicateTo replicateTo,
+            long writeQueryTimeout, long producerRetryPause) {
+
+        UpsertOptions options = UpsertOptions.upsertOptions()
+                .expiry(Duration.ofSeconds(expiry))
+                .durability(persistTo, replicateTo)
+                .timeout(Duration.ofMillis(writeQueryTimeout))
+                
.retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause),
+                        Duration.ofMillis(producerRetryPause), 1));
+
+        MutationResult result = collection.upsert(id, obj, options);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(result.toString());
+        }
+
+        return true;
+    }
+
+    /**
+     * Gets a document from a Collection
+     *
+     * @param  collection
+     * @param  id
+     * @param  queryTimeout
+     * @return
+     */
+    protected static GetResult getDocument(Collection collection, String id, 
long queryTimeout) {
+        GetOptions options = GetOptions.getOptions()
+                .timeout(Duration.ofMillis(queryTimeout));
+        return collection.get(id, options);
+    }
+
+    /**
+     * Removes a document from a Collection
+     *
+     * @param  collection
+     * @param  id
+     * @param  writeQueryTimeout
+     * @param  producerRetryPause
+     * @return
+     */
+    protected static MutationResult removeDocument(
+            Collection collection, String id, long writeQueryTimeout, long 
producerRetryPause) {
+        RemoveOptions options = RemoveOptions.removeOptions()
+                .timeout(Duration.ofMillis(writeQueryTimeout))
+                
.retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause),
+                        Duration.ofMillis(producerRetryPause), 1));
+        return collection.remove(id, options);
+    }
+}
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 18c2e98..e3a899e 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -104,9 +104,6 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
     protected synchronized int poll() throws Exception {
         ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), 
endpoint.getViewName(), this.viewOptions);
 
-        LOG.info("Received result set from Couchbase");
-        Collection collection = bucket.defaultCollection();
-
         if (LOG.isTraceEnabled()) {
             LOG.trace("ViewResponse =  {}", result);
         }
@@ -116,7 +113,7 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
             Object doc;
             String id = row.id().get();
             if (endpoint.isFullDocument()) {
-                doc = collection.get(id);
+                doc = CouchbaseCollectionOperation.getDocument(collection, id, 
endpoint.getQueryTimeout());
             } else {
                 doc = row.valueAs(Object.class);
             }
@@ -137,8 +134,8 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Deleting doc with ID {}", id);
                     }
-
-                    collection.remove(id);
+                    CouchbaseCollectionOperation.removeDocument(collection, 
id, endpoint.getWriteQueryTimeout(),
+                            endpoint.getProducerRetryPause());
                 } else if 
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Filtering out ID {}", id);
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java
index cc294a7..6c13a2e 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java
@@ -551,4 +551,14 @@ public class CouchbaseEndpoint extends 
ScheduledPollEndpoint {
 
         return cluster.bucket(bucket);
     }
+
+    /**
+     * Compares retry strategy with query timeout and gets the higher value : 
for write operations with retry
+     *
+     * @return
+     */
+    public long getWriteQueryTimeout() {
+        long retryTimeout = producerRetryAttempts * (long) producerRetryPause;
+        return retryTimeout > queryTimeout ? retryTimeout : queryTimeout;
+    }
 }
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java
index d7cae32..7454761 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java
@@ -16,18 +16,16 @@
  */
 package org.apache.camel.component.couchbase;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.couchbase.client.core.retry.BestEffortRetryStrategy;
 import com.couchbase.client.java.Bucket;
 import com.couchbase.client.java.Collection;
 import com.couchbase.client.java.Scope;
+import com.couchbase.client.java.kv.GetResult;
 import com.couchbase.client.java.kv.MutationResult;
 import com.couchbase.client.java.kv.PersistTo;
 import com.couchbase.client.java.kv.ReplicateTo;
-import com.couchbase.client.java.kv.UpsertOptions;
 import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultProducer;
 import org.slf4j.Logger;
@@ -53,8 +51,9 @@ public class CouchbaseProducer extends DefaultProducer {
     private final Collection collection;
     private final PersistTo persistTo;
     private final ReplicateTo replicateTo;
-    private final int producerRetryAttempts;
     private final int producerRetryPause;
+    private final long queryTimeout;
+    private final long writeQueryTimeout;
 
     public CouchbaseProducer(CouchbaseEndpoint endpoint, Bucket client, int 
persistTo, int replicateTo) {
         super(endpoint);
@@ -77,8 +76,11 @@ public class CouchbaseProducer extends DefaultProducer {
         if (endpoint.isAutoStartIdForInserts()) {
             this.startId.set(endpoint.getStartingIdForInsertsFrom());
         }
-        this.producerRetryAttempts = endpoint.getProducerRetryAttempts();
+
+        // timeout and retry strategy
         this.producerRetryPause = endpoint.getProducerRetryPause();
+        this.writeQueryTimeout = endpoint.getWriteQueryTimeout();
+        this.queryTimeout = endpoint.getQueryTimeout();
 
         switch (persistTo) {
             case 0:
@@ -136,14 +138,17 @@ public class CouchbaseProducer extends DefaultProducer {
         if (endpoint.getOperation().equals(COUCHBASE_PUT)) {
             LOG.trace("Type of operation: PUT");
             Object obj = exchange.getIn().getBody();
-            exchange.getMessage().setBody(setDocument(id, ttl, obj, persistTo, 
replicateTo));
+            Boolean result = 
CouchbaseCollectionOperation.setDocument(collection, id, ttl, obj, persistTo, 
replicateTo,
+                    writeQueryTimeout, producerRetryPause);
+            exchange.getMessage().setBody(result);
         } else if (endpoint.getOperation().equals(COUCHBASE_GET)) {
             LOG.trace("Type of operation: GET");
-            Object result = collection.get(id);
+            GetResult result = 
CouchbaseCollectionOperation.getDocument(collection, id, queryTimeout);
             exchange.getMessage().setBody(result);
         } else if (endpoint.getOperation().equals(COUCHBASE_DELETE)) {
             LOG.trace("Type of operation: DELETE");
-            MutationResult result = collection.remove(id);
+            MutationResult result
+                    = CouchbaseCollectionOperation.removeDocument(collection, 
id, writeQueryTimeout, producerRetryPause);
             exchange.getMessage().setBody(result.toString());
         }
         // cleanup the cache headers
@@ -158,25 +163,4 @@ public class CouchbaseProducer extends DefaultProducer {
         }
     }
 
-    private Boolean setDocument(String id, int expiry, Object obj, PersistTo 
persistTo, ReplicateTo replicateTo) {
-        return setDocument(id, expiry, obj, producerRetryAttempts, persistTo, 
replicateTo);
-    }
-
-    private Boolean setDocument(
-            String id, int expiry, Object obj, int retryAttempts, PersistTo 
persistTo, ReplicateTo replicateTo) {
-
-        UpsertOptions options = UpsertOptions.upsertOptions()
-                .expiry(Duration.ofSeconds(expiry))
-                .durability(persistTo, replicateTo)
-                .timeout(Duration.ofMillis(retryAttempts * (long) 
producerRetryPause))
-                
.retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause),
-                        Duration.ofMillis(producerRetryPause), 1));
-
-        MutationResult result = collection.upsert(id, obj, options);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(result.toString());
-        }
-
-        return true;
-    }
 }

Reply via email to