This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit fcb501324fb55b7083a86d47d80fd7c59e680043 Author: Rinaldo Pitzer JĂșnior <16694899+rinaldo...@users.noreply.github.com> AuthorDate: Mon Aug 1 15:07:42 2022 -0300 CAMEL-18333: camel-kafka: add better error message to the health-check This extracts the message from the root cause exception and adds it to the general health check message. This message should contain more relevant information to the user, like telling that the boostrap servers are incorrect or unreachable, or that a port number is invalid. Those messages would not be present in the default health check message, making it harder for the user to diagnose, especially in environtments where the component is running inside a pod and the user doesn't necessarily have access to the logs. --- .../camel/component/kafka/TaskHealthState.java | 13 ++ .../KafkaConsumerBadPortHealthCheckIT.java | 193 ++++++++++++++++++++ ...fkaConsumerBadPortSupervisingHealthCheckIT.java | 201 +++++++++++++++++++++ .../KafkaConsumerUnresolvableHealthCheckIT.java | 193 ++++++++++++++++++++ 4 files changed, 600 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java index 1ce9a130d6e..f8f0f74437a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java @@ -93,6 +93,19 @@ public class TaskHealthState { msg += " (recovery in progress using " + time + " intervals)."; } + if (lastError != null) { + msg += " - Error: " + extractRootCause(lastError).getMessage(); + } + return msg; } + + private Throwable extractRootCause(Throwable throwable) { + Throwable rootCause = throwable; + while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { + rootCause = rootCause.getCause(); + } + return rootCause; + } + } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java new file mode 100644 index 00000000000..0ca75d0b2b4 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckHelper; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.impl.health.DefaultHealthCheckRegistry; +import org.apache.camel.test.infra.kafka.services.KafkaService; +import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport { + public static final String TOPIC = "test-health"; + + public static KafkaService service = KafkaServiceFactory.createService(); + + protected static AdminClient kafkaAdminClient; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerBadPortHealthCheckIT.class); + + @BindToRegistry("myHeaderDeserializer") + private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer(); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @BeforeAll + public static void beforeClass() { + service.initialize(); + + LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers()); + System.setProperty("bootstrapServers", service.getBootstrapServers()); + System.setProperty("brokers", service.getBootstrapServers()); + } + + @AfterAll + public static void afterClass() { + service.shutdown(); + } + + @BeforeEach + public void setKafkaAdminClient() { + if (kafkaAdminClient == null) { + kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service); + } + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getPropertiesComponent().setLocation("ref:prop"); + + KafkaComponent kafka = new KafkaComponent(context); + kafka.init(); + kafka.getConfiguration().setBrokers(service.getBootstrapServers() + 123); + context.addComponent("kafka", kafka); + + // install health check manually (yes a bit cumbersome) + HealthCheckRegistry registry = new DefaultHealthCheckRegistry(); + registry.setCamelContext(context); + Object hc = registry.resolveById("context"); + registry.register(hc); + hc = registry.resolveById("routes"); + registry.register(hc); + hc = registry.resolveById("consumers"); + registry.register(hc); + context.setExtension(HealthCheckRegistry.class, registry); + + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(from) + .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) + .routeId("test-health-it").to(to); + } + }; + } + + @Order(1) + @Test + public void kafkaConsumerHealthCheck() throws InterruptedException { + // health-check liveness should be UP + Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context); + boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP)); + Assertions.assertTrue(up, "liveness check"); + + // health-check readiness should be down + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); + boolean up2 = res2.stream().allMatch(r -> { + return r.getState().equals(HealthCheck.State.DOWN) && + r.getMessage().stream().allMatch(msg -> msg.contains("port")); + }); + Assertions.assertTrue(up2, "readiness check"); + }); + + String propagatedHeaderKey = "PropagatedCustomHeader"; + byte[] propagatedHeaderValue = "propagated header value".getBytes(); + to.expectedMessageCount(0); + to.expectedMinimumMessageCount(0); + to.expectedNoHeaderReceived(); + + for (int k = 0; k < 5; k++) { + String msg = "message-" + k; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg); + data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes())); + data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue)); + producer.send(data); + } + + to.assertIsSatisfied(3000); + } + + private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer { + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java new file mode 100644 index 00000000000..31727cae3fd --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckHelper; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.impl.engine.DefaultSupervisingRouteController; +import org.apache.camel.impl.health.DefaultHealthCheckRegistry; +import org.apache.camel.spi.SupervisingRouteController; +import org.apache.camel.test.infra.kafka.services.KafkaService; +import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSupport { + public static final String TOPIC = "test-health"; + + public static KafkaService service = KafkaServiceFactory.createService(); + + protected static AdminClient kafkaAdminClient; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerBadPortSupervisingHealthCheckIT.class); + + @BindToRegistry("myHeaderDeserializer") + private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer(); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @BeforeAll + public static void beforeClass() { + service.initialize(); + + LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers()); + System.setProperty("bootstrapServers", service.getBootstrapServers()); + System.setProperty("brokers", service.getBootstrapServers()); + } + + @AfterAll + public static void afterClass() { + service.shutdown(); + } + + @BeforeEach + public void setKafkaAdminClient() { + if (kafkaAdminClient == null) { + kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service); + } + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getPropertiesComponent().setLocation("ref:prop"); + + context.setRouteController(new DefaultSupervisingRouteController()); + SupervisingRouteController src = context.getRouteController().supervising(); + src.setBackOffDelay(3); + src.setBackOffMaxAttempts(3); + src.setInitialDelay(3); + + KafkaComponent kafka = new KafkaComponent(context); + kafka.init(); + kafka.getConfiguration().setBrokers(service.getBootstrapServers() + 123); + context.addComponent("kafka", kafka); + + // install health check manually (yes a bit cumbersome) + HealthCheckRegistry registry = new DefaultHealthCheckRegistry(); + registry.setCamelContext(context); + Object hc = registry.resolveById("context"); + registry.register(hc); + hc = registry.resolveById("routes"); + registry.register(hc); + hc = registry.resolveById("consumers"); + registry.register(hc); + context.setExtension(HealthCheckRegistry.class, registry); + + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(from) + .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) + .routeId("test-health-it").to(to); + } + }; + } + + @Order(1) + @Test + public void kafkaConsumerHealthCheck() throws InterruptedException { + // health-check liveness should be UP + Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context); + boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP)); + Assertions.assertTrue(up, "liveness check"); + + // health-check readiness should be down + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); + boolean up2 = res2.stream().allMatch(r -> { + return r.getState().equals(HealthCheck.State.DOWN) && + r.getMessage().stream().allMatch(msg -> msg.contains("port")); + }); + Assertions.assertTrue(up2, "readiness check"); + }); + + String propagatedHeaderKey = "PropagatedCustomHeader"; + byte[] propagatedHeaderValue = "propagated header value".getBytes(); + to.expectedMessageCount(0); + to.expectedMinimumMessageCount(0); + to.expectedNoHeaderReceived(); + + for (int k = 0; k < 5; k++) { + String msg = "message-" + k; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg); + data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes())); + data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue)); + producer.send(data); + } + + to.assertIsSatisfied(3000); + } + + private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer { + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java new file mode 100644 index 00000000000..c0395ec7f6c --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckHelper; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.impl.health.DefaultHealthCheckRegistry; +import org.apache.camel.test.infra.kafka.services.KafkaService; +import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport { + public static final String TOPIC = "test-health"; + + public static KafkaService service = KafkaServiceFactory.createService(); + + protected static AdminClient kafkaAdminClient; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUnresolvableHealthCheckIT.class); + + @BindToRegistry("myHeaderDeserializer") + private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer(); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @BeforeAll + public static void beforeClass() { + service.initialize(); + + LOG.info("### Embedded Kafka cluster broker list: {}", service.getBootstrapServers()); + System.setProperty("bootstrapServers", service.getBootstrapServers()); + System.setProperty("brokers", service.getBootstrapServers()); + } + + @AfterAll + public static void afterClass() { + service.shutdown(); + } + + @BeforeEach + public void setKafkaAdminClient() { + if (kafkaAdminClient == null) { + kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service); + } + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getPropertiesComponent().setLocation("ref:prop"); + + KafkaComponent kafka = new KafkaComponent(context); + kafka.init(); + kafka.getConfiguration().setBrokers(service.getBootstrapServers().replace("localhost", "locaIhost")); + context.addComponent("kafka", kafka); + + // install health check manually (yes a bit cumbersome) + HealthCheckRegistry registry = new DefaultHealthCheckRegistry(); + registry.setCamelContext(context); + Object hc = registry.resolveById("context"); + registry.register(hc); + hc = registry.resolveById("routes"); + registry.register(hc); + hc = registry.resolveById("consumers"); + registry.register(hc); + context.setExtension(HealthCheckRegistry.class, registry); + + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(from) + .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) + .routeId("test-health-it").to(to); + } + }; + } + + @Order(1) + @Test + public void kafkaConsumerHealthCheck() throws InterruptedException { + // health-check liveness should be UP + Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context); + boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP)); + Assertions.assertTrue(up, "liveness check"); + + // health-check readiness should be down + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); + boolean up2 = res2.stream().allMatch(r -> { + return r.getState().equals(HealthCheck.State.DOWN) && + r.getMessage().stream().allMatch(msg -> msg.contains("bootstrap")); + }); + Assertions.assertTrue(up2, "readiness check"); + }); + + String propagatedHeaderKey = "PropagatedCustomHeader"; + byte[] propagatedHeaderValue = "propagated header value".getBytes(); + to.expectedMessageCount(0); + to.expectedMinimumMessageCount(0); + to.expectedNoHeaderReceived(); + + for (int k = 0; k < 5; k++) { + String msg = "message-" + k; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg); + data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes())); + data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue)); + producer.send(data); + } + + to.assertIsSatisfied(3000); + } + + private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer { + } +}