This is an automated email from the ASF dual-hosted git repository. zbendhiba pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 9700b2c CAMEL-16439: improve timeout and retry strategy (#5287) 9700b2c is described below commit 9700b2caffc55c3b07414ce20a3faf5a59087815 Author: Zineb BENDHIBA <bendhiba.zi...@gmail.com> AuthorDate: Thu Apr 1 16:36:49 2021 +0200 CAMEL-16439: improve timeout and retry strategy (#5287) --- .../couchbase/CouchbaseCollectionOperation.java | 102 +++++++++++++++++++++ .../component/couchbase/CouchbaseConsumer.java | 9 +- .../component/couchbase/CouchbaseEndpoint.java | 10 ++ .../component/couchbase/CouchbaseProducer.java | 42 +++------ 4 files changed, 128 insertions(+), 35 deletions(-) diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java new file mode 100644 index 0000000..58fdd5f --- /dev/null +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.couchbase; + +import java.time.Duration; + +import com.couchbase.client.core.retry.BestEffortRetryStrategy; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.kv.GetOptions; +import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.RemoveOptions; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class CouchbaseCollectionOperation { + private static final Logger LOG = LoggerFactory.getLogger(CouchbaseCollectionOperation.class); + + private CouchbaseCollectionOperation() { + } + + /** + * Adds or updates a document in a Collection + * + * @param collection + * @param id + * @param expiry + * @param obj + * @param persistTo + * @param replicateTo + * @param writeQueryTimeout + * @param producerRetryPause + * @return + */ + protected static Boolean setDocument( + Collection collection, String id, int expiry, Object obj, PersistTo persistTo, ReplicateTo replicateTo, + long writeQueryTimeout, long producerRetryPause) { + + UpsertOptions options = UpsertOptions.upsertOptions() + .expiry(Duration.ofSeconds(expiry)) + .durability(persistTo, replicateTo) + .timeout(Duration.ofMillis(writeQueryTimeout)) + .retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause), + Duration.ofMillis(producerRetryPause), 1)); + + MutationResult result = collection.upsert(id, obj, options); + if (LOG.isDebugEnabled()) { + LOG.debug(result.toString()); + } + + return true; + } + + /** + * Gets a document from a Collection + * + * @param collection + * @param id + * @param queryTimeout + * @return + */ + protected static GetResult getDocument(Collection collection, String id, long queryTimeout) { + GetOptions options = GetOptions.getOptions() + .timeout(Duration.ofMillis(queryTimeout)); + return collection.get(id, options); + } + + /** + * Removes a document from a Collection + * + * @param collection + * @param id + * @param writeQueryTimeout + * @param producerRetryPause + * @return + */ + protected static MutationResult removeDocument( + Collection collection, String id, long writeQueryTimeout, long producerRetryPause) { + RemoveOptions options = RemoveOptions.removeOptions() + .timeout(Duration.ofMillis(writeQueryTimeout)) + .retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause), + Duration.ofMillis(producerRetryPause), 1)); + return collection.remove(id, options); + } +} diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java index 18c2e98..e3a899e 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -104,9 +104,6 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { protected synchronized int poll() throws Exception { ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(), this.viewOptions); - LOG.info("Received result set from Couchbase"); - Collection collection = bucket.defaultCollection(); - if (LOG.isTraceEnabled()) { LOG.trace("ViewResponse = {}", result); } @@ -116,7 +113,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { Object doc; String id = row.id().get(); if (endpoint.isFullDocument()) { - doc = collection.get(id); + doc = CouchbaseCollectionOperation.getDocument(collection, id, endpoint.getQueryTimeout()); } else { doc = row.valueAs(Object.class); } @@ -137,8 +134,8 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { if (LOG.isTraceEnabled()) { LOG.trace("Deleting doc with ID {}", id); } - - collection.remove(id); + CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(), + endpoint.getProducerRetryPause()); } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { if (LOG.isTraceEnabled()) { LOG.trace("Filtering out ID {}", id); diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java index cc294a7..6c13a2e 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java @@ -551,4 +551,14 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint { return cluster.bucket(bucket); } + + /** + * Compares retry strategy with query timeout and gets the higher value : for write operations with retry + * + * @return + */ + public long getWriteQueryTimeout() { + long retryTimeout = producerRetryAttempts * (long) producerRetryPause; + return retryTimeout > queryTimeout ? retryTimeout : queryTimeout; + } } diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java index d7cae32..7454761 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java @@ -16,18 +16,16 @@ */ package org.apache.camel.component.couchbase; -import java.time.Duration; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import com.couchbase.client.core.retry.BestEffortRetryStrategy; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Collection; import com.couchbase.client.java.Scope; +import com.couchbase.client.java.kv.GetResult; import com.couchbase.client.java.kv.MutationResult; import com.couchbase.client.java.kv.PersistTo; import com.couchbase.client.java.kv.ReplicateTo; -import com.couchbase.client.java.kv.UpsertOptions; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultProducer; import org.slf4j.Logger; @@ -53,8 +51,9 @@ public class CouchbaseProducer extends DefaultProducer { private final Collection collection; private final PersistTo persistTo; private final ReplicateTo replicateTo; - private final int producerRetryAttempts; private final int producerRetryPause; + private final long queryTimeout; + private final long writeQueryTimeout; public CouchbaseProducer(CouchbaseEndpoint endpoint, Bucket client, int persistTo, int replicateTo) { super(endpoint); @@ -77,8 +76,11 @@ public class CouchbaseProducer extends DefaultProducer { if (endpoint.isAutoStartIdForInserts()) { this.startId.set(endpoint.getStartingIdForInsertsFrom()); } - this.producerRetryAttempts = endpoint.getProducerRetryAttempts(); + + // timeout and retry strategy this.producerRetryPause = endpoint.getProducerRetryPause(); + this.writeQueryTimeout = endpoint.getWriteQueryTimeout(); + this.queryTimeout = endpoint.getQueryTimeout(); switch (persistTo) { case 0: @@ -136,14 +138,17 @@ public class CouchbaseProducer extends DefaultProducer { if (endpoint.getOperation().equals(COUCHBASE_PUT)) { LOG.trace("Type of operation: PUT"); Object obj = exchange.getIn().getBody(); - exchange.getMessage().setBody(setDocument(id, ttl, obj, persistTo, replicateTo)); + Boolean result = CouchbaseCollectionOperation.setDocument(collection, id, ttl, obj, persistTo, replicateTo, + writeQueryTimeout, producerRetryPause); + exchange.getMessage().setBody(result); } else if (endpoint.getOperation().equals(COUCHBASE_GET)) { LOG.trace("Type of operation: GET"); - Object result = collection.get(id); + GetResult result = CouchbaseCollectionOperation.getDocument(collection, id, queryTimeout); exchange.getMessage().setBody(result); } else if (endpoint.getOperation().equals(COUCHBASE_DELETE)) { LOG.trace("Type of operation: DELETE"); - MutationResult result = collection.remove(id); + MutationResult result + = CouchbaseCollectionOperation.removeDocument(collection, id, writeQueryTimeout, producerRetryPause); exchange.getMessage().setBody(result.toString()); } // cleanup the cache headers @@ -158,25 +163,4 @@ public class CouchbaseProducer extends DefaultProducer { } } - private Boolean setDocument(String id, int expiry, Object obj, PersistTo persistTo, ReplicateTo replicateTo) { - return setDocument(id, expiry, obj, producerRetryAttempts, persistTo, replicateTo); - } - - private Boolean setDocument( - String id, int expiry, Object obj, int retryAttempts, PersistTo persistTo, ReplicateTo replicateTo) { - - UpsertOptions options = UpsertOptions.upsertOptions() - .expiry(Duration.ofSeconds(expiry)) - .durability(persistTo, replicateTo) - .timeout(Duration.ofMillis(retryAttempts * (long) producerRetryPause)) - .retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause), - Duration.ofMillis(producerRetryPause), 1)); - - MutationResult result = collection.upsert(id, obj, options); - if (LOG.isDebugEnabled()) { - LOG.debug(result.toString()); - } - - return true; - } }