This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 92b299e61af CAMEL-19470 camel-mongodb : add option fullDocument for 
changeStreams… (#10434)
92b299e61af is described below

commit 92b299e61af44a278ab25a2bd107762d54f4ecf6
Author: GarridoKevin <[email protected]>
AuthorDate: Mon Jun 19 18:32:07 2023 +0200

    CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams… 
(#10434)
    
    * CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams 
consumer.
    
    * formatting
---
 .../mongodb/MongoDbEndpointConfigurer.java         |  6 ++++
 .../mongodb/MongoDbEndpointUriFactory.java         |  3 +-
 .../apache/camel/component/mongodb/mongodb.json    |  1 +
 .../mongodb/MongoDbChangeStreamsConsumer.java      |  9 +++--
 .../mongodb/MongoDbChangeStreamsThread.java        | 10 ++++--
 .../camel/component/mongodb/MongoDbEndpoint.java   | 14 ++++++++
 .../MongoDbChangeStreamsConsumerIT.java            | 38 ++++++++++++++++++++++
 .../src/test/resources/mongodb.test.properties     |  1 +
 8 files changed, 75 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
index a1ad1303fd9..04752c47e85 100644
--- 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
+++ 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
@@ -40,6 +40,8 @@ public class MongoDbEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "exceptionHandler": 
target.setExceptionHandler(property(camelContext, 
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": 
target.setExchangePattern(property(camelContext, 
org.apache.camel.ExchangePattern.class, value)); return true;
+        case "fulldocument":
+        case "fullDocument": target.setFullDocument(property(camelContext, 
java.lang.String.class, value)); return true;
         case "hosts": target.setHosts(property(camelContext, 
java.lang.String.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
@@ -96,6 +98,8 @@ public class MongoDbEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "exceptionHandler": return 
org.apache.camel.spi.ExceptionHandler.class;
         case "exchangepattern":
         case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+        case "fulldocument":
+        case "fullDocument": return java.lang.String.class;
         case "hosts": return java.lang.String.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
@@ -153,6 +157,8 @@ public class MongoDbEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
+        case "fulldocument":
+        case "fullDocument": return target.getFullDocument();
         case "hosts": return target.getHosts();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
diff --git 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
index 7cf54650e8a..6cd0e20f744 100644
--- 
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
+++ 
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class MongoDbEndpointUriFactory extends 
org.apache.camel.support.componen
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(29);
+        Set<String> props = new HashSet<>(30);
         props.add("authSource");
         props.add("bridgeErrorHandler");
         props.add("collection");
@@ -34,6 +34,7 @@ public class MongoDbEndpointUriFactory extends 
org.apache.camel.support.componen
         props.add("dynamicity");
         props.add("exceptionHandler");
         props.add("exchangePattern");
+        props.add("fullDocument");
         props.add("hosts");
         props.add("lazyStartProducer");
         props.add("mongoConnection");
diff --git 
a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
 
b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
index 0f0c9f99fdf..8b1718b8ac5 100644
--- 
a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
+++ 
b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
@@ -73,6 +73,7 @@
     "readPreference": { "kind": "parameter", "displayName": "Read Preference", 
"group": "advanced", "label": "advanced", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "PRIMARY", "PRIMARY_PREFERRED", 
"SECONDARY", "SECONDARY_PREFERRED", "NEAREST" ], "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "PRIMARY", "description": 
"Configure how MongoDB clients route read operations to the members of a 
replica set. Possible values are PRIMA [...]
     "writeConcern": { "kind": "parameter", "displayName": "Write Concern", 
"group": "advanced", "label": "advanced", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "ACKNOWLEDGED", "W1", "W2", "W3", 
"UNACKNOWLEDGED", "JOURNALED", "MAJORITY" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ACKNOWLEDGED", "description": 
"Configure the connection bean with the level of acknowledgment requested from 
MongoDB for write operations to a [...]
     "writeResultAsHeader": { "kind": "parameter", "displayName": "Write Result 
As Header", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "description": "In write 
operations, it determines whether instead of returning WriteResult as the body 
of the OUT message, we transfer the IN message to the OUT and attach the 
WriteResult as a header." },
+    "fullDocument": { "kind": "parameter", "displayName": "Full Document", 
"group": "changeStream", "label": "consumer,changeStream", "required": false, 
"type": "string", "javaType": "java.lang.String", "enum": [ "default", 
"updateLookup" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "default", "description": "Specifies whether changeStream 
consumer include a copy of the full document when modified by update 
operations. Possible values are default and updat [...]
     "streamFilter": { "kind": "parameter", "displayName": "Stream Filter", 
"group": "changeStream", "label": "consumer,changeStream", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Filter condition for 
change streams consumer." },
     "authSource": { "kind": "parameter", "displayName": "Auth Source", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "The database name associated with the user's 
credentials." },
     "password": { "kind": "parameter", "displayName": "Password", "group": 
"security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "User password for mongodb connection" },
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
index b76b8ef889b..66625cb1758 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.mongodb;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import com.mongodb.client.model.changestream.FullDocument;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
@@ -62,9 +63,11 @@ public class MongoDbChangeStreamsConsumer extends 
DefaultConsumer {
             bsonFilter = singletonList(BsonDocument.parse(streamFilter));
         }
 
-        executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
endpoint.getEndpointUri(),
-                1);
-        changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this, 
bsonFilter);
+        FullDocument fullDocumentOption = 
FullDocument.fromString(endpoint.getFullDocument());
+
+        executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+                endpoint.getEndpointUri(), 1);
+        changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this, 
bsonFilter, fullDocumentOption);
         changeStreamsThread.init();
         executor.execute(changeStreamsThread);
     }
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 71847976047..a948dadc52f 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -22,6 +22,7 @@ import com.mongodb.MongoException;
 import com.mongodb.client.ChangeStreamIterable;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.FullDocument;
 import com.mongodb.client.model.changestream.OperationType;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -34,10 +35,13 @@ import static 
org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
 class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
     private List<BsonDocument> bsonFilter;
     private BsonDocument resumeToken;
+    private FullDocument fullDocument;
 
-    MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, 
MongoDbChangeStreamsConsumer consumer, List<BsonDocument> bsonFilter) {
+    MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, 
MongoDbChangeStreamsConsumer consumer,
+                               List<BsonDocument> bsonFilter, FullDocument 
fullDocument) {
         super(endpoint, consumer);
         this.bsonFilter = bsonFilter;
+        this.fullDocument = fullDocument;
     }
 
     @Override
@@ -48,8 +52,8 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
     @Override
     protected MongoCursor initializeCursor() {
         ChangeStreamIterable<Document> iterable = bsonFilter != null
-                ? dbCol.watch(bsonFilter)
-                : dbCol.watch();
+                ? dbCol.watch(bsonFilter).fullDocument(fullDocument)
+                : dbCol.watch().fullDocument(fullDocument);
 
         if (resumeToken != null) {
             iterable = iterable.resumeAfter(resumeToken);
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 99c8dced9e7..b4fc89b8f47 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -107,6 +107,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private String tailTrackIncreasingField;
     @UriParam(label = "consumer,changeStream")
     private String streamFilter;
+    @UriParam(label = "consumer,changeStream", enums = "default,updateLookup", 
defaultValue = "default")
+    private String fullDocument = "default";
     // persistent tail tracking
     @UriParam(label = "consumer,tail")
     private boolean persistentTailTracking;
@@ -662,6 +664,18 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         this.streamFilter = streamFilter;
     }
 
+    public String getFullDocument() {
+        return fullDocument;
+    }
+
+    /**
+     * Specifies whether changeStream consumer include a copy of the full 
document when modified by update operations.
+     * Possible values are default and updateLookup.
+     */
+    public void setFullDocument(String fullDocument) {
+        this.fullDocument = fullDocument;
+    }
+
     /**
      * Configure the connection bean with the level of acknowledgment 
requested from MongoDB for write operations to a
      * standalone mongod, replicaset or cluster. Possible values are 
ACKNOWLEDGED, W1, W2, W3, UNACKNOWLEDGED, JOURNALED
diff --git 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
index f0f3b47aae6..8278108bb19 100644
--- 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
+++ 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
@@ -95,6 +95,39 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport {
         context.getRouteController().stopRoute(consumerRouteId);
     }
 
+    @Test
+    public void updateWithFullDocumentTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(1);
+
+        String consumerRouteId = "updateWithFullDocumentConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        ObjectId objectId1 = new ObjectId();
+        ObjectId objectId2 = new ObjectId();
+        Thread t = new Thread(() -> {
+            mongoCollection.insertOne(new Document("_id", 
objectId1).append("property", "random value"));
+            mongoCollection.insertOne(new Document("_id", 
objectId2).append("property", "another value"));
+            mongoCollection.updateOne(new Document("_id", objectId1),
+                    new Document("$set", new Document("property", 
"filterOk")));
+            mongoCollection.updateOne(new Document("_id", objectId2),
+                    new Document("$set", new Document("property", 
"filterNotOk")));
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+
+        Exchange updateExchange = mock.getExchanges().get(0);
+        Document actualDocument = 
updateExchange.getIn().getBody(Document.class);
+        assertEquals("filterOk", actualDocument.get("property"));
+        assertEquals(objectId1, updateExchange.getIn().getHeader("_id"));
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
     @Test
     public void operationTypeAndIdHeaderTest() throws Exception {
         assertEquals(0, mongoCollection.countDocuments());
@@ -145,6 +178,11 @@ public class MongoDbChangeStreamsConsumerIT extends 
AbstractMongoDbITSupport {
                         .id("filterConsumer")
                         .autoStartup(false)
                         .to("mock:test");
+
+                
from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&streamFilter={{filter.update}}&fullDocument=updateLookup")
+                        .id("updateWithFullDocumentConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
             }
         });
     }
diff --git 
a/components/camel-mongodb/src/test/resources/mongodb.test.properties 
b/components/camel-mongodb/src/test/resources/mongodb.test.properties
index a80317cf82b..c1e81f65950 100644
--- a/components/camel-mongodb/src/test/resources/mongodb.test.properties
+++ b/components/camel-mongodb/src/test/resources/mongodb.test.properties
@@ -21,3 +21,4 @@ mongodb.testCollection=camelTest
 mongodb.cappedTestCollection=camelTestCapped
 
 myStreamFilter = { '$match':{'$or':[{'fullDocument.string': 'value2'}]} }
+filter.update={'$match':{'$and':[{'operationType': 'update'}, 
{'fullDocument.property': 'filterOk'}]} }

Reply via email to