This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new d48a7ea "CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover" d48a7ea is described below commit d48a7ea49aeb19a513c5013f9e1646fc1cd448f2 Author: Ramu <kkaka...@redhat.com> AuthorDate: Mon Jun 3 19:33:25 2019 +0530 "CAMEL-13407:CouchDbChangesetTracker fails silently on network error and does not recover" --- .../component/couchdb/CouchDbChangesetTracker.java | 98 +++++++++++++++++----- 1 file changed, 77 insertions(+), 21 deletions(-) diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java index 6857aa7..0090c27 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java @@ -20,6 +20,7 @@ import com.google.gson.JsonObject; import org.apache.camel.Exchange; import org.lightcouch.Changes; import org.lightcouch.ChangesResult; +import org.lightcouch.CouchDbException; import org.lightcouch.CouchDbInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory; public class CouchDbChangesetTracker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(CouchDbChangesetTracker.class); + private static final int MAX_DB_ERROR_REPEATS = 8; private volatile boolean stopped; private final CouchDbClientWrapper couchClient; @@ -38,46 +40,100 @@ public class CouchDbChangesetTracker implements Runnable { this.endpoint = endpoint; this.consumer = consumer; this.couchClient = couchClient; - initChanges(); + initChanges(null); } - void initChanges() { - String since = endpoint.getSince(); - if (since == null) { + private void initChanges(final String sequence) { + String since = sequence; + if (null == since) { CouchDbInfo dbInfo = couchClient.context().info(); since = dbInfo.getUpdateSeq(); // get latest update seq - LOG.debug("Last sequence [{}]", since); } + LOG.debug("Last sequence [{}]", since); changes = couchClient.changes().style(endpoint.getStyle()).includeDocs(true) .since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges(); } - @Override public void run() { - while (changes.hasNext()) { // blocks until a feed is received - ChangesResult.Row feed = changes.next(); - if (feed.isDeleted() && !endpoint.isDeletes()) { - continue; - } - if (!feed.isDeleted() && !endpoint.isUpdates()) { - continue; + + String lastSequence = null; + + try { + while (!stopped) { + + try { + while (changes.hasNext()) { // blocks until a feed is received + ChangesResult.Row feed = changes.next(); + if (feed.isDeleted() && !endpoint.isDeletes()) { + continue; + } + if (!feed.isDeleted() && !endpoint.isUpdates()) { + continue; + } + + lastSequence = feed.getSeq(); + JsonObject doc = feed.getDoc(); + + Exchange exchange = endpoint.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted()); + if (LOG.isTraceEnabled()) { + LOG.trace("Created exchange [exchange={}, _id={}, seq={}", new Object[]{exchange, feed.getId(), lastSequence}); + } + + try { + consumer.getProcessor().process(exchange); + } catch (Exception e) { + consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } + } + + stopped = true; + + } catch (CouchDbException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("CouchDb Exception encountered waiting for changes! Attempting to recover...", e); + } + if (!waitForStability(lastSequence)) { + throw e; + } + } } + } catch (Exception e) { + LOG.error("Unexpected error causing CouchDb change tracker to exit!", e); + } + } + + private boolean waitForStability(final String lastSequence) { - String seq = feed.getSeq(); - JsonObject doc = feed.getDoc(); + boolean problems = true; + int repeatDbErrorCount = 0; - Exchange exchange = endpoint.createExchange(seq, feed.getId(), doc, feed.isDeleted()); - if (LOG.isTraceEnabled()) { - LOG.trace("Created exchange [exchange={}, _id={}, seq={}", exchange, feed.getId(), seq); + while (problems) { + if (++repeatDbErrorCount > MAX_DB_ERROR_REPEATS) { + LOG.error("CouchDb change set listener fatal error! Retry attempts exceeded, listener must exit."); + return false; } try { - consumer.getProcessor().process(exchange); + Thread.sleep((int) ((Math.random() * 2000) + 5000)); // <2000ms,5000ms) + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("CouchDb change set listener interrupted waiting for stability!!", e); + } + } + try { + // Fail fast operation + couchClient.context().serverVersion(); + // reset change listener + initChanges(lastSequence); + problems = false; + } catch (Exception e) { - consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to get CouchDb server version and/or reset change listener! Attempt: " + repeatDbErrorCount, e); + } } } - stopped = true; + return true; } public void stop() {