yingbo-wu opened a new issue #1112:
URL: https://github.com/apache/camel-kafka-connector/issues/1112


   ```
   {
       "connector.class": 
"org.apache.camel.kafkaconnector.mongodb.CamelMongodbSourceConnector",
       "tasks.max": "1",
       "topics": "mongo.test-object.save.source",
       "camel.source.endpoint.consumerType": "changeStreams",
       "camel.source.path.connectionBean": "mongo",
       "errors.deadletterqueue.context.headers.enable": "true",
       "camel.source.endpoint.collection": "test_object",
       "camel.source.endpoint.database": "ac-test",
       "errors.deadletterqueue.topic.name": 
"mongo.test-object.save.source.deadletter",
       "value.converter.schemas.enable": "false",
       "errors.tolerance": "all",
       "errors.deadletterqueue.topic.replication.factor": "1",
       "**camel.source.endpoint.streamFilter**": 
"{'$match':{'$or':[{'operationType':'insert'},{'operationType':'update'},{'operationType':'replace'}]}}",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "errors.log.enable": "true",
       "camel.source.endpoint.mongoConnection": 
"#class:com.mongodb.client.MongoClients#create('mongodb://root:root@192.168.23.164:27017/?authSource=admin')",
       "key.converter": "org.apache.kafka.connect.storage.StringConverter",
       "camel.source.marshal": "org.apache.camel.component.gson.GsonDataFormat",
       "camel.source.pollingConsumerQueueSize": "10000",
       "camel.source.contentLogLevel": "INFO"
   }
   ```
   
   The **camel.source.endpoint.streamFilter** Setting an incorrect JSON format 
does not result in an exception
   So my guess is that this setting doesn't work
   
   In the end I surmised that it was the Camel-MongoDB component problem
   The following is part of the Camel-MongoDB code
   org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
   ```
   String streamFilter = (String) 
getRoute().getProperties().get(STREAM_FILTER_PROPERTY);
   List<BsonDocument> bsonFilter = null;
   if (ObjectHelper.isNotEmpty(streamFilter)) {
       bsonFilter = singletonList(BsonDocument.parse(streamFilter));
   }
   ```
   The **camel.source.endpoint.streamFilter** is URI parameters
   But camel-mongodb Is to read content from properties
   Whether the Camle-Mongo component has an incompatible connector configuration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to