This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0864a71777c5116ea66dc8cd058f4ce06797b223
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Nov 4 16:41:49 2021 +0100

    CAMEL-17618: camel-health - Route health check with consumer should be DOWN 
until first poll executed
---
 .../TelegramConsumerHealthCheckErrorTest.java      | 12 +++++++---
 .../TelegramConsumerHealthCheckOkTest.java         |  2 --
 .../camel/support/ScheduledPollConsumer.java       | 27 +++++++++++++++++++++-
 .../support/ScheduledPollConsumerHealthCheck.java  | 13 ++++++++---
 4 files changed, 45 insertions(+), 9 deletions(-)

diff --git 
a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java
 
b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java
index 76f6711..885e61b 100644
--- 
a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java
+++ 
b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckErrorTest.java
@@ -74,12 +74,18 @@ public class TelegramConsumerHealthCheckErrorTest extends 
TelegramTestSupport {
         Assertions.assertEquals(HealthCheck.State.DOWN, rc.getState());
         String msg = rc.getMessage().get();
         long count = (long) 
rc.getDetails().get(HealthCheck.FAILURE_ERROR_COUNT);
-        Assertions.assertEquals("Consumer failed polling " + count + " times 
route: telegram (telegram://bots)", msg);
+        if (count == 0) {
+            Assertions.assertEquals("Consumer has not yet polled route: 
telegram (telegram://bots)", msg);
+        } else {
+            Assertions.assertEquals("Consumer failed polling " + count + " 
times route: telegram (telegram://bots)", msg);
+        }
         
Assertions.assertEquals("telegram://bots?authorizationToken=mock-token",
                 rc.getDetails().get(HealthCheck.FAILURE_ENDPOINT_URI));
 
-        Throwable e = rc.getError().get();
-        Assertions.assertTrue(e.getMessage().contains("401 Unauthorized"));
+        if (rc.getError().isPresent()) {
+            Throwable e = rc.getError().get();
+            Assertions.assertTrue(e.getMessage().contains("401 Unauthorized"));
+        }
     }
 
     @Override
diff --git 
a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java
 
b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java
index a193d22..b68db20 100644
--- 
a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java
+++ 
b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConsumerHealthCheckOkTest.java
@@ -54,8 +54,6 @@ public class TelegramConsumerHealthCheckOkTest extends 
TelegramTestSupport {
         HealthCheckRegistry hcr = 
context.getExtension(HealthCheckRegistry.class);
         HealthCheckRepository repo = hcr.getRepository("routes").get();
 
-        repo.stream().forEach(h -> 
Assertions.assertEquals(HealthCheck.State.UP, h.call().getState()));
-
         endpoint.expectedMinimumMessageCount(2);
         endpoint.expectedBodiesReceived("message1", "message2");
 
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index c585956..3131442 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -71,6 +71,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
     private volatile int backoffCounter;
     private volatile long idleCounter;
     private volatile long errorCounter;
+    private volatile long successCounter;
     private volatile Throwable lastError;
     private final AtomicLong counter = new AtomicLong();
 
@@ -156,6 +157,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
                 idleCounter = 0;
                 errorCounter = 0;
                 backoffCounter = 0;
+                successCounter = 0;
                 LOG.trace("doRun() backoff finished, resetting counters.");
             }
         }
@@ -250,14 +252,17 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
 
         if (cause != null) {
             idleCounter = 0;
+            successCounter = 0;
             errorCounter++;
             lastError = cause;
         } else {
             idleCounter = polledMessages == 0 ? ++idleCounter : 0;
+            successCounter++;
             errorCounter = 0;
             lastError = null;
         }
-        LOG.trace("doRun() done with idleCounter={}, errorCounter={}", 
idleCounter, errorCounter);
+        LOG.trace("doRun() done with idleCounter={}, successCounter={}, 
errorCounter={}", idleCounter, successCounter,
+                errorCounter);
 
         // avoid this thread to throw exceptions because the thread pool wont 
re-schedule a new thread
     }
@@ -429,12 +434,31 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
     /**
      * Gets the error counter. If the counter is > 0 that means the consumer 
failed polling for the last N number of
      * times. When the consumer is successfully again, then the error counter 
resets to zero.
+     *
+     * @see #getSuccessCounter()
      */
     protected long getErrorCounter() {
         return errorCounter;
     }
 
     /**
+     * Gets the success counter. If the success is > 0 that means the consumer 
succeeded polling for the last N number
+     * of times. When the consumer is failing again, then the success counter 
resets to zero.
+     *
+     * @see #getErrorCounter()
+     */
+    protected long getSuccessCounter() {
+        return successCounter;
+    }
+
+    /**
+     * Gets the total number of polls run.
+     */
+    protected long getCounter() {
+        return counter.get();
+    }
+
+    /**
      * Gets the last caused error (exception) for the last poll that failed. 
When the consumer is successfully again,
      * then the error resets to null.
      */
@@ -544,6 +568,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
         backoffCounter = 0;
         idleCounter = 0;
         errorCounter = 0;
+        successCounter = 0;
         counter.set(0);
 
         super.doStop();
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
index 73cb31b..3839ca0 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
@@ -49,17 +49,24 @@ public class ScheduledPollConsumerHealthCheck implements 
HealthCheck {
         builder.detail(FAILURE_ENDPOINT_URI, 
consumer.getEndpoint().getEndpointUri());
 
         long ec = consumer.getErrorCounter();
+        long cnt = consumer.getCounter();
         Throwable cause = consumer.getLastError();
 
-        boolean healthy = ec == 0;
+        // can only be healthy if we have at least one poll and there are no 
errors
+        boolean healthy = cnt > 0 && ec == 0;
         if (healthy) {
             builder.up();
         } else {
             builder.down();
             builder.detail(FAILURE_ERROR_COUNT, ec);
             String rid = consumer.getRouteId();
-            String msg = "Consumer failed polling %s times route: %s (%s)";
-            builder.message(String.format(msg, ec, rid, sanitizedUri));
+            if (ec > 0) {
+                String msg = "Consumer failed polling %s times route: %s (%s)";
+                builder.message(String.format(msg, ec, rid, sanitizedUri));
+            } else {
+                String msg = "Consumer has not yet polled route: %s (%s)";
+                builder.message(String.format(msg, rid, sanitizedUri));
+            }
             builder.error(cause);
         }
 

Reply via email to