This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch essobedo/532/add-integration-test-for-debezium-mongodb in repository https://gitbox.apache.org/repos/asf/camel-karaf.git
commit 9b1680f0504e700886ffeb4b2537ae8a847311ff Author: Nicolas Filotto <nicolas.filo...@qlik.com> AuthorDate: Wed Oct 23 21:04:32 2024 +0200 Ref #532: Add camel-debezium-mongodb integration test --- .../camel-debezium/camel-debezium-mongodb/pom.xml | 14 ++ features/src/main/feature/camel-features.xml | 3 + tests/features/camel-debezium-mongodb/pom.xml | 45 +++++ .../test/CamelDebeziumMongodbRouteSupplier.java | 89 ++++++++++ .../camel/itest/CamelDebeziumMongodbITest.java | 47 +++++ .../karaf/camel/itest/MongoContainerResource.java | 192 +++++++++++++++++++++ tests/features/pom.xml | 1 + 7 files changed, 391 insertions(+) diff --git a/components/camel-debezium/camel-debezium-mongodb/pom.xml b/components/camel-debezium/camel-debezium-mongodb/pom.xml index a677a407e..7837b3cf6 100644 --- a/components/camel-debezium/camel-debezium-mongodb/pom.xml +++ b/components/camel-debezium/camel-debezium-mongodb/pom.xml @@ -53,6 +53,18 @@ </exclusion> </exclusions> </dependency> + <!-- Must be removed once the split package issue is fixed in Camel --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-debezium-common</artifactId> + <version>${camel-version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.camel</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> @@ -69,6 +81,8 @@ <configuration> <artifactSet> <includes> + <!-- Must be removed once the split package issue is fixed in Camel --> + <include>org.apache.camel:camel-debezium-common</include> <include>org.apache.camel:camel-debezium-mongodb</include> </includes> </artifactSet> diff --git a/features/src/main/feature/camel-features.xml b/features/src/main/feature/camel-features.xml index dcd7d7f6b..bec373cc1 100644 --- a/features/src/main/feature/camel-features.xml +++ b/features/src/main/feature/camel-features.xml @@ -944,6 +944,9 @@ <feature name='camel-debezium-mongodb' version='${project.version}' start-level='50'> <feature version='${camel-osgi-version-range}'>camel-debezium-common</feature> <bundle dependency='true'>wrap:mvn:io.debezium/debezium-connector-mongodb/${debezium-version}</bundle> + <bundle dependency='true'>mvn:org.mongodb/mongodb-driver-sync/${auto-detect-version}</bundle> + <bundle dependency='true'>mvn:org.mongodb/mongodb-driver-core/${auto-detect-version}</bundle> + <bundle dependency='true'>mvn:org.mongodb/bson/${auto-detect-version}</bundle> <bundle>mvn:org.apache.camel.karaf/camel-debezium-mongodb/${project.version}</bundle> </feature> <feature name='camel-debezium-mysql' version='${project.version}' start-level='50'> diff --git a/tests/features/camel-debezium-mongodb/pom.xml b/tests/features/camel-debezium-mongodb/pom.xml new file mode 100644 index 000000000..169e7395c --- /dev/null +++ b/tests/features/camel-debezium-mongodb/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.camel.karaf</groupId> + <artifactId>camel-karaf-features-test</artifactId> + <version>4.8.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-debezium-mongodb-test</artifactId> + <name>Apache Camel :: Karaf :: Tests :: Features :: Debezium MongoDB</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-debezium-mongodb</artifactId> + <version>${camel-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-mongodb</artifactId> + <version>${camel-version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/tests/features/camel-debezium-mongodb/src/main/java/org/apache/karaf/camel/test/CamelDebeziumMongodbRouteSupplier.java b/tests/features/camel-debezium-mongodb/src/main/java/org/apache/karaf/camel/test/CamelDebeziumMongodbRouteSupplier.java new file mode 100644 index 000000000..f0c0f26a1 --- /dev/null +++ b/tests/features/camel-debezium-mongodb/src/main/java/org/apache/karaf/camel/test/CamelDebeziumMongodbRouteSupplier.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.camel.test; + +import java.util.function.Function; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCredential; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mongodb.MongoDbComponent; +import org.apache.camel.model.RouteDefinition; +import org.apache.karaf.camel.itests.AbstractCamelSingleFeatureResultMockBasedRouteSupplier; +import org.apache.karaf.camel.itests.CamelRouteSupplier; +import org.bson.Document; +import org.osgi.service.component.annotations.Component; + +import static org.apache.camel.builder.Builder.constant; +import static org.apache.camel.builder.Builder.simple; +import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID; + +@Component( + name = "karaf-camel-debezium-mongodb-test", + immediate = true, + service = CamelRouteSupplier.class +) +public class CamelDebeziumMongodbRouteSupplier extends AbstractCamelSingleFeatureResultMockBasedRouteSupplier { + + @Override + public void configure(CamelContext context) { + MongoClient db = MongoClients.create( + MongoClientSettings.builder() + .credential(MongoCredential.createCredential(System.getProperty("mongo.username"), + System.getProperty("mongo.authentication.database"), System.getProperty("mongo.password").toCharArray())) + .applyConnectionString(new ConnectionString(System.getProperty("mongo.connection"))) + .build()); + context.getRegistry().bind("db", db); + context.getComponent("mongodb", MongoDbComponent.class).setMongoConnection(null); + } + + @Override + protected Function<RouteBuilder, RouteDefinition> consumerRoute() { + return builder -> + (RouteDefinition) builder.fromF("debezium-mongodb:debezium-mongodb-example-01?mongodbConnectionString=%s" + + "&mongodbAuthsource=%s&mongodbUser=%s&mongodbPassword=%s" + + "&topicPrefix=embedded-debezium&offsetStorageFileName=offset-01.data&offsetStorage=org.apache.kafka.connect.storage.FileOffsetBackingStore" + + "&databaseIncludeList=%s&schemaHistoryInternalFileFilename=schema-history-01.data", + System.getProperty("mongo.connection"), + System.getProperty("mongo.authentication.database"), System.getProperty("mongo.username"), System.getProperty("mongo.password"), + System.getProperty("mongo.database")) + .log("received message ${body}") + .choice() + .when(simple("${body.toString} contains 'weight'")) + .process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody(Document.class).get("_id"))) + .otherwise() + .stop() + .endChoice() + .end(); + } + + @Override + protected void configureProducer(RouteBuilder builder, RouteDefinition producerRoute) { + producerRoute + .log("insert new product") + .setBody(constant(new Document(MONGO_ID, 1L) + .append("name", "scooter") + .append("description", "Small 2-wheel yellow scooter") + .append("quantity", 5) + .append("weight", 5.54).toJson())) + .toF("mongodb:db?database=%s&collection=%s&operation=insert", System.getProperty("mongo.database"), System.getProperty("mongo.collection")); + } +} + diff --git a/tests/features/camel-debezium-mongodb/src/test/java/org/apache/karaf/camel/itest/CamelDebeziumMongodbITest.java b/tests/features/camel-debezium-mongodb/src/test/java/org/apache/karaf/camel/itest/CamelDebeziumMongodbITest.java new file mode 100644 index 000000000..802597db1 --- /dev/null +++ b/tests/features/camel-debezium-mongodb/src/test/java/org/apache/karaf/camel/itest/CamelDebeziumMongodbITest.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.camel.itest; + +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.karaf.camel.itests.AbstractCamelSingleFeatureResultMockBasedRouteITest; +import org.apache.karaf.camel.itests.CamelKarafTestHint; +import org.apache.karaf.camel.itests.PaxExamWithExternalResource; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; +import org.ops4j.pax.exam.spi.reactors.PerClass; + +@CamelKarafTestHint(externalResourceProvider = CamelDebeziumMongodbITest.ExternalResourceProviders.class, + additionalRequiredFeatures = "camel-mongodb", retryOnFailure = true) +@RunWith(PaxExamWithExternalResource.class) +@ExamReactorStrategy(PerClass.class) +public class CamelDebeziumMongodbITest extends AbstractCamelSingleFeatureResultMockBasedRouteITest { + + @Override + public void configureMock(MockEndpoint mock) { + mock.expectedBodiesReceivedInAnyOrder("101", "102", "103", "104", "105", "106", "107", "108", "109", "1"); + } + + @Test + public void testResultMock() throws Exception { + assertMockEndpointsSatisfied(); + } + + public static final class ExternalResourceProviders { + + public static MongoContainerResource createMongoDBContainer() { + return new MongoContainerResource(); + } + } +} \ No newline at end of file diff --git a/tests/features/camel-debezium-mongodb/src/test/java/org/apache/karaf/camel/itest/MongoContainerResource.java b/tests/features/camel-debezium-mongodb/src/test/java/org/apache/karaf/camel/itest/MongoContainerResource.java new file mode 100644 index 000000000..29599c0c1 --- /dev/null +++ b/tests/features/camel-debezium-mongodb/src/test/java/org/apache/karaf/camel/itest/MongoContainerResource.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.karaf.camel.itest; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; + +import org.apache.karaf.camel.itests.ExternalResource; + +/** + * A resource that manages the lifecycle of the MongoDB container for testing purposes. The integration test has many + * limitations, such as the requirement to use the MongoDB port which could cause conflicts with concurrent builds but + * the retry on failure mode is enabled for this test to limit the risk of build failure. + * For some unknown reason, if the container is managed by test containers, the connection to the database fails, reason + * why this class has been added as workaround. + */ +class MongoContainerResource implements ExternalResource { + + private static final String CONTAINER_NAME = "mongo"; + private static final String DEBEZIUM_VERSION = "2.7"; + private static final String MONGO_DB_IMAGE = "quay.io/debezium/example-mongodb"; + private static final String AUTHENTICATION_DB_NAME = "admin"; + private static final String SOURCE_DB_NAME = "inventory"; + private static final String SOURCE_COLLECTION_NAME = "products"; + private static final String SOURCE_DB_USERNAME = "debezium"; + private static final String SOURCE_DB_PASSWORD = "dbz"; + private static final int MONGODB_PORT = 27017; + + @Override + public void before() { + startContainer(); + intContainer(); + } + + private static void startContainer() { + try { + Command command = new Command( + "docker", "run", "--rm", "--name", CONTAINER_NAME, "-p", "%d:%d".formatted(MONGODB_PORT, MONGODB_PORT), + "-e", "MONGODB_USER=%s".formatted(SOURCE_DB_USERNAME), "-e", "MONGODB_PASSWORD=%s".formatted(SOURCE_DB_PASSWORD), + "%s:%s".formatted(MONGO_DB_IMAGE, DEBEZIUM_VERSION) + ); + command.waitForResult("(?i).*waiting for connections.*"); + } catch (Exception e) { + throw new RuntimeException("Error starting MongoDB container", e); + } + } + + private static void intContainer() { + try { + Command command = new Command("docker", "exec", CONTAINER_NAME, "bash", "-c", "/usr/local/bin/init-inventory.sh -h localhost"); + command.waitForResult(); + } catch (Exception e) { + throw new RuntimeException("Error initializing MongoDB container", e); + } + } + + private static void stopContainer() { + try { + Command command = new Command("docker", "stop", CONTAINER_NAME); + command.waitForResult(); + } catch (Exception e) { + throw new RuntimeException("Error stopping MongoDB container", e); + } + } + + @Override + public void after() { + stopContainer(); + } + + @Override + public Map<String, String> properties() { + return Map.of( + "mongo.connection", "mongodb://localhost:%d/?replicaSet=rs0".formatted(MONGODB_PORT), + "mongo.collection", SOURCE_COLLECTION_NAME, + "mongo.authentication.database", AUTHENTICATION_DB_NAME, + "mongo.database", SOURCE_DB_NAME, + "mongo.username", SOURCE_DB_USERNAME, + "mongo.password", SOURCE_DB_PASSWORD + ); + } + + private static class Command { + private final Process process; + + Command(String... command) throws IOException { + this.process = new ProcessBuilder(command).start(); + } + + void waitForResult() { + waitForResult( + CompletableFuture.runAsync(this::checkExitCode), CompletableFuture.runAsync(this::checkError) + ); + } + + void waitForResult(String startupPattern) { + waitForResult( + CompletableFuture.supplyAsync(this::checkExitCode), + CompletableFuture.supplyAsync(() -> checkStartupPattern(startupPattern)), + CompletableFuture.supplyAsync(this::checkError) + ); + } + + private void waitForResult(CompletableFuture<?>... cfs) { + ProcessResult result = (ProcessResult) CompletableFuture.anyOf(cfs) + .join(); + if (result != null && !result.isSuccess()) { + throw new RuntimeException(result.getError()); + } + } + + private ProcessResult checkExitCode() { + try { + int exitCode = process.waitFor(); + if (exitCode == 0) { + return new ProcessResult(true); + } + return new ProcessResult("Process exited with code %d".formatted(exitCode)); + } catch (InterruptedException e) { + return new ProcessResult("The thread was interrupted"); + } catch (RuntimeException e) { + return new ProcessResult("Could not check the exit code: %s".formatted(e.getMessage())); + } + } + + private ProcessResult checkStartupPattern(String startupPattern) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + Pattern pattern = Pattern.compile(startupPattern); + String line; + while ((line = reader.readLine()) != null) { + if (pattern.matcher(line).matches()) { + return new ProcessResult(true); + } + } + } catch (Exception e) { + return new ProcessResult("Could not check the status pattern: %s".formatted(e.getMessage())); + } + return new ProcessResult("The startup pattern %s could not be found".formatted(startupPattern)); + } + + private ProcessResult checkError() { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { + return new ProcessResult(reader.readLine()); + } catch (Exception e) { + return new ProcessResult("Could not check the error messages: %s".formatted(e.getMessage())); + } + } + } + + private static class ProcessResult { + private final boolean success; + private final String error; + + private ProcessResult(boolean success) { + this.success = success; + this.error = null; + } + + private ProcessResult(String error) { + this.success = false; + this.error = error; + } + + public boolean isSuccess() { + return success; + } + + public String getError() { + return Optional.ofNullable(error).orElse("Process failed"); + } + } +} diff --git a/tests/features/pom.xml b/tests/features/pom.xml index c898781b8..cf6581887 100644 --- a/tests/features/pom.xml +++ b/tests/features/pom.xml @@ -68,6 +68,7 @@ <module>camel-crypto</module> <module>camel-csv</module> <module>camel-cxf</module> + <module>camel-debezium-mongodb</module> <module>camel-debezium-mysql</module> <module>camel-debezium-postgres</module> <module>camel-disruptor</module>