This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch camel-main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 5903707f0bd3b9f40fd698d48d0636546b77ffc9 Author: Amos Feng <zh.f...@gmail.com> AuthorDate: Fri Mar 11 17:06:30 2022 +0800 Fix #3592 add some ReflectiveClassBuildItem for camel-kafka (#3594) --- .../component/kafka/deployment/KafkaProcessor.java | 26 ++++++++++ .../quarkus/test/support/kafka/InjectKafka.java | 27 ++++++++++ .../test/support/kafka/KafkaTestResource.java | 7 +++ integration-tests/kafka/pom.xml | 4 ++ .../kafka/it/CamelKafkaHealthCheckIT.java | 23 +++++++++ .../kafka/it/CamelKafkaHealthCheckTest.java | 59 ++++++++++++++++++++++ .../kafka/it/KafkaHealthCheckProfile.java | 29 +++++++++++ 7 files changed, 175 insertions(+) diff --git a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java index 90e2945..270a7a1 100644 --- a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java +++ b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java @@ -16,7 +16,9 @@ */ package org.apache.camel.quarkus.component.kafka.deployment; +import java.util.Collection; import java.util.Optional; +import java.util.stream.Stream; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.deployment.Capabilities; @@ -24,19 +26,28 @@ import io.quarkus.deployment.Capability; import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.DevServicesLauncherConfigResultBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig; import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig; import org.apache.camel.quarkus.component.kafka.KafkaClientFactoryProducer; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.jandex.IndexView; class KafkaProcessor { private static final String FEATURE = "camel-kafka"; private static final String CAMEL_KAFKA_BROKERS = "camel.component.kafka.brokers"; private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + private static final DotName[] KAFKA_CLIENTS_TYPES = { + DotName.createSimple("org.apache.kafka.clients.producer.Producer"), + DotName.createSimple("org.apache.kafka.clients.consumer.Consumer") + }; @BuildStep FeatureBuildItem feature() { @@ -68,4 +79,19 @@ class KafkaProcessor { } } } + + @BuildStep + public void reflectiveClasses(CombinedIndexBuildItem combinedIndex, + BuildProducer<ReflectiveClassBuildItem> reflectiveClass) { + IndexView index = combinedIndex.getIndex(); + + Stream.of(KAFKA_CLIENTS_TYPES) + .map(index::getAllKnownImplementors) + .flatMap(Collection::stream) + .map(ClassInfo::toString) + .forEach(name -> reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, name))); + + reflectiveClass + .produce(new ReflectiveClassBuildItem(false, true, "org.apache.kafka.clients.producer.internals.Sender")); + } } diff --git a/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/InjectKafka.java b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/InjectKafka.java new file mode 100644 index 0000000..02dca6a --- /dev/null +++ b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/InjectKafka.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.test.support.kafka; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD }) +@Retention(RetentionPolicy.RUNTIME) +public @interface InjectKafka { +} diff --git a/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java index dce93d4..9c81c69 100644 --- a/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java +++ b/integration-tests-support/kafka/src/main/java/org/apache/camel/quarkus/test/support/kafka/KafkaTestResource.java @@ -63,4 +63,11 @@ public class KafkaTestResource implements QuarkusTestResourceLifecycleManager { } } } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields(container, + new TestInjector.AnnotatedAndMatchesType(InjectKafka.class, KafkaContainer.class)); + } + } diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml index e9e3e24..d298715 100644 --- a/integration-tests/kafka/pom.xml +++ b/integration-tests/kafka/pom.xml @@ -57,6 +57,10 @@ </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-microprofile-health</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-integration-tests-support-kafka</artifactId> </dependency> <dependency> diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckIT.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckIT.java new file mode 100644 index 0000000..2961643 --- /dev/null +++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckIT.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka.it; + +import io.quarkus.test.junit.NativeImageTest; + +@NativeImageTest +public class CamelKafkaHealthCheckIT extends CamelKafkaHealthCheckTest { +} diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckTest.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckTest.java new file mode 100644 index 0000000..27bd629 --- /dev/null +++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaHealthCheckTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka.it; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import io.restassured.RestAssured; +import io.restassured.http.ContentType; +import org.apache.camel.quarkus.test.support.kafka.InjectKafka; +import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@QuarkusTest +@QuarkusTestResource(KafkaTestResource.class) +@TestProfile(KafkaHealthCheckProfile.class) +public class CamelKafkaHealthCheckTest { + + @InjectKafka + KafkaContainer container; + + @Test + void testHealthCheck() { + RestAssured.when().get("/q/health").then() + .contentType(ContentType.JSON) + .header("Content-Type", containsString("charset=UTF-8")) + .body("status", is("UP")); + + // stop the kafka container to test health-check DOWN + container.stop(); + + RestAssured.when().get("/q/health").then() + .contentType(ContentType.JSON) + .header("Content-Type", containsString("charset=UTF-8")) + .body("status", is("DOWN"), + "checks.find { it.name == 'camel-kafka' }.status", is("DOWN"), + "checks.find { it.name == 'camel-kafka' }.data.topic", notNullValue(), + "checks.find { it.name == 'camel-kafka' }.data.'client.id'", notNullValue()); + } +} diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/KafkaHealthCheckProfile.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/KafkaHealthCheckProfile.java new file mode 100644 index 0000000..620c5dc --- /dev/null +++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/KafkaHealthCheckProfile.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka.it; + +import java.util.Map; + +import io.quarkus.test.junit.QuarkusTestProfile; + +public class KafkaHealthCheckProfile implements QuarkusTestProfile { + @Override + public Map<String, String> getConfigOverrides() { + // force shutdown + return Map.of("camel.main.shutdownTimeout", "10"); + } +}