This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/main by this push: new 3a9feb3 CAMEL-11834: Add tests for as2/aws/debezium/transformer-demo (#68) 3a9feb3 is described below commit 3a9feb3917228f7b61a3219cde140d7d12b5cd71 Author: Nicolas Filotto <essob...@users.noreply.github.com> AuthorDate: Thu Feb 3 22:31:36 2022 +0100 CAMEL-11834: Add tests for as2/aws/debezium/transformer-demo (#68) --- examples/as2/pom.xml | 7 +- .../java/org/apache/camel/example/as2/As2Test.java | 49 ++++++ .../aws/main-endpointdsl-aws2-s3-kafka/README.adoc | 9 + .../aws/main-endpointdsl-aws2-s3-kafka/pom.xml | 8 +- .../org/apache/camel/example/AwsS3KafkaTest.java | 127 ++++++++++++++ examples/aws/main-endpointdsl-aws2-s3/README.adoc | 9 + .../java/org/apache/camel/example/AwsS3Test.java | 112 ++++++++++++ .../{readme.adoc => README.adoc} | 14 +- .../{readme.adoc => README.adoc} | 16 +- .../org/apache/camel/example/MyRouteBuilder.java | 2 +- .../java/org/apache/camel/example/AwsS3Test.java | 95 ++++++++++ .../aws2-sqs-consumer/{readme.adoc => README.adoc} | 14 +- .../org/apache/camel/example/MyRouteBuilder.java | 2 +- .../java/org/apache/camel/example/AwsSQSTest.java | 109 ++++++++++++ .../README.adoc | 23 ++- .../pom.xml | 8 +- .../org/apache/camel/example/MyRouteBuilder.java | 3 +- .../src/main/resources/application.properties | 3 + .../org/apache/camel/example/KafkaAwsS3Test.java | 121 +++++++++++++ .../aws/main-endpointdsl-kafka-aws2-s3/README.adoc | 18 +- .../aws/main-endpointdsl-kafka-aws2-s3/pom.xml | 8 +- .../org/apache/camel/example/MyRouteBuilder.java | 10 +- .../src/main/resources/application.properties | 3 + .../org/apache/camel/example/KafkaAwsS3Test.java | 123 +++++++++++++ examples/aws/pom.xml | 22 ++- .../org/apache/camel/example/MyRouteBuilder.java | 6 +- .../example/azurestorageblob/Application.java | 59 +++---- .../org/apache/camel/example/MyRouteBuilder.java | 4 - examples/csimple-joor/{readme.adoc => README.adoc} | 2 +- examples/csimple/{readme.adoc => README.adoc} | 0 .../DebeziumMySqlConsumerToAzureEventHubs.java | 5 +- .../src/main/resources/application.properties | 5 +- examples/debezium/README.adoc | 35 ++-- examples/debezium/pom.xml | 45 ++++- ...is.java => DebeziumPgSQLConsumerToKinesis.java} | 50 +++--- .../debezium/KinesisProducerToCassandra.java | 78 +++++---- .../src/main/resources/application.properties | 26 ++- .../camel/example/debezium/DebeziumTest.java | 191 +++++++++++++++++++++ .../org/apache/camel/example/debezium/db-init.cql | 10 ++ examples/transformer-demo/pom.xml | 8 +- .../example/transformer/demo/TransformerTest.java | 117 +++++++++++++ 41 files changed, 1376 insertions(+), 180 deletions(-) diff --git a/examples/as2/pom.xml b/examples/as2/pom.xml index 86f9463..007a737 100644 --- a/examples/as2/pom.xml +++ b/examples/as2/pom.xml @@ -85,7 +85,12 @@ <artifactId>slf4j-log4j12</artifactId> <version>${slf4j-version}</version> </dependency> - + <!-- for testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring-junit5</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/examples/as2/src/test/java/org/apache/camel/example/as2/As2Test.java b/examples/as2/src/test/java/org/apache/camel/example/as2/As2Test.java new file mode 100644 index 0000000..36573fc --- /dev/null +++ b/examples/as2/src/test/java/org/apache/camel/example/as2/As2Test.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.as2; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.test.spring.junit5.CamelSpringTest; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * A unit test checking that the Camel works as expected with AS2. + */ +@CamelSpringTest +@ContextConfiguration("/META-INF/spring/camel-context.xml") +class As2Test { + + @Autowired + ModelCamelContext context; + + @Test + void should_consume_and_produce_as2_messages() { + NotifyBuilder notify = new NotifyBuilder(context).fromRoute("server-route") + .whenCompleted(8).create(); + + assertTrue( + notify.matches(30, TimeUnit.SECONDS), "8 messages should be received by the AS2 server" + ); + } +} diff --git a/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc b/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc index 1d70dbe..7f6dbd1 100644 --- a/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc +++ b/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc @@ -12,6 +12,15 @@ This example will use the AWS default credentials Provider: https://docs.aws.ama Set your credentials accordingly. Don't forget to add the bucket name and point to the correct topic. +=== Build + +First compile the example by executing: + +[source,sh] +---- +$ mvn compile +---- + === How to run You can run this example using diff --git a/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml b/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml index 9493af6..53ad64f 100644 --- a/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml +++ b/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml @@ -86,7 +86,13 @@ <artifactId>logback-classic</artifactId> <version>${logback-version}</version> </dependency> - + <!-- for testing --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/examples/aws/main-endpointdsl-aws2-s3-kafka/src/test/java/org/apache/camel/example/AwsS3KafkaTest.java b/examples/aws/main-endpointdsl-aws2-s3-kafka/src/test/java/org/apache/camel/example/AwsS3KafkaTest.java new file mode 100644 index 0000000..e02d9a4 --- /dev/null +++ b/examples/aws/main-endpointdsl-aws2-s3-kafka/src/test/java/org/apache/camel/example/AwsS3KafkaTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Component; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.apache.camel.util.PropertiesHelper.asProperties; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +/** + * A unit test checking that Camel can poll an Amazon S3 bucket and put the data into a Kafka topic. + */ +class AwsS3KafkaTest extends CamelTestSupport { + + private static final String AWS_IMAGE = "localstack/localstack:0.13.3"; + private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:6.2.2"; + private static LocalStackContainer AWS_CONTAINER; + private static KafkaContainer KAFKA_CONTAINER; + + @BeforeAll + static void init() { + AWS_CONTAINER = new LocalStackContainer(DockerImageName.parse(AWS_IMAGE)) + .withServices(S3) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));; + AWS_CONTAINER.start(); + KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)); + KAFKA_CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (AWS_CONTAINER != null) { + AWS_CONTAINER.stop(); + } + if (KAFKA_CONTAINER != null) { + KAFKA_CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + AWS2S3Component s3 = camelContext.getComponent("aws2-s3", AWS2S3Component.class); + s3.getConfiguration().setAmazonS3Client( + S3Client.builder() + .endpointOverride(AWS_CONTAINER.getEndpointOverride(S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), AWS_CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(AWS_CONTAINER.getRegion())) + .build() + ); + // Override the host and port of the broker + camelContext.getPropertiesComponent().setOverrideProperties( + asProperties( + "kafkaBrokers", String.format("%s:%d", KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(9093)) + ) + ); + return camelContext; + } + + @Test + void should_poll_s3_bucket_and_push_to_kafka() { + // Add a bucket first + template.send("direct:putObject", exchange -> { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "camel-content-type.txt"); + exchange.getIn().setHeader(AWS2S3Constants.CONTENT_TYPE, "application/text"); + exchange.getIn().setBody("Camel rocks!"); + }); + + NotifyBuilder notify = new NotifyBuilder(context).from("kafka:*").whenCompleted(1).create(); + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + } + + @Override + protected RoutesBuilder[] createRouteBuilders() { + return new RoutesBuilder[]{new MyRouteBuilder(), new AddBucketRouteBuilder()}; + } + + private static class AddBucketRouteBuilder extends RouteBuilder { + + @Override + public void configure() { + from("direct:putObject").to("aws2-s3://{{bucketName}}?autoCreateBucket=true"); + } + } +} diff --git a/examples/aws/main-endpointdsl-aws2-s3/README.adoc b/examples/aws/main-endpointdsl-aws2-s3/README.adoc index 5c844e2..dc8c072 100644 --- a/examples/aws/main-endpointdsl-aws2-s3/README.adoc +++ b/examples/aws/main-endpointdsl-aws2-s3/README.adoc @@ -12,6 +12,15 @@ This example will use the AWS default credentials Provider: https://docs.aws.ama Set your credentials accordingly. Don't forget to add the bucket name. +=== Build + +First compile the example by executing: + +[source,sh] +---- +$ mvn compile +---- + === How to run You can run this example using diff --git a/examples/aws/main-endpointdsl-aws2-s3/src/test/java/org/apache/camel/example/AwsS3Test.java b/examples/aws/main-endpointdsl-aws2-s3/src/test/java/org/apache/camel/example/AwsS3Test.java new file mode 100644 index 0000000..02e0c2d --- /dev/null +++ b/examples/aws/main-endpointdsl-aws2-s3/src/test/java/org/apache/camel/example/AwsS3Test.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Component; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +/** + * A unit test checking that Camel can poll an Amazon S3 bucket. + */ +class AwsS3Test extends CamelTestSupport { + + private static final String IMAGE = "localstack/localstack:0.13.3"; + private static LocalStackContainer CONTAINER; + + @BeforeAll + static void init() { + CONTAINER = new LocalStackContainer(DockerImageName.parse(IMAGE)) + .withServices(S3) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));; + CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (CONTAINER != null) { + CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + AWS2S3Component s3 = camelContext.getComponent("aws2-s3", AWS2S3Component.class); + s3.getConfiguration().setAmazonS3Client( + S3Client.builder() + .endpointOverride(CONTAINER.getEndpointOverride(S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(CONTAINER.getAccessKey(), CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(CONTAINER.getRegion())) + .build() + ); + return camelContext; + } + + @Test + void should_poll_s3_bucket() { + // Add a bucket first + template.send("direct:putObject", exchange -> { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "camel-content-type.txt"); + exchange.getIn().setHeader(AWS2S3Constants.CONTENT_TYPE, "application/text"); + exchange.getIn().setBody("Camel rocks!"); + }); + + NotifyBuilder notify = new NotifyBuilder(context).from("aws2-s3:*").whenCompleted(1).create(); + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + } + + @Override + protected RoutesBuilder[] createRouteBuilders() { + return new RoutesBuilder[]{new MyRouteBuilder(), new AddBucketRouteBuilder()}; + } + + private static class AddBucketRouteBuilder extends RouteBuilder { + + @Override + public void configure() throws Exception { + from("direct:putObject").to("aws2-s3://{{bucketName}}?autoCreateBucket=true"); + } + } +} diff --git a/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/readme.adoc b/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/README.adoc similarity index 85% rename from examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/readme.adoc rename to examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/README.adoc index 587115f..bf4470f 100644 --- a/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/readme.adoc +++ b/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/README.adoc @@ -10,11 +10,23 @@ Notice how you can configure Camel in the `application.properties` file. Don't forget to add your AWS Credentials and the bucket name. +=== Build + +First compile the example by executing: + +[source,sh] +---- +$ mvn compile +---- + === How to run You can run this example using - mvn camel:run +[source,sh] +---- +$ mvn camel:run +---- === Help and contributions diff --git a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/readme.adoc b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/README.adoc similarity index 79% rename from examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/readme.adoc rename to examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/README.adoc index 587115f..d696fc5 100644 --- a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/readme.adoc +++ b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/README.adoc @@ -4,17 +4,29 @@ This example shows how to use the endpoint DSL in your Camel routes to define endpoints using type safe fluent builders, which are Java methods that are compiled. -The example will poll an S3 bucket and Log the content of the file. +The example will store content into an S3 bucket. Notice how you can configure Camel in the `application.properties` file. Don't forget to add your AWS Credentials and the bucket name. +=== Build + +First compile the example by executing: + +[source,sh] +---- +$ mvn compile +---- + === How to run You can run this example using - mvn camel:run +[source,sh] +---- +$ mvn camel:run +---- === Help and contributions diff --git a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java index 2e3a781..93df218 100644 --- a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java +++ b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java @@ -28,7 +28,7 @@ public class MyRouteBuilder extends EndpointRouteBuilder { from(timer("fire").repeatCount("1")) .setBody(constant("Camel rocks")) - .to(aws2S3("{{bucketName}}").keyName("firstfile")) + .to(aws2S3("{{bucketName}}").keyName("firstfile").autoCreateBucket(true)) .stop(); } } diff --git a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/test/java/org/apache/camel/example/AwsS3Test.java b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/test/java/org/apache/camel/example/AwsS3Test.java new file mode 100644 index 0000000..e4c410f --- /dev/null +++ b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/test/java/org/apache/camel/example/AwsS3Test.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Component; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +/** + * A unit test checking that Camel can store content into an Amazon S3 bucket. + */ +class AwsS3Test extends CamelTestSupport { + + private static final String IMAGE = "localstack/localstack:0.13.3"; + private static LocalStackContainer CONTAINER; + + @BeforeAll + static void init() { + CONTAINER = new LocalStackContainer(DockerImageName.parse(IMAGE)) + .withServices(S3) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));; + CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (CONTAINER != null) { + CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + AWS2S3Component s3 = camelContext.getComponent("aws2-s3", AWS2S3Component.class); + s3.getConfiguration().setAmazonS3Client( + S3Client.builder() + .endpointOverride(CONTAINER.getEndpointOverride(S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(CONTAINER.getAccessKey(), CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(CONTAINER.getRegion())) + .build() + ); + return camelContext; + } + + @Test + void should_store_content_into_a_s3_bucket() throws Exception { + NotifyBuilder notify = new NotifyBuilder(context).wereSentTo("aws2-s3:*").whenCompleted(1).create(); + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new MyRouteBuilder(); + } +} diff --git a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/readme.adoc b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/README.adoc similarity index 85% rename from examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/readme.adoc rename to examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/README.adoc index f4626f9..057950f 100644 --- a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/readme.adoc +++ b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/README.adoc @@ -10,11 +10,23 @@ Notice how you can configure Camel in the `application.properties` file. Don't forget to add your AWS Credentials and the bucket name. +=== Build + +First compile the example by executing: + +[source,sh] +---- +$ mvn compile +---- + === How to run You can run this example using - mvn camel:run +[source,sh] +---- +$mvn camel:run +---- === Help and contributions diff --git a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java index caa2048..cec5d0a 100644 --- a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java +++ b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java @@ -26,7 +26,7 @@ public class MyRouteBuilder extends EndpointRouteBuilder { @Override public void configure() throws Exception { - from(aws2Sqs("{{sqs-queue-name}}").deleteAfterRead(true)) + from(aws2Sqs("{{sqs-queue-name}}").deleteAfterRead(true).autoCreateQueue(true)) .log("${body}"); } } diff --git a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/test/java/org/apache/camel/example/AwsSQSTest.java b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/test/java/org/apache/camel/example/AwsSQSTest.java new file mode 100644 index 0000000..d6fd2a9 --- /dev/null +++ b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/test/java/org/apache/camel/example/AwsSQSTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.sqs.Sqs2Component; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; + +/** + * A unit test checking that Camel can consume messages from Amazon SQS. + */ +class AwsSQSTest extends CamelTestSupport { + + private static final String IMAGE = "localstack/localstack:0.13.3"; + private static LocalStackContainer CONTAINER; + + @BeforeAll + static void init() { + CONTAINER = new LocalStackContainer(DockerImageName.parse(IMAGE)) + .withServices(SQS) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));; + CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (CONTAINER != null) { + CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + Sqs2Component sqs = camelContext.getComponent("aws2-sqs", Sqs2Component.class); + sqs.getConfiguration().setAmazonSQSClient( + SqsClient.builder() + .endpointOverride(CONTAINER.getEndpointOverride(SQS)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(CONTAINER.getAccessKey(), CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(CONTAINER.getRegion())) + .build() + ); + return camelContext; + } + + @Test + void should_poll_sqs_queue() { + // Add a message first + template.send("direct:publishMessage", exchange -> { + exchange.getIn().setBody("Camel rocks!"); + }); + + NotifyBuilder notify = new NotifyBuilder(context).from("aws2-sqs:*").whenCompleted(1).create(); + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + } + + @Override + protected RoutesBuilder[] createRouteBuilders() { + return new RoutesBuilder[]{new MyRouteBuilder(), new PublishMessageRouteBuilder()}; + } + + private static class PublishMessageRouteBuilder extends RouteBuilder { + + @Override + public void configure() throws Exception { + from("direct:publishMessage").to("aws2-sqs://{{sqs-queue-name}}?autoCreateQueue=true"); + } + } +} diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc index cafef89..af953c2 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc @@ -2,14 +2,16 @@ This example shows how to use the endpoint DSL in your Camel routes to define endpoints using type safe fluent builders, which are Java methods -that are compiled and it will show the AWS2-S3 stream mode. +that are compiled, and it will show the AWS2-S3 stream mode. -The example will poll one kafka topic s3.topic.1 and upload batch of 25 messages as single file into an s3 bucket (mycamel-1). +The example will poll one kafka topic s3.topic.1 and upload batch of 25 messages as single file into a s3 bucket (mycamel-1). On your bucket you'll see: +``` s3.topic.1/partition_<partition-number>/s3.topic.1.txt s3.topic.1/partition_<partition-number>/s3.topic.1-1.txt +``` and so on @@ -23,7 +25,7 @@ Don't forget to add the bucket name (already created ahead of time) and point to You'll need also a running kafka broker. You'll need to have kafkacat installed. -This example supposed the s3.topic.1 has 1 partition only. +This example supposed the `s3.topic.1` has 1 partition only. But this should work with multiple partitions too. @@ -36,6 +38,10 @@ You can run this example using $ mvn compile ---- +=== How to run + +You can run this example using + [source,sh] ---- $ mvn camel:run @@ -45,20 +51,21 @@ Now run [source,sh] ---- -$ data/burst.sh s3.topic.1 250 0 msg.txt +$ data/burst.sh s3.topic.1 250 0 data/msg.txt ---- Stop the route with CTRL + C. -At this point you should see an s3.topic.1/partition_0 folder, with 10 files. +At this point you should see a `s3.topic.1/partition_0` folder, with 10 files. -Restart the route and run +Restart the route and run +[source,sh] ---- -$ data/burst.sh s3.topic.1 250 0 msg.txt +$ data/burst.sh s3.topic.1 250 0 data/msg.txt ---- -Now in the same s3.topic.1/partition_0 folder, you should see 20 files correctly numbered. +Now in the same `s3.topic.1/partition_0` folder, you should see 20 files correctly numbered. === Help and contributions diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml index a4fb4a2..aef1c5f 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml @@ -86,7 +86,13 @@ <artifactId>logback-classic</artifactId> <version>${logback-version}</version> </dependency> - + <!-- for testing --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java index dd9be98..82cebc3 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java @@ -25,11 +25,12 @@ public class MyRouteBuilder extends EndpointRouteBuilder { @Override public void configure() throws Exception { - from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}")) + from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}").seekTo("{{consumer.seekTo}}")) .log("Kafka Message is: ${body}") .toD(aws2S3("{{bucketName}}") .streamingUploadMode(true) .useDefaultCredentialsProvider(true) + .autoCreateBucket(true) .restartingPolicy(AWSS3RestartingPolicyEnum.lastPart) .batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive) .keyName("{{kafkaTopic1}}/partition_${headers.kafka.PARTITION}/{{kafkaTopic1}}.txt")); diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties index 1bf3df7..49349b2 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties @@ -24,3 +24,6 @@ bucketName=camel-1 kafkaTopic1=s3.topic.1 kafkaBrokers=localhost:9092 + +# Get records from the beginning +consumer.seekTo=beginning \ No newline at end of file diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java new file mode 100644 index 0000000..c3f4417 --- /dev/null +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Component; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.apache.camel.util.PropertiesHelper.asProperties; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +/** + * A unit test checking that Camel can poll data from a Kafka topic and put it into an Amazon S3 bucket. + */ +class KafkaAwsS3Test extends CamelTestSupport { + + private static final String AWS_IMAGE = "localstack/localstack:0.13.3"; + private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:6.2.2"; + private static LocalStackContainer AWS_CONTAINER; + private static KafkaContainer KAFKA_CONTAINER; + + @BeforeAll + static void init() { + AWS_CONTAINER = new LocalStackContainer(DockerImageName.parse(AWS_IMAGE)) + .withServices(S3) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1)); + AWS_CONTAINER.start(); + KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)); + KAFKA_CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (AWS_CONTAINER != null) { + AWS_CONTAINER.stop(); + } + if (KAFKA_CONTAINER != null) { + KAFKA_CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + AWS2S3Component s3 = camelContext.getComponent("aws2-s3", AWS2S3Component.class); + s3.getConfiguration().setAmazonS3Client( + S3Client.builder() + .endpointOverride(AWS_CONTAINER.getEndpointOverride(S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), AWS_CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(AWS_CONTAINER.getRegion())) + .build() + ); + // Override the host and port of the broker + camelContext.getPropertiesComponent().setOverrideProperties( + asProperties( + "kafkaBrokers", String.format("%s:%d", KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(9093)) + ) + ); + return camelContext; + } + + @Test + void should_poll_kafka_and_push_to_s3_bucket() { + NotifyBuilder notify = new NotifyBuilder(context).from("kafka:*").whenCompleted(1).create(); + // Load data into Kafka + template.sendBody("direct:kafka", "Camel rocks in topic 1!"); + assertTrue( + notify.matches(10, TimeUnit.SECONDS), "2 messages should be completed" + ); + } + + @Override + protected RoutesBuilder[] createRouteBuilders() { + return new RoutesBuilder[]{new MyRouteBuilder(), new LoadKafkaRouteBuilder()}; + } + + private static class LoadKafkaRouteBuilder extends RouteBuilder { + + @Override + public void configure() { + from("direct:kafka").to("kafka:{{kafkaTopic1}}?brokers={{kafkaBrokers}}"); + } + } +} diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc b/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc index 0e4ed22..ee5699d 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc @@ -2,18 +2,18 @@ This example shows how to use the endpoint DSL in your Camel routes to define endpoints using type safe fluent builders, which are Java methods -that are compiled and it will show the AWS2-S3 stream mode. +that are compiled, and it will show the AWS2-S3 stream mode. The example will poll two kafka topics (s3.topic.1 and s3.topic.2) and upload batch of 25 messages as single file into an s3 bucket (mycamel-1). On your bucket you'll see: - +``` s3.topic.1/s3.topic.1.txt s3.topic.1/s3.topic.1-1.txt s3.topic.2/s3.topic.2.txt s3.topic.2/s3.topic.2-1.txt - +``` and so on At the end you should have a total of 80 files. @@ -26,15 +26,19 @@ Don't forget to add the bucket name (already created ahead of time) and point to You'll need also a running kafka broker. You'll need to have kafkacat installed. -=== How to run +=== Build -You can run this example using +First compile the example by executing: [source,sh] ---- $ mvn compile ---- +=== How to run + +You can run this example using + [source,sh] ---- $ mvn camel:run @@ -44,8 +48,8 @@ Now run [source,sh] ---- -$ data/burst.sh s3.topic.1 1000 msg.txt -$ data/burst.sh s3.topic.2 1000 msg.txt +$ data/burst.sh s3.topic.1 1000 data/msg.txt +$ data/burst.sh s3.topic.2 1000 data/msg.txt ---- You should see the bucket populated. diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml b/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml index 152f584..967ddc8 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml @@ -86,7 +86,13 @@ <artifactId>logback-classic</artifactId> <version>${logback-version}</version> </dependency> - + <!-- for testing --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java index d51038b..89831ea 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java @@ -24,12 +24,14 @@ public class MyRouteBuilder extends EndpointRouteBuilder { @Override public void configure() throws Exception { - from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}")) + from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}") + .seekTo("{{consumer.seekTo}}")) .log("Kafka Message is: ${body}") - .to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt")); + .to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).autoCreateBucket(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt")); - from(kafka("{{kafkaTopic2}}").brokers("{{kafkaBrokers}}")) + from(kafka("{{kafkaTopic2}}").brokers("{{kafkaBrokers}}") + .seekTo("{{consumer.seekTo}}")) .log("Kafka Message is: ${body}") - .to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt")); + .to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).autoCreateBucket(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt")); } } diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties index 1e4c766..57323ea 100644 --- a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties @@ -25,3 +25,6 @@ bucketName=camel-1 kafkaTopic1=s3.topic.1 kafkaTopic2=s3.topic.2 kafkaBrokers=localhost:9092 + +# Get records from the beginning +consumer.seekTo=beginning \ No newline at end of file diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java new file mode 100644 index 0000000..4063ee7 --- /dev/null +++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.s3.AWS2S3Component; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import static org.apache.camel.util.PropertiesHelper.asProperties; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +/** + * A unit test checking that Camel can poll data from a Kafka topic and put it into an Amazon S3 bucket. + */ +class KafkaAwsS3Test extends CamelTestSupport { + + private static final String AWS_IMAGE = "localstack/localstack:0.13.3"; + private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:6.2.2"; + private static LocalStackContainer AWS_CONTAINER; + private static KafkaContainer KAFKA_CONTAINER; + + @BeforeAll + static void init() { + AWS_CONTAINER = new LocalStackContainer(DockerImageName.parse(AWS_IMAGE)) + .withServices(S3) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1)); + AWS_CONTAINER.start(); + KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)); + KAFKA_CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (AWS_CONTAINER != null) { + AWS_CONTAINER.stop(); + } + if (KAFKA_CONTAINER != null) { + KAFKA_CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + AWS2S3Component s3 = camelContext.getComponent("aws2-s3", AWS2S3Component.class); + s3.getConfiguration().setAmazonS3Client( + S3Client.builder() + .endpointOverride(AWS_CONTAINER.getEndpointOverride(S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), AWS_CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(AWS_CONTAINER.getRegion())) + .build() + ); + // Override the host and port of the broker + camelContext.getPropertiesComponent().setOverrideProperties( + asProperties( + "kafkaBrokers", String.format("%s:%d", KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(9093)) + ) + ); + return camelContext; + } + + @Test + void should_poll_kafka_and_push_to_s3_bucket() { + NotifyBuilder notify = new NotifyBuilder(context).from("kafka:*").whenCompleted(2).create(); + // Load data into Kafka + template.sendBody("direct:kafkaT1", "Camel rocks in topic 1!"); + template.sendBody("direct:kafkaT2", "Camel rocks in topic 2!"); + assertTrue( + notify.matches(10, TimeUnit.SECONDS), "2 messages should be completed" + ); + } + + @Override + protected RoutesBuilder[] createRouteBuilders() { + return new RoutesBuilder[]{new MyRouteBuilder(), new LoadKafkaRouteBuilder()}; + } + + private static class LoadKafkaRouteBuilder extends RouteBuilder { + + @Override + public void configure() { + from("direct:kafkaT1").to("kafka:{{kafkaTopic1}}?brokers={{kafkaBrokers}}"); + from("direct:kafkaT2").to("kafka:{{kafkaTopic2}}?brokers={{kafkaBrokers}}"); + } + } +} diff --git a/examples/aws/pom.xml b/examples/aws/pom.xml index 9463c65..ce7e620 100644 --- a/examples/aws/pom.xml +++ b/examples/aws/pom.xml @@ -44,5 +44,25 @@ <module>main-endpointdsl-kafka-aws2-s3</module> <module>main-endpointdsl-kafka-aws2-s3-restarting-policy</module> </modules> - + <dependencies> + <!-- for testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-junit5</artifactId> + <version>${camel.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>localstack</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-core</artifactId> + <version>1.12.150</version> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java index cfa6066..6a64025 100644 --- a/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java +++ b/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java @@ -17,10 +17,6 @@ package org.apache.camel.example; import org.apache.camel.builder.endpoint.EndpointRouteBuilder; -import org.apache.camel.component.azure.eventhubs.EventHubsConstants; - -import java.util.LinkedList; -import java.util.List; /** * To use the endpoint DSL then we must extend EndpointRouteBuilder instead of RouteBuilder @@ -28,7 +24,7 @@ import java.util.List; public class MyRouteBuilder extends EndpointRouteBuilder { @Override - public void configure() throws Exception { + public void configure() { from(azureEventhubs("{{namespaceName}}/{{eventhubName}}").sharedAccessKey("{{sharedAccessKey}}").sharedAccessName("{{sharedAccessName}}").blobAccessKey("{{blobAccessKey}}").blobAccountName("{{blobAccountName}}").blobContainerName("{{blobContainerName}}")) .log("The content is ${body}"); diff --git a/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java b/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java index 17e15f1..06102d9 100644 --- a/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java +++ b/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java @@ -38,43 +38,44 @@ public final class Application { // add routes which can be inlined as anonymous inner class // (to keep all code in a single java file for this basic example) - camel.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("timer://runOnce?repeatCount=1&delay=0") + camel.addRoutes(createRouteBuilder()); + + // start is not blocking + camel.start(); + + // so run for 10 seconds + Thread.sleep(10_000); + } + } + + static RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer://runOnce?repeatCount=1&delay=0") .routeId("listBlobs") .process(exchange -> exchange.getIn() - .setBody( - new BlobServiceClientBuilder() - .endpoint(String.format("https://%s.blob.core.windows.net", ACCOUNT)) - .credential(new StorageSharedKeyCredential(ACCOUNT, ACCESS_KEY)) - .buildClient() - .getBlobContainerClient(BLOB_CONTAINER_NAME) - .listBlobs( - new ListBlobsOptions().setMaxResultsPerPage(1), - null - ) - ) + .setBody( + new BlobServiceClientBuilder() + .endpoint(String.format("https://%s.blob.core.windows.net", ACCOUNT)) + .credential(new StorageSharedKeyCredential(ACCOUNT, ACCESS_KEY)) + .buildClient() + .getBlobContainerClient(BLOB_CONTAINER_NAME) + .listBlobs( + new ListBlobsOptions().setMaxResultsPerPage(1), + null + ) + ) ) .loopDoWhile(exchange -> - exchange.getIn().getBody(Iterator.class).hasNext() + exchange.getIn().getBody(Iterator.class).hasNext() ) .process(exchange -> - exchange.getIn().setBody(exchange.getIn().getBody(Iterator.class).next()) + exchange.getIn().setBody(exchange.getIn().getBody(Iterator.class).next()) ) .log("${body.name}") .end(); - } - }); - - // start is not blocking - camel.start(); - - // so run for 10 seconds - Thread.sleep(10_000); - - // and then stop nicely - camel.stop(); - } + } + }; } } diff --git a/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java b/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java index 896700c..3b8381e 100644 --- a/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java +++ b/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java @@ -17,17 +17,13 @@ package org.apache.camel.example; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.builder.endpoint.EndpointRouteBuilder; public class MyRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { - - from("kafka:{{topicName}}?brokers={{brokers}}") .setHeader("CamelAzureStorageBlobBlobName", simple("${exchangeId}")) .to("azure-storage-blob://{{accountName}}/{{containerName}}/?accessKey=RAW({{accessKey}})&operation=uploadBlockBlob"); - } } diff --git a/examples/csimple-joor/readme.adoc b/examples/csimple-joor/README.adoc similarity index 89% rename from examples/csimple-joor/readme.adoc rename to examples/csimple-joor/README.adoc index 15c7cd3..e5d40d6 100644 --- a/examples/csimple-joor/readme.adoc +++ b/examples/csimple-joor/README.adoc @@ -5,7 +5,7 @@ This example shows using csimple (compiled simple) scripting language in your Ca When Camel bootstrap then each csimple scripts is compiled using the JVM compiler via the jOOR compiler library. The compilation happens once during startup. This makes the csimple language native Java compiled, with no runtime overhead. -The generated source code are in memory only and compiled at runtime. This means debugging the generated source code is not possible. See the other csimple example which uses a Maven plugin to detect csimple scripts from the source code, to genereate Java source code at build time; which can be debugged. +The generated source code are in memory only and compiled at runtime. This means debugging the generated source code is not possible. See the other csimple example which uses a Maven plugin to detect csimple scripts from the source code, to generate Java source code at build time; which can be debugged. === Build diff --git a/examples/csimple/readme.adoc b/examples/csimple/README.adoc similarity index 100% rename from examples/csimple/readme.adoc rename to examples/csimple/README.adoc diff --git a/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java b/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java index 32861d4..5c537f3 100644 --- a/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java +++ b/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java @@ -50,10 +50,11 @@ public final class DebeziumMySqlConsumerToAzureEventHubs { public void configure() { // Initial Debezium route that will run and listens to the changes, // first it will perform an initial snapshot using (select * from) in case there are no offsets - // exists for the connector and then it will listens to MySQL binlogs for any DB events such as (UPDATE, INSERT and DELETE) + // exists for the connector, and then it will listen to MySQL binlogs for any DB events such as (UPDATE, INSERT and DELETE) from("debezium-mysql:{{debezium.mysql.name}}?" + "databaseServerId={{debezium.mysql.databaseServerId}}" + "&databaseHostname={{debezium.mysql.databaseHostName}}" + + "&databasePort={{debezium.mysql.databasePort}}" + "&databaseUser={{debezium.mysql.databaseUser}}" + "&databasePassword={{debezium.mysql.databasePassword}}" + "&databaseServerName={{debezium.mysql.databaseServerName}}" @@ -63,7 +64,7 @@ public final class DebeziumMySqlConsumerToAzureEventHubs { + "&offsetStorageFileName={{debezium.mysql.offsetStorageFileName}}") .routeId("FromDebeziumMySql") // We will need to prepare the data for Azure EventHubs Therefore, we will hash the key to make sure our record land on the same partition - // and convert it to string, but that means we need to preserve the key information into the message body in order not to lose these information downstream. + // and convert it to string, but that means we need to preserve the key information into the message body in order not to lose this information downstream. // Note: If you'd use Kafka, most probably you will not need these transformations as you can send the key as an object and Kafka will do // the rest to hash it in the broker in order to place it in the correct topic's partition. .setBody(exchange -> { diff --git a/examples/debezium-eventhubs-blob/src/main/resources/application.properties b/examples/debezium-eventhubs-blob/src/main/resources/application.properties index 3510e2c..0bec099 100644 --- a/examples/debezium-eventhubs-blob/src/main/resources/application.properties +++ b/examples/debezium-eventhubs-blob/src/main/resources/application.properties @@ -17,14 +17,15 @@ debezium.mysql.name = debezium-mysql-example-01 debezium.mysql.databaseHostName = localhost +debezium.mysql.databasePort = 3306 debezium.mysql.databaseServerId = 8445698 debezium.mysql.databaseUser = debezium debezium.mysql.databasePassword = dbz debezium.mysql.databaseServerName = debezium-connector-mysql-01 -debezium.mysql.databaseHistoryFileName = dbhistory-01.data +debezium.mysql.databaseHistoryFileName = target/dbhistory-01.data debezium.mysql.databaseIncludeList = inventory debezium.mysql.tableIncludeList = inventory.products -debezium.mysql.offsetStorageFileName = offset-01.data +debezium.mysql.offsetStorageFileName = target/offset-01.data eventhubs.connectionString = {{generated_connection_string}} diff --git a/examples/debezium/README.adoc b/examples/debezium/README.adoc index b496bab..bb7d78a 100644 --- a/examples/debezium/README.adoc +++ b/examples/debezium/README.adoc @@ -7,25 +7,25 @@ An example which shows how to integrate Camel with Debezium and sink everything This project consists of the following examples: 1. Send events using Debezium component to Kinesis. - 2. Example how data can be sinked into Cassandra that produced by Debezium. + 2. Load the data produced by Debezium into Cassandra. === Prerequisites -==== MySQL -In order to stream changes from MySQL, you will need to have https://debezium.io/documentation/reference/0.9/connectors/mysql.html#enabling-the-binlog[_row-level_] binary binlog enabled. However, -for the sake of this example, we will use the following docker image which is setup with row enabled binary logs and some sample data: +==== PostgreSQL +In order to stream changes from PostgreSQL, you may have to https://debezium.io/documentation/reference/stable/connectors/postgresql.html#setting-up-postgresql[set up your PostgreSQL server]. However, +for the sake of this example, we will use the following docker image which is properly set up and contains some sample data: [source,sh] ---- -$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9 +$ docker run -it --rm --name pgsql -p 5432:5432 -e POSTGRES_DB=debezium-db -e POSTGRES_USER=pgsql-user -e POSTGRES_PASSWORD=pgsql-pw debezium/example-postgres:1.9 ---- -The above docker image will start a MySQL server exposed to port `3306` with root password set. +The above docker image will start a PostgreSQL server exposed to port `5432`. ==== Amazon Kinesis Since we will use Kinesis to stream changes from Debezium as an example, you need to create a stream called `camel-debezium-example` in `eu-central-1`. As well, you will need to create AWS access and secret keys, once you are done from creating the keys, update the following properties in `src/main/resources/application.properties`: ``` -kinesis.accessKey ={{generated-access-key}} -kinesis.secretKey = {{generated-secret-key}} +kinesis.accessKey = generated-access-key +kinesis.secretKey = generated-secret-key ``` ==== Cassandra @@ -49,21 +49,10 @@ USE dbzSink; weight float ); ``` -*Note:* We will stream a table called `product` from MySQL docker image which is already set. Most of the configurations that will get you started with this example are already set in `application.properties`. +*Note:* We will stream a table called `product` from PostgreSQL docker image which is already set. Most of the configurations that will get you started with this example are already set in `application.properties`. === Build -Due to licensing issues, you will need to add the dependency for `mysql-conenctor-java`, just add the following to your POM file: - -[source,xml] ------------------------------------------------------------- -<dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>8.0.15</version> -</dependency> ------------------------------------------------------------- - You will need to compile this example first: [source,sh] @@ -77,18 +66,18 @@ Run the Kinesis producer first [source,sh] ---- -$ mvn compile exec:java -Pkinesis-producer +$ mvn exec:java -Pkinesis-producer ---- Run the Debezium consumer in the separate shell [source,sh] ---- -$ mvn compile exec:java -Pdebezium-consumer +$ mvn exec:java -Pdebezium-consumer ---- Initially, you will Debezium will perform a snapshot of the whitelisted tables per `application.properties`, hence you should expect -the data to be replicated into Cassandra. Once the snapshot mode is done, you can try to insert a new row, update fields, delete .. etc on MySQL whitelisted table(s), you should see +the data to be replicated into Cassandra. Once the snapshot mode is done, you can try to insert a new row, update fields, delete etc. on PostgreSQL whitelisted table(s), you should see the changes reflecting on Cassandra as well, you can verify that by running the following query on cqlsh: ``` select * from dbzSink.products; diff --git a/examples/debezium/pom.xml b/examples/debezium/pom.xml index 8483071..292fc28 100644 --- a/examples/debezium/pom.xml +++ b/examples/debezium/pom.xml @@ -61,7 +61,7 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-debezium-mysql</artifactId> + <artifactId>camel-debezium-postgres</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> @@ -92,6 +92,47 @@ <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j2-version}</version> </dependency> + <!-- for testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>localstack</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>cassandra</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-core</artifactId> + <version>1.12.150</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-sql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility-version}</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> @@ -106,7 +147,7 @@ <profile> <id>debezium-consumer</id> <properties> - <target.main.class>org.apache.camel.example.debezium.DebeziumMySqlConsumerToKinesis</target.main.class> + <target.main.class>org.apache.camel.example.debezium.DebeziumPgSQLConsumerToKinesis</target.main.class> </properties> </profile> diff --git a/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumMySqlConsumerToKinesis.java b/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java similarity index 76% rename from examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumMySqlConsumerToKinesis.java rename to examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java index e97b277..1669527 100644 --- a/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumMySqlConsumerToKinesis.java +++ b/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java @@ -31,14 +31,14 @@ import org.slf4j.LoggerFactory; /** * A simple example to consume data from Debezium and send it to Kinesis */ -public final class DebeziumMySqlConsumerToKinesis { +public final class DebeziumPgSQLConsumerToKinesis { - private static final Logger LOG = LoggerFactory.getLogger(DebeziumMySqlConsumerToKinesis.class); + private static final Logger LOG = LoggerFactory.getLogger(DebeziumPgSQLConsumerToKinesis.class); // use Camel Main to set up and run Camel private static final Main MAIN = new Main(); - private DebeziumMySqlConsumerToKinesis() { + private DebeziumPgSQLConsumerToKinesis() { } public static void main(String[] args) throws Exception { @@ -46,25 +46,32 @@ public final class DebeziumMySqlConsumerToKinesis { LOG.debug("About to run Debezium integration..."); // add route - MAIN.configure().addRoutesBuilder(new RouteBuilder() { + MAIN.configure().addRoutesBuilder(createRouteBuilder()); + + // start and run Camel (block) + MAIN.run(); + } + + static RouteBuilder createRouteBuilder() { + return new RouteBuilder() { public void configure() { // Initial Debezium route that will run and listen to the changes, // first it will perform an initial snapshot using (select * from) in case no offset - // exists for the connector, and then it will listen to MySQL binlogs for any DB events such as (UPDATE, INSERT and DELETE) - from("debezium-mysql:{{debezium.mysql.name}}?" - + "databaseServerId={{debezium.mysql.databaseServerId}}" - + "&databaseHostname={{debezium.mysql.databaseHostName}}" - + "&databaseUser={{debezium.mysql.databaseUser}}" - + "&databasePassword={{debezium.mysql.databasePassword}}" - + "&databaseServerName={{debezium.mysql.databaseServerName}}" - + "&databaseHistoryFileFilename={{debezium.mysql.databaseHistoryFileName}}" - + "&databaseIncludeList={{debezium.mysql.databaseIncludeList}}" - + "&tableIncludeList={{debezium.mysql.tableIncludeList}}" - + "&offsetStorageFileName={{debezium.mysql.offsetStorageFileName}}") - .routeId("FromDebeziumMySql") - // We will need to prepare the data for Kinesis, however we need to mention here is that Kinesis is bit different from Kafka in terms + // exists for the connector, and then it will listen to postgres for any DB events such as (UPDATE, INSERT and DELETE) + from("debezium-postgres:{{debezium.postgres.name}}?" + + "databaseHostname={{debezium.postgres.databaseHostName}}" + + "&databasePort={{debezium.postgres.databasePort}}" + + "&databaseUser={{debezium.postgres.databaseUser}}" + + "&databasePassword={{debezium.postgres.databasePassword}}" + + "&databaseServerName={{debezium.postgres.databaseServerName}}" + + "&databaseDbname={{debezium.postgres.databaseDbname}}" + + "&schemaIncludeList={{debezium.postgres.schemaIncludeList}}" + + "&tableIncludeList={{debezium.postgres.tableIncludeList}}" + + "&offsetStorageFileName={{debezium.postgres.offsetStorageFileName}}") + .routeId("FromDebeziumPgSql") + // We will need to prepare the data for Kinesis, however we need to mention here is that Kinesis is a bit different from Kafka in terms // of the key partition which only limited to 256 byte length, depending on the size of your key, that may not be optimal. Therefore, the safer option is to hash the key - // and convert it to string, but that means we need to preserve the key information into the message body in order not to lose these information downstream. + // and convert it to string, but that means we need to preserve the key information into the message body in order not to lose this information downstream. // Note: If you'd use Kafka, most probably you will not need these transformations as you can send the key as an object and Kafka will do // the rest to hash it in the broker in order to place it in the correct topic's partition. .setBody(exchange -> { @@ -93,15 +100,12 @@ public final class DebeziumMySqlConsumerToKinesis { // Marshal everything to JSON, you can use any other data format such as Avro, Protobuf..etc, but in this example we will keep it to JSON for simplicity .marshal().json(JsonLibrary.Jackson) // Send our data to kinesis - .to("aws-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})" + .to("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})" + "&secretKey=RAW({{kinesis.secretKey}})" + "®ion={{kinesis.region}}") .end(); } - }); - - // start and run Camel (block) - MAIN.run(); + }; } } diff --git a/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java b/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java index 02185c6..c83d30c 100644 --- a/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java +++ b/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java @@ -44,13 +44,20 @@ public final class KinesisProducerToCassandra { LOG.debug("About to run Kinesis to Cassandra integration..."); // add route - MAIN.configure().addRoutesBuilder(new RouteBuilder() { + MAIN.configure().addRoutesBuilder(createRouteBuilder()); + + // start and run Camel (block) + MAIN.run(); + } + + static RouteBuilder createRouteBuilder() { + return new RouteBuilder() { public void configure() { // We set the CQL templates we need, note that an UPDATE in Cassandra means an UPSERT which is what we need final String cqlUpdate = "update products set name = ?, description = ?, weight = ? where id = ?"; final String cqlDelete = "delete from products where id = ?"; - from("aws-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})" + from("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})" + "&secretKey=RAW({{kinesis.secretKey}})" + "®ion={{kinesis.region}}") // Since we expect the data of the body to be ByteArr, we convert it to String using Kinesis @@ -62,45 +69,42 @@ public final class KinesisProducerToCassandra { // a header .setProperty("DBOperation", simple("${body[operation]}")) .choice() - // If we have a INSERT or UPDATE, we will need to set the body with the CQL query parameters since we are using - // camel-cassandraql component - .when(exchangeProperty("DBOperation").in("c", "u")) - .setBody(exchange -> { - final Map body = (Map) exchange.getMessage().getBody(); - final Map value = (Map) body.get("value"); - final Map key = (Map) body.get("key"); - - // We as well check for nulls - final String name = value.get("name") != null ? value.get("name").toString() : ""; - final String description = value.get("description") != null ? value.get("description").toString() : ""; - final float weight = value.get("weight") != null ? Float.parseFloat(value.get("weight").toString()) : 0; - - return Arrays.asList(name, description, weight, key.get("id")); - }) - // We set the appropriate query in the header so we don't run the same route twice - .setHeader("CQLQuery", constant(cqlUpdate)) - // If we have a DELETE, then we just set the id as a query parameter in the body - .when(exchangeProperty("DBOperation").isEqualTo("d")) - .setBody(exchange -> { - final Map body = (Map) exchange.getMessage().getBody(); - final Map key = (Map) body.get("key"); - - return Collections.singletonList(key.get("id")); - }) - // We set the appropriate query in the header so we don't run the same route twice - .setHeader("CQLQuery", constant(cqlDelete)) + // If we have a INSERT or UPDATE, we will need to set the body with the CQL query parameters since we are using + // camel-cassandraql component + .when(exchangeProperty("DBOperation").in("c", "u")) + .setBody(exchange -> { + final Map body = (Map) exchange.getMessage().getBody(); + final Map value = (Map) body.get("value"); + final Map key = (Map) body.get("key"); + + // We as well check for nulls + final String name = value.get("name") != null ? value.get("name").toString() : ""; + final String description = value.get("description") != null ? value.get("description").toString() : ""; + final float weight = value.get("weight") != null ? Float.parseFloat(value.get("weight").toString()) : 0; + + return Arrays.asList(name, description, weight, key.get("id")); + }) + // We set the appropriate query in the header so we don't run the same route twice + .setHeader("CQLQuery", constant(cqlUpdate)) + // If we have a DELETE, then we just set the id as a query parameter in the body + .when(exchangeProperty("DBOperation").isEqualTo("d")) + .setBody(exchange -> { + final Map body = (Map) exchange.getMessage().getBody(); + final Map key = (Map) body.get("key"); + + return Collections.singletonList(key.get("id")); + }) + // We set the appropriate query in the header so we don't run the same route twice + .setHeader("CQLQuery", constant(cqlDelete)) .end() .choice() - // We just make sure we ONLY handle INSERT, UPDATE and DELETE and nothing else - .when(exchangeProperty("DBOperation").in("c", "u", "d")) - // Send query to Cassandra - .recipientList(simple("cql:{{cassandra.host}}/{{cassandra.keyspace}}?cql=RAW(${header.CQLQuery})")) + // We just make sure we ONLY handle INSERT, UPDATE and DELETE and nothing else + .when(exchangeProperty("DBOperation").in("c", "u", "d")) + // Send query to Cassandra + .recipientList(simple("cql:{{cassandra.host}}/{{cassandra.keyspace}}?cql=RAW(${header.CQLQuery})")) .end(); } - }); - - // start and run Camel (block) - MAIN.run(); + }; } } diff --git a/examples/debezium/src/main/resources/application.properties b/examples/debezium/src/main/resources/application.properties index bf0a3e9..56eab2f 100644 --- a/examples/debezium/src/main/resources/application.properties +++ b/examples/debezium/src/main/resources/application.properties @@ -15,23 +15,21 @@ ## limitations under the License. ## --------------------------------------------------------------------------- -debezium.mysql.name = debezium-mysql-example-01 +debezium.postgres.name = debezium-postgres-example-01 -debezium.mysql.databaseHostName = localhost -debezium.mysql.databaseServerId = 8445698 -debezium.mysql.databaseUser = debezium -debezium.mysql.databasePassword = dbz -debezium.mysql.databaseServerName = debezium-connector-mysql-01 -debezium.mysql.databaseHistoryFileName = dbhistory-01.data -debezium.mysql.databaseIncludeList = inventory - -debezium.mysql.tableIncludeList = inventory.products - -debezium.mysql.offsetStorageFileName = offset-01.data +debezium.postgres.databaseHostName = localhost +debezium.postgres.databasePort = 5432 +debezium.postgres.databaseUser = debezium +debezium.postgres.databasePassword = dbz +debezium.postgres.databaseServerName = debezium-connector-postgres-01 +debezium.postgres.databaseDbname = debezium-db +debezium.postgres.schemaIncludeList = inventory +debezium.postgres.tableIncludeList = inventory.products +debezium.postgres.offsetStorageFileName = target/offset-01.data kinesis.streamName = camel-debezium-example -kinesis.accessKey = {{generated-access-key}} -kinesis.secretKey = {{generated-secret-key}} +kinesis.accessKey = generated-access-key +kinesis.secretKey = generated-secret-key kinesis.region = EU_CENTRAL_1 cassandra.host = localhost:9042 diff --git a/examples/debezium/src/test/java/org/apache/camel/example/debezium/DebeziumTest.java b/examples/debezium/src/test/java/org/apache/camel/example/debezium/DebeziumTest.java new file mode 100644 index 0000000..52fe8c8 --- /dev/null +++ b/examples/debezium/src/test/java/org/apache/camel/example/debezium/DebeziumTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.debezium; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws2.kinesis.Kinesis2Component; +import org.apache.camel.component.sql.SqlComponent; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.camel.util.PropertiesHelper.asProperties; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS; + +/** + * A unit test checking that Camel can propagate changes from one Database to another thanks to Debezium. + */ +class DebeziumTest extends CamelTestSupport { + + private static final String AWS_IMAGE = "localstack/localstack:0.13.3"; + private static final String PGSQL_IMAGE = "debezium/example-postgres:1.9"; + private static final String CASSANDRA_IMAGE = "cassandra:4.0.1"; + private static LocalStackContainer AWS_CONTAINER; + private static PostgreSQLContainer<?> PGSQL_CONTAINER; + private static CassandraContainer<?> CASSANDRA_CONTAINER; + + private static final String SOURCE_DB_NAME = "debezium-db"; + private static final String SOURCE_DB_SCHEMA = "inventory"; + private static final String SOURCE_DB_USERNAME = "pgsql-user"; + private static final String SOURCE_DB_PASSWORD = "pgsql-pw"; + + @BeforeAll + static void init() throws IOException { + Files.deleteIfExists(Path.of("target/offset-01.data")); + AWS_CONTAINER = new LocalStackContainer(DockerImageName.parse(AWS_IMAGE)) + .withServices(KINESIS) + .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1)); + AWS_CONTAINER.start(); + PGSQL_CONTAINER = new PostgreSQLContainer<>(DockerImageName.parse(PGSQL_IMAGE).asCompatibleSubstituteFor("postgres")) + .withDatabaseName(SOURCE_DB_NAME) + .withUsername(SOURCE_DB_USERNAME) + .withPassword(SOURCE_DB_PASSWORD); + PGSQL_CONTAINER.start(); + CASSANDRA_CONTAINER = new CassandraContainer<>(CASSANDRA_IMAGE) + .withInitScript("org/apache/camel/example/debezium/db-init.cql"); + CASSANDRA_CONTAINER.start(); + } + + @AfterAll + static void destroy() { + if (AWS_CONTAINER != null) { + AWS_CONTAINER.stop(); + } + if (PGSQL_CONTAINER != null) { + PGSQL_CONTAINER.stop(); + } + if (CASSANDRA_CONTAINER != null) { + CASSANDRA_CONTAINER.stop(); + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + // Set the location of the configuration + camelContext.getPropertiesComponent().setLocation("classpath:application.properties"); + Kinesis2Component component = camelContext.getComponent("aws2-kinesis", Kinesis2Component.class); + KinesisClient kinesisClient = KinesisClient.builder() + .endpointOverride(AWS_CONTAINER.getEndpointOverride(KINESIS)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), AWS_CONTAINER.getSecretKey()) + ) + ) + .region(Region.of(AWS_CONTAINER.getRegion())) + .build(); + // Create the stream + kinesisClient.createStream(CreateStreamRequest.builder().streamName("camel-debezium-example").shardCount(1).build()); + component.getConfiguration().setAmazonKinesisClient(kinesisClient); + // Override the host and port of the broker + camelContext.getPropertiesComponent().setOverrideProperties( + asProperties( + "debezium.postgres.databaseHostName", PGSQL_CONTAINER.getHost(), + "debezium.postgres.databasePort", Integer.toString(PGSQL_CONTAINER.getMappedPort(5432)), + "debezium.postgres.databaseUser", SOURCE_DB_USERNAME, + "debezium.postgres.databasePassword", SOURCE_DB_PASSWORD, + "cassandra.host", String.format("%s:%d", CASSANDRA_CONTAINER.getHost(), CASSANDRA_CONTAINER.getMappedPort(9042)) + ) + ); + return camelContext; + } + + @Test + void should_propagate_db_event_thanks_to_debezium() { + NotifyBuilder notify = new NotifyBuilder(context).from("aws2-kinesis:*").whenCompleted(3).create(); + + List<?> resultSource = template.requestBody("direct:select", null, List.class); + assertEquals(9, resultSource.size(), "We should not have additional products in source"); + await().atMost(20, SECONDS).until(() -> template.requestBody("direct:result", null, List.class).size(), equalTo(0)); + + template.sendBody("direct:insert", new Object[] { 1, "scooter", "Small 2-wheel yellow scooter", 5.54 }); + + resultSource = template.requestBody("direct:select", null, List.class); + assertEquals(10, resultSource.size(), "We should have one additional product in source"); + await().atMost(20, SECONDS).until(() -> template.requestBody("direct:result", null, List.class).size(), equalTo(1)); + + template.sendBody("direct:update", new Object[] { "yellow scooter", 1 }); + + resultSource = template.requestBody("direct:select", null, List.class); + assertEquals(10, resultSource.size(), "We should not have more product in source"); + await().atMost(20, SECONDS).until(() -> template.requestBody("direct:result", null, List.class).size(), equalTo(1)); + + template.sendBody("direct:delete", new Object[] { 1 }); + + resultSource = template.requestBody("direct:select", null, List.class); + assertEquals(9, resultSource.size(), "We should have one less product in source"); + await().atMost(20, SECONDS).until(() -> template.requestBody("direct:result", null, List.class).size(), equalTo(0)); + + assertTrue( + notify.matches(60, SECONDS), "3 messages should be completed" + ); + } + + @Override + protected RoutesBuilder[] createRouteBuilders() { + return new RoutesBuilder[]{ + DebeziumPgSQLConsumerToKinesis.createRouteBuilder(), KinesisProducerToCassandra.createRouteBuilder(), + new ApplyChangesToPgSQLRouteBuilder() + }; + } + + private static class ApplyChangesToPgSQLRouteBuilder extends RouteBuilder { + + @Override + public void configure() { + // required for the sql component + PGSimpleDataSource db = new PGSimpleDataSource(); + db.setServerNames(new String[]{PGSQL_CONTAINER.getHost()}); + db.setPortNumbers(new int[]{PGSQL_CONTAINER.getMappedPort(5432)}); + db.setUser(SOURCE_DB_USERNAME); + db.setPassword(SOURCE_DB_PASSWORD); + db.setDatabaseName(SOURCE_DB_NAME); + + getContext().getComponent("sql", SqlComponent.class).setDataSource(db); + from("direct:select").toF("sql:select * from %s.products", SOURCE_DB_SCHEMA).to("mock:query"); + from("direct:insert").toF("sql:insert into %s.products (id, name, description, weight) values (#, #, #, #)", SOURCE_DB_SCHEMA).to("mock:insert"); + from("direct:update").toF("sql:update %s.products set name=# where id=#", SOURCE_DB_SCHEMA).to("mock:update"); + from("direct:delete").toF("sql:delete from %s.products where id=#", SOURCE_DB_SCHEMA).to("mock:delete"); + from("direct:result").to("cql://{{cassandra.host}}/{{cassandra.keyspace}}?cql=select * from dbzSink.products").to("mock:result"); + } + } +} diff --git a/examples/debezium/src/test/resources/org/apache/camel/example/debezium/db-init.cql b/examples/debezium/src/test/resources/org/apache/camel/example/debezium/db-init.cql new file mode 100644 index 0000000..501a90c --- /dev/null +++ b/examples/debezium/src/test/resources/org/apache/camel/example/debezium/db-init.cql @@ -0,0 +1,10 @@ +CREATE KEYSPACE dbzSink WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1}; + +USE dbzSink; + +CREATE TABLE products ( + id int PRIMARY KEY, + name varchar, + description varchar, + weight float +); \ No newline at end of file diff --git a/examples/transformer-demo/pom.xml b/examples/transformer-demo/pom.xml index 1ea80e3..9d8ca7f 100644 --- a/examples/transformer-demo/pom.xml +++ b/examples/transformer-demo/pom.xml @@ -98,15 +98,9 @@ <!-- for testing --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-spring</artifactId> + <artifactId>camel-test-spring-junit5</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>xmlunit</groupId> - <artifactId>xmlunit</artifactId> - <scope>test</scope> - <version>${xmlunit-version}</version> - </dependency> </dependencies> diff --git a/examples/transformer-demo/src/test/java/org/apache/camel/example/transformer/demo/TransformerTest.java b/examples/transformer-demo/src/test/java/org/apache/camel/example/transformer/demo/TransformerTest.java new file mode 100644 index 0000000..83c5811 --- /dev/null +++ b/examples/transformer-demo/src/test/java/org/apache/camel/example/transformer/demo/TransformerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.transformer.demo; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.spi.DataType; +import org.apache.camel.spi.DataTypeAware; +import org.apache.camel.test.spring.junit5.CamelSpringTest; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * A unit test checking that Camel can transform and validate data. + */ +@CamelSpringTest +@ContextConfiguration("/META-INF/spring/camel-context.xml") +class TransformerTest { + + @Autowired + ProducerTemplate template; + @Autowired + ModelCamelContext context; + + @Test + void should_transform_and_validate_pojo() { + NotifyBuilder notify = new NotifyBuilder(context).fromRoute("java") + .whenCompleted(1).and() + .whenCompleted(1).wereSentTo("file:target/output*").create(); + // Given + Order order = new Order() + .setOrderId("Order-Java-0001") + .setItemId("MILK") + .setQuantity(3); + // When + OrderResponse response = template.requestBody("direct:java", order, OrderResponse.class); + + // Then + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + assertNotNull(response); + assertTrue(response.isAccepted()); + assertEquals("Order-Java-0001", response.getOrderId()); + assertEquals(String.format("Order accepted:[item='%s' quantity='%s']", order.getItemId(), order.getQuantity()), response.getDescription()); + } + + @Test + void should_transform_and_validate_json() { + NotifyBuilder notify = new NotifyBuilder(context).fromRoute("json") + .whenCompleted(1).and() + .whenCompleted(1).wereSentTo("file:target/output*").create(); + // Given + String orderJson = "{\"orderId\":\"Order-JSON-0001\", \"itemId\":\"MIZUYO-KAN\", \"quantity\":\"16350\"}"; + // When + Exchange answerJson = template.send("direct:json", ex -> ((DataTypeAware) ex.getIn()).setBody(orderJson, new DataType("json"))); + + // Then + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + assertNotNull(answerJson); + String response = answerJson.getIn().getBody(String.class); + assertNotNull(response); + assertTrue(response.contains("\"accepted\":true")); + assertTrue(response.contains("\"orderId\":\"Order-JSON-0001\"")); + assertTrue(response.contains("Order accepted:[item='MIZUYO-KAN' quantity='16350']")); + } + + @Test + void should_transform_and_validate_xml() { + NotifyBuilder notify = new NotifyBuilder(context).fromRoute("xml") + .whenCompleted(1).and() + .whenCompleted(1).wereSentTo("file:target/output*").create(); + // Given + String orderXml = "<order orderId=\"Order-XML-0001\" itemId=\"MIKAN\" quantity=\"365\"/>"; + + // When + Exchange answerXml = template.send("direct:xml", + ex -> ((DataTypeAware) ex.getIn()).setBody(orderXml, new DataType("xml:XMLOrder")) + ); + + // Then + assertTrue( + notify.matches(20, TimeUnit.SECONDS), "1 message should be completed" + ); + assertNotNull(answerXml); + String response = answerXml.getIn().getBody(String.class); + assertNotNull(response); + assertTrue(response.contains("accepted=\"true\"")); + assertTrue(response.contains("orderId=\"Order-XML-0001\"")); + assertTrue(response.contains("Order accepted:[item='MIKAN' quantity='365']")); + } +}