This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0864a71777c5116ea66dc8cd058f4ce06797b223 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Nov 4 16:41:49 2021 +0100 CAMEL-17618: camel-health - Route health check with consumer should be DOWN until first poll executed --- .../TelegramConsumerHealthCheckErrorTest.java | 12 +++++++--- .../TelegramConsumerHealthCheckOkTest.java | 2 -- .../camel/support/ScheduledPollConsumer.java | 27 +++++++++++++++++++++- .../support/ScheduledPollConsumerHealthCheck.java | 13 ++++++++--- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java index 76f6711..885e61b 100644 --- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java @@ -74,12 +74,18 @@ public class TelegramConsumerHealthCheckErrorTest extends TelegramTestSupport { Assertions.assertEquals(HealthCheck.State.DOWN, rc.getState()); String msg = rc.getMessage().get(); long count = (long) rc.getDetails().get(HealthCheck.FAILURE_ERROR_COUNT); - Assertions.assertEquals("Consumer failed polling " + count + " times route: telegram (telegram://bots)", msg); + if (count == 0) { + Assertions.assertEquals("Consumer has not yet polled route: telegram (telegram://bots)", msg); + } else { + Assertions.assertEquals("Consumer failed polling " + count + " times route: telegram (telegram://bots)", msg); + } Assertions.assertEquals("telegram://bots?authorizationToken=mock-token", rc.getDetails().get(HealthCheck.FAILURE_ENDPOINT_URI)); - Throwable e = rc.getError().get(); - Assertions.assertTrue(e.getMessage().contains("401 Unauthorized")); + if (rc.getError().isPresent()) { + Throwable e = rc.getError().get(); + Assertions.assertTrue(e.getMessage().contains("401 Unauthorized")); + } } @Override diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java index a193d22..b68db20 100644 --- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java +++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java @@ -54,8 +54,6 @@ public class TelegramConsumerHealthCheckOkTest extends TelegramTestSupport { HealthCheckRegistry hcr = context.getExtension(HealthCheckRegistry.class); HealthCheckRepository repo = hcr.getRepository("routes").get(); - repo.stream().forEach(h -> Assertions.assertEquals(HealthCheck.State.UP, h.call().getState())); - endpoint.expectedMinimumMessageCount(2); endpoint.expectedBodiesReceived("message1", "message2"); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java index c585956..3131442 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java @@ -71,6 +71,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer private volatile int backoffCounter; private volatile long idleCounter; private volatile long errorCounter; + private volatile long successCounter; private volatile Throwable lastError; private final AtomicLong counter = new AtomicLong(); @@ -156,6 +157,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer idleCounter = 0; errorCounter = 0; backoffCounter = 0; + successCounter = 0; LOG.trace("doRun() backoff finished, resetting counters."); } } @@ -250,14 +252,17 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer if (cause != null) { idleCounter = 0; + successCounter = 0; errorCounter++; lastError = cause; } else { idleCounter = polledMessages == 0 ? ++idleCounter : 0; + successCounter++; errorCounter = 0; lastError = null; } - LOG.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter); + LOG.trace("doRun() done with idleCounter={}, successCounter={}, errorCounter={}", idleCounter, successCounter, + errorCounter); // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread } @@ -429,12 +434,31 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer /** * Gets the error counter. If the counter is > 0 that means the consumer failed polling for the last N number of * times. When the consumer is successfully again, then the error counter resets to zero. + * + * @see #getSuccessCounter() */ protected long getErrorCounter() { return errorCounter; } /** + * Gets the success counter. If the success is > 0 that means the consumer succeeded polling for the last N number + * of times. When the consumer is failing again, then the success counter resets to zero. + * + * @see #getErrorCounter() + */ + protected long getSuccessCounter() { + return successCounter; + } + + /** + * Gets the total number of polls run. + */ + protected long getCounter() { + return counter.get(); + } + + /** * Gets the last caused error (exception) for the last poll that failed. When the consumer is successfully again, * then the error resets to null. */ @@ -544,6 +568,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer backoffCounter = 0; idleCounter = 0; errorCounter = 0; + successCounter = 0; counter.set(0); super.doStop(); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java index 73cb31b..3839ca0 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java @@ -49,17 +49,24 @@ public class ScheduledPollConsumerHealthCheck implements HealthCheck { builder.detail(FAILURE_ENDPOINT_URI, consumer.getEndpoint().getEndpointUri()); long ec = consumer.getErrorCounter(); + long cnt = consumer.getCounter(); Throwable cause = consumer.getLastError(); - boolean healthy = ec == 0; + // can only be healthy if we have at least one poll and there are no errors + boolean healthy = cnt > 0 && ec == 0; if (healthy) { builder.up(); } else { builder.down(); builder.detail(FAILURE_ERROR_COUNT, ec); String rid = consumer.getRouteId(); - String msg = "Consumer failed polling %s times route: %s (%s)"; - builder.message(String.format(msg, ec, rid, sanitizedUri)); + if (ec > 0) { + String msg = "Consumer failed polling %s times route: %s (%s)"; + builder.message(String.format(msg, ec, rid, sanitizedUri)); + } else { + String msg = "Consumer has not yet polled route: %s (%s)"; + builder.message(String.format(msg, rid, sanitizedUri)); + } builder.error(cause); }