Repository: camel Updated Branches: refs/heads/camel-2.19.x f4f2a1c8d -> f8d4c88f1
CAMEL-11455: Fixed camel-mongodb3 type converters which was implemented wrong and could lead to stop working on first parsing error. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f8d4c88f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f8d4c88f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f8d4c88f Branch: refs/heads/camel-2.19.x Commit: f8d4c88f12385422c1c63a70c30bb8f855f19c17 Parents: 0289fba Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 23 13:31:51 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 23 13:38:43 2017 +0200 ---------------------------------------------------------------------- .../component/mongodb/MongoDbProducer.java | 102 +++++++-------- .../component/mongodb3/MongoDbProducer.java | 130 +++++++++---------- .../converters/MongoDbFallbackConverter.java | 5 + .../mongodb3/MongoDbConversionsTest.java | 1 - .../src/test/resources/log4j2.properties | 2 +- 5 files changed, 120 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f8d4c88f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java index 1e340fe..6096288 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java @@ -284,32 +284,32 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoCount() { - return exch -> { - BasicDBObject query = exch.getIn().getBody(BasicDBObject.class); + return exchange -> { + BasicDBObject query = exchange.getContext().getTypeConverter().tryConvertTo(BasicDBObject.class, exchange, exchange.getIn().getBody()); if (query == null) { query = new BasicDBObject(); } - return (Long) calculateCollection(exch).count(query); + return (Long) calculateCollection(exchange).count(query); }; } private Function<Exchange, Object> createDoFindAll() { - return exchange1 -> { + return exchange -> { Iterable<BasicDBObject> result; - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection BasicDBObject query = null; // do not run around looking for a type converter unless there is a need for it - if (exchange1.getIn().getBody() != null) { - query = exchange1.getIn().getBody(BasicDBObject.class); + if (exchange.getIn().getBody() != null) { + query = exchange.getContext().getTypeConverter().tryConvertTo(BasicDBObject.class, exchange, exchange.getIn().getBody()); } - BasicDBObject fieldFilter = exchange1.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, BasicDBObject.class); + BasicDBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, BasicDBObject.class); // get the batch size and number to skip - Integer batchSize = exchange1.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class); - Integer numToSkip = exchange1.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class); - Integer limit = exchange1.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class); - BasicDBObject sortBy = exchange1.getIn().getHeader(MongoDbConstants.SORT_BY, BasicDBObject.class); + Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class); + Integer numToSkip = exchange.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class); + Integer limit = exchange.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class); + BasicDBObject sortBy = exchange.getIn().getHeader(MongoDbConstants.SORT_BY, BasicDBObject.class); FindIterable<BasicDBObject> ret; if (query == null && fieldFilter == null) { ret = dbCol.find(new BasicDBObject()); @@ -341,7 +341,7 @@ public class MongoDbProducer extends DefaultProducer { try { result = new ArrayList<>(); ret.iterator().forEachRemaining(((List<BasicDBObject>) result)::add); - exchange1.getOut().setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ((List<BasicDBObject>) result).size()); + exchange.getOut().setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ((List<BasicDBObject>) result).size()); } finally { ret.iterator().close(); } @@ -353,51 +353,51 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoInsert() { - return exchange1 -> { - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); - boolean singleInsert = !exchange1.getIn().getHeader(MongoDbConstants.MULTIINSERT, Boolean.FALSE, Boolean.class); + return exchange -> { + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); + boolean singleInsert = !exchange.getIn().getHeader(MongoDbConstants.MULTIINSERT, Boolean.FALSE, Boolean.class); - Object insert = null; + Object insert; if (singleInsert) { - insert = exchange1.getIn().getBody(DBObject.class); + insert = exchange.getContext().getTypeConverter().tryConvertTo(DBObject.class, exchange, exchange.getIn().getBody()); if (insert == null) { // previous behavior: // body could not be converted to DBObject, check to see if it's of type List<DBObject> - insert = getMultiInsertBody(exchange1); + insert = getMultiInsertBody(exchange); singleInsert = false; } else if (insert instanceof BasicDBList) { singleInsert = false; } } else { - insert = getMultiInsertBody(exchange1); + insert = getMultiInsertBody(exchange); } if (singleInsert) { BasicDBObject insertObject = (BasicDBObject) insert; dbCol.insertOne(insertObject); - exchange1.getIn().setHeader(MongoDbConstants.OID, insertObject.get("_id")); + exchange.getIn().setHeader(MongoDbConstants.OID, insertObject.get("_id")); } else { @SuppressWarnings("unchecked") List<BasicDBObject> insertObjects = (List<BasicDBObject>) insert; dbCol.insertMany(insertObjects); List<Object> objectIdentification = new ArrayList<>(insertObjects.size()); objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get("_id")).collect(Collectors.toList())); - exchange1.getIn().setHeader(MongoDbConstants.OID, objectIdentification); + exchange.getIn().setHeader(MongoDbConstants.OID, objectIdentification); } return insert; }; } - private Object getMultiInsertBody(Exchange exchange1) { + private Object getMultiInsertBody(Exchange exchange) { Object insert; // we try List first, because it should be the common case - insert = exchange1.getIn().getBody(List.class); + insert = exchange.getIn().getBody(List.class); if (insert != null) { // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject> - insert = attemptConvertToList((List<?>) insert, exchange1); + insert = attemptConvertToList((List<?>) insert, exchange); } else { - insert = exchange1.getIn().getBody(BasicDBList.class); + insert = exchange.getContext().getTypeConverter().tryConvertTo(BasicDBList.class, exchange, exchange.getIn().getBody()); } if (insert == null) { @@ -407,10 +407,10 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoUpdate() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); - List<BasicDBObject> saveObj = exchange1.getIn().getMandatoryBody((Class<List<BasicDBObject>>) (Class<?>) List.class); + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); + List<BasicDBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<BasicDBObject>>) (Class<?>) List.class); if (saveObj.size() != 2) { throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of DBObject objects with size = 2"); } @@ -418,8 +418,8 @@ public class MongoDbProducer extends DefaultProducer { BasicDBObject updateCriteria = saveObj.get(0); BasicDBObject objNew = saveObj.get(1); - Boolean multi = exchange1.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class); - Boolean upsert = exchange1.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class); + Boolean multi = exchange.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class); + Boolean upsert = exchange.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class); UpdateResult result; UpdateOptions options = new UpdateOptions(); @@ -432,7 +432,7 @@ public class MongoDbProducer extends DefaultProducer { result = dbCol.updateMany(updateCriteria, objNew, options); } if (result.isModifiedCountAvailable()) { - exchange1.getOut().setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getModifiedCount()); + exchange.getOut().setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getModifiedCount()); } return result; } catch (InvalidPayloadException e) { @@ -442,14 +442,14 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoRemove() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); - BasicDBObject removeObj = exchange1.getIn().getMandatoryBody(BasicDBObject.class); + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); + BasicDBObject removeObj = exchange.getIn().getMandatoryBody(BasicDBObject.class); DeleteResult result = dbCol.deleteMany(removeObj); if (result.wasAcknowledged()) { - exchange1.getOut().setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getDeletedCount()); + exchange.getOut().setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getDeletedCount()); } return result; } catch (InvalidPayloadException e) { @@ -459,10 +459,10 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoAggregate() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); - DBObject query = exchange1.getIn().getMandatoryBody(DBObject.class); + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); + DBObject query = exchange.getIn().getMandatoryBody(DBObject.class); // Impossible with java driver to get the batch size and number to skip List<BasicDBObject> dbIterator = new ArrayList<>(); @@ -487,10 +487,10 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoCommand() { - return exchange1 -> { + return exchange -> { try { - MongoDatabase db = calculateDb(exchange1); - BasicDBObject cmdObj = exchange1.getIn().getMandatoryBody(BasicDBObject.class); + MongoDatabase db = calculateDb(exchange); + BasicDBObject cmdObj = exchange.getIn().getMandatoryBody(BasicDBObject.class); return db.runCommand(cmdObj); } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Invalid payload for command", e); @@ -503,19 +503,19 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoFindById() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); - Object id = exchange1.getIn().getMandatoryBody(); + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); + Object id = exchange.getIn().getMandatoryBody(); BasicDBObject o = new BasicDBObject("_id", id); DBObject ret; - BasicDBObject fieldFilter = exchange1.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, BasicDBObject.class); + BasicDBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, BasicDBObject.class); if (fieldFilter == null) { fieldFilter = new BasicDBObject(); } ret = dbCol.find(o).projection(fieldFilter).first(); - exchange1.getOut().setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1); + exchange.getOut().setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1); return ret; } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Invalid payload for findById", e); @@ -524,15 +524,15 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoSave() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange1); - BasicDBObject saveObj = exchange1.getIn().getMandatoryBody(BasicDBObject.class); + MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange); + BasicDBObject saveObj = exchange.getIn().getMandatoryBody(BasicDBObject.class); UpdateOptions options = new UpdateOptions().upsert(true); BasicDBObject queryObject = new BasicDBObject("_id", saveObj.get("_id")); UpdateResult result = dbCol.replaceOne(queryObject, saveObj, options); - exchange1.getIn().setHeader(MongoDbConstants.OID, saveObj.get("_id")); + exchange.getIn().setHeader(MongoDbConstants.OID, saveObj.get("_id")); return result; } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Body incorrect type for save", e); http://git-wip-us.apache.org/repos/asf/camel/blob/f8d4c88f/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java index cb33fe4..edc356e 100644 --- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java +++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java @@ -120,11 +120,7 @@ public class MongoDbProducer extends DefaultProducer { } /** - * Entry method that selects the appropriate MongoDB operation and executes - * it - * - * @param operation - * @param exchange + * Entry method that selects the appropriate MongoDB operation and executes it */ protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception { Processor processor = operations.get(operation); @@ -281,17 +277,17 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoFindOneByQuery() { - return exch -> { + return exchange -> { try { - MongoCollection<Document> dbCol = calculateCollection(exch); + MongoCollection<Document> dbCol = calculateCollection(exchange); - Bson query = exch.getIn().getHeader(CRITERIA, Bson.class); + Bson query = exchange.getIn().getHeader(CRITERIA, Bson.class); if (null == query) { - query = exch.getIn().getMandatoryBody(Bson.class); + query = exchange.getIn().getMandatoryBody(Bson.class); } - Bson sortBy = exch.getIn().getHeader(SORT_BY, Bson.class); - Bson fieldFilter = exch.getIn().getHeader(FIELDS_PROJECTION, Bson.class); + Bson sortBy = exchange.getIn().getHeader(SORT_BY, Bson.class); + Bson fieldFilter = exchange.getIn().getHeader(FIELDS_PROJECTION, Bson.class); if (fieldFilter == null) { fieldFilter = new Document(); @@ -302,7 +298,7 @@ public class MongoDbProducer extends DefaultProducer { } Document ret = dbCol.find(query).projection(fieldFilter).sort(sortBy).first(); - exch.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1); + exchange.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1); return ret; } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Payload is no Document", e); @@ -311,37 +307,37 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoCount() { - return exch -> { - Bson query = exch.getIn().getHeader(CRITERIA, Bson.class); - if (null == query) { - query = exch.getIn().getBody(Bson.class); + return exchange -> { + Bson query = exchange.getIn().getHeader(CRITERIA, Bson.class); + if (query == null) { + query = exchange.getContext().getTypeConverter().tryConvertTo(Bson.class, exchange, exchange.getIn().getBody()); } if (query == null) { query = new Document(); } - return calculateCollection(exch).count(query); + return calculateCollection(exchange).count(query); }; } private Function<Exchange, Object> createDoFindAll() { - return exchange1 -> { + return exchange -> { Iterable<Document> result; - MongoCollection<Document> dbCol = calculateCollection(exchange1); + MongoCollection<Document> dbCol = calculateCollection(exchange); // do not use getMandatoryBody, because if the body is empty we want // to retrieve all objects in the collection - Bson query = exchange1.getIn().getHeader(CRITERIA, Bson.class); + Bson query = exchange.getIn().getHeader(CRITERIA, Bson.class); // do not run around looking for a type converter unless there is a // need for it - if (null == query && exchange1.getIn().getBody() != null) { - query = exchange1.getIn().getBody(Bson.class); + if (query == null && exchange.getIn().getBody() != null) { + query = exchange.getContext().getTypeConverter().tryConvertTo(Bson.class, exchange, exchange.getIn().getBody()); } - Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class); + Bson fieldFilter = exchange.getIn().getHeader(FIELDS_PROJECTION, Bson.class); // get the batch size and number to skip - Integer batchSize = exchange1.getIn().getHeader(BATCH_SIZE, Integer.class); - Integer numToSkip = exchange1.getIn().getHeader(NUM_TO_SKIP, Integer.class); - Integer limit = exchange1.getIn().getHeader(LIMIT, Integer.class); - Document sortBy = exchange1.getIn().getHeader(SORT_BY, Document.class); + Integer batchSize = exchange.getIn().getHeader(BATCH_SIZE, Integer.class); + Integer numToSkip = exchange.getIn().getHeader(NUM_TO_SKIP, Integer.class); + Integer limit = exchange.getIn().getHeader(LIMIT, Integer.class); + Document sortBy = exchange.getIn().getHeader(SORT_BY, Document.class); FindIterable<Document> ret; if (query == null && fieldFilter == null) { ret = dbCol.find(new Document()); @@ -373,7 +369,7 @@ public class MongoDbProducer extends DefaultProducer { try { result = new ArrayList<>(); ret.iterator().forEachRemaining(((List<Document>)result)::add); - exchange1.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>)result).size()); + exchange.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>)result).size()); } finally { ret.iterator().close(); } @@ -385,19 +381,19 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoInsert() { - return exchange1 -> { - MongoCollection<Document> dbCol = calculateCollection(exchange1); + return exchange -> { + MongoCollection<Document> dbCol = calculateCollection(exchange); boolean singleInsert = true; - Object insert = exchange1.getIn().getBody(Document.class); + Object insert = exchange.getContext().getTypeConverter().tryConvertTo(Document.class, exchange, exchange.getIn().getBody()); // body could not be converted to Document, check to see if it's of // type List<Document> if (insert == null) { - insert = exchange1.getIn().getBody(List.class); + insert = exchange.getIn().getBody(List.class); // if the body of type List was obtained, ensure that all items // are of type Document and cast the List to List<Document> if (insert != null) { singleInsert = false; - insert = attemptConvertToList((List<?>)insert, exchange1); + insert = attemptConvertToList((List<?>)insert, exchange); } else { throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type Document nor List<Document>"); } @@ -407,29 +403,29 @@ public class MongoDbProducer extends DefaultProducer { Document insertObject = Document.class.cast(insert); dbCol.insertOne(insertObject); - exchange1.getIn().setHeader(OID, insertObject.get(MONGO_ID)); + exchange.getIn().setHeader(OID, insertObject.get(MONGO_ID)); } else { @SuppressWarnings("unchecked") List<Document> insertObjects = (List<Document>)insert; dbCol.insertMany(insertObjects); List<Object> objectIdentification = new ArrayList<>(insertObjects.size()); objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get(MONGO_ID)).collect(Collectors.toList())); - exchange1.getIn().setHeader(OID, objectIdentification); + exchange.getIn().setHeader(OID, objectIdentification); } return insert; }; } private Function<Exchange, Object> createDoUpdate() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<Document> dbCol = calculateCollection(exchange1); + MongoCollection<Document> dbCol = calculateCollection(exchange); - Bson updateCriteria = exchange1.getIn().getHeader(CRITERIA, Bson.class); + Bson updateCriteria = exchange.getIn().getHeader(CRITERIA, Bson.class); Bson objNew; if (null == updateCriteria) { @SuppressWarnings("unchecked") - List<Bson> saveObj = exchange1.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class)); + List<Bson> saveObj = exchange.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class)); if (saveObj.size() != 2) { throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of Document objects with size = 2"); } @@ -437,11 +433,11 @@ public class MongoDbProducer extends DefaultProducer { updateCriteria = saveObj.get(0); objNew = saveObj.get(1); } else { - objNew = exchange1.getIn().getMandatoryBody(Bson.class); + objNew = exchange.getIn().getMandatoryBody(Bson.class); } - Boolean multi = exchange1.getIn().getHeader(MULTIUPDATE, Boolean.class); - Boolean upsert = exchange1.getIn().getHeader(UPSERT, Boolean.class); + Boolean multi = exchange.getIn().getHeader(MULTIUPDATE, Boolean.class); + Boolean upsert = exchange.getIn().getHeader(UPSERT, Boolean.class); UpdateResult result; UpdateOptions options = new UpdateOptions(); @@ -455,9 +451,9 @@ public class MongoDbProducer extends DefaultProducer { result = dbCol.updateMany(updateCriteria, objNew, options); } if (result.isModifiedCountAvailable()) { - exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount()); + exchange.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount()); } - exchange1.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount()); + exchange.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount()); return result; } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Invalid payload for update", e); @@ -466,14 +462,14 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoRemove() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<Document> dbCol = calculateCollection(exchange1); - Document removeObj = exchange1.getIn().getMandatoryBody(Document.class); + MongoCollection<Document> dbCol = calculateCollection(exchange); + Document removeObj = exchange.getIn().getMandatoryBody(Document.class); DeleteResult result = dbCol.deleteMany(removeObj); if (result.wasAcknowledged()) { - exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount()); + exchange.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount()); } return result; } catch (InvalidPayloadException e) { @@ -483,9 +479,9 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoAggregate() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<Document> dbCol = calculateCollection(exchange1); + MongoCollection<Document> dbCol = calculateCollection(exchange); // Impossible with java driver to get the batch size and number // to skip @@ -493,16 +489,16 @@ public class MongoDbProducer extends DefaultProducer { AggregateIterable<Document> aggregationResult; @SuppressWarnings("unchecked") - List<Bson> query = exchange1.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class)); + List<Bson> query = exchange.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class)); // Allow body to be a pipeline // @see http://docs.mongodb.org/manual/core/aggregation/ - if (null != query) { + if (query != null) { List<Bson> queryList = query.stream().map(o -> (Bson)o).collect(Collectors.toList()); aggregationResult = dbCol.aggregate(queryList); } else { List<Bson> queryList = new ArrayList<>(); - queryList.add(Bson.class.cast(exchange1.getIn().getMandatoryBody(Bson.class))); + queryList.add(Bson.class.cast(exchange.getIn().getMandatoryBody(Bson.class))); aggregationResult = dbCol.aggregate(queryList); } aggregationResult.iterator().forEachRemaining(dbIterator::add); @@ -514,10 +510,10 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoCommand() { - return exchange1 -> { + return exchange -> { try { - MongoDatabase db = calculateDb(exchange1); - Document cmdObj = exchange1.getIn().getMandatoryBody(Document.class); + MongoDatabase db = calculateDb(exchange); + Document cmdObj = exchange.getIn().getMandatoryBody(Document.class); return db.runCommand(cmdObj); } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Invalid payload for command", e); @@ -530,19 +526,19 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoFindById() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<Document> dbCol = calculateCollection(exchange1); - Object id = exchange1.getIn().getMandatoryBody(); + MongoCollection<Document> dbCol = calculateCollection(exchange); + Object id = exchange.getIn().getMandatoryBody(); Bson o = Filters.eq(MONGO_ID, id); Document ret; - Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class); + Bson fieldFilter = exchange.getIn().getHeader(FIELDS_PROJECTION, Bson.class); if (fieldFilter == null) { fieldFilter = new Document(); } ret = dbCol.find(o).projection(fieldFilter).first(); - exchange1.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1); + exchange.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1); return ret; } catch (InvalidPayloadException e) { throw new CamelMongoDbException("Invalid payload for findById", e); @@ -551,18 +547,18 @@ public class MongoDbProducer extends DefaultProducer { } private Function<Exchange, Object> createDoSave() { - return exchange1 -> { + return exchange -> { try { - MongoCollection<Document> dbCol = calculateCollection(exchange1); - Document saveObj = exchange1.getIn().getMandatoryBody(Document.class); + MongoCollection<Document> dbCol = calculateCollection(exchange); + Document saveObj = exchange.getIn().getMandatoryBody(Document.class); UpdateOptions options = new UpdateOptions().upsert(true); - UpdateResult result = null; + UpdateResult result; if (null == saveObj.get(MONGO_ID)) { result = dbCol.replaceOne(Filters.where("false"), saveObj, options); - exchange1.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue()); + exchange.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue()); } else { result = dbCol.replaceOne(eq(MONGO_ID, saveObj.get(MONGO_ID)), saveObj, options); - exchange1.getIn().setHeader(OID, saveObj.get(MONGO_ID)); + exchange.getIn().setHeader(OID, saveObj.get(MONGO_ID)); } return result; } catch (InvalidPayloadException e) { http://git-wip-us.apache.org/repos/asf/camel/blob/f8d4c88f/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbFallbackConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbFallbackConverter.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbFallbackConverter.java index 10962cf..3fbeb46 100644 --- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbFallbackConverter.java +++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbFallbackConverter.java @@ -28,6 +28,7 @@ import org.apache.camel.Exchange; import org.apache.camel.FallbackConverter; import org.apache.camel.InvalidPayloadException; import org.apache.camel.spi.TypeConverterRegistry; +import org.bson.Document; @Converter public final class MongoDbFallbackConverter { @@ -39,6 +40,7 @@ public final class MongoDbFallbackConverter { } @FallbackConverter + @SuppressWarnings("unchecked") public static Object convertTo(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) throws InvalidPayloadException { @@ -73,6 +75,9 @@ public final class MongoDbFallbackConverter { if (type == DBObject.class) { Map<?, ?> m = OBJECT_MAPPER.convertValue(value, Map.class); return new BasicDBObject(m); + } else if (type == Document.class) { + Map<String, Object> m = OBJECT_MAPPER.convertValue(value, Map.class); + return new Document(m); } return null; http://git-wip-us.apache.org/repos/asf/camel/blob/f8d4c88f/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java index 7b7ca8b..aaefc72 100644 --- a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java +++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java @@ -49,7 +49,6 @@ public class MongoDbConversionsTest extends AbstractMongoDbTest { template.requestBody("direct:insertMap", m1); Document b = testCollection.find(eq(MONGO_ID, "testInsertMap")).first(); assertNotNull("No record with 'testInsertMap' _id", b); - } @Test http://git-wip-us.apache.org/repos/asf/camel/blob/f8d4c88f/components/camel-mongodb3/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-mongodb3/src/test/resources/log4j2.properties b/components/camel-mongodb3/src/test/resources/log4j2.properties index 9c7812b..bd563fe 100644 --- a/components/camel-mongodb3/src/test/resources/log4j2.properties +++ b/components/camel-mongodb3/src/test/resources/log4j2.properties @@ -26,4 +26,4 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n rootLogger.level = WARN rootLogger.appenderRef.file.ref = file -rootLogger.appenderRef.out.ref = out +#rootLogger.appenderRef.out.ref = out