This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.24.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 77271fd7f9c31484bfaf1e05ac95fbbd7ce7b5b7 Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Mon Jul 22 12:09:46 2019 +0200 CAMEL-13776 - backport issue fixed --- .../component/mongodb3/MongoDbTailingProcess.java | 24 ++++++++++++++-------- .../MongoDbTailableCursorConsumerTest.java | 4 ---- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java index 0be296f..40e8be2 100644 --- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java +++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java @@ -114,7 +114,7 @@ public class MongoDbTailingProcess implements Runnable { if (keepRunning) { cursor.close(); if (LOG.isDebugEnabled()) { - LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal, cursorRegenerationDelay); + LOG.debug("Regenerating cursor with lastVal: {}, waiting {} ms first", tailTracking.lastVal, cursorRegenerationDelay); } if (cursorRegenerationDelayEnabled) { @@ -178,12 +178,19 @@ public class MongoDbTailingProcess implements Runnable { if (keepRunning) { LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e); } + } catch (IllegalStateException e) { + // this is happening when the consumer is stopped or the mongo interrupted (ie, junit ending test) + // as we cannot resume, we shutdown the thread gracefully + LOG.info("Cursor was closed, likely the consumer was stopped and closed the cursor on purpose.", e); + if (cursor != null) { + cursor.close(); + } + keepRunning = false; + } finally { + // the loop finished, persist the lastValue just in case we are shutting down + // TODO: perhaps add a functionality to persist every N records + tailTracking.persistToStore(); } - - // the loop finished, persist the lastValue just in case we are shutting - // down - // TODO: perhaps add a functionality to persist every N records - tailTracking.persistToStore(); } // no arguments, will ask DB what the last updated Id was (checking @@ -196,9 +203,8 @@ public class MongoDbTailingProcess implements Runnable { if (lastVal == null) { answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator(); } else { - try (MongoCursor<Document> iterator = dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator();) { - answer = iterator; - } + MongoCursor<Document> iterator = dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator(); + answer = iterator; } return answer; } diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumerTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumerTest.java index 7c5edb6..1c24d88 100644 --- a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumerTest.java +++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumerTest.java @@ -24,7 +24,6 @@ import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.bson.Document; -import org.junit.Ignore; import org.junit.Test; import static com.mongodb.client.model.Filters.eq; @@ -153,7 +152,6 @@ public class MongoDbTailableCursorConsumerTest extends AbstractMongoDbTest { } @Test - @Ignore public void testPersistentTailTrack() throws Exception { assertEquals(0, cappedTestCollection.count()); final MockEndpoint mock = getMockEndpoint("mock:test"); @@ -227,7 +225,6 @@ public class MongoDbTailableCursorConsumerTest extends AbstractMongoDbTest { } @Test - @Ignore public void testPersistentTailTrackIncreasingDateField() throws Exception { assertEquals(0, cappedTestCollection.count()); final MockEndpoint mock = getMockEndpoint("mock:test"); @@ -304,7 +301,6 @@ public class MongoDbTailableCursorConsumerTest extends AbstractMongoDbTest { } @Test - @Ignore public void testCustomTailTrackLocation() throws Exception { assertEquals(0, cappedTestCollection.count()); final MockEndpoint mock = getMockEndpoint("mock:test");