This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 77b58e811ea CAMEL-20800: Use cloudant instead of LightCouch 77b58e811ea is described below commit 77b58e811eadca9da3a08ab1eaf109041af3a06b Author: Croway <federico.mariani.1...@gmail.com> AuthorDate: Fri Jun 14 11:59:22 2024 +0200 CAMEL-20800: Use cloudant instead of LightCouch --- .../apache/camel/catalog/components/couchdb.json | 17 +-- components/camel-couchdb/pom.xml | 13 +- .../couchdb/CouchDbEndpointConfigurer.java | 6 + .../couchdb/CouchDbEndpointUriFactory.java | 3 +- .../apache/camel/component/couchdb/couchdb.json | 17 +-- .../component/couchdb/CouchDbChangesetTracker.java | 144 --------------------- .../component/couchdb/CouchDbClientWrapper.java | 118 +++++++++++------ .../camel/component/couchdb/CouchDbConsumer.java | 78 +++++++---- .../camel/component/couchdb/CouchDbEndpoint.java | 41 ++++-- .../camel/component/couchdb/CouchDbProducer.java | 55 +++++--- .../couchdb/CouchDbChangesetTrackerTest.java | 122 ----------------- .../component/couchdb/CouchDbProducerTest.java | 54 +++++--- .../couchdb/integration/CouchDbCrudIT.java | 12 +- .../dsl/CouchDbEndpointBuilderFactory.java | 36 ++++++ .../camel/kotlin/components/CouchdbUriDsl.kt | 18 +++ 15 files changed, 324 insertions(+), 410 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json index cc5807374a3..c7a44cc4fa1 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json @@ -43,13 +43,14 @@ "createDatabase": { "index": 4, "kind": "parameter", "displayName": "Create Database", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Creates the database if it does not already exist" }, "deletes": { "index": 5, "kind": "parameter", "displayName": "Deletes", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document deletes are published as events" }, "heartbeat": { "index": 6, "kind": "parameter", "displayName": "Heartbeat", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "How often to send an empty message to keep socket alive in millis" }, - "style": { "index": 7, "kind": "parameter", "displayName": "Style", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "main_only", "description": "Specifies how many revisions are returned in the changes array. The default, main_only, will only return the current winning revision; all_docs will return all leaf revi [...] - "updates": { "index": 8, "kind": "parameter", "displayName": "Updates", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document inserts\/updates are published as events" }, - "bridgeErrorHandler": { "index": 9, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 10, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] - "exchangePattern": { "index": 11, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "lazyStartProducer": { "index": 12, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "password": { "index": 13, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for authenticated databases" }, - "username": { "index": 14, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username in case of authenticated databases" } + "maxMessagesPerPoll": { "index": 7, "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Gets the maximum number of messages as a limit to poll at each polling. Gets the maximum number of messages as a limit to poll at each polling. The default value is 10. Use 0 or a negative number to [...] + "style": { "index": 8, "kind": "parameter", "displayName": "Style", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "main_only", "description": "Specifies how many revisions are returned in the changes array. The default, main_only, will only return the current winning revision; all_docs will return all leaf revi [...] + "updates": { "index": 9, "kind": "parameter", "displayName": "Updates", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document inserts\/updates are published as events" }, + "bridgeErrorHandler": { "index": 10, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 11, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] + "exchangePattern": { "index": 12, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "password": { "index": 14, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for authenticated databases" }, + "username": { "index": 15, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username in case of authenticated databases" } } } diff --git a/components/camel-couchdb/pom.xml b/components/camel-couchdb/pom.xml index 1b595ddfa6c..38843980de4 100644 --- a/components/camel-couchdb/pom.xml +++ b/components/camel-couchdb/pom.xml @@ -47,16 +47,9 @@ <artifactId>camel-core-processor</artifactId> </dependency> <dependency> - <groupId>org.lightcouch</groupId> - <artifactId>lightcouch</artifactId> - <version>${lightcouch-version}</version> - </dependency> - <!-- prefer to use the httpclient version used by Camel components to align - the version --> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>${httpclient4-version}</version> + <groupId>com.ibm.cloud</groupId> + <artifactId>cloudant</artifactId> + <version>0.8.6</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> diff --git a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java index 703779b71aa..1f2528ed587 100644 --- a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java +++ b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java @@ -35,6 +35,8 @@ public class CouchDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "heartbeat": target.setHeartbeat(property(camelContext, java.time.Duration.class, value).toMillis()); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "maxmessagesperpoll": + case "maxMessagesPerPoll": target.setMaxMessagesPerPoll(property(camelContext, int.class, value)); return true; case "password": target.setPassword(property(camelContext, java.lang.String.class, value)); return true; case "style": target.setStyle(property(camelContext, java.lang.String.class, value)); return true; case "updates": target.setUpdates(property(camelContext, boolean.class, value)); return true; @@ -58,6 +60,8 @@ public class CouchDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "heartbeat": return long.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; + case "maxmessagesperpoll": + case "maxMessagesPerPoll": return int.class; case "password": return java.lang.String.class; case "style": return java.lang.String.class; case "updates": return boolean.class; @@ -82,6 +86,8 @@ public class CouchDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "heartbeat": return target.getHeartbeat(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "maxmessagesperpoll": + case "maxMessagesPerPoll": return target.getMaxMessagesPerPoll(); case "password": return target.getPassword(); case "style": return target.getStyle(); case "updates": return target.isUpdates(); diff --git a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java index ade397b722a..e0b194904f3 100644 --- a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java +++ b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java @@ -23,7 +23,7 @@ public class CouchDbEndpointUriFactory extends org.apache.camel.support.componen private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(15); + Set<String> props = new HashSet<>(16); props.add("bridgeErrorHandler"); props.add("createDatabase"); props.add("database"); @@ -33,6 +33,7 @@ public class CouchDbEndpointUriFactory extends org.apache.camel.support.componen props.add("heartbeat"); props.add("hostname"); props.add("lazyStartProducer"); + props.add("maxMessagesPerPoll"); props.add("password"); props.add("port"); props.add("protocol"); diff --git a/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json b/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json index cc5807374a3..c7a44cc4fa1 100644 --- a/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json +++ b/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json @@ -43,13 +43,14 @@ "createDatabase": { "index": 4, "kind": "parameter", "displayName": "Create Database", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Creates the database if it does not already exist" }, "deletes": { "index": 5, "kind": "parameter", "displayName": "Deletes", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document deletes are published as events" }, "heartbeat": { "index": 6, "kind": "parameter", "displayName": "Heartbeat", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "How often to send an empty message to keep socket alive in millis" }, - "style": { "index": 7, "kind": "parameter", "displayName": "Style", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "main_only", "description": "Specifies how many revisions are returned in the changes array. The default, main_only, will only return the current winning revision; all_docs will return all leaf revi [...] - "updates": { "index": 8, "kind": "parameter", "displayName": "Updates", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document inserts\/updates are published as events" }, - "bridgeErrorHandler": { "index": 9, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 10, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] - "exchangePattern": { "index": 11, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "lazyStartProducer": { "index": 12, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "password": { "index": 13, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for authenticated databases" }, - "username": { "index": 14, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username in case of authenticated databases" } + "maxMessagesPerPoll": { "index": 7, "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Gets the maximum number of messages as a limit to poll at each polling. Gets the maximum number of messages as a limit to poll at each polling. The default value is 10. Use 0 or a negative number to [...] + "style": { "index": 8, "kind": "parameter", "displayName": "Style", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "main_only", "description": "Specifies how many revisions are returned in the changes array. The default, main_only, will only return the current winning revision; all_docs will return all leaf revi [...] + "updates": { "index": 9, "kind": "parameter", "displayName": "Updates", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document inserts\/updates are published as events" }, + "bridgeErrorHandler": { "index": 10, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 11, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] + "exchangePattern": { "index": 12, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "password": { "index": 14, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for authenticated databases" }, + "username": { "index": 15, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username in case of authenticated databases" } } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java deleted file mode 100644 index 735e43f47ac..00000000000 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.couchdb; - -import java.time.Duration; - -import com.google.gson.JsonObject; -import org.apache.camel.Exchange; -import org.apache.camel.support.task.BlockingTask; -import org.apache.camel.support.task.Tasks; -import org.apache.camel.support.task.budget.Budgets; -import org.lightcouch.Changes; -import org.lightcouch.ChangesResult; -import org.lightcouch.CouchDbException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CouchDbChangesetTracker implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(CouchDbChangesetTracker.class); - private static final int MAX_DB_ERROR_REPEATS = 8; - - private volatile boolean stopped; - private final CouchDbClientWrapper couchClient; - private final CouchDbEndpoint endpoint; - private final CouchDbConsumer consumer; - private Changes changes; - - public CouchDbChangesetTracker(CouchDbEndpoint endpoint, CouchDbConsumer consumer, CouchDbClientWrapper couchClient) { - this.endpoint = endpoint; - this.consumer = consumer; - this.couchClient = couchClient; - } - - private void initChanges(final String sequence) { - String since = sequence; - if (null == since) { - since = couchClient.getLatestUpdateSequence(); - } - changes = couchClient.changes().style(endpoint.getStyle()).includeDocs(true) - .since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges(); - } - - @Override - public void run() { - String lastSequence = null; - initChanges(null); - - try { - while (!stopped) { - - try { - while (changes.hasNext()) { // blocks until a feed is received - ChangesResult.Row feed = changes.next(); - if (feed.isDeleted() && !endpoint.isDeletes()) { - continue; - } - if (!feed.isDeleted() && !endpoint.isUpdates()) { - continue; - } - - lastSequence = feed.getSeq(); - JsonObject doc = feed.getDoc(); - - Exchange exchange = consumer.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted()); - - if (LOG.isTraceEnabled()) { - LOG.trace("Created exchange [exchange={}, _id={}, seq={}", exchange, feed.getId(), lastSequence); - } - - try { - consumer.getProcessor().process(exchange); - } catch (Exception e) { - consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e); - } finally { - consumer.releaseExchange(exchange, false); - } - } - - stopped = true; - - } catch (CouchDbException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("CouchDb Exception encountered waiting for changes! Attempting to recover...", e); - } - if (endpoint.isRunAllowed() || !endpoint.isShutdown() || !consumer.isStopped()) { - if (!waitForStability(lastSequence)) { - throw e; - } - } else { - LOG.debug("Skipping the stability check because shutting down or running is not allowed at the moment"); - } - } - } - } catch (Exception e) { - LOG.error("Unexpected error causing CouchDb change tracker to exit!", e); - } - } - - private boolean waitForStability(final String lastSequence) { - BlockingTask task = Tasks.foregroundTask() - .withBudget(Budgets.iterationBudget() - .withMaxIterations(MAX_DB_ERROR_REPEATS) - .withInterval(Duration.ofSeconds(3)) - .build()) - .withName("couchdb-wait-for-stability") - .build(); - - return task.run(this::stabilityCheck, lastSequence); - } - - private boolean stabilityCheck(String lastSequence) { - try { - // Fail fast operation - couchClient.context().serverVersion(); - // reset change listener - initChanges(lastSequence); - - return true; - } catch (Exception e) { - LOG.debug("Failed to get CouchDb server version and/or reset change listener", e); - } - - return false; - } - - public void stop() { - changes.stop(); - } -} diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java index 7254d1eb417..e35f0fb1693 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java @@ -16,72 +16,112 @@ */ package org.apache.camel.component.couchdb; -import java.io.IOException; - -import com.google.gson.JsonObject; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.util.EntityUtils; -import org.lightcouch.Changes; -import org.lightcouch.CouchDbClient; -import org.lightcouch.CouchDbContext; -import org.lightcouch.CouchDbException; -import org.lightcouch.Response; +import com.ibm.cloud.cloudant.v1.Cloudant; +import com.ibm.cloud.cloudant.v1.model.*; +import com.ibm.cloud.sdk.core.http.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Necessary to allow mockito to mock this client. Once LightCouch library adds an interface for the client, this class - * can be removed. + * Necessary to allow mockito to mock this client. */ public class CouchDbClientWrapper { + private static final Logger log = LoggerFactory.getLogger(CouchDbEndpoint.class); - private final CouchDbClient client; + private final Cloudant client; + private final String dbName; - public CouchDbClientWrapper(CouchDbClient client) { + public CouchDbClientWrapper(Cloudant client, String dbName, boolean createDatabase) { this.client = client; + this.dbName = dbName; + + initDatabase(createDatabase); } - public Response update(Object doc) { - return client.update(doc); + public void initDatabase(boolean createDatabase) { + if (createDatabase) { + boolean alreadyCreated = false; + for (String db : client.getAllDbs().execute().getResult()) { + if (db.equals(dbName)) { + alreadyCreated = true; + break; + } + } + + if (!alreadyCreated) { + PutDatabaseOptions putDatabaseOptions = new PutDatabaseOptions.Builder() + .db(dbName) + .build(); + + client.putDatabase(putDatabaseOptions).execute(); + } + + log.debug("Database {} created", dbName); + } } - public Response save(Object doc) { - return client.save(doc); + public Response<DocumentResult> update(Document doc) { + PostDocumentOptions postDocumentOptions = new PostDocumentOptions.Builder() + .document(doc) + .db(dbName) + .build(); + + return client.postDocument(postDocumentOptions).execute(); } - public Response remove(Object doc) { - return client.remove(doc); + public Response save(Document doc) { + PutDocumentOptions putDocumentOptions = new PutDocumentOptions.Builder() + .document(doc) + .docId(doc.getId()) + .db(dbName) + .build(); + + return client.putDocument(putDocumentOptions).execute(); } - public Changes changes() { - return client.changes(); + public Response removeByIdAndRev(String id, String rev) { + DeleteDocumentOptions deleteDocumentOptions = new DeleteDocumentOptions.Builder() + .docId(id) + .rev(rev) + .db(dbName) + .build(); + + return client.deleteDocument(deleteDocumentOptions).execute(); } - public Object get(String id) { - return client.find(id); + public Response<ChangesResult> pollChanges(String style, String since, long heartBeat, long maxMessagesPerPoll) { + PostChangesOptions postChangesOptions = new PostChangesOptions.Builder() + .db(dbName) + .since(since) + .limit(maxMessagesPerPoll) + .build(); + + return client.postChanges(postChangesOptions).execute(); } - public CouchDbContext context() { - return client.context(); + public Response get(String id) { + GetDocumentOptions getDocumentOptions = new GetDocumentOptions.Builder() + .docId(id) + .db(dbName) + .build(); + + return client.getDocument(getDocumentOptions).execute(); } /** * In CouchDB 2.3.x, the purge_seq field type was changed from number to string. As such, calling - * {@link org.lightcouch.CouchDbContext#info()} was throwing an exception. This method workarounds the issue by - * getting the update_seq field while ignoring the purge_seq field. + * {@link CouchDbContext#info()} was throwing an exception. This method workarounds the issue by getting the + * update_seq field while ignoring the purge_seq field. * * @return The latest update sequence */ public String getLatestUpdateSequence() { - HttpGet getDbInfoRequest = new HttpGet(client.getDBUri()); - try { - HttpResponse getDbInfoResponse = client.executeRequest(getDbInfoRequest); - String dbInfoString = EntityUtils.toString(getDbInfoResponse.getEntity()); - JsonObject dbInfo = client.getGson().fromJson(dbInfoString, JsonObject.class); - return dbInfo.get("update_seq").getAsString(); - } catch (IOException e) { - getDbInfoRequest.abort(); - throw new CouchDbException("Error executing request to fetch the latest update sequence. ", e); - } + GetDatabaseInformationOptions getDatabaseInformationOptions = new GetDatabaseInformationOptions.Builder() + .db(dbName) + .build(); + + return client.getDatabaseInformation(getDatabaseInformationOptions).execute() + .getResult().getUpdateSeq(); } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java index 5ce86edd3dc..5a78470a087 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java @@ -16,30 +16,40 @@ */ package org.apache.camel.component.couchdb; +import java.util.Queue; import java.util.concurrent.ExecutorService; -import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.ibm.cloud.cloudant.v1.model.ChangesResult; +import com.ibm.cloud.cloudant.v1.model.ChangesResultItem; +import com.ibm.cloud.sdk.core.http.Response; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; -import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.support.resume.ResumeStrategyHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.camel.component.couchdb.CouchDbConstants.COUCHDB_RESUME_ACTION; -public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy> { +public class CouchDbConsumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> { + private static final Logger LOG = LoggerFactory.getLogger(CouchDbConsumer.class); private final CouchDbClientWrapper couchClient; private final CouchDbEndpoint endpoint; private ExecutorService executor; - private CouchDbChangesetTracker task; private ResumeStrategy resumeStrategy; + private String since; + private String lastSequence = null; public CouchDbConsumer(CouchDbEndpoint endpoint, CouchDbClientWrapper couchClient, Processor processor) { super(endpoint, processor); this.couchClient = couchClient; this.endpoint = endpoint; + + since = couchClient.getLatestUpdateSequence(); } @Override @@ -52,40 +62,64 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Resu return resumeStrategy; } - public Exchange createExchange(String seq, String id, JsonObject obj, boolean deleted) { + public Exchange createExchange(String seq, String id, ChangesResultItem changesResultItem, boolean deleted) { Exchange exchange = createExchange(false); exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, endpoint.getDatabase()); exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq); exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id); - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, obj.get("_rev").getAsString()); exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ? "DELETE" : "UPDATE"); - exchange.getIn().setBody(obj); + exchange.getIn().setBody(new JsonParser().parseString(changesResultItem.toString())); return exchange; } + @Override + protected int poll() throws Exception { + Response<ChangesResult> changesResultResponse + = couchClient.pollChanges(endpoint.getStyle(), since, endpoint.getHeartbeat(), getMaxMessagesPerPoll()); + + for (ChangesResultItem changesResultItem : changesResultResponse.getResult().getResults()) { + if (changesResultItem.isDeleted() != null) { + if (changesResultItem.isDeleted() && !endpoint.isDeletes()) { + continue; + } + if (!changesResultItem.isDeleted() && !endpoint.isUpdates()) { + continue; + } + } + + lastSequence = changesResultItem.getSeq(); + + Exchange exchange = this.createExchange(lastSequence, changesResultItem.getId(), changesResultItem, + changesResultItem.isDeleted() == null ? false : changesResultItem.isDeleted()); + + if (LOG.isTraceEnabled()) { + LOG.trace("Created exchange [exchange={}, _id={}, seq={}", exchange, changesResultItem.getId(), lastSequence); + } + + try { + this.getProcessor().process(exchange); + } catch (Exception e) { + this.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } finally { + // Update since with latest seq, the messages are ordered + since = changesResultItem.getSeq(); + this.releaseExchange(exchange, false); + } + } + + return changesResultResponse.getResult().getResults().size(); + } + @Override protected void doStart() throws Exception { ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this, resumeStrategy, COUCHDB_RESUME_ACTION); super.doStart(); - executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), - 1); - task = new CouchDbChangesetTracker(endpoint, this, couchClient); - executor.submit(task); - } @Override - protected void doStop() throws Exception { - super.doStop(); - if (task != null) { - task.stop(); - } - if (executor != null) { - endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); - executor = null; - } + public int processBatch(Queue<Object> exchanges) throws Exception { + return 0; } - } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java index f80c8bc9bd0..3e20b723169 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java @@ -19,17 +19,16 @@ package org.apache.camel.component.couchdb; import java.net.URI; import java.util.Map; +import com.ibm.cloud.cloudant.v1.Cloudant; +import com.ibm.cloud.sdk.core.security.Authenticator; +import com.ibm.cloud.sdk.core.security.BasicAuthenticator; +import com.ibm.cloud.sdk.core.security.NoAuthAuthenticator; import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.spi.EndpointServiceLocation; -import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.spi.UriPath; +import org.apache.camel.spi.*; import org.apache.camel.support.DefaultEndpoint; -import org.lightcouch.CouchDbClient; /** * Consume changesets for inserts, updates and deletes in a CouchDB database, as well as get, save, update and delete @@ -71,6 +70,8 @@ public class CouchDbEndpoint extends DefaultEndpoint implements EndpointServiceL private boolean deletes = true; @UriParam(label = "consumer", defaultValue = "true") private boolean updates = true; + @UriParam(label = "consumer", defaultValue = "10") + private int maxMessagesPerPoll = 10; public CouchDbEndpoint() { } @@ -119,6 +120,7 @@ public class CouchDbEndpoint extends DefaultEndpoint implements EndpointServiceL @Override public Consumer createConsumer(Processor processor) throws Exception { CouchDbConsumer answer = new CouchDbConsumer(this, createClient(), processor); + answer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); configureConsumer(answer); return answer; } @@ -129,8 +131,17 @@ public class CouchDbEndpoint extends DefaultEndpoint implements EndpointServiceL } protected CouchDbClientWrapper createClient() { - return new CouchDbClientWrapper( - new CouchDbClient(database, createDatabase, protocol, hostname, port, username, password)); + Authenticator authenticator; + if (username == null) { + authenticator = new NoAuthAuthenticator(); + } else { + authenticator = new BasicAuthenticator(username, password); + } + + Cloudant cloudant = new Cloudant("camel-couchdb", authenticator); + cloudant.setServiceUrl(getServiceUrl()); + + return new CouchDbClientWrapper(cloudant, database, createDatabase); } public String getProtocol() { @@ -255,4 +266,18 @@ public class CouchDbEndpoint extends DefaultEndpoint implements EndpointServiceL public void setUpdates(boolean updates) { this.updates = updates; } + + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + /** + * Gets the maximum number of messages as a limit to poll at each polling. + * <p/> + * Gets the maximum number of messages as a limit to poll at each polling. The default value is 10. Use 0 or a + * negative number to set it as unlimited. + */ + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java index f50784bd773..914f7d605bd 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java @@ -16,15 +16,19 @@ */ package org.apache.camel.component.couchdb; +import java.util.UUID; + import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; +import com.ibm.cloud.cloudant.v1.model.Document; +import com.ibm.cloud.cloudant.v1.model.DocumentResult; +import com.ibm.cloud.sdk.core.http.Response; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.ObjectHelper; -import org.lightcouch.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,28 +48,28 @@ public class CouchDbProducer extends DefaultProducer { JsonElement json = getBodyAsJsonElement(exchange); String operation = exchange.getIn().getHeader(CouchDbConstants.HEADER_METHOD, String.class); if (ObjectHelper.isEmpty(operation)) { - Response save = saveJsonElement(json); + Response<DocumentResult> save = saveJsonElement(json); if (save == null) { throw new CouchDbException("Could not save document [unknown reason]", exchange); } if (LOG.isTraceEnabled()) { - LOG.trace("Document saved [_id={}, _rev={}]", save.getId(), save.getRev()); + LOG.trace("Document saved [_id={}, _rev={}]", save.getResult().getId(), save.getResult().getRev()); } - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, save.getRev()); - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, save.getId()); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, save.getResult().getRev()); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, save.getResult().getId()); } else { if (operation.equalsIgnoreCase(CouchDbOperations.DELETE.toString())) { - Response delete = deleteJsonElement(json); + Response<DocumentResult> delete = deleteJsonElement(json); if (delete == null) { throw new CouchDbException("Could not delete document [unknown reason]", exchange); } if (LOG.isTraceEnabled()) { - LOG.trace("Document saved [_id={}, _rev={}]", delete.getId(), delete.getRev()); + LOG.trace("Document saved [_id={}, _rev={}]", delete.getResult().getId(), delete.getResult().getRev()); } - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, delete.getRev()); - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, delete.getId()); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, delete.getResult().getRev()); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, delete.getResult().getId()); } if (operation.equalsIgnoreCase(CouchDbOperations.GET.toString())) { String docId = exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_ID, String.class); @@ -93,23 +97,37 @@ public class CouchDbProducer extends DefaultProducer { } } else if (body instanceof JsonElement) { return (JsonElement) body; + } else if (body instanceof Document document) { + return new JsonParser().parse(document.toString()); } else { throw new InvalidPayloadException(exchange, body != null ? body.getClass() : null); } } - private Response saveJsonElement(JsonElement json) { - Response save; + private Response<DocumentResult> saveJsonElement(JsonElement json) { + Response save = null; if (json instanceof JsonObject) { JsonObject obj = (JsonObject) json; + Document.Builder documentBuilder = new Document.Builder(); + for (String key : obj.keySet()) { + if (key.equals("_id")) { + documentBuilder.id(obj.get(key).getAsString()); + } else { + documentBuilder.add(key, obj.get(key)); + } + } + + Document document = documentBuilder.build(); + if (document.getId() == null) { + document.setId(UUID.randomUUID().toString()); + } if (obj.get("_rev") == null) { - save = couchClient.save(json); + save = couchClient.save(document); } else { - save = couchClient.update(json); + save = couchClient.update(document); } - } else { - save = couchClient.save(json); } + return save; } @@ -117,16 +135,17 @@ public class CouchDbProducer extends DefaultProducer { Response delete; if (json instanceof JsonObject) { JsonObject obj = (JsonObject) json; - delete = couchClient.remove(obj); + delete = couchClient.removeByIdAndRev(obj.get("_id").getAsString(), obj.get("_rev").getAsString()); } else { - delete = couchClient.remove(json); + delete = couchClient.removeByIdAndRev(json.getAsJsonObject().get("_id").getAsString(), + json.getAsJsonObject().get("_rev").getAsString()); } return delete; } private Object getElement(String id) { Object response; - response = couchClient.get(id); + response = couchClient.get(id).getResult(); return response; } } diff --git a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java deleted file mode 100644 index 7ae1c4999e0..00000000000 --- a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.couchdb; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.lightcouch.Changes; -import org.lightcouch.ChangesResult.Row; -import org.lightcouch.CouchDbContext; -import org.lightcouch.CouchDbInfo; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class CouchDbChangesetTrackerTest { - - @Mock - private Changes changes; - @Mock - private CouchDbClientWrapper client; - @Mock - private CouchDbEndpoint endpoint; - @Mock - private CouchDbConsumer consumer; - @Mock - private CouchDbContext context; - @Mock - private CouchDbInfo info; - @Mock - private Row row3; - @Mock - private Row row2; - @Mock - private Row row1; - @Mock - private Exchange exchange1; - @Mock - private Exchange exchange2; - @Mock - private Exchange exchange3; - @Mock - private Processor processor; - - private CouchDbChangesetTracker tracker; - - @BeforeEach - public void before() { - when(endpoint.isUpdates()).thenReturn(true); - - when(client.getLatestUpdateSequence()).thenReturn("100"); - when(client.changes()).thenReturn(changes); - when(changes.continuousChanges()).thenReturn(changes); - when(changes.includeDocs(true)).thenReturn(changes); - when(changes.since(anyString())).thenReturn(changes); - when(changes.heartBeat(anyLong())).thenReturn(changes); - when(changes.style(ArgumentMatchers.isNull())).thenReturn(changes); - - when(row1.getSeq()).thenReturn("seq1"); - - when(row1.getId()).thenReturn("id1"); - - tracker = new CouchDbChangesetTracker(endpoint, consumer, client); - } - - @Test - void testExchangeCreatedWithCorrectProperties() throws Exception { - when(row2.getSeq()).thenReturn("seq2"); - when(row3.getSeq()).thenReturn("seq3"); - when(row2.getId()).thenReturn("id2"); - when(row3.getId()).thenReturn("id3"); - when(changes.hasNext()).thenReturn(true, true, true, false); - when(changes.next()).thenReturn(row1, row2, row3); - when(consumer.createExchange("seq1", "id1", null, false)).thenReturn(exchange1); - when(consumer.createExchange("seq2", "id2", null, false)).thenReturn(exchange2); - when(consumer.createExchange("seq3", "id3", null, false)).thenReturn(exchange3); - when(consumer.getProcessor()).thenReturn(processor); - - tracker.run(); - - verify(consumer).createExchange("seq1", "id1", null, false); - verify(processor).process(exchange1); - verify(consumer).createExchange("seq2", "id2", null, false); - verify(processor).process(exchange2); - verify(consumer).createExchange("seq3", "id3", null, false); - verify(processor).process(exchange3); - } - - @Test - void testProcessorInvoked() throws Exception { - when(changes.hasNext()).thenReturn(true, false); - when(changes.next()).thenReturn(row1); - when(consumer.getProcessor()).thenReturn(processor); - - tracker.run(); - - verify(consumer).createExchange("seq1", "id1", null, false); - verify(processor).process(ArgumentMatchers.isNull()); - } -} diff --git a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java index 4e9ffea65eb..593fd6c85c6 100644 --- a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java +++ b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java @@ -18,15 +18,17 @@ package org.apache.camel.component.couchdb; import java.util.UUID; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.ibm.cloud.cloudant.v1.model.Document; +import com.ibm.cloud.cloudant.v1.model.DocumentResult; +import com.ibm.cloud.sdk.core.http.Response; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.lightcouch.Response; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; @@ -35,8 +37,7 @@ import org.mockito.stubbing.Answer; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) public class CouchDbProducerTest { @@ -53,8 +54,8 @@ public class CouchDbProducerTest { @Mock private Message msg; - @Mock - private Response response; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Response<DocumentResult> response; private CouchDbProducer producer; @@ -77,14 +78,17 @@ public class CouchDbProducerTest { String id = UUID.randomUUID().toString(); String rev = UUID.randomUUID().toString(); - JsonObject doc = new JsonObject(); - doc.addProperty("_id", id); - doc.addProperty("_rev", rev); + Document doc = new Document.Builder() + .add("_rev", rev) + .id(id) + .build(); + DocumentResult documentResult = mock(DocumentResult.class, Answers.RETURNS_DEEP_STUBS); when(msg.getMandatoryBody()).thenReturn(doc); when(client.update(doc)).thenReturn(response); - when(response.getId()).thenReturn(id); - when(response.getRev()).thenReturn(rev); + when(response.getResult()).thenReturn(documentResult); + when(response.getResult().getId()).thenReturn(id); + when(response.getResult().getRev()).thenReturn(rev); producer.process(exchange); verify(msg).setHeader(CouchDbConstants.HEADER_DOC_ID, id); @@ -104,15 +108,18 @@ public class CouchDbProducerTest { String id = UUID.randomUUID().toString(); String rev = UUID.randomUUID().toString(); - JsonObject doc = new JsonObject(); - doc.addProperty("_id", id); - doc.addProperty("_rev", rev); + Document doc = new Document.Builder() + .id(id) + .add("_rev", rev) + .build(); + DocumentResult documentResult = mock(DocumentResult.class, Answers.RETURNS_DEEP_STUBS); when(msg.getHeader(CouchDbConstants.HEADER_METHOD, String.class)).thenReturn("DELETE"); when(msg.getMandatoryBody()).thenReturn(doc); - when(client.remove(doc)).thenReturn(response); - when(response.getId()).thenReturn(id); - when(response.getRev()).thenReturn(rev); + when(client.removeByIdAndRev(id, rev)).thenReturn(response); + when(response.getResult()).thenReturn(documentResult); + when(response.getResult().getId()).thenReturn(id); + when(response.getResult().getRev()).thenReturn(rev); producer.process(exchange); verify(msg).setHeader(CouchDbConstants.HEADER_DOC_ID, id); @@ -142,12 +149,17 @@ public class CouchDbProducerTest { @Override public Response answer(InvocationOnMock invocation) { - assertTrue(invocation.getArguments()[0] instanceof JsonElement, - invocation.getArguments()[0].getClass() + " but wanted " + JsonElement.class); - return new Response(); + assertTrue(invocation.getArguments()[0] instanceof Document, + invocation.getArguments()[0].getClass() + " but wanted " + Document.class); + + DocumentResult documentResult = mock(DocumentResult.class); + Response response = mock(Response.class); + when(response.getResult()).thenReturn(documentResult); + + return response; } }); producer.process(exchange); - verify(client).save(any(JsonObject.class)); + verify(client).save(any(Document.class)); } } diff --git a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java index 9307f2ec34a..0f45bbcd4d6 100644 --- a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java +++ b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java @@ -29,20 +29,13 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.couchdb.CouchDbConstants; import org.apache.camel.component.couchdb.CouchDbOperations; import org.apache.camel.component.mock.MockEndpoint; -import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; -import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.*; @TestInstance(Lifecycle.PER_CLASS) public class CouchDbCrudIT extends CouchDbTestSupport { @@ -103,6 +96,7 @@ public class CouchDbCrudIT extends CouchDbTestSupport { // Creating a document should trigger an update notification mockUpdateNotifications.expectedHeaderReceived(CouchDbConstants.HEADER_METHOD, "UPDATE"); mockUpdateNotifications.expectedMessageCount(1); + createExchange = template.request(couchDbIn, e -> e.getMessage().setBody(testDocument)); assertNotNull(getDocumentId(createExchange)); diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java index 2084b5d278f..8d68ecdab61 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java @@ -134,6 +134,42 @@ public interface CouchDbEndpointBuilderFactory { doSetProperty("heartbeat", heartbeat); return this; } + /** + * Gets the maximum number of messages as a limit to poll at each + * polling. Gets the maximum number of messages as a limit to poll at + * each polling. The default value is 10. Use 0 or a negative number to + * set it as unlimited. + * + * The option is a: <code>int</code> type. + * + * Default: 10 + * Group: consumer + * + * @param maxMessagesPerPoll the value to set + * @return the dsl builder + */ + default CouchDbEndpointConsumerBuilder maxMessagesPerPoll(int maxMessagesPerPoll) { + doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll); + return this; + } + /** + * Gets the maximum number of messages as a limit to poll at each + * polling. Gets the maximum number of messages as a limit to poll at + * each polling. The default value is 10. Use 0 or a negative number to + * set it as unlimited. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 10 + * Group: consumer + * + * @param maxMessagesPerPoll the value to set + * @return the dsl builder + */ + default CouchDbEndpointConsumerBuilder maxMessagesPerPoll(String maxMessagesPerPoll) { + doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll); + return this; + } /** * Specifies how many revisions are returned in the changes array. The * default, main_only, will only return the current winning revision; diff --git a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt index c85f495ac98..339623f21db 100644 --- a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt +++ b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt @@ -125,6 +125,24 @@ public class CouchdbUriDsl( it.property("heartbeat", heartbeat) } + /** + * Gets the maximum number of messages as a limit to poll at each polling. Gets the maximum number + * of messages as a limit to poll at each polling. The default value is 10. Use 0 or a negative + * number to set it as unlimited. + */ + public fun maxMessagesPerPoll(maxMessagesPerPoll: String) { + it.property("maxMessagesPerPoll", maxMessagesPerPoll) + } + + /** + * Gets the maximum number of messages as a limit to poll at each polling. Gets the maximum number + * of messages as a limit to poll at each polling. The default value is 10. Use 0 or a negative + * number to set it as unlimited. + */ + public fun maxMessagesPerPoll(maxMessagesPerPoll: Int) { + it.property("maxMessagesPerPoll", maxMessagesPerPoll.toString()) + } + /** * Specifies how many revisions are returned in the changes array. The default, main_only, will * only return the current winning revision; all_docs will return all leaf revisions (including