This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new b1f3e412b87 CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams... (#10436) b1f3e412b87 is described below commit b1f3e412b87f4b5fe9979d0573ae3df9ac1672a8 Author: GarridoKevin <137059017+garridoke...@users.noreply.github.com> AuthorDate: Wed Jun 21 13:33:22 2023 +0200 CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams... (#10436) * CAMEL-19470 camel-mongodb : add option fullDocument for changeStreams consumer. * formatting * fixes for camel-4 * corrections after remarks * corrections after remarks --- .../mongodb/MongoDbEndpointConfigurer.java | 6 +++ .../mongodb/MongoDbEndpointUriFactory.java | 3 +- .../apache/camel/component/mongodb/mongodb.json | 21 +++++----- .../mongodb/MongoDbChangeStreamsConsumer.java | 4 +- .../mongodb/MongoDbChangeStreamsThread.java | 5 ++- .../camel/component/mongodb/MongoDbEndpoint.java | 15 ++++++++ .../MongoDbChangeStreamsConsumerIT.java | 45 +++++++++++++++++++--- .../src/test/resources/mongodb.test.properties | 1 + 8 files changed, 80 insertions(+), 20 deletions(-) diff --git a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java index a1ad1303fd9..04752c47e85 100644 --- a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java +++ b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointConfigurer.java @@ -40,6 +40,8 @@ public class MongoDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true; case "exchangepattern": case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true; + case "fulldocument": + case "fullDocument": target.setFullDocument(property(camelContext, java.lang.String.class, value)); return true; case "hosts": target.setHosts(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; @@ -96,6 +98,8 @@ public class MongoDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "exceptionHandler": return org.apache.camel.spi.ExceptionHandler.class; case "exchangepattern": case "exchangePattern": return org.apache.camel.ExchangePattern.class; + case "fulldocument": + case "fullDocument": return java.lang.String.class; case "hosts": return java.lang.String.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; @@ -153,6 +157,8 @@ public class MongoDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "exceptionHandler": return target.getExceptionHandler(); case "exchangepattern": case "exchangePattern": return target.getExchangePattern(); + case "fulldocument": + case "fullDocument": return target.getFullDocument(); case "hosts": return target.getHosts(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); diff --git a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java index 7cf54650e8a..6cd0e20f744 100644 --- a/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java +++ b/components/camel-mongodb/src/generated/java/org/apache/camel/component/mongodb/MongoDbEndpointUriFactory.java @@ -21,7 +21,7 @@ public class MongoDbEndpointUriFactory extends org.apache.camel.support.componen private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(29); + Set<String> props = new HashSet<>(30); props.add("authSource"); props.add("bridgeErrorHandler"); props.add("collection"); @@ -34,6 +34,7 @@ public class MongoDbEndpointUriFactory extends org.apache.camel.support.componen props.add("dynamicity"); props.add("exceptionHandler"); props.add("exchangePattern"); + props.add("fullDocument"); props.add("hosts"); props.add("lazyStartProducer"); props.add("mongoConnection"); diff --git a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json index 681b813fbe3..1fdee0797f5 100644 --- a/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json +++ b/components/camel-mongodb/src/generated/resources/org/apache/camel/component/mongodb/mongodb.json @@ -73,15 +73,16 @@ "readPreference": { "index": 16, "kind": "parameter", "displayName": "Read Preference", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "PRIMARY", "PRIMARY_PREFERRED", "SECONDARY", "SECONDARY_PREFERRED", "NEAREST" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "PRIMARY", "description": "Configure how MongoDB clients route read operations to the members of a replica set. Possible val [...] "writeConcern": { "index": 17, "kind": "parameter", "displayName": "Write Concern", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "ACKNOWLEDGED", "W1", "W2", "W3", "UNACKNOWLEDGED", "JOURNALED", "MAJORITY" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ACKNOWLEDGED", "description": "Configure the connection bean with the level of acknowledgment requested from MongoDB for write op [...] "writeResultAsHeader": { "index": 18, "kind": "parameter", "displayName": "Write Result As Header", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "In write operations, it determines whether instead of returning WriteResult as the body of the OUT message, we transfer the IN message to the OUT and attach the WriteResult as a header." }, - "streamFilter": { "index": 19, "kind": "parameter", "displayName": "Stream Filter", "group": "changeStream", "label": "consumer,changeStream", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Filter condition for change streams consumer." }, - "authSource": { "index": 20, "kind": "parameter", "displayName": "Auth Source", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The database name associated with the user's credentials." }, - "password": { "index": 21, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "User password for mongodb connection" }, - "username": { "index": 22, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username for mongodb connection" }, - "persistentId": { "index": 23, "kind": "parameter", "displayName": "Persistent Id", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "One tail tracking collection can host many trackers for several tailable consumers. To keep them separate, each tracker should have its own unique persistentId." }, - "persistentTailTracking": { "index": 24, "kind": "parameter", "displayName": "Persistent Tail Tracking", "group": "tail", "label": "consumer,tail", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts. The next time the system is up, the endpoint will recover the [...] - "tailTrackCollection": { "index": 25, "kind": "parameter", "displayName": "Tail Track Collection", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Collection where tail tracking information will be persisted. If not specified, MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default." }, - "tailTrackDb": { "index": 26, "kind": "parameter", "displayName": "Tail Track Db", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking da [...] - "tailTrackField": { "index": 27, "kind": "parameter", "displayName": "Tail Track Field", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Field where the last tracked value will be placed. If not specified, MongoDbTailTrackingConfig#DEFAULT_FIELD will be used by default." }, - "tailTrackIncreasingField": { "index": 28, "kind": "parameter", "displayName": "Tail Track Increasing Field", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every time it is generated. The cursor will be (re)created with a query o [...] + "fullDocument": { "index": 19, "kind": "parameter", "displayName": "Full Document", "group": "changeStream", "label": "consumer,changeStream", "required": false, "type": "object", "javaType": "com.mongodb.client.model.changestream.FullDocument", "enum": [ "default", "updateLookup", "required", "whenAvailable" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "default", "description": "Specifies whether changeStream consumer include a copy of the full docume [...] + "streamFilter": { "index": 20, "kind": "parameter", "displayName": "Stream Filter", "group": "changeStream", "label": "consumer,changeStream", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Filter condition for change streams consumer." }, + "authSource": { "index": 21, "kind": "parameter", "displayName": "Auth Source", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The database name associated with the user's credentials." }, + "password": { "index": 22, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "User password for mongodb connection" }, + "username": { "index": 23, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username for mongodb connection" }, + "persistentId": { "index": 24, "kind": "parameter", "displayName": "Persistent Id", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "One tail tracking collection can host many trackers for several tailable consumers. To keep them separate, each tracker should have its own unique persistentId." }, + "persistentTailTracking": { "index": 25, "kind": "parameter", "displayName": "Persistent Tail Tracking", "group": "tail", "label": "consumer,tail", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts. The next time the system is up, the endpoint will recover the [...] + "tailTrackCollection": { "index": 26, "kind": "parameter", "displayName": "Tail Track Collection", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Collection where tail tracking information will be persisted. If not specified, MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default." }, + "tailTrackDb": { "index": 27, "kind": "parameter", "displayName": "Tail Track Db", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking da [...] + "tailTrackField": { "index": 28, "kind": "parameter", "displayName": "Tail Track Field", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Field where the last tracked value will be placed. If not specified, MongoDbTailTrackingConfig#DEFAULT_FIELD will be used by default." }, + "tailTrackIncreasingField": { "index": 29, "kind": "parameter", "displayName": "Tail Track Increasing Field", "group": "tail", "label": "consumer,tail", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every time it is generated. The cursor will be (re)created with a query o [...] } } diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java index b76b8ef889b..044d370887a 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java @@ -62,8 +62,8 @@ public class MongoDbChangeStreamsConsumer extends DefaultConsumer { bsonFilter = singletonList(BsonDocument.parse(streamFilter)); } - executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), - 1); + executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, + endpoint.getEndpointUri(), 1); changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this, bsonFilter); changeStreamsThread.init(); executor.execute(changeStreamsThread); diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java index a772bfee11f..4be722baa59 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java @@ -35,7 +35,8 @@ class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread { private List<BsonDocument> bsonFilter; private BsonDocument resumeToken; - MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, MongoDbChangeStreamsConsumer consumer, List<BsonDocument> bsonFilter) { + MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, MongoDbChangeStreamsConsumer consumer, + List<BsonDocument> bsonFilter) { super(endpoint, consumer); this.bsonFilter = bsonFilter; } @@ -51,6 +52,8 @@ class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread { ? dbCol.watch(bsonFilter) : dbCol.watch(); + iterable.fullDocument(endpoint.getFullDocument()); + if (resumeToken != null) { iterable = iterable.resumeAfter(resumeToken); } diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java index 5ee082c8f74..46386636a22 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java @@ -30,6 +30,7 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.changestream.FullDocument; import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -107,6 +108,8 @@ public class MongoDbEndpoint extends DefaultEndpoint { private String tailTrackIncreasingField; @UriParam(label = "consumer,changeStream") private String streamFilter; + @UriParam(label = "consumer,changeStream", enums = "default,updateLookup,required,whenAvailable", defaultValue = "default") + private FullDocument fullDocument = FullDocument.DEFAULT; // persistent tail tracking @UriParam(label = "consumer,tail") private boolean persistentTailTracking; @@ -662,6 +665,18 @@ public class MongoDbEndpoint extends DefaultEndpoint { this.streamFilter = streamFilter; } + public FullDocument getFullDocument() { + return fullDocument; + } + + /** + * Specifies whether changeStream consumer include a copy of the full document when modified by update operations. + * Possible values are default, updateLookup, required and whenAvailable. + */ + public void setFullDocument(FullDocument fullDocument) { + this.fullDocument = fullDocument; + } + /** * Configure the connection bean with the level of acknowledgment requested from MongoDB for write operations to a * standalone mongod, replicaset or cluster. Possible values are ACKNOWLEDGED, W1, W2, W3, UNACKNOWLEDGED, JOURNALED diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java index 1be2b816a8f..591ba00e3a9 100644 --- a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java +++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java @@ -16,8 +16,7 @@ */ package org.apache.camel.component.mongodb.integration; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.CreateCollectionOptions; @@ -45,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport implements ConfigurableRoute { private MongoCollection<Document> mongoCollection; - private ExecutorService executorService = Executors.newSingleThreadExecutor(); /* * NOTE: in the case of this test, we *DO* want to recreate everything after the test has executed, so that when @@ -76,7 +74,7 @@ public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport imp String consumerRouteId = "simpleConsumer"; context.getRouteController().startRoute(consumerRouteId); - Executors.newSingleThreadExecutor().submit(this::singleInsert).get(); + CompletableFuture.runAsync(this::singleInsert); mock.assertIsSatisfied(); context.getRouteController().stopRoute(consumerRouteId); @@ -98,7 +96,7 @@ public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport imp String consumerRouteId = "filterConsumer"; context.getRouteController().startRoute(consumerRouteId); - executorService.submit(this::singleInsert).get(); + CompletableFuture.runAsync(this::singleInsert); mock.assertIsSatisfied(); @@ -109,6 +107,36 @@ public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport imp @Order(3) @Test + public void updateWithFullDocumentTest() throws Exception { + assertEquals(0, mongoCollection.countDocuments()); + MockEndpoint mock = contextExtension.getMockEndpoint("mock:test"); + mock.expectedMessageCount(1); + + String consumerRouteId = "updateWithFullDocumentConsumer"; + context.getRouteController().startRoute(consumerRouteId); + + ObjectId objectId1 = new ObjectId(); + ObjectId objectId2 = new ObjectId(); + CompletableFuture.runAsync(() -> { + mongoCollection.insertOne(new Document("_id", objectId1).append("property", "random value")); + mongoCollection.insertOne(new Document("_id", objectId2).append("property", "another value")); + mongoCollection.updateOne(new Document("_id", objectId1), + new Document("$set", new Document("property", "filterOk"))); + mongoCollection.updateOne(new Document("_id", objectId2), + new Document("$set", new Document("property", "filterNotOk"))); + }); + + mock.assertIsSatisfied(); + + Exchange updateExchange = mock.getExchanges().get(0); + Document actualDocument = updateExchange.getIn().getBody(Document.class); + assertEquals("filterOk", actualDocument.get("property")); + assertEquals(objectId1, updateExchange.getIn().getHeader("_id")); + context.getRouteController().stopRoute(consumerRouteId); + } + + @Order(4) + @Test public void operationTypeAndIdHeaderTest() throws Exception { Assumptions.assumeTrue(0 == mongoCollection.countDocuments(), "The collection should have no documents"); MockEndpoint mock = contextExtension.getMockEndpoint("mock:test"); @@ -120,7 +148,7 @@ public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport imp context.getRouteController().startRoute(consumerRouteId); ObjectId objectId = new ObjectId(); - Executors.newSingleThreadExecutor().submit(() -> insertAndDelete(objectId)).get(); + CompletableFuture.runAsync(() -> insertAndDelete(objectId)); mock.assertIsSatisfied(); @@ -160,6 +188,11 @@ public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport imp .id("filterConsumer") .autoStartup(false) .to("mock:test"); + + from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&streamFilter={{filter.update}}&fullDocument=updateLookup") + .id("updateWithFullDocumentConsumer") + .autoStartup(false) + .to("mock:test"); } }); } diff --git a/components/camel-mongodb/src/test/resources/mongodb.test.properties b/components/camel-mongodb/src/test/resources/mongodb.test.properties index a80317cf82b..c1e81f65950 100644 --- a/components/camel-mongodb/src/test/resources/mongodb.test.properties +++ b/components/camel-mongodb/src/test/resources/mongodb.test.properties @@ -21,3 +21,4 @@ mongodb.testCollection=camelTest mongodb.cappedTestCollection=camelTestCapped myStreamFilter = { '$match':{'$or':[{'fullDocument.string': 'value2'}]} } +filter.update={'$match':{'$and':[{'operationType': 'update'}, {'fullDocument.property': 'filterOk'}]} }