This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 025549c CAMEL-14319: upgrade couchbase java client to 3.0.5 (#3944) 025549c is described below commit 025549c010e467d59ff52fde8f87e3daa0ae9cf1 Author: Matej Melko <6814482+mme...@users.noreply.github.com> AuthorDate: Fri Jun 26 07:32:48 2020 +0200 CAMEL-14319: upgrade couchbase java client to 3.0.5 (#3944) refactor component to work with newer client remove options that were removed in newer clients add possibility to specify collection and scope for newer couchbase --- camel-dependencies/pom.xml | 2 +- .../apache/camel/catalog/components/couchbase.json | 11 +- .../camel/catalog/docs/couchbase-component.adoc | 17 +- components/camel-couchbase/pom.xml | 2 +- .../couchbase/CouchbaseEndpointConfigurer.java | 51 +- .../camel/component/couchbase/couchbase.json | 11 +- .../src/main/docs/couchbase-component.adoc | 17 +- .../component/couchbase/CouchbaseComponent.java | 88 +-- .../component/couchbase/CouchbaseConstants.java | 9 +- .../component/couchbase/CouchbaseConsumer.java | 80 ++- .../component/couchbase/CouchbaseEndpoint.java | 206 +++---- .../component/couchbase/CouchbaseException.java | 74 +-- .../component/couchbase/CouchbaseProducer.java | 84 +-- ...onsumeBeerMessagesWithLimitIntegrationTest.java | 5 +- .../couchbase/CouchbaseComponentTest.java | 308 +++++----- .../component/couchbase/CouchbaseEndpointTest.java | 39 +- .../component/couchbase/CouchbaseProducerTest.java | 134 +---- .../couchbase/ProduceMessagesSimpleTest.java | 7 +- .../ProduceMessagesWithAutoIDIntegrationTest.java | 98 +-- .../couchbase/RemoveMessagesIntegrationTest.java | 2 +- .../dsl/CouchbaseEndpointBuilderFactory.java | 655 +++------------------ .../modules/ROOT/pages/couchbase-component.adoc | 17 +- parent/pom.xml | 2 +- 23 files changed, 631 insertions(+), 1288 deletions(-) diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index 9513e35..834df85 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -150,7 +150,7 @@ <conscrypt-uber-version>2.2.1</conscrypt-uber-version> <consul-client-version>1.3.3</consul-client-version> <corda-version>4.4</corda-version> - <couchbase-client-version>1.4.13</couchbase-client-version> + <couchbase-client-version>3.0.5</couchbase-client-version> <curator-version>4.3.0</curator-version> <cxf-codegen-plugin-version>3.3.5</cxf-codegen-plugin-version> <cxf-version>3.3.6</cxf-version> diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchbase.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchbase.json index c9c284f..08b28ca 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchbase.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchbase.json @@ -30,7 +30,9 @@ "hostname": { "kind": "path", "displayName": "Hostname", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "description": "The hostname to use" }, "port": { "kind": "path", "displayName": "Port", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "8091", "description": "The port number to use" }, "bucket": { "kind": "parameter", "displayName": "Bucket", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The bucket to use" }, + "collection": { "kind": "parameter", "displayName": "Collection", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "_default", "description": "The collection to use" }, "key": { "kind": "parameter", "displayName": "Key", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The key to use" }, + "scope": { "kind": "parameter", "displayName": "Scope", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "_default", "description": "The scope to use" }, "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...] "consumerProcessedStrategy": { "kind": "parameter", "displayName": "Consumer Processed Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "none", "description": "Define the consumer Processed strategy to use" }, "descending": { "kind": "parameter", "displayName": "Descending", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Define if this operation is descending or not" }, @@ -54,15 +56,8 @@ "startingIdForInsertsFrom": { "kind": "parameter", "displayName": "Starting Id For Inserts From", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "description": "Define the starting Id where we are doing an insert operation" }, "additionalHosts": { "kind": "parameter", "displayName": "Additional Hosts", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The additional hosts" }, "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" }, - "maxReconnectDelay": { "kind": "parameter", "displayName": "Max Reconnect Delay", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "30000", "description": "Define the max delay during a reconnection" }, - "obsPollInterval": { "kind": "parameter", "displayName": "Obs Poll Interval", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "400", "description": "Define the observation polling interval" }, - "obsTimeout": { "kind": "parameter", "displayName": "Obs Timeout", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "-1", "description": "Define the observation timeout" }, - "opQueueMaxBlockTime": { "kind": "parameter", "displayName": "Op Queue Max Block Time", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "10000", "description": "Define the max time an operation can be in queue blocked" }, - "opTimeOut": { "kind": "parameter", "displayName": "Op Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "2500", "description": "Define the operation timeout" }, - "readBufferSize": { "kind": "parameter", "displayName": "Read Buffer Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "16384", "description": "Define the buffer size" }, - "shouldOptimize": { "kind": "parameter", "displayName": "Should Optimize", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Define if we want to use optimization or not where possible" }, + "queryTimeout": { "kind": "parameter", "displayName": "Query Timeout", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "2500", "description": "Define the operation timeout" }, "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }, - "timeoutExceptionThreshold": { "kind": "parameter", "displayName": "Timeout Exception Threshold", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "998", "description": "Define the threshold for throwing a timeout Exception" }, "backoffErrorThreshold": { "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." }, "backoffIdleThreshold": { "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." }, "backoffMultiplier": { "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/couchbase-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/couchbase-component.adoc index 2d07347..5a63855 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/couchbase-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/couchbase-component.adoc @@ -72,14 +72,16 @@ with the following path and query parameters: |=== -=== Query Parameters (50 parameters): +=== Query Parameters (45 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type | *bucket* (common) | The bucket to use | | String +| *collection* (common) | The collection to use | _default | String | *key* (common) | The key to use | | String +| *scope* (common) | The scope to use | _default | String | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *consumerProcessedStrategy* (consumer) | Define the consumer Processed strategy to use | none | String | *descending* (consumer) | Define if this operation is descending or not | false | boolean @@ -103,15 +105,8 @@ with the following path and query parameters: | *startingIdForInsertsFrom* (producer) | Define the starting Id where we are doing an insert operation | | long | *additionalHosts* (advanced) | The additional hosts | | String | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean -| *maxReconnectDelay* (advanced) | Define the max delay during a reconnection | 30000 | long -| *obsPollInterval* (advanced) | Define the observation polling interval | 400 | long -| *obsTimeout* (advanced) | Define the observation timeout | -1 | long -| *opQueueMaxBlockTime* (advanced) | Define the max time an operation can be in queue blocked | 10000 | long -| *opTimeOut* (advanced) | Define the operation timeout | 2500 | long -| *readBufferSize* (advanced) | Define the buffer size | 16384 | int -| *shouldOptimize* (advanced) | Define if we want to use optimization or not where possible | false | boolean +| *queryTimeout* (advanced) | Define the operation timeout | 2500 | long | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean -| *timeoutExceptionThreshold* (advanced) | Define the threshold for throwing a timeout Exception | 998 | int | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int | *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int | *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int @@ -132,9 +127,9 @@ with the following path and query parameters: // endpoint options: END == Couchbase SDK compatibility -This component is currently using a "Legacy SDK" version of couchbase-client. +Using collections and scopes is supported only for Couchbase Enterprise Server 6.5+. -In order to authenticate with newer versions of Couchbase Server 5.0 and beyond, as per instructions on the https://docs.couchbase.com/java-sdk/2.7/sdk-authentication-overview.html/[CouchBase Java SDK Authentication]: +This component is currently using Java SDK 3.x so it might be not compatible with older Couchbase servers anymore. See the compatibility https://docs.couchbase.com/java-sdk/current/project-docs/compatibility.html[page]. * The value formerly interpreted as a bucket-name is now interpreted as a username. The username must correspond to a user defined on the cluster that is being accessed. * The value formerly interpreted as a bucket-password is now interpreted as the password of the defined user. diff --git a/components/camel-couchbase/pom.xml b/components/camel-couchbase/pom.xml index fb5a552..dda6f96 100644 --- a/components/camel-couchbase/pom.xml +++ b/components/camel-couchbase/pom.xml @@ -38,7 +38,7 @@ <dependency> <groupId>com.couchbase.client</groupId> - <artifactId>couchbase-client</artifactId> + <artifactId>java-client</artifactId> <version>${couchbase-client-version}</version> </dependency> diff --git a/components/camel-couchbase/src/generated/java/org/apache/camel/component/couchbase/CouchbaseEndpointConfigurer.java b/components/camel-couchbase/src/generated/java/org/apache/camel/component/couchbase/CouchbaseEndpointConfigurer.java index 3bbddbd..ad6ecc8 100644 --- a/components/camel-couchbase/src/generated/java/org/apache/camel/component/couchbase/CouchbaseEndpointConfigurer.java +++ b/components/camel-couchbase/src/generated/java/org/apache/camel/component/couchbase/CouchbaseEndpointConfigurer.java @@ -34,6 +34,7 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "bridgeerrorhandler": case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true; case "bucket": target.setBucket(property(camelContext, java.lang.String.class, value)); return true; + case "collection": target.setCollection(property(camelContext, java.lang.String.class, value)); return true; case "consumerprocessedstrategy": case "consumerProcessedStrategy": target.setConsumerProcessedStrategy(property(camelContext, java.lang.String.class, value)); return true; case "delay": target.setDelay(property(camelContext, long.class, value)); return true; @@ -51,16 +52,6 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "limit": target.setLimit(property(camelContext, int.class, value)); return true; - case "maxreconnectdelay": - case "maxReconnectDelay": target.setMaxReconnectDelay(property(camelContext, java.time.Duration.class, value).toMillis()); return true; - case "obspollinterval": - case "obsPollInterval": target.setObsPollInterval(property(camelContext, java.time.Duration.class, value).toMillis()); return true; - case "obstimeout": - case "obsTimeout": target.setObsTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true; - case "opqueuemaxblocktime": - case "opQueueMaxBlockTime": target.setOpQueueMaxBlockTime(property(camelContext, java.time.Duration.class, value).toMillis()); return true; - case "optimeout": - case "opTimeOut": target.setOpTimeOut(property(camelContext, java.time.Duration.class, value).toMillis()); return true; case "operation": target.setOperation(property(camelContext, java.lang.String.class, value)); return true; case "password": target.setPassword(property(camelContext, java.lang.String.class, value)); return true; case "persistto": @@ -71,12 +62,12 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "producerRetryAttempts": target.setProducerRetryAttempts(property(camelContext, int.class, value)); return true; case "producerretrypause": case "producerRetryPause": target.setProducerRetryPause(property(camelContext, int.class, value)); return true; + case "querytimeout": + case "queryTimeout": target.setQueryTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true; case "rangeendkey": case "rangeEndKey": target.setRangeEndKey(property(camelContext, java.lang.String.class, value)); return true; case "rangestartkey": case "rangeStartKey": target.setRangeStartKey(property(camelContext, java.lang.String.class, value)); return true; - case "readbuffersize": - case "readBufferSize": target.setReadBufferSize(property(camelContext, int.class, value)); return true; case "repeatcount": case "repeatCount": target.setRepeatCount(property(camelContext, long.class, value)); return true; case "replicateto": @@ -88,10 +79,9 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "scheduler": target.setScheduler(property(camelContext, java.lang.String.class, value)); return true; case "schedulerproperties": case "schedulerProperties": target.setSchedulerProperties(property(camelContext, java.util.Map.class, value)); return true; + case "scope": target.setScope(property(camelContext, java.lang.String.class, value)); return true; case "sendemptymessagewhenidle": case "sendEmptyMessageWhenIdle": target.setSendEmptyMessageWhenIdle(property(camelContext, boolean.class, value)); return true; - case "shouldoptimize": - case "shouldOptimize": target.setShouldOptimize(property(camelContext, boolean.class, value)); return true; case "skip": target.setSkip(property(camelContext, int.class, value)); return true; case "startscheduler": case "startScheduler": target.setStartScheduler(property(camelContext, boolean.class, value)); return true; @@ -100,8 +90,6 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true; case "timeunit": case "timeUnit": target.setTimeUnit(property(camelContext, java.util.concurrent.TimeUnit.class, value)); return true; - case "timeoutexceptionthreshold": - case "timeoutExceptionThreshold": target.setTimeoutExceptionThreshold(property(camelContext, int.class, value)); return true; case "usefixeddelay": case "useFixedDelay": target.setUseFixedDelay(property(camelContext, boolean.class, value)); return true; case "username": target.setUsername(property(camelContext, java.lang.String.class, value)); return true; @@ -122,6 +110,7 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple answer.put("basicPropertyBinding", boolean.class); answer.put("bridgeErrorHandler", boolean.class); answer.put("bucket", java.lang.String.class); + answer.put("collection", java.lang.String.class); answer.put("consumerProcessedStrategy", java.lang.String.class); answer.put("delay", long.class); answer.put("descending", boolean.class); @@ -133,34 +122,28 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple answer.put("key", java.lang.String.class); answer.put("lazyStartProducer", boolean.class); answer.put("limit", int.class); - answer.put("maxReconnectDelay", long.class); - answer.put("obsPollInterval", long.class); - answer.put("obsTimeout", long.class); - answer.put("opQueueMaxBlockTime", long.class); - answer.put("opTimeOut", long.class); answer.put("operation", java.lang.String.class); answer.put("password", java.lang.String.class); answer.put("persistTo", int.class); answer.put("pollStrategy", org.apache.camel.spi.PollingConsumerPollStrategy.class); answer.put("producerRetryAttempts", int.class); answer.put("producerRetryPause", int.class); + answer.put("queryTimeout", long.class); answer.put("rangeEndKey", java.lang.String.class); answer.put("rangeStartKey", java.lang.String.class); - answer.put("readBufferSize", int.class); answer.put("repeatCount", long.class); answer.put("replicateTo", int.class); answer.put("runLoggingLevel", org.apache.camel.LoggingLevel.class); answer.put("scheduledExecutorService", java.util.concurrent.ScheduledExecutorService.class); answer.put("scheduler", java.lang.String.class); answer.put("schedulerProperties", java.util.Map.class); + answer.put("scope", java.lang.String.class); answer.put("sendEmptyMessageWhenIdle", boolean.class); - answer.put("shouldOptimize", boolean.class); answer.put("skip", int.class); answer.put("startScheduler", boolean.class); answer.put("startingIdForInsertsFrom", long.class); answer.put("synchronous", boolean.class); answer.put("timeUnit", java.util.concurrent.TimeUnit.class); - answer.put("timeoutExceptionThreshold", int.class); answer.put("useFixedDelay", boolean.class); answer.put("username", java.lang.String.class); answer.put("viewName", java.lang.String.class); @@ -186,6 +169,7 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "bridgeerrorhandler": case "bridgeErrorHandler": return target.isBridgeErrorHandler(); case "bucket": return target.getBucket(); + case "collection": return target.getCollection(); case "consumerprocessedstrategy": case "consumerProcessedStrategy": return target.getConsumerProcessedStrategy(); case "delay": return target.getDelay(); @@ -203,16 +187,6 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); case "limit": return target.getLimit(); - case "maxreconnectdelay": - case "maxReconnectDelay": return target.getMaxReconnectDelay(); - case "obspollinterval": - case "obsPollInterval": return target.getObsPollInterval(); - case "obstimeout": - case "obsTimeout": return target.getObsTimeout(); - case "opqueuemaxblocktime": - case "opQueueMaxBlockTime": return target.getOpQueueMaxBlockTime(); - case "optimeout": - case "opTimeOut": return target.getOpTimeOut(); case "operation": return target.getOperation(); case "password": return target.getPassword(); case "persistto": @@ -223,12 +197,12 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "producerRetryAttempts": return target.getProducerRetryAttempts(); case "producerretrypause": case "producerRetryPause": return target.getProducerRetryPause(); + case "querytimeout": + case "queryTimeout": return target.getQueryTimeout(); case "rangeendkey": case "rangeEndKey": return target.getRangeEndKey(); case "rangestartkey": case "rangeStartKey": return target.getRangeStartKey(); - case "readbuffersize": - case "readBufferSize": return target.getReadBufferSize(); case "repeatcount": case "repeatCount": return target.getRepeatCount(); case "replicateto": @@ -240,10 +214,9 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "scheduler": return target.getScheduler(); case "schedulerproperties": case "schedulerProperties": return target.getSchedulerProperties(); + case "scope": return target.getScope(); case "sendemptymessagewhenidle": case "sendEmptyMessageWhenIdle": return target.isSendEmptyMessageWhenIdle(); - case "shouldoptimize": - case "shouldOptimize": return target.isShouldOptimize(); case "skip": return target.getSkip(); case "startscheduler": case "startScheduler": return target.isStartScheduler(); @@ -252,8 +225,6 @@ public class CouchbaseEndpointConfigurer extends PropertyConfigurerSupport imple case "synchronous": return target.isSynchronous(); case "timeunit": case "timeUnit": return target.getTimeUnit(); - case "timeoutexceptionthreshold": - case "timeoutExceptionThreshold": return target.getTimeoutExceptionThreshold(); case "usefixeddelay": case "useFixedDelay": return target.isUseFixedDelay(); case "username": return target.getUsername(); diff --git a/components/camel-couchbase/src/generated/resources/org/apache/camel/component/couchbase/couchbase.json b/components/camel-couchbase/src/generated/resources/org/apache/camel/component/couchbase/couchbase.json index c9c284f..e01ebc1 100644 --- a/components/camel-couchbase/src/generated/resources/org/apache/camel/component/couchbase/couchbase.json +++ b/components/camel-couchbase/src/generated/resources/org/apache/camel/component/couchbase/couchbase.json @@ -30,7 +30,9 @@ "hostname": { "kind": "path", "displayName": "Hostname", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "secret": false, "description": "The hostname to use" }, "port": { "kind": "path", "displayName": "Port", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "8091", "description": "The port number to use" }, "bucket": { "kind": "parameter", "displayName": "Bucket", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The bucket to use" }, + "collection": { "kind": "parameter", "displayName": "Collection", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The collection to use" }, "key": { "kind": "parameter", "displayName": "Key", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The key to use" }, + "scope": { "kind": "parameter", "displayName": "Scope", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The scope to use" }, "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled b [...] "consumerProcessedStrategy": { "kind": "parameter", "displayName": "Consumer Processed Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "none", "description": "Define the consumer Processed strategy to use" }, "descending": { "kind": "parameter", "displayName": "Descending", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Define if this operation is descending or not" }, @@ -54,15 +56,8 @@ "startingIdForInsertsFrom": { "kind": "parameter", "displayName": "Starting Id For Inserts From", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "description": "Define the starting Id where we are doing an insert operation" }, "additionalHosts": { "kind": "parameter", "displayName": "Additional Hosts", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The additional hosts" }, "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" }, - "maxReconnectDelay": { "kind": "parameter", "displayName": "Max Reconnect Delay", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "30000", "description": "Define the max delay during a reconnection" }, - "obsPollInterval": { "kind": "parameter", "displayName": "Obs Poll Interval", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "400", "description": "Define the observation polling interval" }, - "obsTimeout": { "kind": "parameter", "displayName": "Obs Timeout", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "-1", "description": "Define the observation timeout" }, - "opQueueMaxBlockTime": { "kind": "parameter", "displayName": "Op Queue Max Block Time", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "10000", "description": "Define the max time an operation can be in queue blocked" }, - "opTimeOut": { "kind": "parameter", "displayName": "Op Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "2500", "description": "Define the operation timeout" }, - "readBufferSize": { "kind": "parameter", "displayName": "Read Buffer Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "16384", "description": "Define the buffer size" }, - "shouldOptimize": { "kind": "parameter", "displayName": "Should Optimize", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Define if we want to use optimization or not where possible" }, + "queryTimeout": { "kind": "parameter", "displayName": "Query Timeout", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "2500", "description": "Define the operation timeout in milliseconds" }, "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." }, - "timeoutExceptionThreshold": { "kind": "parameter", "displayName": "Timeout Exception Threshold", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "998", "description": "Define the threshold for throwing a timeout Exception" }, "backoffErrorThreshold": { "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." }, "backoffIdleThreshold": { "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." }, "backoffMultiplier": { "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option [...] diff --git a/components/camel-couchbase/src/main/docs/couchbase-component.adoc b/components/camel-couchbase/src/main/docs/couchbase-component.adoc index 2d07347..6278fd3 100644 --- a/components/camel-couchbase/src/main/docs/couchbase-component.adoc +++ b/components/camel-couchbase/src/main/docs/couchbase-component.adoc @@ -72,14 +72,16 @@ with the following path and query parameters: |=== -=== Query Parameters (50 parameters): +=== Query Parameters (45 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type | *bucket* (common) | The bucket to use | | String +| *collection* (common) | The collection to use | | String | *key* (common) | The key to use | | String +| *scope* (common) | The scope to use | | String | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *consumerProcessedStrategy* (consumer) | Define the consumer Processed strategy to use | none | String | *descending* (consumer) | Define if this operation is descending or not | false | boolean @@ -103,15 +105,8 @@ with the following path and query parameters: | *startingIdForInsertsFrom* (producer) | Define the starting Id where we are doing an insert operation | | long | *additionalHosts* (advanced) | The additional hosts | | String | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean -| *maxReconnectDelay* (advanced) | Define the max delay during a reconnection | 30000 | long -| *obsPollInterval* (advanced) | Define the observation polling interval | 400 | long -| *obsTimeout* (advanced) | Define the observation timeout | -1 | long -| *opQueueMaxBlockTime* (advanced) | Define the max time an operation can be in queue blocked | 10000 | long -| *opTimeOut* (advanced) | Define the operation timeout | 2500 | long -| *readBufferSize* (advanced) | Define the buffer size | 16384 | int -| *shouldOptimize* (advanced) | Define if we want to use optimization or not where possible | false | boolean +| *queryTimeout* (advanced) | Define the operation timeout in milliseconds | 2500 | long | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean -| *timeoutExceptionThreshold* (advanced) | Define the threshold for throwing a timeout Exception | 998 | int | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int | *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int | *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int @@ -132,9 +127,9 @@ with the following path and query parameters: // endpoint options: END == Couchbase SDK compatibility -This component is currently using a "Legacy SDK" version of couchbase-client. +Using collections and scopes is supported only for Couchbase Enterprise Server 6.5+. -In order to authenticate with newer versions of Couchbase Server 5.0 and beyond, as per instructions on the https://docs.couchbase.com/java-sdk/2.7/sdk-authentication-overview.html/[CouchBase Java SDK Authentication]: +This component is currently using Java SDK 3.x so it might be not compatible with older Couchbase servers anymore. See the compatibility https://docs.couchbase.com/java-sdk/current/project-docs/compatibility.html[page]. * The value formerly interpreted as a bucket-name is now interpreted as a username. The username must correspond to a user defined on the cluster that is being accessed. * The value formerly interpreted as a bucket-password is now interpreted as the password of the defined user. diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java index 90d72f6..d57a1cf 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseComponent.java @@ -1,44 +1,44 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.couchbase; - -import java.util.Map; - -import org.apache.camel.CamelContext; -import org.apache.camel.spi.annotations.Component; -import org.apache.camel.support.DefaultComponent; - -/** - * Couchbase component. - */ -@Component("couchbase") -public class CouchbaseComponent extends DefaultComponent { - - public CouchbaseComponent() { - } - - public CouchbaseComponent(CamelContext context) { - super(context); - } - - @Override - protected CouchbaseEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - CouchbaseEndpoint endpoint = new CouchbaseEndpoint(uri, remaining, this); - setProperties(endpoint, parameters); - return endpoint; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.couchbase; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.annotations.Component; +import org.apache.camel.support.DefaultComponent; + +/** + * Couchbase component. + */ +@Component("couchbase") +public class CouchbaseComponent extends DefaultComponent { + + public CouchbaseComponent() { + } + + public CouchbaseComponent(CamelContext context) { + super(context); + } + + @Override + protected CouchbaseEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + CouchbaseEndpoint endpoint = new CouchbaseEndpoint(uri, remaining, this); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java index 986c68c..45cbd5a 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConstants.java @@ -38,13 +38,8 @@ public interface CouchbaseConstants { int DEFAULT_PAUSE_BETWEEN_RETRIES = 5000; int DEFAULT_COUCHBASE_PORT = 8091; int DEFAULT_TTL = 0; - long DEFAULT_OP_TIMEOUT = 2500; - int DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD = 998; - int DEFAULT_READ_BUFFER_SIZE = 16384; - long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME = 10000; - long DEFAULT_MAX_RECONNECT_DELAY = 30000; - long DEFAULT_OBS_POLL_INTERVAL = 400; - long DEFAULT_OBS_TIMEOUT = -1; + long DEFAULT_QUERY_TIMEOUT = 2500; + String DEFAULT_CONSUME_PROCESSED_STRATEGY = "none"; } diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java index bc7b979..883329c 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -16,11 +16,15 @@ */ package org.apache.camel.component.couchbase; -import com.couchbase.client.CouchbaseClient; -import com.couchbase.client.protocol.views.Query; -import com.couchbase.client.protocol.views.View; -import com.couchbase.client.protocol.views.ViewResponse; -import com.couchbase.client.protocol.views.ViewRow; +import java.util.List; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.Scope; +import com.couchbase.client.java.view.ViewOptions; +import com.couchbase.client.java.view.ViewOrdering; +import com.couchbase.client.java.view.ViewResult; +import com.couchbase.client.java.view.ViewRow; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.DefaultScheduledPollConsumer; @@ -37,44 +41,54 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class); private final CouchbaseEndpoint endpoint; - private final CouchbaseClient client; - private final View view; - private final Query query; - - public CouchbaseConsumer(CouchbaseEndpoint endpoint, CouchbaseClient client, Processor processor) { + private final Bucket bucket; + private ViewOptions viewOptions; + private Collection collection; + public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor processor) { super(endpoint, processor); - this.client = client; + this.bucket = client; this.endpoint = endpoint; - this.view = client.getView(endpoint.getDesignDocumentName(), endpoint.getViewName()); - this.query = new Query(); + Scope scope; + if (endpoint.getScope() != null) { + scope = client.scope(endpoint.getScope()); + } else { + scope = client.defaultScope(); + } + + if (endpoint.getCollection() != null) { + this.collection = scope.collection(endpoint.getCollection()); + } else { + this.collection = client.defaultCollection(); + } init(); } @Override protected void doInit() { - query.setIncludeDocs(true); - + // query.setIncludeDocs(true); + this.viewOptions = ViewOptions.viewOptions(); int limit = endpoint.getLimit(); if (limit > 0) { - query.setLimit(limit); + viewOptions.limit(limit); } int skip = endpoint.getSkip(); if (skip > 0) { - query.setSkip(skip); + viewOptions.skip(skip); } - query.setDescending(endpoint.isDescending()); + if (endpoint.isDescending()) { + viewOptions.order(ViewOrdering.DESCENDING); + } String rangeStartKey = endpoint.getRangeStartKey(); String rangeEndKey = endpoint.getRangeEndKey(); if ("".equals(rangeStartKey) || "".equals(rangeEndKey)) { return; } - query.setRange(rangeStartKey, rangeEndKey); - + viewOptions.startKey(rangeEndKey).endKey(rangeEndKey); } @Override @@ -87,28 +101,30 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { protected void doStop() throws Exception { LOG.info("Stopping Couchbase consumer"); super.doStop(); - if (client != null) { - client.shutdown(); + if (bucket != null) { + bucket.core().shutdown(); } } @Override protected synchronized int poll() throws Exception { - ViewResponse result = client.query(view, query); + ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(), this.viewOptions); + LOG.info("Received result set from Couchbase"); + Collection collection = bucket.defaultCollection(); + if (LOG.isTraceEnabled()) { - LOG.trace("ViewResponse = {}", result); + LOG.trace("ViewResponse = {}", result); } String consumerProcessedStrategy = endpoint.getConsumerProcessedStrategy(); + for (ViewRow row : result.rows()) { - for (ViewRow row : result) { + String id = row.id().get(); + Object doc = collection.get(id); - String id = row.getId(); - Object doc = row.getDocument(); - - String key = row.getKey(); + String key = (String) row.keyAs(List.class).get().get(0); String designDocumentName = endpoint.getDesignDocumentName(); String viewName = endpoint.getViewName(); @@ -123,7 +139,8 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { if (LOG.isTraceEnabled()) { LOG.trace("Deleting doc with ID {}", id); } - client.delete(id); + + collection.remove(id); } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { if (LOG.isTraceEnabled()) { LOG.trace("Filtering out ID {}", id); @@ -142,11 +159,10 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { } } - return result.size(); + return result.rows().size(); } private void logDetails(String id, Object doc, String key, String designDocumentName, String viewName, Exchange exchange) { - if (LOG.isTraceEnabled()) { LOG.trace("Created exchange = {}", exchange); LOG.trace("Added Document in body = {}", doc); diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java index 8897767..363bc0e 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java @@ -19,14 +19,18 @@ package org.apache.camel.component.couchbase; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; -import com.couchbase.client.CouchbaseClient; -import com.couchbase.client.CouchbaseConnectionFactoryBuilder; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.ClusterOptions; +import com.couchbase.client.java.env.ClusterEnvironment; import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -38,20 +42,15 @@ import org.apache.camel.spi.UriPath; import org.apache.camel.support.ScheduledPollEndpoint; import static org.apache.camel.component.couchbase.CouchbaseConstants.COUCHBASE_PUT; -import static org.apache.camel.component.couchbase.CouchbaseConstants.COUCHBASE_URI_ERROR; +import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_PRODUCER_RETRIES; import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_CONSUME_PROCESSED_STRATEGY; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_COUCHBASE_PORT; import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_DESIGN_DOCUMENT_NAME; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_MAX_RECONNECT_DELAY; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_OBS_POLL_INTERVAL; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_OBS_TIMEOUT; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_OP_QUEUE_MAX_BLOCK_TIME; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_OP_TIMEOUT; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_PAUSE_BETWEEN_RETRIES; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_PRODUCER_RETRIES; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_READ_BUFFER_SIZE; -import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD; import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_VIEWNAME; +import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_PAUSE_BETWEEN_RETRIES; +import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_QUERY_TIMEOUT; +import static org.apache.camel.component.couchbase.CouchbaseConstants.COUCHBASE_URI_ERROR; +import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_COUCHBASE_PORT; + /** * Query Couchbase Views with a poll strategy and/or perform various operations against Couchbase databases. @@ -71,6 +70,12 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint { @UriParam private String bucket; + @UriParam + private String collection; + + @UriParam + private String scope; + // Couchbase key @UriParam private String key; @@ -126,21 +131,8 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint { // Connection fine tuning parameters @UriParam(label = "advanced", defaultValue = "2500", javaType = "java.time.Duration") - private long opTimeOut = DEFAULT_OP_TIMEOUT; - @UriParam(label = "advanced", defaultValue = "998") - private int timeoutExceptionThreshold = DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD; - @UriParam(label = "advanced", defaultValue = "16384") - private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; - @UriParam(label = "advanced", defaultValue = "false") - private boolean shouldOptimize; - @UriParam(label = "advanced", defaultValue = "30000", javaType = "java.time.Duration") - private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY; - @UriParam(label = "advanced", defaultValue = "10000", javaType = "java.time.Duration") - private long opQueueMaxBlockTime = DEFAULT_OP_QUEUE_MAX_BLOCK_TIME; - @UriParam(label = "advanced", defaultValue = "400", javaType = "java.time.Duration") - private long obsPollInterval = DEFAULT_OBS_POLL_INTERVAL; - @UriParam(label = "advanced", defaultValue = "-1", javaType = "java.time.Duration") - private long obsTimeout = DEFAULT_OBS_TIMEOUT; + private long queryTimeout = DEFAULT_QUERY_TIMEOUT; + public CouchbaseEndpoint() { } @@ -225,6 +217,28 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint { this.port = port; } + /** + * The collection to use + */ + public String getCollection() { + return this.collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } + + public String getScope() { + return this.scope; + } + + /** + * The scope to use + */ + public void setScope(String scope) { + this.scope = scope; + } + public String getKey() { return key; } @@ -434,98 +448,21 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint { this.consumerProcessedStrategy = consumerProcessedStrategy; } - public long getOpTimeOut() { - return opTimeOut; - } - - /** - * Define the operation timeout - */ - public void setOpTimeOut(long opTimeOut) { - this.opTimeOut = opTimeOut; - } - - public int getTimeoutExceptionThreshold() { - return timeoutExceptionThreshold; - } - - /** - * Define the threshold for throwing a timeout Exception - */ - public void setTimeoutExceptionThreshold(int timeoutExceptionThreshold) { - this.timeoutExceptionThreshold = timeoutExceptionThreshold; - } - - public int getReadBufferSize() { - return readBufferSize; - } - - /** - * Define the buffer size - */ - public void setReadBufferSize(int readBufferSize) { - this.readBufferSize = readBufferSize; - } - - public boolean isShouldOptimize() { - return shouldOptimize; - } - - /** - * Define if we want to use optimization or not where possible - */ - public void setShouldOptimize(boolean shouldOptimize) { - this.shouldOptimize = shouldOptimize; - } - - public long getMaxReconnectDelay() { - return maxReconnectDelay; - } - - /** - * Define the max delay during a reconnection - */ - public void setMaxReconnectDelay(long maxReconnectDelay) { - this.maxReconnectDelay = maxReconnectDelay; - } - - public long getOpQueueMaxBlockTime() { - return opQueueMaxBlockTime; + public long getQueryTimeout() { + return queryTimeout; } /** - * Define the max time an operation can be in queue blocked + * Define the operation timeout in milliseconds */ - public void setOpQueueMaxBlockTime(long opQueueMaxBlockTime) { - this.opQueueMaxBlockTime = opQueueMaxBlockTime; - } - - public long getObsPollInterval() { - return obsPollInterval; - } - - /** - * Define the observation polling interval - */ - public void setObsPollInterval(long obsPollInterval) { - this.obsPollInterval = obsPollInterval; - } - - public long getObsTimeout() { - return obsTimeout; - } - - /** - * Define the observation timeout - */ - public void setObsTimeout(long obsTimeout) { - this.obsTimeout = obsTimeout; + public void setQueryTimeout(long queryTimeout) { + this.queryTimeout = queryTimeout; } public URI[] makeBootstrapURI() throws URISyntaxException { if (additionalHosts == null || "".equals(additionalHosts)) { - return new URI[] {new URI(protocol + "://" + hostname + ":" + port + "/pools")}; + return new URI[]{new URI(protocol + "://" + hostname + ":" + port + "/pools")}; } return getAllUris(); @@ -554,37 +491,32 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint { return uriArray; } - private CouchbaseClient createClient() throws IOException, URISyntaxException { + //create from couchbase-client + private Bucket createClient() throws IOException, URISyntaxException { List<URI> hosts = Arrays.asList(makeBootstrapURI()); + String connectionString; - CouchbaseConnectionFactoryBuilder cfb = new CouchbaseConnectionFactoryBuilder(); - - if (opTimeOut != DEFAULT_OP_TIMEOUT) { - cfb.setOpTimeout(opTimeOut); + ClusterEnvironment.Builder cfb = ClusterEnvironment.builder(); + if (queryTimeout != DEFAULT_QUERY_TIMEOUT) { + cfb.timeoutConfig().queryTimeout(Duration.ofMillis(queryTimeout)); } - if (timeoutExceptionThreshold != DEFAULT_TIMEOUT_EXCEPTION_THRESHOLD) { - cfb.setTimeoutExceptionThreshold(timeoutExceptionThreshold); - } - if (readBufferSize != DEFAULT_READ_BUFFER_SIZE) { - cfb.setReadBufferSize(readBufferSize); - } - if (shouldOptimize) { - cfb.setShouldOptimize(true); - } - if (maxReconnectDelay != DEFAULT_MAX_RECONNECT_DELAY) { - cfb.setMaxReconnectDelay(maxReconnectDelay); - } - if (opQueueMaxBlockTime != DEFAULT_OP_QUEUE_MAX_BLOCK_TIME) { - cfb.setOpQueueMaxBlockTime(opQueueMaxBlockTime); - } - if (obsPollInterval != DEFAULT_OBS_POLL_INTERVAL) { - cfb.setObsPollInterval(obsPollInterval); - } - if (obsTimeout != DEFAULT_OBS_TIMEOUT) { - cfb.setObsTimeout(obsTimeout); + + ClusterEnvironment env = cfb.build(); + + String addHosts = hosts.stream() + .map(URI::getHost) + .collect(Collectors.joining(",")); + + if (!addHosts.isEmpty()) { + connectionString = addHosts; + } else { + connectionString = hostname; } - return new CouchbaseClient(cfb.buildCouchbaseConnection(hosts, bucket, username, password)); + Cluster cluster = Cluster.connect(connectionString, ClusterOptions + .clusterOptions(username, password) + .environment(env)); + return cluster.bucket(bucket); } } diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java index abacfc2..1193597 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseException.java @@ -1,37 +1,37 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.couchbase; - -import org.apache.camel.CamelExchangeException; -import org.apache.camel.Exchange; - -/** - * Couchbase exception. - */ - -public class CouchbaseException extends CamelExchangeException { - - private static final long serialVersionUID = 1L; - - public CouchbaseException(String message, Exchange exchange) { - super(message, exchange); - } - - public CouchbaseException(String message, Exchange exchange, Throwable cause) { - super(message, exchange, cause); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.couchbase; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; + +/** + * Couchbase exception. + */ + +public class CouchbaseException extends CamelExchangeException { + + private static final long serialVersionUID = 1L; + + public CouchbaseException(String message, Exchange exchange) { + super(message, exchange); + } + + public CouchbaseException(String message, Exchange exchange, Throwable cause) { + super(message, exchange, cause); + } +} diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java index befbf32..7ddc867 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java @@ -16,13 +16,17 @@ */ package org.apache.camel.component.couchbase; +import java.time.Duration; import java.util.Map; -import java.util.concurrent.Future; -import com.couchbase.client.CouchbaseClientIF; -import net.spy.memcached.PersistTo; -import net.spy.memcached.ReplicateTo; -import net.spy.memcached.internal.OperationFuture; +import com.couchbase.client.core.retry.BestEffortRetryStrategy; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.Scope; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplicateTo; +import com.couchbase.client.java.kv.UpsertOptions; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultProducer; import org.slf4j.Logger; @@ -35,6 +39,7 @@ import static org.apache.camel.component.couchbase.CouchbaseConstants.DEFAULT_TT import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_ID; import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_TTL; + /** * Couchbase producer generates various type of operations. PUT, GET, and DELETE * are currently supported @@ -44,17 +49,32 @@ public class CouchbaseProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(CouchbaseProducer.class); private CouchbaseEndpoint endpoint; - private CouchbaseClientIF client; + private Bucket client; + private Collection collection; private long startId; private PersistTo persistTo; private ReplicateTo replicateTo; private int producerRetryAttempts; private int producerRetryPause; - public CouchbaseProducer(CouchbaseEndpoint endpoint, CouchbaseClientIF client, int persistTo, int replicateTo) throws Exception { + public CouchbaseProducer(CouchbaseEndpoint endpoint, Bucket client, int persistTo, int replicateTo) throws Exception { super(endpoint); this.endpoint = endpoint; this.client = client; + Scope scope; + + if (endpoint.getScope() != null) { + scope = client.scope(endpoint.getScope()); + } else { + scope = client.defaultScope(); + } + + if (endpoint.getCollection() != null) { + this.collection = scope.collection(endpoint.getCollection()); + } else { + this.collection = client.defaultCollection(); + } + if (endpoint.isAutoStartIdForInserts()) { this.startId = endpoint.getStartingIdForInsertsFrom(); } @@ -63,13 +83,10 @@ public class CouchbaseProducer extends DefaultProducer { switch (persistTo) { case 0: - this.persistTo = PersistTo.ZERO; + this.persistTo = PersistTo.NONE; break; case 1: - this.persistTo = PersistTo.MASTER; - break; - case 2: - this.persistTo = PersistTo.TWO; + this.persistTo = PersistTo.ACTIVE; break; case 3: this.persistTo = PersistTo.THREE; @@ -83,7 +100,7 @@ public class CouchbaseProducer extends DefaultProducer { switch (replicateTo) { case 0: - this.replicateTo = ReplicateTo.ZERO; + this.replicateTo = ReplicateTo.NONE; break; case 1: this.replicateTo = ReplicateTo.ONE; @@ -102,7 +119,7 @@ public class CouchbaseProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws Exception { - +// Map<String, Object> headers = exchange.getIn().getHeaders(); String id = (headers.containsKey(HEADER_ID)) ? exchange.getIn().getHeader(HEADER_ID, String.class) : endpoint.getId(); @@ -119,17 +136,16 @@ public class CouchbaseProducer extends DefaultProducer { if (endpoint.getOperation().equals(COUCHBASE_PUT)) { LOG.debug("Type of operation: PUT"); Object obj = exchange.getIn().getBody(); - exchange.getOut().setBody(setDocument(id, ttl, obj, persistTo, replicateTo)); + exchange.getMessage().setBody(setDocument(id, ttl, obj, persistTo, replicateTo)); } else if (endpoint.getOperation().equals(COUCHBASE_GET)) { LOG.debug("Type of operation: GET"); - Object result = client.get(id); - exchange.getOut().setBody(result); + Object result = collection.get(id); + exchange.getMessage().setBody(result); } else if (endpoint.getOperation().equals(COUCHBASE_DELETE)) { LOG.debug("Type of operation: DELETE"); - Future<Boolean> result = client.delete(id); - exchange.getOut().setBody(result.get()); + MutationResult result = collection.remove(id); + exchange.getMessage().setBody(result.toString()); } - // cleanup the cache headers exchange.getIn().removeHeader(HEADER_ID); @@ -139,7 +155,7 @@ public class CouchbaseProducer extends DefaultProducer { protected void doStop() throws Exception { super.doStop(); if (client != null) { - client.shutdown(); + client.core().shutdown(); } } @@ -149,21 +165,17 @@ public class CouchbaseProducer extends DefaultProducer { private Boolean setDocument(String id, int expiry, Object obj, int retryAttempts, PersistTo persistTo, ReplicateTo replicateTo) throws Exception { - OperationFuture<Boolean> result = client.set(id, expiry, obj, persistTo, replicateTo); - try { - if (!result.get()) { - throw new Exception("Unable to save Document. " + id); - } - return true; - } catch (Exception e) { - if (retryAttempts <= 0) { - throw e; - } else { - LOG.info("Unable to save Document, retrying in " + producerRetryPause + "ms (" + retryAttempts + ")"); - Thread.sleep(producerRetryPause); - return setDocument(id, expiry, obj, retryAttempts - 1, persistTo, replicateTo); - } + UpsertOptions options = UpsertOptions.upsertOptions() + .expiry(Duration.ofSeconds(expiry)) + .durability(persistTo, replicateTo) + .timeout(Duration.ofMillis(retryAttempts * producerRetryPause)) + .retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(producerRetryPause), Duration.ofMillis(producerRetryPause), 1)); + + MutationResult result = collection.upsert(id, obj, options); + if (LOG.isDebugEnabled()) { + LOG.debug(result.toString()); } - } + return true; + } } diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java index 452cef3..3b1deb4 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ConsumeBeerMessagesWithLimitIntegrationTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.couchbase; +import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit5.CamelTestSupport; @@ -39,7 +41,8 @@ public class ConsumeBeerMessagesWithLimitIntegrationTest extends CamelTestSuppor public void configure() throws Exception { // need couchbase installed on localhost with beer-sample data - from("couchbase:http://localhost/beer-sample?designDocumentName=beer&viewName=brewery_beers&limit=10").to("mock:result"); + from("couchbase:http://localhost/beer-sample?username=root&password=123456&designDocumentName=beer&viewName=brewery_beers&limit=10"). + to("mock:result"); } }; diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java index b5189e3..1bedde8f 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseComponentTest.java @@ -1,154 +1,154 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.couchbase; - -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - -import org.apache.camel.test.junit5.CamelTestSupport; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -public class CouchbaseComponentTest extends CamelTestSupport { - - private CouchbaseComponent component; - - @BeforeEach - @Override - public void setUp() throws Exception { - super.setUp(); - component = context.getComponent("couchbase", CouchbaseComponent.class); - } - - @Override - public boolean isUseRouteBuilder() { - return false; - } - - @Test - public void testEndpointCreated() throws Exception { - Map<String, Object> params = new HashMap<>(); - - String uri = "couchbase:http://localhost:9191/bucket"; - String remaining = "http://localhost:9191/bucket"; - - CouchbaseEndpoint endpoint = component.createEndpoint(uri, remaining, params); - assertNotNull(endpoint); - } - - @Test - public void testPropertiesSet() throws Exception { - Map<String, Object> params = new HashMap<>(); - params.put("username", "ugol"); - params.put("password", "pwd"); - params.put("additionalHosts", "127.0.0.1,example.com,another-host"); - params.put("persistTo", 2); - params.put("replicateTo", 3); - - String uri = "couchdb:http://localhost:91234/bucket"; - String remaining = "http://localhost:91234/bucket"; - - CouchbaseEndpoint endpoint = component.createEndpoint(uri, remaining, params); - - assertEquals("http", endpoint.getProtocol()); - assertEquals("localhost", endpoint.getHostname()); - assertEquals("bucket", endpoint.getBucket()); - assertEquals(91234, endpoint.getPort()); - assertEquals("ugol", endpoint.getUsername()); - assertEquals("pwd", endpoint.getPassword()); - assertEquals("127.0.0.1,example.com,another-host", endpoint.getAdditionalHosts()); - assertEquals(2, endpoint.getPersistTo()); - assertEquals(3, endpoint.getReplicateTo()); - } - - @Test - public void testCouchbaseURI() throws Exception { - Map<String, Object> params = new HashMap<>(); - String uri = "couchbase:http://localhost/bucket?param=true"; - String remaining = "http://localhost/bucket?param=true"; - - CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); - assertEquals(new URI("http://localhost:8091/pools"), endpoint.makeBootstrapURI()[0]); - } - - @Test - public void testCouchbaseAdditionalHosts() throws Exception { - Map<String, Object> params = new HashMap<>(); - params.put("additionalHosts", "127.0.0.1,example.com,another-host"); - String uri = "couchbase:http://localhost/bucket?param=true"; - String remaining = "http://localhost/bucket?param=true"; - - CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); - - URI[] endpointArray = endpoint.makeBootstrapURI(); - assertEquals(new URI("http://localhost:8091/pools"), endpointArray[0]); - assertEquals(new URI("http://127.0.0.1:8091/pools"), endpointArray[1]); - assertEquals(new URI("http://example.com:8091/pools"), endpointArray[2]); - assertEquals(new URI("http://another-host:8091/pools"), endpointArray[3]); - assertEquals(4, endpointArray.length); - } - - @Test - public void testCouchbaseAdditionalHostsWithSpaces() throws Exception { - Map<String, Object> params = new HashMap<>(); - params.put("additionalHosts", " 127.0.0.1, example.com, another-host "); - String uri = "couchbase:http://localhost/bucket?param=true"; - String remaining = "http://localhost/bucket?param=true"; - - CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); - - URI[] endpointArray = endpoint.makeBootstrapURI(); - assertEquals(new URI("http://localhost:8091/pools"), endpointArray[0]); - assertEquals(new URI("http://127.0.0.1:8091/pools"), endpointArray[1]); - assertEquals(new URI("http://example.com:8091/pools"), endpointArray[2]); - assertEquals(new URI("http://another-host:8091/pools"), endpointArray[3]); - assertEquals(4, endpointArray.length); - } - - @Test - public void testCouchbaseDuplicateAdditionalHosts() throws Exception { - Map<String, Object> params = new HashMap<>(); - params.put("additionalHosts", "127.0.0.1,localhost, localhost"); - String uri = "couchbase:http://localhost/bucket?param=true"; - String remaining = "http://localhost/bucket?param=true"; - - CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); - URI[] endpointArray = endpoint.makeBootstrapURI(); - assertEquals(2, endpointArray.length); - assertEquals(new URI("http://localhost:8091/pools"), endpointArray[0]); - assertEquals(new URI("http://127.0.0.1:8091/pools"), endpointArray[1]); - } - - @Test - public void testCouchbaseNullAdditionalHosts() throws Exception { - Map<String, Object> params = new HashMap<>(); - params.put("additionalHosts", null); - String uri = "couchbase:http://localhost/bucket?param=true"; - String remaining = "http://localhost/bucket?param=true"; - - CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); - - URI[] endpointArray = endpoint.makeBootstrapURI(); - - assertEquals(1, endpointArray.length); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.couchbase; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class CouchbaseComponentTest extends CamelTestSupport { + + private CouchbaseComponent component; + + @BeforeEach + @Override + public void setUp() throws Exception { + super.setUp(); + component = context.getComponent("couchbase", CouchbaseComponent.class); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testEndpointCreated() throws Exception { + Map<String, Object> params = new HashMap<>(); + + String uri = "couchbase:http://localhost:9191/bucket"; + String remaining = "http://localhost:9191/bucket"; + + CouchbaseEndpoint endpoint = component.createEndpoint(uri, remaining, params); + assertNotNull(endpoint); + } + + @Test + public void testPropertiesSet() throws Exception { + Map<String, Object> params = new HashMap<>(); + params.put("username", "ugol"); + params.put("password", "pwd"); + params.put("additionalHosts", "127.0.0.1,example.com,another-host"); + params.put("persistTo", 2); + params.put("replicateTo", 3); + + String uri = "couchdb:http://localhost:91234/bucket"; + String remaining = "http://localhost:91234/bucket"; + + CouchbaseEndpoint endpoint = component.createEndpoint(uri, remaining, params); + + assertEquals("http", endpoint.getProtocol()); + assertEquals("localhost", endpoint.getHostname()); + assertEquals("bucket", endpoint.getBucket()); + assertEquals(91234, endpoint.getPort()); + assertEquals("ugol", endpoint.getUsername()); + assertEquals("pwd", endpoint.getPassword()); + assertEquals("127.0.0.1,example.com,another-host", endpoint.getAdditionalHosts()); + assertEquals(2, endpoint.getPersistTo()); + assertEquals(3, endpoint.getReplicateTo()); + } + + @Test + public void testCouchbaseURI() throws Exception { + Map<String, Object> params = new HashMap<>(); + String uri = "couchbase:http://localhost/bucket?param=true"; + String remaining = "http://localhost/bucket?param=true"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + assertEquals(new URI("http://localhost:8091/pools"), endpoint.makeBootstrapURI()[0]); + } + + @Test + public void testCouchbaseAdditionalHosts() throws Exception { + Map<String, Object> params = new HashMap<>(); + params.put("additionalHosts", "127.0.0.1,example.com,another-host"); + String uri = "couchbase:http://localhost/bucket?param=true"; + String remaining = "http://localhost/bucket?param=true"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + + URI[] endpointArray = endpoint.makeBootstrapURI(); + assertEquals(new URI("http://localhost:8091/pools"), endpointArray[0]); + assertEquals(new URI("http://127.0.0.1:8091/pools"), endpointArray[1]); + assertEquals(new URI("http://example.com:8091/pools"), endpointArray[2]); + assertEquals(new URI("http://another-host:8091/pools"), endpointArray[3]); + assertEquals(4, endpointArray.length); + } + + @Test + public void testCouchbaseAdditionalHostsWithSpaces() throws Exception { + Map<String, Object> params = new HashMap<>(); + params.put("additionalHosts", " 127.0.0.1, example.com, another-host "); + String uri = "couchbase:http://localhost/bucket?param=true"; + String remaining = "http://localhost/bucket?param=true"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + + URI[] endpointArray = endpoint.makeBootstrapURI(); + assertEquals(new URI("http://localhost:8091/pools"), endpointArray[0]); + assertEquals(new URI("http://127.0.0.1:8091/pools"), endpointArray[1]); + assertEquals(new URI("http://example.com:8091/pools"), endpointArray[2]); + assertEquals(new URI("http://another-host:8091/pools"), endpointArray[3]); + assertEquals(4, endpointArray.length); + } + + @Test + public void testCouchbaseDuplicateAdditionalHosts() throws Exception { + Map<String, Object> params = new HashMap<>(); + params.put("additionalHosts", "127.0.0.1,localhost, localhost"); + String uri = "couchbase:http://localhost/bucket?param=true"; + String remaining = "http://localhost/bucket?param=true"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + URI[] endpointArray = endpoint.makeBootstrapURI(); + assertEquals(2, endpointArray.length); + assertEquals(new URI("http://localhost:8091/pools"), endpointArray[0]); + assertEquals(new URI("http://127.0.0.1:8091/pools"), endpointArray[1]); + } + + @Test + public void testCouchbaseNullAdditionalHosts() throws Exception { + Map<String, Object> params = new HashMap<>(); + params.put("additionalHosts", null); + String uri = "couchbase:http://localhost/bucket?param=true"; + String remaining = "http://localhost/bucket?param=true"; + + CouchbaseEndpoint endpoint = new CouchbaseComponent(context).createEndpoint(uri, remaining, params); + + URI[] endpointArray = endpoint.makeBootstrapURI(); + + assertEquals(1, endpointArray.length); + } + +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java index 677996d..917750c 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseEndpointTest.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.couchbase; -import org.apache.camel.Exchange; +import com.couchbase.client.core.error.InvalidArgumentException; import org.apache.camel.Processor; import org.junit.jupiter.api.Test; @@ -64,7 +64,7 @@ public class CouchbaseEndpointTest { @Test public void testCouchbaseEndpointWithoutProtocol() throws Exception { - assertThrows(IllegalArgumentException.class, + assertThrows(IllegalArgumentException.class, () -> new CouchbaseEndpoint("localhost:80/bucket", "localhost:80/bucket", new CouchbaseComponent())); } @@ -75,7 +75,7 @@ public class CouchbaseEndpointTest { @Test public void testCouchbaseEndpointCreateProducer() throws Exception { - assertThrows(IllegalArgumentException.class, + assertThrows(InvalidArgumentException.class, () -> new CouchbaseEndpoint("couchbase:localhost:80/bucket", new CouchbaseComponent()).createProducer()); } @@ -84,7 +84,7 @@ public class CouchbaseEndpointTest { Processor p = exchange -> { // Nothing to do }; - assertThrows(IllegalArgumentException.class, + assertThrows(InvalidArgumentException.class, () -> new CouchbaseEndpoint("couchbase:localhost:80/bucket", new CouchbaseComponent()).createConsumer(p)); } @@ -98,6 +98,12 @@ public class CouchbaseEndpointTest { endpoint.setBucket("bucket"); assertEquals("bucket", endpoint.getBucket()); + endpoint.setCollection("collection"); + assertEquals("collection", endpoint.getCollection()); + + endpoint.setScope("scope"); + assertEquals("scope", endpoint.getScope()); + endpoint.setHostname("localhost"); assertEquals("localhost", endpoint.getHostname()); @@ -137,29 +143,8 @@ public class CouchbaseEndpointTest { endpoint.setConsumerProcessedStrategy("delete"); assertEquals("delete", endpoint.getConsumerProcessedStrategy()); - endpoint.setOpTimeOut(1L); - assertEquals(1L, endpoint.getOpTimeOut()); - - endpoint.setTimeoutExceptionThreshold(1); - assertEquals(1, endpoint.getTimeoutExceptionThreshold()); - - endpoint.setReadBufferSize(1); - assertEquals(1, endpoint.getReadBufferSize()); - - endpoint.setShouldOptimize(true); - assertTrue(endpoint.isShouldOptimize()); - - endpoint.setMaxReconnectDelay(1L); - assertEquals(1L, endpoint.getMaxReconnectDelay()); - - endpoint.setOpQueueMaxBlockTime(1L); - assertEquals(1L, endpoint.getOpQueueMaxBlockTime()); - - endpoint.setObsPollInterval(1L); - assertEquals(1L, endpoint.getObsPollInterval()); - - endpoint.setObsTimeout(1L); - assertEquals(1L, endpoint.getObsTimeout()); + endpoint.setQueryTimeout(1L); + assertEquals(1L, endpoint.getQueryTimeout()); endpoint.setDescending(false); } diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java index b64316e..8d7a5cc 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/CouchbaseProducerTest.java @@ -16,29 +16,31 @@ */ package org.apache.camel.component.couchbase; +import java.lang.reflect.Field; +import java.time.Duration; import java.util.HashMap; import java.util.Map; -import com.couchbase.client.CouchbaseClient; -import net.spy.memcached.internal.OperationFuture; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Collection; +import com.couchbase.client.java.Scope; +import com.couchbase.client.java.kv.MutationResult; +import com.couchbase.client.java.kv.UpsertOptions; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.stubbing.Answer; import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_TTL; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,7 +48,13 @@ import static org.mockito.Mockito.when; public class CouchbaseProducerTest { @Mock - private CouchbaseClient client; + private Bucket client; + + @Mock + private Collection collection; + + @Mock + private Scope scope; @Mock private CouchbaseEndpoint endpoint; @@ -58,16 +66,21 @@ public class CouchbaseProducerTest { private Message msg; @Mock - private OperationFuture<?> response; + private MutationResult response; +// Observable<String> myStringObservable @Mock - private OperationFuture<Boolean> of; + private MutationResult of; private CouchbaseProducer producer; @BeforeEach public void before() throws Exception { lenient().when(endpoint.getProducerRetryAttempts()).thenReturn(CouchbaseConstants.DEFAULT_PRODUCER_RETRIES); + lenient().when(endpoint.getProducerRetryAttempts()).thenReturn(3); + lenient().when(endpoint.getProducerRetryPause()).thenReturn(200); + lenient().when(client.defaultCollection()).thenReturn(collection); + producer = new CouchbaseProducer(endpoint, client, 0, 0); lenient().when(exchange.getIn()).thenReturn(msg); } @@ -107,111 +120,28 @@ public class CouchbaseProducerTest { producer = new CouchbaseProducer(endpoint, client, 4, 3); } + // @Test public void testExpiryTimeIsSet() throws Exception { - when(of.get()).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - return true; - - } - }); - - when(client.set(anyString(), anyInt(), any(), any(), any())).thenReturn(of); - // Mock out some headers so we can set an expiry int expiry = 5000; Map<String, Object> testHeaders = new HashMap<>(); testHeaders.put("CCB_TTL", Integer.toString(expiry)); when(msg.getHeaders()).thenReturn(testHeaders); + when(collection.upsert(anyString(), any(), any())).thenReturn(response); when(msg.getHeader(HEADER_TTL, String.class)).thenReturn(Integer.toString(expiry)); when(endpoint.getId()).thenReturn("123"); when(endpoint.getOperation()).thenReturn("CCB_PUT"); - when(exchange.getOut()).thenReturn(msg); - - producer.process(exchange); - - verify(client).set(anyString(), eq(expiry), any(), any(), any()); - } - - @Test - public void testTimeOutRetryToException() throws Exception { - - when(of.get()).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - throw new RuntimeException("Timed out waiting for operation"); - - } - }); - - when(client.set(anyString(), anyInt(), any(), any(), any())).thenReturn(of); - when(endpoint.getId()).thenReturn("123"); - when(endpoint.getOperation()).thenReturn("CCB_PUT"); - try { - producer.process(exchange); - } catch (Exception e) { - // do nothing - verify(of, times(3)).get(); - } - - } - - @Test - public void testTimeOutRetryThenSuccess() throws Exception { - - when(of.get()).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - throw new RuntimeException("Timed out waiting for operation"); - } - }).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - return true; - } - }); - - when(client.set(anyString(), anyInt(), any(), any(), any())).thenReturn(of); - when(endpoint.getId()).thenReturn("123"); - when(endpoint.getOperation()).thenReturn("CCB_PUT"); - when(exchange.getOut()).thenReturn(msg); - - producer.process(exchange); - - verify(of, times(2)).get(); - verify(msg).setBody(true); - } - - @Test - public void testTimeOutRetryTwiceThenSuccess() throws Exception { - - when(of.get()).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - throw new RuntimeException("Timed out waiting for operation"); - } - }).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - throw new RuntimeException("Timed out waiting for operation"); - } - }).thenAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Exception { - return true; - } - }); - - when(client.set(anyString(), anyInt(), any(), any(), any())).thenReturn(of); - when(endpoint.getId()).thenReturn("123"); - when(endpoint.getOperation()).thenReturn("CCB_PUT"); - when(exchange.getOut()).thenReturn(msg); + when(exchange.getMessage()).thenReturn(msg); + ArgumentCaptor<UpsertOptions> options = ArgumentCaptor.forClass(UpsertOptions.class); producer.process(exchange); - verify(of, times(3)).get(); - verify(msg).setBody(true); + verify(collection).upsert(anyString(), any(), options.capture()); + Field privateField = UpsertOptions.class.getDeclaredField("expiry"); + privateField.setAccessible(true); + Duration exp = (Duration) privateField.get(options.getValue()); + assertEquals(expiry, exp.getSeconds()); } } diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java index d5d3bdc..8ddd285 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesSimpleTest.java @@ -31,19 +31,20 @@ public class ProduceMessagesSimpleTest extends CamelTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - template.sendBody("direct:start", "ugol"); + template.sendBody("direct:start", "couchbase persist"); assertMockEndpointsSatisfied(); + mock.message(0).body().equals("couchbase persist"); } @Override - protected RouteBuilder createRouteBuilder() throws Exception { + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() throws Exception { // need couchbase installed on localhost - from("direct:start").setHeader(CouchbaseConstants.HEADER_ID, constant("120770")).to("couchbase:http://localhost/default").to("mock:result"); + from("direct:start").setHeader(CouchbaseConstants.HEADER_ID, constant("blabla:120771")).to("couchbase:http://192.168.1.102/test?additionalHosts=localhost&username=root&password=123456").to("mock:result"); } }; diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java index d509240..cb3693e 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/ProduceMessagesWithAutoIDIntegrationTest.java @@ -1,49 +1,49 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.couchbase; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit5.CamelTestSupport; -import org.junit.jupiter.api.Test; - -public class ProduceMessagesWithAutoIDIntegrationTest extends CamelTestSupport { - - @Test - public void testInsert() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMessageCount(2); - - template.sendBody("direct:start", "ugol1"); - template.sendBody("direct:start", "ugol2"); - - assertMockEndpointsSatisfied(); - - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - - // need couchbase installed on localhost - from("direct:start").to("couchbase:http://localhost/default?autoStartIdForInserts=true&startingIdForInsertsFrom=1000").to("mock:result"); - } - }; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.couchbase; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class ProduceMessagesWithAutoIDIntegrationTest extends CamelTestSupport { + + @Test + public void testInsert() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + + template.sendBody("direct:start", "ugol1"); + template.sendBody("direct:start", "ugol2"); + + assertMockEndpointsSatisfied(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + // need couchbase installed on localhost + from("direct:start").to("couchbase:http://localhost/test?username=root&password=123456&autoStartIdForInserts=true&startingIdForInsertsFrom=1000").to("mock:result"); + } + }; + } +} diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java index 452dd24..0878aef 100644 --- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java +++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/RemoveMessagesIntegrationTest.java @@ -42,7 +42,7 @@ public class RemoveMessagesIntegrationTest extends CamelTestSupport { public void configure() throws Exception { // need couchbase installed on localhost - from("direct:start").setHeader(CouchbaseConstants.HEADER_ID, constant("120770")).to("couchbase:http://localhost/default?operation='DELETE'").to("mock:result"); + from("direct:start").setHeader(CouchbaseConstants.HEADER_ID, constant("120770")).to("couchbase:http://localhost/default?username=root&password=123456&operation='DELETE'").to("mock:result"); } }; } diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchbaseEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchbaseEndpointBuilderFactory.java index a923939..58dda9f 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchbaseEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchbaseEndpointBuilderFactory.java @@ -59,6 +59,17 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** + * The collection to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + */ + default CouchbaseEndpointConsumerBuilder collection(String collection) { + doSetProperty("collection", collection); + return this; + } + /** * The key to use. * * The option is a: <code>java.lang.String</code> type. @@ -70,6 +81,17 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** + * The scope to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + */ + default CouchbaseEndpointConsumerBuilder scope(String scope) { + doSetProperty("scope", scope); + return this; + } + /** * Allows for bridging the consumer to the Camel routing Error Handler, * which mean any exceptions occurred while the consumer is trying to * pickup incoming messages, or the likes, will now be processed as a @@ -805,185 +827,29 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** - * Define the max delay during a reconnection. - * - * The option is a: <code>long</code> type. - * - * Default: 30000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder maxReconnectDelay( - long maxReconnectDelay) { - doSetProperty("maxReconnectDelay", maxReconnectDelay); - return this; - } - /** - * Define the max delay during a reconnection. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 30000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder maxReconnectDelay( - String maxReconnectDelay) { - doSetProperty("maxReconnectDelay", maxReconnectDelay); - return this; - } - /** - * Define the observation polling interval. - * - * The option is a: <code>long</code> type. - * - * Default: 400 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder obsPollInterval( - long obsPollInterval) { - doSetProperty("obsPollInterval", obsPollInterval); - return this; - } - /** - * Define the observation polling interval. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 400 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder obsPollInterval( - String obsPollInterval) { - doSetProperty("obsPollInterval", obsPollInterval); - return this; - } - /** - * Define the observation timeout. - * - * The option is a: <code>long</code> type. - * - * Default: -1 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder obsTimeout( - long obsTimeout) { - doSetProperty("obsTimeout", obsTimeout); - return this; - } - /** - * Define the observation timeout. - * - * The option will be converted to a <code>long</code> type. - * - * Default: -1 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder obsTimeout( - String obsTimeout) { - doSetProperty("obsTimeout", obsTimeout); - return this; - } - /** - * Define the max time an operation can be in queue blocked. - * - * The option is a: <code>long</code> type. - * - * Default: 10000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder opQueueMaxBlockTime( - long opQueueMaxBlockTime) { - doSetProperty("opQueueMaxBlockTime", opQueueMaxBlockTime); - return this; - } - /** - * Define the max time an operation can be in queue blocked. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 10000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder opQueueMaxBlockTime( - String opQueueMaxBlockTime) { - doSetProperty("opQueueMaxBlockTime", opQueueMaxBlockTime); - return this; - } - /** - * Define the operation timeout. + * Define the operation timeout in milliseconds. * * The option is a: <code>long</code> type. * * Default: 2500 * Group: advanced */ - default AdvancedCouchbaseEndpointConsumerBuilder opTimeOut( - long opTimeOut) { - doSetProperty("opTimeOut", opTimeOut); + default AdvancedCouchbaseEndpointConsumerBuilder queryTimeout( + long queryTimeout) { + doSetProperty("queryTimeout", queryTimeout); return this; } /** - * Define the operation timeout. + * Define the operation timeout in milliseconds. * * The option will be converted to a <code>long</code> type. * * Default: 2500 * Group: advanced */ - default AdvancedCouchbaseEndpointConsumerBuilder opTimeOut( - String opTimeOut) { - doSetProperty("opTimeOut", opTimeOut); - return this; - } - /** - * Define the buffer size. - * - * The option is a: <code>int</code> type. - * - * Default: 16384 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder readBufferSize( - int readBufferSize) { - doSetProperty("readBufferSize", readBufferSize); - return this; - } - /** - * Define the buffer size. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 16384 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder readBufferSize( - String readBufferSize) { - doSetProperty("readBufferSize", readBufferSize); - return this; - } - /** - * Define if we want to use optimization or not where possible. - * - * The option is a: <code>boolean</code> type. - * - * Default: false - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder shouldOptimize( - boolean shouldOptimize) { - doSetProperty("shouldOptimize", shouldOptimize); - return this; - } - /** - * Define if we want to use optimization or not where possible. - * - * The option will be converted to a <code>boolean</code> type. - * - * Default: false - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder shouldOptimize( - String shouldOptimize) { - doSetProperty("shouldOptimize", shouldOptimize); + default AdvancedCouchbaseEndpointConsumerBuilder queryTimeout( + String queryTimeout) { + doSetProperty("queryTimeout", queryTimeout); return this; } /** @@ -1014,32 +880,6 @@ public interface CouchbaseEndpointBuilderFactory { doSetProperty("synchronous", synchronous); return this; } - /** - * Define the threshold for throwing a timeout Exception. - * - * The option is a: <code>int</code> type. - * - * Default: 998 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder timeoutExceptionThreshold( - int timeoutExceptionThreshold) { - doSetProperty("timeoutExceptionThreshold", timeoutExceptionThreshold); - return this; - } - /** - * Define the threshold for throwing a timeout Exception. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 998 - * Group: advanced - */ - default AdvancedCouchbaseEndpointConsumerBuilder timeoutExceptionThreshold( - String timeoutExceptionThreshold) { - doSetProperty("timeoutExceptionThreshold", timeoutExceptionThreshold); - return this; - } } /** @@ -1063,6 +903,17 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** + * The collection to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + */ + default CouchbaseEndpointProducerBuilder collection(String collection) { + doSetProperty("collection", collection); + return this; + } + /** * The key to use. * * The option is a: <code>java.lang.String</code> type. @@ -1074,6 +925,17 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** + * The scope to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + */ + default CouchbaseEndpointProducerBuilder scope(String scope) { + doSetProperty("scope", scope); + return this; + } + /** * Define if we want an autostart Id when we are doing an insert * operation. * @@ -1353,185 +1215,29 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** - * Define the max delay during a reconnection. - * - * The option is a: <code>long</code> type. - * - * Default: 30000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder maxReconnectDelay( - long maxReconnectDelay) { - doSetProperty("maxReconnectDelay", maxReconnectDelay); - return this; - } - /** - * Define the max delay during a reconnection. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 30000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder maxReconnectDelay( - String maxReconnectDelay) { - doSetProperty("maxReconnectDelay", maxReconnectDelay); - return this; - } - /** - * Define the observation polling interval. - * - * The option is a: <code>long</code> type. - * - * Default: 400 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder obsPollInterval( - long obsPollInterval) { - doSetProperty("obsPollInterval", obsPollInterval); - return this; - } - /** - * Define the observation polling interval. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 400 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder obsPollInterval( - String obsPollInterval) { - doSetProperty("obsPollInterval", obsPollInterval); - return this; - } - /** - * Define the observation timeout. - * - * The option is a: <code>long</code> type. - * - * Default: -1 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder obsTimeout( - long obsTimeout) { - doSetProperty("obsTimeout", obsTimeout); - return this; - } - /** - * Define the observation timeout. - * - * The option will be converted to a <code>long</code> type. - * - * Default: -1 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder obsTimeout( - String obsTimeout) { - doSetProperty("obsTimeout", obsTimeout); - return this; - } - /** - * Define the max time an operation can be in queue blocked. - * - * The option is a: <code>long</code> type. - * - * Default: 10000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder opQueueMaxBlockTime( - long opQueueMaxBlockTime) { - doSetProperty("opQueueMaxBlockTime", opQueueMaxBlockTime); - return this; - } - /** - * Define the max time an operation can be in queue blocked. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 10000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder opQueueMaxBlockTime( - String opQueueMaxBlockTime) { - doSetProperty("opQueueMaxBlockTime", opQueueMaxBlockTime); - return this; - } - /** - * Define the operation timeout. + * Define the operation timeout in milliseconds. * * The option is a: <code>long</code> type. * * Default: 2500 * Group: advanced */ - default AdvancedCouchbaseEndpointProducerBuilder opTimeOut( - long opTimeOut) { - doSetProperty("opTimeOut", opTimeOut); + default AdvancedCouchbaseEndpointProducerBuilder queryTimeout( + long queryTimeout) { + doSetProperty("queryTimeout", queryTimeout); return this; } /** - * Define the operation timeout. + * Define the operation timeout in milliseconds. * * The option will be converted to a <code>long</code> type. * * Default: 2500 * Group: advanced */ - default AdvancedCouchbaseEndpointProducerBuilder opTimeOut( - String opTimeOut) { - doSetProperty("opTimeOut", opTimeOut); - return this; - } - /** - * Define the buffer size. - * - * The option is a: <code>int</code> type. - * - * Default: 16384 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder readBufferSize( - int readBufferSize) { - doSetProperty("readBufferSize", readBufferSize); - return this; - } - /** - * Define the buffer size. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 16384 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder readBufferSize( - String readBufferSize) { - doSetProperty("readBufferSize", readBufferSize); - return this; - } - /** - * Define if we want to use optimization or not where possible. - * - * The option is a: <code>boolean</code> type. - * - * Default: false - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder shouldOptimize( - boolean shouldOptimize) { - doSetProperty("shouldOptimize", shouldOptimize); - return this; - } - /** - * Define if we want to use optimization or not where possible. - * - * The option will be converted to a <code>boolean</code> type. - * - * Default: false - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder shouldOptimize( - String shouldOptimize) { - doSetProperty("shouldOptimize", shouldOptimize); + default AdvancedCouchbaseEndpointProducerBuilder queryTimeout( + String queryTimeout) { + doSetProperty("queryTimeout", queryTimeout); return this; } /** @@ -1562,32 +1268,6 @@ public interface CouchbaseEndpointBuilderFactory { doSetProperty("synchronous", synchronous); return this; } - /** - * Define the threshold for throwing a timeout Exception. - * - * The option is a: <code>int</code> type. - * - * Default: 998 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder timeoutExceptionThreshold( - int timeoutExceptionThreshold) { - doSetProperty("timeoutExceptionThreshold", timeoutExceptionThreshold); - return this; - } - /** - * Define the threshold for throwing a timeout Exception. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 998 - * Group: advanced - */ - default AdvancedCouchbaseEndpointProducerBuilder timeoutExceptionThreshold( - String timeoutExceptionThreshold) { - doSetProperty("timeoutExceptionThreshold", timeoutExceptionThreshold); - return this; - } } /** @@ -1612,6 +1292,17 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** + * The collection to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + */ + default CouchbaseEndpointBuilder collection(String collection) { + doSetProperty("collection", collection); + return this; + } + /** * The key to use. * * The option is a: <code>java.lang.String</code> type. @@ -1623,6 +1314,17 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** + * The scope to use. + * + * The option is a: <code>java.lang.String</code> type. + * + * Group: common + */ + default CouchbaseEndpointBuilder scope(String scope) { + doSetProperty("scope", scope); + return this; + } + /** * The password to use. * * The option is a: <code>java.lang.String</code> type. @@ -1697,181 +1399,28 @@ public interface CouchbaseEndpointBuilderFactory { return this; } /** - * Define the max delay during a reconnection. - * - * The option is a: <code>long</code> type. - * - * Default: 30000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder maxReconnectDelay( - long maxReconnectDelay) { - doSetProperty("maxReconnectDelay", maxReconnectDelay); - return this; - } - /** - * Define the max delay during a reconnection. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 30000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder maxReconnectDelay( - String maxReconnectDelay) { - doSetProperty("maxReconnectDelay", maxReconnectDelay); - return this; - } - /** - * Define the observation polling interval. - * - * The option is a: <code>long</code> type. - * - * Default: 400 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder obsPollInterval( - long obsPollInterval) { - doSetProperty("obsPollInterval", obsPollInterval); - return this; - } - /** - * Define the observation polling interval. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 400 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder obsPollInterval( - String obsPollInterval) { - doSetProperty("obsPollInterval", obsPollInterval); - return this; - } - /** - * Define the observation timeout. - * - * The option is a: <code>long</code> type. - * - * Default: -1 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder obsTimeout(long obsTimeout) { - doSetProperty("obsTimeout", obsTimeout); - return this; - } - /** - * Define the observation timeout. - * - * The option will be converted to a <code>long</code> type. - * - * Default: -1 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder obsTimeout(String obsTimeout) { - doSetProperty("obsTimeout", obsTimeout); - return this; - } - /** - * Define the max time an operation can be in queue blocked. - * - * The option is a: <code>long</code> type. - * - * Default: 10000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder opQueueMaxBlockTime( - long opQueueMaxBlockTime) { - doSetProperty("opQueueMaxBlockTime", opQueueMaxBlockTime); - return this; - } - /** - * Define the max time an operation can be in queue blocked. - * - * The option will be converted to a <code>long</code> type. - * - * Default: 10000 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder opQueueMaxBlockTime( - String opQueueMaxBlockTime) { - doSetProperty("opQueueMaxBlockTime", opQueueMaxBlockTime); - return this; - } - /** - * Define the operation timeout. + * Define the operation timeout in milliseconds. * * The option is a: <code>long</code> type. * * Default: 2500 * Group: advanced */ - default AdvancedCouchbaseEndpointBuilder opTimeOut(long opTimeOut) { - doSetProperty("opTimeOut", opTimeOut); + default AdvancedCouchbaseEndpointBuilder queryTimeout(long queryTimeout) { + doSetProperty("queryTimeout", queryTimeout); return this; } /** - * Define the operation timeout. + * Define the operation timeout in milliseconds. * * The option will be converted to a <code>long</code> type. * * Default: 2500 * Group: advanced */ - default AdvancedCouchbaseEndpointBuilder opTimeOut(String opTimeOut) { - doSetProperty("opTimeOut", opTimeOut); - return this; - } - /** - * Define the buffer size. - * - * The option is a: <code>int</code> type. - * - * Default: 16384 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder readBufferSize( - int readBufferSize) { - doSetProperty("readBufferSize", readBufferSize); - return this; - } - /** - * Define the buffer size. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 16384 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder readBufferSize( - String readBufferSize) { - doSetProperty("readBufferSize", readBufferSize); - return this; - } - /** - * Define if we want to use optimization or not where possible. - * - * The option is a: <code>boolean</code> type. - * - * Default: false - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder shouldOptimize( - boolean shouldOptimize) { - doSetProperty("shouldOptimize", shouldOptimize); - return this; - } - /** - * Define if we want to use optimization or not where possible. - * - * The option will be converted to a <code>boolean</code> type. - * - * Default: false - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder shouldOptimize( - String shouldOptimize) { - doSetProperty("shouldOptimize", shouldOptimize); + default AdvancedCouchbaseEndpointBuilder queryTimeout( + String queryTimeout) { + doSetProperty("queryTimeout", queryTimeout); return this; } /** @@ -1900,32 +1449,6 @@ public interface CouchbaseEndpointBuilderFactory { doSetProperty("synchronous", synchronous); return this; } - /** - * Define the threshold for throwing a timeout Exception. - * - * The option is a: <code>int</code> type. - * - * Default: 998 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder timeoutExceptionThreshold( - int timeoutExceptionThreshold) { - doSetProperty("timeoutExceptionThreshold", timeoutExceptionThreshold); - return this; - } - /** - * Define the threshold for throwing a timeout Exception. - * - * The option will be converted to a <code>int</code> type. - * - * Default: 998 - * Group: advanced - */ - default AdvancedCouchbaseEndpointBuilder timeoutExceptionThreshold( - String timeoutExceptionThreshold) { - doSetProperty("timeoutExceptionThreshold", timeoutExceptionThreshold); - return this; - } } public interface CouchbaseBuilders { diff --git a/docs/components/modules/ROOT/pages/couchbase-component.adoc b/docs/components/modules/ROOT/pages/couchbase-component.adoc index 7d77075..f184af5 100644 --- a/docs/components/modules/ROOT/pages/couchbase-component.adoc +++ b/docs/components/modules/ROOT/pages/couchbase-component.adoc @@ -74,14 +74,16 @@ with the following path and query parameters: |=== -=== Query Parameters (50 parameters): +=== Query Parameters (45 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type | *bucket* (common) | The bucket to use | | String +| *collection* (common) | The collection to use | _default | String | *key* (common) | The key to use | | String +| *scope* (common) | The scope to use | _default | String | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *consumerProcessedStrategy* (consumer) | Define the consumer Processed strategy to use | none | String | *descending* (consumer) | Define if this operation is descending or not | false | boolean @@ -105,15 +107,8 @@ with the following path and query parameters: | *startingIdForInsertsFrom* (producer) | Define the starting Id where we are doing an insert operation | | long | *additionalHosts* (advanced) | The additional hosts | | String | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean -| *maxReconnectDelay* (advanced) | Define the max delay during a reconnection | 30000 | long -| *obsPollInterval* (advanced) | Define the observation polling interval | 400 | long -| *obsTimeout* (advanced) | Define the observation timeout | -1 | long -| *opQueueMaxBlockTime* (advanced) | Define the max time an operation can be in queue blocked | 10000 | long -| *opTimeOut* (advanced) | Define the operation timeout | 2500 | long -| *readBufferSize* (advanced) | Define the buffer size | 16384 | int -| *shouldOptimize* (advanced) | Define if we want to use optimization or not where possible | false | boolean +| *queryTimeout* (advanced) | Define the operation timeout | 2500 | long | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean -| *timeoutExceptionThreshold* (advanced) | Define the threshold for throwing a timeout Exception | 998 | int | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. | | int | *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. | | int | *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. | | int @@ -134,9 +129,9 @@ with the following path and query parameters: // endpoint options: END == Couchbase SDK compatibility -This component is currently using a "Legacy SDK" version of couchbase-client. +Using collections and scopes is supported only for Couchbase Enterprise Server 6.5+. -In order to authenticate with newer versions of Couchbase Server 5.0 and beyond, as per instructions on the https://docs.couchbase.com/java-sdk/2.7/sdk-authentication-overview.html/[CouchBase Java SDK Authentication]: +This component is currently using Java SDK 3.x so it might be not compatible with older Couchbase servers anymore. See the compatibility https://docs.couchbase.com/java-sdk/current/project-docs/compatibility.html[page]. * The value formerly interpreted as a bucket-name is now interpreted as a username. The username must correspond to a user defined on the cluster that is being accessed. * The value formerly interpreted as a bucket-password is now interpreted as the password of the defined user. diff --git a/parent/pom.xml b/parent/pom.xml index 8901ec2..f9370b4 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -133,7 +133,7 @@ <consul-client-version>1.3.3</consul-client-version> <cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version> <corda-version>4.4</corda-version> - <couchbase-client-version>1.4.13</couchbase-client-version> + <couchbase-client-version>3.0.5</couchbase-client-version> <curator-version>4.3.0</curator-version> <cxf-version>3.3.6</cxf-version> <cxf-version-range>[3.3,4.0)</cxf-version-range>