This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b1f3e412b87 CAMEL-19470 camel-mongodb : add option fullDocument for 
changeStreams... (#10436)
b1f3e412b87 is described below

commit b1f3e412b87f4b5fe9979d0573ae3df9ac1672a8
Author: GarridoKevin <137059017+garridoke...@users.noreply.github.com>
AuthorDate: Wed Jun 21 13:33:22 2023 +0200

    CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams... 
(#10436)
    
    * CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams 
consumer.
    
    * formatting
    
    * fixes for camel-4
    
    * corrections after remarks
    
    * corrections after remarks
---
 .../mongodb/MongoDbEndpointConfigurer.java         |  6 +++
 .../mongodb/MongoDbEndpointUriFactory.java         |  3 +-
 .../apache/camel/component/mongodb/mongodb.json    | 21 +++++-----
 .../mongodb/MongoDbChangeStreamsConsumer.java      |  4 +-
 .../mongodb/MongoDbChangeStreamsThread.java        |  5 ++-
 .../camel/component/mongodb/MongoDbEndpoint.java   | 15 ++++++++
 .../MongoDbChangeStreamsConsumerIT.java            | 45 +++++++++++++++++++---
 .../src/test/resources/mongodb.test.properties     |  1 +
 8 files changed, 80 insertions(+), 20 deletions(-)

diff --git 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
index a1ad1303fd9..04752c47e85 100644
--- 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
+++ 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
@@ -40,6 +40,8 @@ public class MongoDbEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "exceptionHandler": 
target.setExceptionHandler(property(camelContext, 
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": 
target.setExchangePattern(property(camelContext, 
org.apache.camel.ExchangePattern.class, value)); return true;
+        case "fulldocument":
+        case "fullDocument": target.setFullDocument(property(camelContext, 
java.lang.String.class, value)); return true;
         case "hosts": target.setHosts(property(camelContext, 
java.lang.String.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
@@ -96,6 +98,8 @@ public class MongoDbEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "exceptionHandler": return 
org.apache.camel.spi.ExceptionHandler.class;
         case "exchangepattern":
         case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+        case "fulldocument":
+        case "fullDocument": return java.lang.String.class;
         case "hosts": return java.lang.String.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
@@ -153,6 +157,8 @@ public class MongoDbEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
+        case "fulldocument":
+        case "fullDocument": return target.getFullDocument();
         case "hosts": return target.getHosts();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
diff --git 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
index 7cf54650e8a..6cd0e20f744 100644
--- 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
+++ 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class MongoDbEndpointUriFactory extends 
org.apache.camel.support.componen
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(29);
+        Set<String> props = new HashSet<>(30);
         props.add("authSource");
         props.add("bridgeErrorHandler");
         props.add("collection");
@@ -34,6 +34,7 @@ public class MongoDbEndpointUriFactory extends 
org.apache.camel.support.componen
         props.add("dynamicity");
         props.add("exceptionHandler");
         props.add("exchangePattern");
+        props.add("fullDocument");
         props.add("hosts");
         props.add("lazyStartProducer");
         props.add("mongoConnection");
diff --git 
a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
 
b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
index 681b813fbe3..1fdee0797f5 100644
--- 
a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
+++ 
b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
@@ -73,15 +73,16 @@
     "readPreference": { "index": 16, "kind": "parameter", "displayName": "Read 
Preference", "group": "advanced", "label": "advanced", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "PRIMARY", 
"PRIMARY_PREFERRED", "SECONDARY", "SECONDARY_PREFERRED", "NEAREST" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"PRIMARY", "description": "Configure how MongoDB clients route read operations 
to the members of a replica set. Possible val [...]
     "writeConcern": { "index": 17, "kind": "parameter", "displayName": "Write 
Concern", "group": "advanced", "label": "advanced", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "ACKNOWLEDGED", "W1", "W2", 
"W3", "UNACKNOWLEDGED", "JOURNALED", "MAJORITY" ], "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "ACKNOWLEDGED", 
"description": "Configure the connection bean with the level of acknowledgment 
requested from MongoDB for write op [...]
     "writeResultAsHeader": { "index": 18, "kind": "parameter", "displayName": 
"Write Result As Header", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": "In 
write operations, it determines whether instead of returning WriteResult as the 
body of the OUT message, we transfer the IN message to the OUT and attach the 
WriteResult as a header." },
-    "streamFilter": { "index": 19, "kind": "parameter", "displayName": "Stream 
Filter", "group": "changeStream", "label": "consumer,changeStream", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Filter condition for 
change streams consumer." },
-    "authSource": { "index": 20, "kind": "parameter", "displayName": "Auth 
Source", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "The database name associated with the 
user's credentials." },
-    "password": { "index": 21, "kind": "parameter", "displayName": "Password", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "User password for mongodb connection" },
-    "username": { "index": 22, "kind": "parameter", "displayName": "Username", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Username for mongodb connection" },
-    "persistentId": { "index": 23, "kind": "parameter", "displayName": 
"Persistent Id", "group": "tail", "label": "consumer,tail", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "One tail tracking 
collection can host many trackers for several tailable consumers. To keep them 
separate, each tracker should have its own unique persistentId." },
-    "persistentTailTracking": { "index": 24, "kind": "parameter", 
"displayName": "Persistent Tail Tracking", "group": "tail", "label": 
"consumer,tail", "required": false, "type": "boolean", "javaType": "boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Enable persistent tail tracking, which is a mechanism to 
keep track of the last consumed message across system restarts. The next time 
the system is up, the endpoint will recover the [...]
-    "tailTrackCollection": { "index": 25, "kind": "parameter", "displayName": 
"Tail Track Collection", "group": "tail", "label": "consumer,tail", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Collection where tail 
tracking information will be persisted. If not specified, 
MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default." },
-    "tailTrackDb": { "index": 26, "kind": "parameter", "displayName": "Tail 
Track Db", "group": "tail", "label": "consumer,tail", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Indicates what database 
the tail tracking mechanism will persist to. If not specified, the current 
database will be picked by default. Dynamicity will not be taken into account 
even if enabled, i.e. the tail tracking da [...]
-    "tailTrackField": { "index": 27, "kind": "parameter", "displayName": "Tail 
Track Field", "group": "tail", "label": "consumer,tail", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Field where the last 
tracked value will be placed. If not specified, 
MongoDbTailTrackingConfig#DEFAULT_FIELD will be used by default." },
-    "tailTrackIncreasingField": { "index": 28, "kind": "parameter", 
"displayName": "Tail Track Increasing Field", "group": "tail", "label": 
"consumer,tail", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Correlation field in the incoming record which is of increasing 
nature and will be used to position the tailing cursor every time it is 
generated. The cursor will be (re)created with a query o [...]
+    "fullDocument": { "index": 19, "kind": "parameter", "displayName": "Full 
Document", "group": "changeStream", "label": "consumer,changeStream", 
"required": false, "type": "object", "javaType": 
"com.mongodb.client.model.changestream.FullDocument", "enum": [ "default", 
"updateLookup", "required", "whenAvailable" ], "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "default", "description": 
"Specifies whether changeStream consumer include a copy of the full docume [...]
+    "streamFilter": { "index": 20, "kind": "parameter", "displayName": "Stream 
Filter", "group": "changeStream", "label": "consumer,changeStream", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Filter condition for 
change streams consumer." },
+    "authSource": { "index": 21, "kind": "parameter", "displayName": "Auth 
Source", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "description": "The database name associated with the 
user's credentials." },
+    "password": { "index": 22, "kind": "parameter", "displayName": "Password", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "User password for mongodb connection" },
+    "username": { "index": 23, "kind": "parameter", "displayName": "Username", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Username for mongodb connection" },
+    "persistentId": { "index": 24, "kind": "parameter", "displayName": 
"Persistent Id", "group": "tail", "label": "consumer,tail", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "One tail tracking 
collection can host many trackers for several tailable consumers. To keep them 
separate, each tracker should have its own unique persistentId." },
+    "persistentTailTracking": { "index": 25, "kind": "parameter", 
"displayName": "Persistent Tail Tracking", "group": "tail", "label": 
"consumer,tail", "required": false, "type": "boolean", "javaType": "boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Enable persistent tail tracking, which is a mechanism to 
keep track of the last consumed message across system restarts. The next time 
the system is up, the endpoint will recover the [...]
+    "tailTrackCollection": { "index": 26, "kind": "parameter", "displayName": 
"Tail Track Collection", "group": "tail", "label": "consumer,tail", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Collection where tail 
tracking information will be persisted. If not specified, 
MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default." },
+    "tailTrackDb": { "index": 27, "kind": "parameter", "displayName": "Tail 
Track Db", "group": "tail", "label": "consumer,tail", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Indicates what database 
the tail tracking mechanism will persist to. If not specified, the current 
database will be picked by default. Dynamicity will not be taken into account 
even if enabled, i.e. the tail tracking da [...]
+    "tailTrackField": { "index": 28, "kind": "parameter", "displayName": "Tail 
Track Field", "group": "tail", "label": "consumer,tail", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Field where the last 
tracked value will be placed. If not specified, 
MongoDbTailTrackingConfig#DEFAULT_FIELD will be used by default." },
+    "tailTrackIncreasingField": { "index": 29, "kind": "parameter", 
"displayName": "Tail Track Increasing Field", "group": "tail", "label": 
"consumer,tail", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Correlation field in the incoming record which is of increasing 
nature and will be used to position the tailing cursor every time it is 
generated. The cursor will be (re)created with a query o [...]
   }
 }
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
index b76b8ef889b..044d370887a 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
@@ -62,8 +62,8 @@ public class MongoDbChangeStreamsConsumer extends 
DefaultConsumer {
             bsonFilter = singletonList(BsonDocument.parse(streamFilter));
         }
 
-        executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
endpoint.getEndpointUri(),
-                1);
+        executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+                endpoint.getEndpointUri(), 1);
         changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this, 
bsonFilter);
         changeStreamsThread.init();
         executor.execute(changeStreamsThread);
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index a772bfee11f..4be722baa59 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -35,7 +35,8 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
     private List<BsonDocument> bsonFilter;
     private BsonDocument resumeToken;
 
-    MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, 
MongoDbChangeStreamsConsumer consumer, List<BsonDocument> bsonFilter) {
+    MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, 
MongoDbChangeStreamsConsumer consumer,
+                               List<BsonDocument> bsonFilter) {
         super(endpoint, consumer);
         this.bsonFilter = bsonFilter;
     }
@@ -51,6 +52,8 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
                 ? dbCol.watch(bsonFilter)
                 : dbCol.watch();
 
+        iterable.fullDocument(endpoint.getFullDocument());
+
         if (resumeToken != null) {
             iterable = iterable.resumeAfter(resumeToken);
         }
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 5ee082c8f74..46386636a22 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -30,6 +30,7 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.changestream.FullDocument;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -107,6 +108,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private String tailTrackIncreasingField;
     @UriParam(label = "consumer,changeStream")
     private String streamFilter;
+    @UriParam(label = "consumer,changeStream", enums = 
"default,updateLookup,required,whenAvailable", defaultValue = "default")
+    private FullDocument fullDocument = FullDocument.DEFAULT;
     // persistent tail tracking
     @UriParam(label = "consumer,tail")
     private boolean persistentTailTracking;
@@ -662,6 +665,18 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         this.streamFilter = streamFilter;
     }
 
+    public FullDocument getFullDocument() {
+        return fullDocument;
+    }
+
+    /**
+     * Specifies whether changeStream consumer include a copy of the full 
document when modified by update operations.
+     * Possible values are default, updateLookup, required and whenAvailable.
+     */
+    public void setFullDocument(FullDocument fullDocument) {
+        this.fullDocument = fullDocument;
+    }
+
     /**
      * Configure the connection bean with the level of acknowledgment 
requested from MongoDB for write operations to a
      * standalone mongod, replicaset or cluster. Possible values are 
ACKNOWLEDGED, W1, W2, W3, UNACKNOWLEDGED, JOURNALED
diff --git 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
index 1be2b816a8f..591ba00e3a9 100644
--- 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
+++ 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
@@ -16,8 +16,7 @@
  */
 package org.apache.camel.component.mongodb.integration;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
 
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.model.CreateCollectionOptions;
@@ -45,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport 
implements ConfigurableRoute {
 
     private MongoCollection<Document> mongoCollection;
-    private ExecutorService executorService = 
Executors.newSingleThreadExecutor();
 
     /*
      * NOTE: in the case of this test, we *DO* want to recreate everything 
after the test has executed, so that when
@@ -76,7 +74,7 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport imp
         String consumerRouteId = "simpleConsumer";
         context.getRouteController().startRoute(consumerRouteId);
 
-        Executors.newSingleThreadExecutor().submit(this::singleInsert).get();
+        CompletableFuture.runAsync(this::singleInsert);
 
         mock.assertIsSatisfied();
         context.getRouteController().stopRoute(consumerRouteId);
@@ -98,7 +96,7 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport imp
         String consumerRouteId = "filterConsumer";
         context.getRouteController().startRoute(consumerRouteId);
 
-        executorService.submit(this::singleInsert).get();
+        CompletableFuture.runAsync(this::singleInsert);
 
         mock.assertIsSatisfied();
 
@@ -109,6 +107,36 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport imp
 
     @Order(3)
     @Test
+    public void updateWithFullDocumentTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = contextExtension.getMockEndpoint("mock:test");
+        mock.expectedMessageCount(1);
+
+        String consumerRouteId = "updateWithFullDocumentConsumer";
+        context.getRouteController().startRoute(consumerRouteId);
+
+        ObjectId objectId1 = new ObjectId();
+        ObjectId objectId2 = new ObjectId();
+        CompletableFuture.runAsync(() -> {
+            mongoCollection.insertOne(new Document("_id", 
objectId1).append("property", "random value"));
+            mongoCollection.insertOne(new Document("_id", 
objectId2).append("property", "another value"));
+            mongoCollection.updateOne(new Document("_id", objectId1),
+                    new Document("$set", new Document("property", 
"filterOk")));
+            mongoCollection.updateOne(new Document("_id", objectId2),
+                    new Document("$set", new Document("property", 
"filterNotOk")));
+        });
+
+        mock.assertIsSatisfied();
+
+        Exchange updateExchange = mock.getExchanges().get(0);
+        Document actualDocument = 
updateExchange.getIn().getBody(Document.class);
+        assertEquals("filterOk", actualDocument.get("property"));
+        assertEquals(objectId1, updateExchange.getIn().getHeader("_id"));
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    @Order(4)
+    @Test
     public void operationTypeAndIdHeaderTest() throws Exception {
         Assumptions.assumeTrue(0 == mongoCollection.countDocuments(), "The 
collection should have no documents");
         MockEndpoint mock = contextExtension.getMockEndpoint("mock:test");
@@ -120,7 +148,7 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport imp
         context.getRouteController().startRoute(consumerRouteId);
 
         ObjectId objectId = new ObjectId();
-        Executors.newSingleThreadExecutor().submit(() -> 
insertAndDelete(objectId)).get();
+        CompletableFuture.runAsync(() -> insertAndDelete(objectId));
 
         mock.assertIsSatisfied();
 
@@ -160,6 +188,11 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport imp
                         .id("filterConsumer")
                         .autoStartup(false)
                         .to("mock:test");
+
+                
from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&streamFilter={{filter.update}}&fullDocument=updateLookup")
+                        .id("updateWithFullDocumentConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
             }
         });
     }
diff --git 
a/components/camel-mongodb/src/test/resources/mongodb.test.properties 
b/components/camel-mongodb/src/test/resources/mongodb.test.properties
index a80317cf82b..c1e81f65950 100644
--- a/components/camel-mongodb/src/test/resources/mongodb.test.properties
+++ b/components/camel-mongodb/src/test/resources/mongodb.test.properties
@@ -21,3 +21,4 @@ mongodb.testCollection=camelTest
 mongodb.cappedTestCollection=camelTestCapped
 
 myStreamFilter = { '$match':{'$or':[{'fullDocument.string': 'value2'}]} }
+filter.update={'$match':{'$and':[{'operationType': 'update'}, 
{'fullDocument.property': 'filterOk'}]} }

Reply via email to