This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new 92b299e61af CAMEL-19470 camel-mongodb : add option fullDocument for
changeStreams… (#10434)
92b299e61af is described below
commit 92b299e61af44a278ab25a2bd107762d54f4ecf6
Author: GarridoKevin <[email protected]>
AuthorDate: Mon Jun 19 18:32:07 2023 +0200
CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams…
(#10434)
* CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams
consumer.
* formatting
---
.../mongodb/MongoDbEndpointConfigurer.java | 6 ++++
.../mongodb/MongoDbEndpointUriFactory.java | 3 +-
.../apache/camel/component/mongodb/mongodb.json | 1 +
.../mongodb/MongoDbChangeStreamsConsumer.java | 9 +++--
.../mongodb/MongoDbChangeStreamsThread.java | 10 ++++--
.../camel/component/mongodb/MongoDbEndpoint.java | 14 ++++++++
.../MongoDbChangeStreamsConsumerIT.java | 38 ++++++++++++++++++++++
.../src/test/resources/mongodb.test.properties | 1 +
8 files changed, 75 insertions(+), 7 deletions(-)
diff --git
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
index a1ad1303fd9..04752c47e85 100644
---
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
+++
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java
@@ -40,6 +40,8 @@ public class MongoDbEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "exceptionHandler":
target.setExceptionHandler(property(camelContext,
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
case "exchangepattern":
case "exchangePattern":
target.setExchangePattern(property(camelContext,
org.apache.camel.ExchangePattern.class, value)); return true;
+ case "fulldocument":
+ case "fullDocument": target.setFullDocument(property(camelContext,
java.lang.String.class, value)); return true;
case "hosts": target.setHosts(property(camelContext,
java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
@@ -96,6 +98,8 @@ public class MongoDbEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "exceptionHandler": return
org.apache.camel.spi.ExceptionHandler.class;
case "exchangepattern":
case "exchangePattern": return org.apache.camel.ExchangePattern.class;
+ case "fulldocument":
+ case "fullDocument": return java.lang.String.class;
case "hosts": return java.lang.String.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
@@ -153,6 +157,8 @@ public class MongoDbEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "exceptionHandler": return target.getExceptionHandler();
case "exchangepattern":
case "exchangePattern": return target.getExchangePattern();
+ case "fulldocument":
+ case "fullDocument": return target.getFullDocument();
case "hosts": return target.getHosts();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
diff --git
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
index 7cf54650e8a..6cd0e20f744 100644
---
a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
+++
b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class MongoDbEndpointUriFactory extends
org.apache.camel.support.componen
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(29);
+ Set<String> props = new HashSet<>(30);
props.add("authSource");
props.add("bridgeErrorHandler");
props.add("collection");
@@ -34,6 +34,7 @@ public class MongoDbEndpointUriFactory extends
org.apache.camel.support.componen
props.add("dynamicity");
props.add("exceptionHandler");
props.add("exchangePattern");
+ props.add("fullDocument");
props.add("hosts");
props.add("lazyStartProducer");
props.add("mongoConnection");
diff --git
a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
index 0f0c9f99fdf..8b1718b8ac5 100644
---
a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
+++
b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json
@@ -73,6 +73,7 @@
"readPreference": { "kind": "parameter", "displayName": "Read Preference",
"group": "advanced", "label": "advanced", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "PRIMARY", "PRIMARY_PREFERRED",
"SECONDARY", "SECONDARY_PREFERRED", "NEAREST" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "PRIMARY", "description":
"Configure how MongoDB clients route read operations to the members of a
replica set. Possible values are PRIMA [...]
"writeConcern": { "kind": "parameter", "displayName": "Write Concern",
"group": "advanced", "label": "advanced", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "ACKNOWLEDGED", "W1", "W2", "W3",
"UNACKNOWLEDGED", "JOURNALED", "MAJORITY" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "ACKNOWLEDGED", "description":
"Configure the connection bean with the level of acknowledgment requested from
MongoDB for write operations to a [...]
"writeResultAsHeader": { "kind": "parameter", "displayName": "Write Result
As Header", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "description": "In write
operations, it determines whether instead of returning WriteResult as the body
of the OUT message, we transfer the IN message to the OUT and attach the
WriteResult as a header." },
+ "fullDocument": { "kind": "parameter", "displayName": "Full Document",
"group": "changeStream", "label": "consumer,changeStream", "required": false,
"type": "string", "javaType": "java.lang.String", "enum": [ "default",
"updateLookup" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "default", "description": "Specifies whether changeStream
consumer include a copy of the full document when modified by update
operations. Possible values are default and updat [...]
"streamFilter": { "kind": "parameter", "displayName": "Stream Filter",
"group": "changeStream", "label": "consumer,changeStream", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Filter condition for
change streams consumer." },
"authSource": { "kind": "parameter", "displayName": "Auth Source",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "The database name associated with the user's
credentials." },
"password": { "kind": "parameter", "displayName": "Password", "group":
"security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "User password for mongodb connection" },
diff --git
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
index b76b8ef889b..66625cb1758 100644
---
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
+++
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.mongodb;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import com.mongodb.client.model.changestream.FullDocument;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
@@ -62,9 +63,11 @@ public class MongoDbChangeStreamsConsumer extends
DefaultConsumer {
bsonFilter = singletonList(BsonDocument.parse(streamFilter));
}
- executor =
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
endpoint.getEndpointUri(),
- 1);
- changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this,
bsonFilter);
+ FullDocument fullDocumentOption =
FullDocument.fromString(endpoint.getFullDocument());
+
+ executor =
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+ endpoint.getEndpointUri(), 1);
+ changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this,
bsonFilter, fullDocumentOption);
changeStreamsThread.init();
executor.execute(changeStreamsThread);
}
diff --git
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 71847976047..a948dadc52f 100644
---
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -22,6 +22,7 @@ import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.OperationType;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -34,10 +35,13 @@ import static
org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
private List<BsonDocument> bsonFilter;
private BsonDocument resumeToken;
+ private FullDocument fullDocument;
- MongoDbChangeStreamsThread(MongoDbEndpoint endpoint,
MongoDbChangeStreamsConsumer consumer, List<BsonDocument> bsonFilter) {
+ MongoDbChangeStreamsThread(MongoDbEndpoint endpoint,
MongoDbChangeStreamsConsumer consumer,
+ List<BsonDocument> bsonFilter, FullDocument
fullDocument) {
super(endpoint, consumer);
this.bsonFilter = bsonFilter;
+ this.fullDocument = fullDocument;
}
@Override
@@ -48,8 +52,8 @@ class MongoDbChangeStreamsThread extends
MongoAbstractConsumerThread {
@Override
protected MongoCursor initializeCursor() {
ChangeStreamIterable<Document> iterable = bsonFilter != null
- ? dbCol.watch(bsonFilter)
- : dbCol.watch();
+ ? dbCol.watch(bsonFilter).fullDocument(fullDocument)
+ : dbCol.watch().fullDocument(fullDocument);
if (resumeToken != null) {
iterable = iterable.resumeAfter(resumeToken);
diff --git
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 99c8dced9e7..b4fc89b8f47 100644
---
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -107,6 +107,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
private String tailTrackIncreasingField;
@UriParam(label = "consumer,changeStream")
private String streamFilter;
+ @UriParam(label = "consumer,changeStream", enums = "default,updateLookup",
defaultValue = "default")
+ private String fullDocument = "default";
// persistent tail tracking
@UriParam(label = "consumer,tail")
private boolean persistentTailTracking;
@@ -662,6 +664,18 @@ public class MongoDbEndpoint extends DefaultEndpoint {
this.streamFilter = streamFilter;
}
+ public String getFullDocument() {
+ return fullDocument;
+ }
+
+ /**
+ * Specifies whether changeStream consumer include a copy of the full
document when modified by update operations.
+ * Possible values are default and updateLookup.
+ */
+ public void setFullDocument(String fullDocument) {
+ this.fullDocument = fullDocument;
+ }
+
/**
* Configure the connection bean with the level of acknowledgment
requested from MongoDB for write operations to a
* standalone mongod, replicaset or cluster. Possible values are
ACKNOWLEDGED, W1, W2, W3, UNACKNOWLEDGED, JOURNALED
diff --git
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
index f0f3b47aae6..8278108bb19 100644
---
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
+++
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
@@ -95,6 +95,39 @@ public class MongoDbChangeStreamsConsumerIT extends
AbstractMongoDbITSupport {
context.getRouteController().stopRoute(consumerRouteId);
}
+ @Test
+ public void updateWithFullDocumentTest() throws Exception {
+ assertEquals(0, mongoCollection.countDocuments());
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(1);
+
+ String consumerRouteId = "updateWithFullDocumentConsumer";
+ addTestRoutes();
+ context.getRouteController().startRoute(consumerRouteId);
+
+ ObjectId objectId1 = new ObjectId();
+ ObjectId objectId2 = new ObjectId();
+ Thread t = new Thread(() -> {
+ mongoCollection.insertOne(new Document("_id",
objectId1).append("property", "random value"));
+ mongoCollection.insertOne(new Document("_id",
objectId2).append("property", "another value"));
+ mongoCollection.updateOne(new Document("_id", objectId1),
+ new Document("$set", new Document("property",
"filterOk")));
+ mongoCollection.updateOne(new Document("_id", objectId2),
+ new Document("$set", new Document("property",
"filterNotOk")));
+ });
+
+ t.start();
+ t.join();
+
+ mock.assertIsSatisfied();
+
+ Exchange updateExchange = mock.getExchanges().get(0);
+ Document actualDocument =
updateExchange.getIn().getBody(Document.class);
+ assertEquals("filterOk", actualDocument.get("property"));
+ assertEquals(objectId1, updateExchange.getIn().getHeader("_id"));
+ context.getRouteController().stopRoute(consumerRouteId);
+ }
+
@Test
public void operationTypeAndIdHeaderTest() throws Exception {
assertEquals(0, mongoCollection.countDocuments());
@@ -145,6 +178,11 @@ public class MongoDbChangeStreamsConsumerIT extends
AbstractMongoDbITSupport {
.id("filterConsumer")
.autoStartup(false)
.to("mock:test");
+
+
from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&streamFilter={{filter.update}}&fullDocument=updateLookup")
+ .id("updateWithFullDocumentConsumer")
+ .autoStartup(false)
+ .to("mock:test");
}
});
}
diff --git
a/components/camel-mongodb/src/test/resources/mongodb.test.properties
b/components/camel-mongodb/src/test/resources/mongodb.test.properties
index a80317cf82b..c1e81f65950 100644
--- a/components/camel-mongodb/src/test/resources/mongodb.test.properties
+++ b/components/camel-mongodb/src/test/resources/mongodb.test.properties
@@ -21,3 +21,4 @@ mongodb.testCollection=camelTest
mongodb.cappedTestCollection=camelTestCapped
myStreamFilter = { '$match':{'$or':[{'fullDocument.string': 'value2'}]} }
+filter.update={'$match':{'$and':[{'operationType': 'update'},
{'fullDocument.property': 'filterOk'}]} }