This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a9feb3  CAMEL-11834: Add tests for as2/aws/debezium/transformer-demo 
(#68)
3a9feb3 is described below

commit 3a9feb3917228f7b61a3219cde140d7d12b5cd71
Author: Nicolas Filotto <essob...@users.noreply.github.com>
AuthorDate: Thu Feb 3 22:31:36 2022 +0100

    CAMEL-11834: Add tests for as2/aws/debezium/transformer-demo (#68)
---
 examples/as2/pom.xml                               |   7 +-
 .../java/org/apache/camel/example/as2/As2Test.java |  49 ++++++
 .../aws/main-endpointdsl-aws2-s3-kafka/README.adoc |   9 +
 .../aws/main-endpointdsl-aws2-s3-kafka/pom.xml     |   8 +-
 .../org/apache/camel/example/AwsS3KafkaTest.java   | 127 ++++++++++++++
 examples/aws/main-endpointdsl-aws2-s3/README.adoc  |   9 +
 .../java/org/apache/camel/example/AwsS3Test.java   | 112 ++++++++++++
 .../{readme.adoc => README.adoc}                   |  14 +-
 .../{readme.adoc => README.adoc}                   |  16 +-
 .../org/apache/camel/example/MyRouteBuilder.java   |   2 +-
 .../java/org/apache/camel/example/AwsS3Test.java   |  95 ++++++++++
 .../aws2-sqs-consumer/{readme.adoc => README.adoc} |  14 +-
 .../org/apache/camel/example/MyRouteBuilder.java   |   2 +-
 .../java/org/apache/camel/example/AwsSQSTest.java  | 109 ++++++++++++
 .../README.adoc                                    |  23 ++-
 .../pom.xml                                        |   8 +-
 .../org/apache/camel/example/MyRouteBuilder.java   |   3 +-
 .../src/main/resources/application.properties      |   3 +
 .../org/apache/camel/example/KafkaAwsS3Test.java   | 121 +++++++++++++
 .../aws/main-endpointdsl-kafka-aws2-s3/README.adoc |  18 +-
 .../aws/main-endpointdsl-kafka-aws2-s3/pom.xml     |   8 +-
 .../org/apache/camel/example/MyRouteBuilder.java   |  10 +-
 .../src/main/resources/application.properties      |   3 +
 .../org/apache/camel/example/KafkaAwsS3Test.java   | 123 +++++++++++++
 examples/aws/pom.xml                               |  22 ++-
 .../org/apache/camel/example/MyRouteBuilder.java   |   6 +-
 .../example/azurestorageblob/Application.java      |  59 +++----
 .../org/apache/camel/example/MyRouteBuilder.java   |   4 -
 examples/csimple-joor/{readme.adoc => README.adoc} |   2 +-
 examples/csimple/{readme.adoc => README.adoc}      |   0
 .../DebeziumMySqlConsumerToAzureEventHubs.java     |   5 +-
 .../src/main/resources/application.properties      |   5 +-
 examples/debezium/README.adoc                      |  35 ++--
 examples/debezium/pom.xml                          |  45 ++++-
 ...is.java => DebeziumPgSQLConsumerToKinesis.java} |  50 +++---
 .../debezium/KinesisProducerToCassandra.java       |  78 +++++----
 .../src/main/resources/application.properties      |  26 ++-
 .../camel/example/debezium/DebeziumTest.java       | 191 +++++++++++++++++++++
 .../org/apache/camel/example/debezium/db-init.cql  |  10 ++
 examples/transformer-demo/pom.xml                  |   8 +-
 .../example/transformer/demo/TransformerTest.java  | 117 +++++++++++++
 41 files changed, 1376 insertions(+), 180 deletions(-)

diff --git a/examples/as2/pom.xml b/examples/as2/pom.xml
index 86f9463..007a737 100644
--- a/examples/as2/pom.xml
+++ b/examples/as2/pom.xml
@@ -85,7 +85,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <version>${slf4j-version}</version>
         </dependency>
-
+        <!-- for testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-spring-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
   
     <build>
diff --git 
a/examples/as2/src/test/java/org/apache/camel/example/as2/As2Test.java 
b/examples/as2/src/test/java/org/apache/camel/example/as2/As2Test.java
new file mode 100644
index 0000000..36573fc
--- /dev/null
+++ b/examples/as2/src/test/java/org/apache/camel/example/as2/As2Test.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.as2;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.test.spring.junit5.CamelSpringTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * A unit test checking that the Camel works as expected with AS2.
+ */
+@CamelSpringTest
+@ContextConfiguration("/META-INF/spring/camel-context.xml")
+class As2Test {
+
+    @Autowired
+    ModelCamelContext context;
+
+    @Test
+    void should_consume_and_produce_as2_messages() {
+        NotifyBuilder notify = new 
NotifyBuilder(context).fromRoute("server-route")
+                .whenCompleted(8).create();
+
+        assertTrue(
+            notify.matches(30, TimeUnit.SECONDS), "8 messages should be 
received by the AS2 server"
+        );
+    }
+}
diff --git a/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc 
b/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc
index 1d70dbe..7f6dbd1 100644
--- a/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc
+++ b/examples/aws/main-endpointdsl-aws2-s3-kafka/README.adoc
@@ -12,6 +12,15 @@ This example will use the AWS default credentials Provider: 
https://docs.aws.ama
 Set your credentials accordingly.
 Don't forget to add the bucket name and point to the correct topic.
 
+=== Build
+
+First compile the example by executing:
+
+[source,sh]
+----
+$ mvn compile
+----
+
 === How to run
 
 You can run this example using
diff --git a/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml 
b/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml
index 9493af6..53ad64f 100644
--- a/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml
+++ b/examples/aws/main-endpointdsl-aws2-s3-kafka/pom.xml
@@ -86,7 +86,13 @@
             <artifactId>logback-classic</artifactId>
             <version>${logback-version}</version>
         </dependency>
-
+        <!-- for testing -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/examples/aws/main-endpointdsl-aws2-s3-kafka/src/test/java/org/apache/camel/example/AwsS3KafkaTest.java
 
b/examples/aws/main-endpointdsl-aws2-s3-kafka/src/test/java/org/apache/camel/example/AwsS3KafkaTest.java
new file mode 100644
index 0000000..e02d9a4
--- /dev/null
+++ 
b/examples/aws/main-endpointdsl-aws2-s3-kafka/src/test/java/org/apache/camel/example/AwsS3KafkaTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Component;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.apache.camel.util.PropertiesHelper.asProperties;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;
+
+/**
+ * A unit test checking that Camel can poll an Amazon S3 bucket and put the 
data into a Kafka topic.
+ */
+class AwsS3KafkaTest extends CamelTestSupport {
+
+    private static final String AWS_IMAGE = "localstack/localstack:0.13.3";
+    private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:6.2.2";
+    private static LocalStackContainer AWS_CONTAINER;
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    @BeforeAll
+    static void init() {
+        AWS_CONTAINER = new 
LocalStackContainer(DockerImageName.parse(AWS_IMAGE))
+                .withServices(S3)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));;
+        AWS_CONTAINER.start();
+        KAFKA_CONTAINER = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
+        KAFKA_CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (AWS_CONTAINER != null) {
+            AWS_CONTAINER.stop();
+        }
+        if (KAFKA_CONTAINER != null) {
+            KAFKA_CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        AWS2S3Component s3 = camelContext.getComponent("aws2-s3", 
AWS2S3Component.class);
+        s3.getConfiguration().setAmazonS3Client(
+                S3Client.builder()
+                .endpointOverride(AWS_CONTAINER.getEndpointOverride(S3))
+                .credentialsProvider(
+                    StaticCredentialsProvider.create(
+                        
AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), 
AWS_CONTAINER.getSecretKey())
+                    )
+                )
+                .region(Region.of(AWS_CONTAINER.getRegion()))
+                .build()
+        );
+        // Override the host and port of the broker
+        camelContext.getPropertiesComponent().setOverrideProperties(
+            asProperties(
+                "kafkaBrokers", String.format("%s:%d", 
KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(9093))
+            )
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_poll_s3_bucket_and_push_to_kafka() {
+        // Add a bucket first
+        template.send("direct:putObject", exchange -> {
+            exchange.getIn().setHeader(AWS2S3Constants.KEY, 
"camel-content-type.txt");
+            exchange.getIn().setHeader(AWS2S3Constants.CONTENT_TYPE, 
"application/text");
+            exchange.getIn().setBody("Camel rocks!");
+        });
+
+        NotifyBuilder notify = new 
NotifyBuilder(context).from("kafka:*").whenCompleted(1).create();
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder[] createRouteBuilders() {
+        return new RoutesBuilder[]{new MyRouteBuilder(), new 
AddBucketRouteBuilder()};
+    }
+
+    private static class AddBucketRouteBuilder extends RouteBuilder {
+
+        @Override
+        public void configure() {
+            
from("direct:putObject").to("aws2-s3://{{bucketName}}?autoCreateBucket=true");
+        }
+    }
+}
diff --git a/examples/aws/main-endpointdsl-aws2-s3/README.adoc 
b/examples/aws/main-endpointdsl-aws2-s3/README.adoc
index 5c844e2..dc8c072 100644
--- a/examples/aws/main-endpointdsl-aws2-s3/README.adoc
+++ b/examples/aws/main-endpointdsl-aws2-s3/README.adoc
@@ -12,6 +12,15 @@ This example will use the AWS default credentials Provider: 
https://docs.aws.ama
 Set your credentials accordingly.
 Don't forget to add the bucket name.
 
+=== Build
+
+First compile the example by executing:
+
+[source,sh]
+----
+$ mvn compile
+----
+
 === How to run
 
 You can run this example using
diff --git 
a/examples/aws/main-endpointdsl-aws2-s3/src/test/java/org/apache/camel/example/AwsS3Test.java
 
b/examples/aws/main-endpointdsl-aws2-s3/src/test/java/org/apache/camel/example/AwsS3Test.java
new file mode 100644
index 0000000..02e0c2d
--- /dev/null
+++ 
b/examples/aws/main-endpointdsl-aws2-s3/src/test/java/org/apache/camel/example/AwsS3Test.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Component;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;
+
+/**
+ * A unit test checking that Camel can poll an Amazon S3 bucket.
+ */
+class AwsS3Test extends CamelTestSupport {
+
+    private static final String IMAGE = "localstack/localstack:0.13.3";
+    private static LocalStackContainer CONTAINER;
+
+    @BeforeAll
+    static void init() {
+        CONTAINER = new LocalStackContainer(DockerImageName.parse(IMAGE))
+                .withServices(S3)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));;
+        CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (CONTAINER != null) {
+            CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        AWS2S3Component s3 = camelContext.getComponent("aws2-s3", 
AWS2S3Component.class);
+        s3.getConfiguration().setAmazonS3Client(
+                S3Client.builder()
+                .endpointOverride(CONTAINER.getEndpointOverride(S3))
+                .credentialsProvider(
+                    StaticCredentialsProvider.create(
+                        AwsBasicCredentials.create(CONTAINER.getAccessKey(), 
CONTAINER.getSecretKey())
+                    )
+                )
+                .region(Region.of(CONTAINER.getRegion()))
+                .build()
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_poll_s3_bucket() {
+        // Add a bucket first
+        template.send("direct:putObject", exchange -> {
+            exchange.getIn().setHeader(AWS2S3Constants.KEY, 
"camel-content-type.txt");
+            exchange.getIn().setHeader(AWS2S3Constants.CONTENT_TYPE, 
"application/text");
+            exchange.getIn().setBody("Camel rocks!");
+        });
+
+        NotifyBuilder notify = new 
NotifyBuilder(context).from("aws2-s3:*").whenCompleted(1).create();
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder[] createRouteBuilders() {
+        return new RoutesBuilder[]{new MyRouteBuilder(), new 
AddBucketRouteBuilder()};
+    }
+
+    private static class AddBucketRouteBuilder extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+            
from("direct:putObject").to("aws2-s3://{{bucketName}}?autoCreateBucket=true");
+        }
+    }
+}
diff --git 
a/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/readme.adoc 
b/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/README.adoc
similarity index 85%
rename from 
examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/readme.adoc
rename to 
examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/README.adoc
index 587115f..bf4470f 100644
--- a/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/readme.adoc
+++ b/examples/aws/main-endpointdsl-aws2/aws2-eventbridge-creator/README.adoc
@@ -10,11 +10,23 @@ Notice how you can configure Camel in the 
`application.properties` file.
 
 Don't forget to add your AWS Credentials and the bucket name.
 
+=== Build
+
+First compile the example by executing:
+
+[source,sh]
+----
+$ mvn compile
+----
+
 === How to run
 
 You can run this example using
 
-    mvn camel:run   
+[source,sh]
+----
+$ mvn camel:run
+----
 
 === Help and contributions
 
diff --git 
a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/readme.adoc 
b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/README.adoc
similarity index 79%
rename from examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/readme.adoc
rename to examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/README.adoc
index 587115f..d696fc5 100644
--- a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/readme.adoc
+++ b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/README.adoc
@@ -4,17 +4,29 @@ This example shows how to use the endpoint DSL in your Camel 
routes
 to define endpoints using type safe fluent builders, which are Java methods
 that are compiled.
 
-The example will poll an S3 bucket and Log the content of the file.
+The example will store content into an S3 bucket.
 
 Notice how you can configure Camel in the `application.properties` file.
 
 Don't forget to add your AWS Credentials and the bucket name.
 
+=== Build
+
+First compile the example by executing:
+
+[source,sh]
+----
+$ mvn compile
+----
+
 === How to run
 
 You can run this example using
 
-    mvn camel:run   
+[source,sh]
+----
+$ mvn camel:run
+----
 
 === Help and contributions
 
diff --git 
a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 
b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java
index 2e3a781..93df218 100644
--- 
a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java
+++ 
b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -28,7 +28,7 @@ public class MyRouteBuilder extends EndpointRouteBuilder {
 
        from(timer("fire").repeatCount("1"))
        .setBody(constant("Camel rocks"))
-       .to(aws2S3("{{bucketName}}").keyName("firstfile"))
+       
.to(aws2S3("{{bucketName}}").keyName("firstfile").autoCreateBucket(true))
        .stop();
     }
 }
diff --git 
a/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/test/java/org/apache/camel/example/AwsS3Test.java
 
b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/test/java/org/apache/camel/example/AwsS3Test.java
new file mode 100644
index 0000000..e4c410f
--- /dev/null
+++ 
b/examples/aws/main-endpointdsl-aws2/aws2-s3-events-inject/src/test/java/org/apache/camel/example/AwsS3Test.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Component;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;
+
+/**
+ * A unit test checking that Camel can store content into an Amazon S3 bucket.
+ */
+class AwsS3Test extends CamelTestSupport {
+
+    private static final String IMAGE = "localstack/localstack:0.13.3";
+    private static LocalStackContainer CONTAINER;
+
+    @BeforeAll
+    static void init() {
+        CONTAINER = new LocalStackContainer(DockerImageName.parse(IMAGE))
+                .withServices(S3)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));;
+        CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (CONTAINER != null) {
+            CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        AWS2S3Component s3 = camelContext.getComponent("aws2-s3", 
AWS2S3Component.class);
+        s3.getConfiguration().setAmazonS3Client(
+                S3Client.builder()
+                .endpointOverride(CONTAINER.getEndpointOverride(S3))
+                .credentialsProvider(
+                    StaticCredentialsProvider.create(
+                        AwsBasicCredentials.create(CONTAINER.getAccessKey(), 
CONTAINER.getSecretKey())
+                    )
+                )
+                .region(Region.of(CONTAINER.getRegion()))
+                .build()
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_store_content_into_a_s3_bucket() throws Exception {
+        NotifyBuilder notify = new 
NotifyBuilder(context).wereSentTo("aws2-s3:*").whenCompleted(1).create();
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new MyRouteBuilder();
+    }
+}
diff --git a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/readme.adoc 
b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/README.adoc
similarity index 85%
rename from examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/readme.adoc
rename to examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/README.adoc
index f4626f9..057950f 100644
--- a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/readme.adoc
+++ b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/README.adoc
@@ -10,11 +10,23 @@ Notice how you can configure Camel in the 
`application.properties` file.
 
 Don't forget to add your AWS Credentials and the bucket name.
 
+=== Build
+
+First compile the example by executing:
+
+[source,sh]
+----
+$ mvn compile
+----
+
 === How to run
 
 You can run this example using
 
-    mvn camel:run   
+[source,sh]
+----
+$mvn camel:run
+----
 
 === Help and contributions
 
diff --git 
a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 
b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java
index caa2048..cec5d0a 100644
--- 
a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java
+++ 
b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -26,7 +26,7 @@ public class MyRouteBuilder extends EndpointRouteBuilder {
     @Override
     public void configure() throws Exception {
 
-        from(aws2Sqs("{{sqs-queue-name}}").deleteAfterRead(true))
+        
from(aws2Sqs("{{sqs-queue-name}}").deleteAfterRead(true).autoCreateQueue(true))
         .log("${body}");
     }
 }
diff --git 
a/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/test/java/org/apache/camel/example/AwsSQSTest.java
 
b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/test/java/org/apache/camel/example/AwsSQSTest.java
new file mode 100644
index 0000000..d6fd2a9
--- /dev/null
+++ 
b/examples/aws/main-endpointdsl-aws2/aws2-sqs-consumer/src/test/java/org/apache/camel/example/AwsSQSTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.sqs.Sqs2Component;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sqs.SqsClient;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS;
+
+/**
+ * A unit test checking that Camel can consume messages from Amazon SQS.
+ */
+class AwsSQSTest extends CamelTestSupport {
+
+    private static final String IMAGE = "localstack/localstack:0.13.3";
+    private static LocalStackContainer CONTAINER;
+
+    @BeforeAll
+    static void init() {
+        CONTAINER = new LocalStackContainer(DockerImageName.parse(IMAGE))
+                .withServices(SQS)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));;
+        CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (CONTAINER != null) {
+            CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        Sqs2Component sqs = camelContext.getComponent("aws2-sqs", 
Sqs2Component.class);
+        sqs.getConfiguration().setAmazonSQSClient(
+                SqsClient.builder()
+                .endpointOverride(CONTAINER.getEndpointOverride(SQS))
+                .credentialsProvider(
+                    StaticCredentialsProvider.create(
+                        AwsBasicCredentials.create(CONTAINER.getAccessKey(), 
CONTAINER.getSecretKey())
+                    )
+                )
+                .region(Region.of(CONTAINER.getRegion()))
+                .build()
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_poll_sqs_queue() {
+        // Add a message first
+        template.send("direct:publishMessage", exchange -> {
+            exchange.getIn().setBody("Camel rocks!");
+        });
+
+        NotifyBuilder notify = new 
NotifyBuilder(context).from("aws2-sqs:*").whenCompleted(1).create();
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder[] createRouteBuilders() {
+        return new RoutesBuilder[]{new MyRouteBuilder(), new 
PublishMessageRouteBuilder()};
+    }
+
+    private static class PublishMessageRouteBuilder extends RouteBuilder {
+
+        @Override
+        public void configure() throws Exception {
+            
from("direct:publishMessage").to("aws2-sqs://{{sqs-queue-name}}?autoCreateQueue=true");
+        }
+    }
+}
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc
index cafef89..af953c2 100644
--- a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc
+++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/README.adoc
@@ -2,14 +2,16 @@
 
 This example shows how to use the endpoint DSL in your Camel routes
 to define endpoints using type safe fluent builders, which are Java methods
-that are compiled and it will show the AWS2-S3 stream mode.
+that are compiled, and it will show the AWS2-S3 stream mode.
 
-The example will poll one kafka topic s3.topic.1 and upload batch of 25 
messages as single file into an s3 bucket (mycamel-1).
+The example will poll one kafka topic s3.topic.1 and upload batch of 25 
messages as single file into a s3 bucket (mycamel-1).
 
 On your bucket you'll see:
 
+```
 s3.topic.1/partition_<partition-number>/s3.topic.1.txt
 s3.topic.1/partition_<partition-number>/s3.topic.1-1.txt
+```
 
 and so on
 
@@ -23,7 +25,7 @@ Don't forget to add the bucket name (already created ahead of 
time) and point to
 You'll need also a running kafka broker.
 You'll need to have kafkacat installed.
 
-This example supposed the s3.topic.1 has 1 partition only.
+This example supposed the `s3.topic.1` has 1 partition only.
 
 But this should work with multiple partitions too.
 
@@ -36,6 +38,10 @@ You can run this example using
 $ mvn compile
 ----
 
+=== How to run
+
+You can run this example using
+
 [source,sh]
 ----
 $ mvn camel:run
@@ -45,20 +51,21 @@ Now run
 
 [source,sh]
 ----
-$ data/burst.sh s3.topic.1 250 0 msg.txt
+$ data/burst.sh s3.topic.1 250 0 data/msg.txt
 ----
 
 Stop the route with CTRL + C.
 
-At this point you should see an s3.topic.1/partition_0 folder, with 10 files.
+At this point you should see a `s3.topic.1/partition_0` folder, with 10 files.
 
-Restart the route and run 
+Restart the route and run
 
+[source,sh]
 ----
-$ data/burst.sh s3.topic.1 250 0 msg.txt
+$ data/burst.sh s3.topic.1 250 0 data/msg.txt
 ----
 
-Now in the same s3.topic.1/partition_0 folder, you should see 20 files 
correctly numbered.
+Now in the same `s3.topic.1/partition_0` folder, you should see 20 files 
correctly numbered.
 
 === Help and contributions
 
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml
index a4fb4a2..aef1c5f 100644
--- a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml
+++ b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/pom.xml
@@ -86,7 +86,13 @@
             <artifactId>logback-classic</artifactId>
             <version>${logback-version}</version>
         </dependency>
-
+        <!-- for testing -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java
index dd9be98..82cebc3 100644
--- 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java
+++ 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -25,11 +25,12 @@ public class MyRouteBuilder extends EndpointRouteBuilder {
     @Override
     public void configure() throws Exception {
 
-        from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}"))
+        
from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}").seekTo("{{consumer.seekTo}}"))
               .log("Kafka Message is: ${body}")
               .toD(aws2S3("{{bucketName}}")
                       .streamingUploadMode(true)
                       .useDefaultCredentialsProvider(true)
+                      .autoCreateBucket(true)
                       .restartingPolicy(AWSS3RestartingPolicyEnum.lastPart)
                       
.batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive)
                       
.keyName("{{kafkaTopic1}}/partition_${headers.kafka.PARTITION}/{{kafkaTopic1}}.txt"));
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties
 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties
index 1bf3df7..49349b2 100644
--- 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties
+++ 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/main/resources/application.properties
@@ -24,3 +24,6 @@ bucketName=camel-1
 
 kafkaTopic1=s3.topic.1
 kafkaBrokers=localhost:9092
+
+# Get records from the beginning
+consumer.seekTo=beginning
\ No newline at end of file
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java
 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java
new file mode 100644
index 0000000..c3f4417
--- /dev/null
+++ 
b/examples/aws/main-endpointdsl-kafka-aws2-s3-restarting-policy/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Component;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.apache.camel.util.PropertiesHelper.asProperties;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;
+
+/**
+ * A unit test checking that Camel can poll data from a Kafka topic and put it 
into an Amazon S3 bucket.
+ */
+class KafkaAwsS3Test extends CamelTestSupport {
+
+    private static final String AWS_IMAGE = "localstack/localstack:0.13.3";
+    private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:6.2.2";
+    private static LocalStackContainer AWS_CONTAINER;
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    @BeforeAll
+    static void init() {
+        AWS_CONTAINER = new 
LocalStackContainer(DockerImageName.parse(AWS_IMAGE))
+                .withServices(S3)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));
+        AWS_CONTAINER.start();
+        KAFKA_CONTAINER = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
+        KAFKA_CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (AWS_CONTAINER != null) {
+            AWS_CONTAINER.stop();
+        }
+        if (KAFKA_CONTAINER != null) {
+            KAFKA_CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        AWS2S3Component s3 = camelContext.getComponent("aws2-s3", 
AWS2S3Component.class);
+        s3.getConfiguration().setAmazonS3Client(
+                S3Client.builder()
+                .endpointOverride(AWS_CONTAINER.getEndpointOverride(S3))
+                .credentialsProvider(
+                    StaticCredentialsProvider.create(
+                        
AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), 
AWS_CONTAINER.getSecretKey())
+                    )
+                )
+                .region(Region.of(AWS_CONTAINER.getRegion()))
+                .build()
+        );
+        // Override the host and port of the broker
+        camelContext.getPropertiesComponent().setOverrideProperties(
+            asProperties(
+                "kafkaBrokers", String.format("%s:%d", 
KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(9093))
+            )
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_poll_kafka_and_push_to_s3_bucket() {
+        NotifyBuilder notify = new 
NotifyBuilder(context).from("kafka:*").whenCompleted(1).create();
+        // Load data into Kafka
+        template.sendBody("direct:kafka", "Camel rocks in topic 1!");
+        assertTrue(
+            notify.matches(10, TimeUnit.SECONDS), "2 messages should be 
completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder[] createRouteBuilders() {
+        return new RoutesBuilder[]{new MyRouteBuilder(), new 
LoadKafkaRouteBuilder()};
+    }
+
+    private static class LoadKafkaRouteBuilder extends RouteBuilder {
+
+        @Override
+        public void configure() {
+            
from("direct:kafka").to("kafka:{{kafkaTopic1}}?brokers={{kafkaBrokers}}");
+        }
+    }
+}
diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc
index 0e4ed22..ee5699d 100644
--- a/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc
+++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/README.adoc
@@ -2,18 +2,18 @@
 
 This example shows how to use the endpoint DSL in your Camel routes
 to define endpoints using type safe fluent builders, which are Java methods
-that are compiled and it will show the AWS2-S3 stream mode.
+that are compiled, and it will show the AWS2-S3 stream mode.
 
 The example will poll two kafka topics (s3.topic.1 and s3.topic.2) and upload 
batch of 25 messages as single file into an s3 bucket (mycamel-1).
 
 On your bucket you'll see:
-
+```
 s3.topic.1/s3.topic.1.txt
 s3.topic.1/s3.topic.1-1.txt
 
 s3.topic.2/s3.topic.2.txt
 s3.topic.2/s3.topic.2-1.txt
-
+```
 and so on
 
 At the end you should have a total of 80 files.
@@ -26,15 +26,19 @@ Don't forget to add the bucket name (already created ahead 
of time) and point to
 You'll need also a running kafka broker.
 You'll need to have kafkacat installed.
 
-=== How to run
+=== Build
 
-You can run this example using
+First compile the example by executing:
 
 [source,sh]
 ----
 $ mvn compile
 ----
 
+=== How to run
+
+You can run this example using
+
 [source,sh]
 ----
 $ mvn camel:run
@@ -44,8 +48,8 @@ Now run
 
 [source,sh]
 ----
-$ data/burst.sh s3.topic.1 1000 msg.txt
-$ data/burst.sh s3.topic.2 1000 msg.txt
+$ data/burst.sh s3.topic.1 1000 data/msg.txt
+$ data/burst.sh s3.topic.2 1000 data/msg.txt
 ----
 
 You should see the bucket populated.
diff --git a/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml
index 152f584..967ddc8 100644
--- a/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml
+++ b/examples/aws/main-endpointdsl-kafka-aws2-s3/pom.xml
@@ -86,7 +86,13 @@
             <artifactId>logback-classic</artifactId>
             <version>${logback-version}</version>
         </dependency>
-
+        <!-- for testing -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java
index d51038b..89831ea 100644
--- 
a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java
+++ 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -24,12 +24,14 @@ public class MyRouteBuilder extends EndpointRouteBuilder {
     @Override
     public void configure() throws Exception {
 
-        from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}"))
+        from(kafka("{{kafkaTopic1}}").brokers("{{kafkaBrokers}}")
+                .seekTo("{{consumer.seekTo}}"))
               .log("Kafka Message is: ${body}")
-              
.to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
+              
.to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).autoCreateBucket(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
 
-        from(kafka("{{kafkaTopic2}}").brokers("{{kafkaBrokers}}"))
+        from(kafka("{{kafkaTopic2}}").brokers("{{kafkaBrokers}}")
+                .seekTo("{{consumer.seekTo}}"))
                 .log("Kafka Message is: ${body}")
-                
.to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
+                
.to(aws2S3("{{bucketName}}").useDefaultCredentialsProvider(true).autoCreateBucket(true).streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
     }
 }
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties
 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties
index 1e4c766..57323ea 100644
--- 
a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties
+++ 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/main/resources/application.properties
@@ -25,3 +25,6 @@ bucketName=camel-1
 kafkaTopic1=s3.topic.1
 kafkaTopic2=s3.topic.2
 kafkaBrokers=localhost:9092
+
+# Get records from the beginning
+consumer.seekTo=beginning
\ No newline at end of file
diff --git 
a/examples/aws/main-endpointdsl-kafka-aws2-s3/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java
 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java
new file mode 100644
index 0000000..4063ee7
--- /dev/null
+++ 
b/examples/aws/main-endpointdsl-kafka-aws2-s3/src/test/java/org/apache/camel/example/KafkaAwsS3Test.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Component;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.apache.camel.util.PropertiesHelper.asProperties;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;
+
+/**
+ * A unit test checking that Camel can poll data from a Kafka topic and put it 
into an Amazon S3 bucket.
+ */
+class KafkaAwsS3Test extends CamelTestSupport {
+
+    private static final String AWS_IMAGE = "localstack/localstack:0.13.3";
+    private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:6.2.2";
+    private static LocalStackContainer AWS_CONTAINER;
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    @BeforeAll
+    static void init() {
+        AWS_CONTAINER = new 
LocalStackContainer(DockerImageName.parse(AWS_IMAGE))
+                .withServices(S3)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));
+        AWS_CONTAINER.start();
+        KAFKA_CONTAINER = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
+        KAFKA_CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (AWS_CONTAINER != null) {
+            AWS_CONTAINER.stop();
+        }
+        if (KAFKA_CONTAINER != null) {
+            KAFKA_CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        AWS2S3Component s3 = camelContext.getComponent("aws2-s3", 
AWS2S3Component.class);
+        s3.getConfiguration().setAmazonS3Client(
+                S3Client.builder()
+                .endpointOverride(AWS_CONTAINER.getEndpointOverride(S3))
+                .credentialsProvider(
+                    StaticCredentialsProvider.create(
+                        
AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), 
AWS_CONTAINER.getSecretKey())
+                    )
+                )
+                .region(Region.of(AWS_CONTAINER.getRegion()))
+                .build()
+        );
+        // Override the host and port of the broker
+        camelContext.getPropertiesComponent().setOverrideProperties(
+            asProperties(
+                "kafkaBrokers", String.format("%s:%d", 
KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(9093))
+            )
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_poll_kafka_and_push_to_s3_bucket() {
+        NotifyBuilder notify = new 
NotifyBuilder(context).from("kafka:*").whenCompleted(2).create();
+        // Load data into Kafka
+        template.sendBody("direct:kafkaT1", "Camel rocks in topic 1!");
+        template.sendBody("direct:kafkaT2", "Camel rocks in topic 2!");
+        assertTrue(
+            notify.matches(10, TimeUnit.SECONDS), "2 messages should be 
completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder[] createRouteBuilders() {
+        return new RoutesBuilder[]{new MyRouteBuilder(), new 
LoadKafkaRouteBuilder()};
+    }
+
+    private static class LoadKafkaRouteBuilder extends RouteBuilder {
+
+        @Override
+        public void configure() {
+            
from("direct:kafkaT1").to("kafka:{{kafkaTopic1}}?brokers={{kafkaBrokers}}");
+            
from("direct:kafkaT2").to("kafka:{{kafkaTopic2}}?brokers={{kafkaBrokers}}");
+        }
+    }
+}
diff --git a/examples/aws/pom.xml b/examples/aws/pom.xml
index 9463c65..ce7e620 100644
--- a/examples/aws/pom.xml
+++ b/examples/aws/pom.xml
@@ -44,5 +44,25 @@
         <module>main-endpointdsl-kafka-aws2-s3</module>
         <module>main-endpointdsl-kafka-aws2-s3-restarting-policy</module>
     </modules>
-
+    <dependencies>
+        <!-- for testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-junit5</artifactId>
+            <version>${camel.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>localstack</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+            <version>1.12.150</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git 
a/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 
b/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java
index cfa6066..6a64025 100644
--- 
a/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java
+++ 
b/examples/azure/azure-eventhubs/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -17,10 +17,6 @@
 package org.apache.camel.example;
 
 import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
-import org.apache.camel.component.azure.eventhubs.EventHubsConstants;
-
-import java.util.LinkedList;
-import java.util.List;
 
 /**
  * To use the endpoint DSL then we must extend EndpointRouteBuilder instead of 
RouteBuilder
@@ -28,7 +24,7 @@ import java.util.List;
 public class MyRouteBuilder extends EndpointRouteBuilder {
 
     @Override
-    public void configure() throws Exception {
+    public void configure() {
 
         
from(azureEventhubs("{{namespaceName}}/{{eventhubName}}").sharedAccessKey("{{sharedAccessKey}}").sharedAccessName("{{sharedAccessName}}").blobAccessKey("{{blobAccessKey}}").blobAccountName("{{blobAccountName}}").blobContainerName("{{blobContainerName}}"))
                 .log("The content is ${body}");
diff --git 
a/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java
 
b/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java
index 17e15f1..06102d9 100644
--- 
a/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java
+++ 
b/examples/azure/azure-storage-blob/src/main/java/org/apache/camel/example/azurestorageblob/Application.java
@@ -38,43 +38,44 @@ public final class Application {
 
             // add routes which can be inlined as anonymous inner class
             // (to keep all code in a single java file for this basic example)
-            camel.addRoutes(new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("timer://runOnce?repeatCount=1&delay=0")
+            camel.addRoutes(createRouteBuilder());
+
+            // start is not blocking
+            camel.start();
+
+            // so run for 10 seconds
+            Thread.sleep(10_000);
+        }
+    }
+
+    static RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer://runOnce?repeatCount=1&delay=0")
                         .routeId("listBlobs")
                         .process(exchange -> exchange.getIn()
-                            .setBody(
-                                new BlobServiceClientBuilder()
-                                    
.endpoint(String.format("https://%s.blob.core.windows.net";, ACCOUNT))
-                                    .credential(new 
StorageSharedKeyCredential(ACCOUNT, ACCESS_KEY))
-                                    .buildClient()
-                                    
.getBlobContainerClient(BLOB_CONTAINER_NAME)
-                                    .listBlobs(
-                                        new 
ListBlobsOptions().setMaxResultsPerPage(1),
-                                        null
-                                    )
-                            )
+                                .setBody(
+                                        new BlobServiceClientBuilder()
+                                                
.endpoint(String.format("https://%s.blob.core.windows.net";, ACCOUNT))
+                                                .credential(new 
StorageSharedKeyCredential(ACCOUNT, ACCESS_KEY))
+                                                .buildClient()
+                                                
.getBlobContainerClient(BLOB_CONTAINER_NAME)
+                                                .listBlobs(
+                                                        new 
ListBlobsOptions().setMaxResultsPerPage(1),
+                                                        null
+                                                )
+                                )
                         )
                         .loopDoWhile(exchange ->
-                            exchange.getIn().getBody(Iterator.class).hasNext()
+                                
exchange.getIn().getBody(Iterator.class).hasNext()
                         )
                         .process(exchange ->
-                            
exchange.getIn().setBody(exchange.getIn().getBody(Iterator.class).next())
+                                
exchange.getIn().setBody(exchange.getIn().getBody(Iterator.class).next())
                         )
                         .log("${body.name}")
                         .end();
-                }
-            });
-
-            // start is not blocking
-            camel.start();
-
-            // so run for 10 seconds
-            Thread.sleep(10_000);
-
-            // and then stop nicely
-            camel.stop();
-        }
+            }
+        };
     }
 }
diff --git 
a/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java
 
b/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java
index 896700c..3b8381e 100644
--- 
a/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java
+++ 
b/examples/azure/kafka-azure/src/main/java/org/apache/camel/example/MyRouteBuilder.java
@@ -17,17 +17,13 @@
 package org.apache.camel.example;
 
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
 
 public class MyRouteBuilder extends RouteBuilder {
 
     @Override
     public void configure() throws Exception {
-
-        
         from("kafka:{{topicName}}?brokers={{brokers}}")
           .setHeader("CamelAzureStorageBlobBlobName", simple("${exchangeId}"))
           
.to("azure-storage-blob://{{accountName}}/{{containerName}}/?accessKey=RAW({{accessKey}})&operation=uploadBlockBlob");
-
     }
 }
diff --git a/examples/csimple-joor/readme.adoc 
b/examples/csimple-joor/README.adoc
similarity index 89%
rename from examples/csimple-joor/readme.adoc
rename to examples/csimple-joor/README.adoc
index 15c7cd3..e5d40d6 100644
--- a/examples/csimple-joor/readme.adoc
+++ b/examples/csimple-joor/README.adoc
@@ -5,7 +5,7 @@ This example shows using csimple (compiled simple) scripting 
language in your Ca
 When Camel bootstrap then each csimple scripts is compiled using the JVM 
compiler via the jOOR compiler library. The compilation happens once during 
startup.
 
 This makes the csimple language native Java compiled, with no runtime overhead.
-The generated source code are in memory only and compiled at runtime. This 
means debugging the generated source code is not possible. See the other 
csimple example which uses a Maven plugin to detect csimple scripts from the 
source code, to genereate Java source code at build time; which can be debugged.
+The generated source code are in memory only and compiled at runtime. This 
means debugging the generated source code is not possible. See the other 
csimple example which uses a Maven plugin to detect csimple scripts from the 
source code, to generate Java source code at build time; which can be debugged.
 
 === Build
 
diff --git a/examples/csimple/readme.adoc b/examples/csimple/README.adoc
similarity index 100%
rename from examples/csimple/readme.adoc
rename to examples/csimple/README.adoc
diff --git 
a/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java
 
b/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java
index 32861d4..5c537f3 100644
--- 
a/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java
+++ 
b/examples/debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/DebeziumMySqlConsumerToAzureEventHubs.java
@@ -50,10 +50,11 @@ public final class DebeziumMySqlConsumerToAzureEventHubs {
             public void configure() {
                 // Initial Debezium route that will run and listens to the 
changes,
                 // first it will perform an initial snapshot using (select * 
from) in case there are no offsets
-                // exists for the connector and then it will listens to MySQL 
binlogs for any DB events such as (UPDATE, INSERT and DELETE)
+                // exists for the connector, and then it will listen to MySQL 
binlogs for any DB events such as (UPDATE, INSERT and DELETE)
                 from("debezium-mysql:{{debezium.mysql.name}}?"
                         + 
"databaseServerId={{debezium.mysql.databaseServerId}}"
                         + 
"&databaseHostname={{debezium.mysql.databaseHostName}}"
+                        + "&databasePort={{debezium.mysql.databasePort}}"
                         + "&databaseUser={{debezium.mysql.databaseUser}}"
                         + 
"&databasePassword={{debezium.mysql.databasePassword}}"
                         + 
"&databaseServerName={{debezium.mysql.databaseServerName}}"
@@ -63,7 +64,7 @@ public final class DebeziumMySqlConsumerToAzureEventHubs {
                         + 
"&offsetStorageFileName={{debezium.mysql.offsetStorageFileName}}")
                         .routeId("FromDebeziumMySql")
                         // We will need to prepare the data for Azure 
EventHubs Therefore, we will hash the key to make sure our record land on the 
same partition
-                        // and convert it to string, but that means we need to 
preserve the key information into the message body in order not to lose these 
information downstream.
+                        // and convert it to string, but that means we need to 
preserve the key information into the message body in order not to lose this 
information downstream.
                         // Note: If you'd use Kafka, most probably you will 
not need these transformations as you can send the key as an object and Kafka 
will do
                         // the rest to hash it in the broker in order to place 
it in the correct topic's partition.
                         .setBody(exchange -> {
diff --git 
a/examples/debezium-eventhubs-blob/src/main/resources/application.properties 
b/examples/debezium-eventhubs-blob/src/main/resources/application.properties
index 3510e2c..0bec099 100644
--- a/examples/debezium-eventhubs-blob/src/main/resources/application.properties
+++ b/examples/debezium-eventhubs-blob/src/main/resources/application.properties
@@ -17,14 +17,15 @@
 
 debezium.mysql.name = debezium-mysql-example-01
 debezium.mysql.databaseHostName = localhost
+debezium.mysql.databasePort = 3306
 debezium.mysql.databaseServerId = 8445698
 debezium.mysql.databaseUser = debezium
 debezium.mysql.databasePassword = dbz
 debezium.mysql.databaseServerName = debezium-connector-mysql-01
-debezium.mysql.databaseHistoryFileName = dbhistory-01.data
+debezium.mysql.databaseHistoryFileName = target/dbhistory-01.data
 debezium.mysql.databaseIncludeList = inventory
 debezium.mysql.tableIncludeList = inventory.products
-debezium.mysql.offsetStorageFileName = offset-01.data
+debezium.mysql.offsetStorageFileName = target/offset-01.data
 
 eventhubs.connectionString = {{generated_connection_string}}
 
diff --git a/examples/debezium/README.adoc b/examples/debezium/README.adoc
index b496bab..bb7d78a 100644
--- a/examples/debezium/README.adoc
+++ b/examples/debezium/README.adoc
@@ -7,25 +7,25 @@ An example which shows how to integrate Camel with Debezium 
and sink everything
 This project consists of the following examples:
 
   1. Send events using Debezium component to Kinesis.
-  2. Example how data can be sinked into Cassandra that produced by Debezium.
+  2. Load the data produced by Debezium into Cassandra.
 
 === Prerequisites
 
-==== MySQL
-In order to stream changes from MySQL, you will need to have 
https://debezium.io/documentation/reference/0.9/connectors/mysql.html#enabling-the-binlog[_row-level_]
 binary binlog enabled. However,
-for the sake of this example, we will use the following docker image which is 
setup with row enabled binary logs and some sample data:
+==== PostgreSQL
+In order to stream changes from PostgreSQL, you may have to 
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#setting-up-postgresql[set
 up your PostgreSQL server]. However,
+for the sake of this example, we will use the following docker image which is 
properly set up and contains some sample data:
 
 [source,sh]
 ----
-$ docker run -it --rm --name mysql -p 3306:3306 -e 
MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw 
debezium/example-mysql:0.9
+$ docker run -it --rm --name pgsql -p 5432:5432 -e POSTGRES_DB=debezium-db -e 
POSTGRES_USER=pgsql-user -e POSTGRES_PASSWORD=pgsql-pw 
debezium/example-postgres:1.9
 ----
-The above docker image will start a MySQL server exposed to port `3306` with 
root password set.
+The above docker image will start a PostgreSQL server exposed to port `5432`.
 
 ==== Amazon Kinesis
 Since we will use Kinesis to stream changes from Debezium as an example, you 
need to create a stream called `camel-debezium-example` in `eu-central-1`. As 
well, you will need to create AWS access and secret keys, once you are done 
from creating the keys, update the following properties in 
`src/main/resources/application.properties`:
 ```
-kinesis.accessKey ={{generated-access-key}}
-kinesis.secretKey = {{generated-secret-key}}
+kinesis.accessKey = generated-access-key
+kinesis.secretKey = generated-secret-key
 ```
 
 ==== Cassandra
@@ -49,21 +49,10 @@ USE dbzSink;
   weight float
 );
 ```
-*Note:* We will stream a table called `product` from MySQL docker image which 
is already set. Most of the configurations that will get you started with this 
example are already set in `application.properties`.
+*Note:* We will stream a table called `product` from PostgreSQL docker image 
which is already set. Most of the configurations that will get you started with 
this example are already set in `application.properties`.
 
 === Build
 
-Due to licensing issues, you will need to add the dependency for 
`mysql-conenctor-java`, just add the following to your POM file:
-
-[source,xml]
-------------------------------------------------------------
-<dependency>
-    <groupId>mysql</groupId>
-    <artifactId>mysql-connector-java</artifactId>
-    <version>8.0.15</version>
-</dependency>
-------------------------------------------------------------
-
 You will need to compile this example first:
 
 [source,sh]
@@ -77,18 +66,18 @@ Run the Kinesis producer first
 
 [source,sh]
 ----
-$ mvn compile exec:java -Pkinesis-producer
+$ mvn exec:java -Pkinesis-producer
 ----
 
 Run the Debezium consumer in the separate shell
 
 [source,sh]
 ----
-$ mvn compile exec:java -Pdebezium-consumer
+$ mvn exec:java -Pdebezium-consumer
 ----
 
 Initially, you will Debezium will perform a snapshot of the whitelisted tables 
per `application.properties`, hence you should expect
-the data to be replicated into Cassandra. Once the snapshot mode is done, you 
can try to insert a new row, update fields, delete .. etc on  MySQL whitelisted 
table(s), you should see
+the data to be replicated into Cassandra. Once the snapshot mode is done, you 
can try to insert a new row, update fields, delete etc. on PostgreSQL 
whitelisted table(s), you should see
 the changes reflecting on Cassandra as well, you can verify that by running 
the following query on cqlsh:
 ```
 select * from dbzSink.products;
diff --git a/examples/debezium/pom.xml b/examples/debezium/pom.xml
index 8483071..292fc28 100644
--- a/examples/debezium/pom.xml
+++ b/examples/debezium/pom.xml
@@ -61,7 +61,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-debezium-mysql</artifactId>
+            <artifactId>camel-debezium-postgres</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -92,6 +92,47 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <version>${log4j2-version}</version>
         </dependency>
+        <!-- for testing -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>localstack</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>cassandra</artifactId>
+            <version>${testcontainers-version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+            <version>1.12.150</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-sql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility-version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
@@ -106,7 +147,7 @@
         <profile>
             <id>debezium-consumer</id>
             <properties>
-                
<target.main.class>org.apache.camel.example.debezium.DebeziumMySqlConsumerToKinesis</target.main.class>
+                
<target.main.class>org.apache.camel.example.debezium.DebeziumPgSQLConsumerToKinesis</target.main.class>
             </properties>
         </profile>
 
diff --git 
a/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumMySqlConsumerToKinesis.java
 
b/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java
similarity index 76%
rename from 
examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumMySqlConsumerToKinesis.java
rename to 
examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java
index e97b277..1669527 100644
--- 
a/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumMySqlConsumerToKinesis.java
+++ 
b/examples/debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java
@@ -31,14 +31,14 @@ import org.slf4j.LoggerFactory;
 /**
  * A simple example to consume data from Debezium and send it to Kinesis
  */
-public final class DebeziumMySqlConsumerToKinesis {
+public final class DebeziumPgSQLConsumerToKinesis {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumMySqlConsumerToKinesis.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumPgSQLConsumerToKinesis.class);
 
     // use Camel Main to set up and run Camel
     private static final Main MAIN = new Main();
 
-    private DebeziumMySqlConsumerToKinesis() {
+    private DebeziumPgSQLConsumerToKinesis() {
     }
 
     public static void main(String[] args) throws Exception {
@@ -46,25 +46,32 @@ public final class DebeziumMySqlConsumerToKinesis {
         LOG.debug("About to run Debezium integration...");
 
         // add route
-        MAIN.configure().addRoutesBuilder(new RouteBuilder() {
+        MAIN.configure().addRoutesBuilder(createRouteBuilder());
+
+        // start and run Camel (block)
+        MAIN.run();
+    }
+
+    static RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
             public void configure() {
                 // Initial Debezium route that will run and listen to the 
changes,
                 // first it will perform an initial snapshot using (select * 
from) in case no offset
-                // exists for the connector, and then it will listen to MySQL 
binlogs for any DB events such as (UPDATE, INSERT and DELETE)
-                from("debezium-mysql:{{debezium.mysql.name}}?"
-                        + 
"databaseServerId={{debezium.mysql.databaseServerId}}"
-                        + 
"&databaseHostname={{debezium.mysql.databaseHostName}}"
-                        + "&databaseUser={{debezium.mysql.databaseUser}}"
-                        + 
"&databasePassword={{debezium.mysql.databasePassword}}"
-                        + 
"&databaseServerName={{debezium.mysql.databaseServerName}}"
-                        + 
"&databaseHistoryFileFilename={{debezium.mysql.databaseHistoryFileName}}"
-                        + 
"&databaseIncludeList={{debezium.mysql.databaseIncludeList}}"
-                        + 
"&tableIncludeList={{debezium.mysql.tableIncludeList}}"
-                        + 
"&offsetStorageFileName={{debezium.mysql.offsetStorageFileName}}")
-                        .routeId("FromDebeziumMySql")
-                        // We will need to prepare the data for Kinesis, 
however we need to mention here is that Kinesis is bit different from Kafka in 
terms
+                // exists for the connector, and then it will listen to 
postgres for any DB events such as (UPDATE, INSERT and DELETE)
+                from("debezium-postgres:{{debezium.postgres.name}}?"
+                        + 
"databaseHostname={{debezium.postgres.databaseHostName}}"
+                        + "&databasePort={{debezium.postgres.databasePort}}"
+                        + "&databaseUser={{debezium.postgres.databaseUser}}"
+                        + 
"&databasePassword={{debezium.postgres.databasePassword}}"
+                        + 
"&databaseServerName={{debezium.postgres.databaseServerName}}"
+                        + 
"&databaseDbname={{debezium.postgres.databaseDbname}}"
+                        + 
"&schemaIncludeList={{debezium.postgres.schemaIncludeList}}"
+                        + 
"&tableIncludeList={{debezium.postgres.tableIncludeList}}"
+                        + 
"&offsetStorageFileName={{debezium.postgres.offsetStorageFileName}}")
+                        .routeId("FromDebeziumPgSql")
+                        // We will need to prepare the data for Kinesis, 
however we need to mention here is that Kinesis is a bit different from Kafka 
in terms
                         // of the key partition which only limited to 256 byte 
length, depending on the size of your key, that may not be optimal. Therefore, 
the safer option is to hash the key
-                        // and convert it to string, but that means we need to 
preserve the key information into the message body in order not to lose these 
information downstream.
+                        // and convert it to string, but that means we need to 
preserve the key information into the message body in order not to lose this 
information downstream.
                         // Note: If you'd use Kafka, most probably you will 
not need these transformations as you can send the key as an object and Kafka 
will do
                         // the rest to hash it in the broker in order to place 
it in the correct topic's partition.
                         .setBody(exchange -> {
@@ -93,15 +100,12 @@ public final class DebeziumMySqlConsumerToKinesis {
                         // Marshal everything to JSON, you can use any other 
data format such as Avro, Protobuf..etc, but in this example we will keep it to 
JSON for simplicity
                         .marshal().json(JsonLibrary.Jackson)
                         // Send our data to kinesis
-                        
.to("aws-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
+                        
.to("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
                                 + "&secretKey=RAW({{kinesis.secretKey}})"
                                 + "&region={{kinesis.region}}")
                         .end();
             }
-        });
-
-        // start and run Camel (block)
-        MAIN.run();
+        };
     }
 
 }
diff --git 
a/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java
 
b/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java
index 02185c6..c83d30c 100644
--- 
a/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java
+++ 
b/examples/debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java
@@ -44,13 +44,20 @@ public final class KinesisProducerToCassandra {
         LOG.debug("About to run Kinesis to Cassandra integration...");
 
         // add route
-        MAIN.configure().addRoutesBuilder(new RouteBuilder() {
+        MAIN.configure().addRoutesBuilder(createRouteBuilder());
+
+        // start and run Camel (block)
+        MAIN.run();
+    }
+
+    static RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
             public void configure() {
                 // We set the CQL templates we need, note that an UPDATE in 
Cassandra means an UPSERT which is what we need
                 final String cqlUpdate = "update products set name = ?, 
description = ?, weight = ? where id = ?";
                 final String cqlDelete = "delete from products where id = ?";
 
-                
from("aws-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
+                
from("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
                         + "&secretKey=RAW({{kinesis.secretKey}})"
                         + "&region={{kinesis.region}}")
                         // Since we expect the data of the body to be ByteArr, 
we convert it to String using Kinesis
@@ -62,45 +69,42 @@ public final class KinesisProducerToCassandra {
                         // a header
                         .setProperty("DBOperation", 
simple("${body[operation]}"))
                         .choice()
-                            // If we have a INSERT or UPDATE, we will need to 
set the body with the CQL query parameters since we are using
-                            // camel-cassandraql component
-                            .when(exchangeProperty("DBOperation").in("c", "u"))
-                                .setBody(exchange -> {
-                                    final Map body = (Map) 
exchange.getMessage().getBody();
-                                    final Map value = (Map) body.get("value");
-                                    final Map key = (Map) body.get("key");
-
-                                    // We as well check for nulls
-                                    final String name = value.get("name") != 
null ? value.get("name").toString() : "";
-                                    final String description = 
value.get("description") != null ? value.get("description").toString() : "";
-                                    final float weight = value.get("weight") 
!= null ? Float.parseFloat(value.get("weight").toString()) : 0;
-
-                                    return Arrays.asList(name, description, 
weight, key.get("id"));
-                                })
-                                // We set the appropriate query in the header 
so we don't run the same route twice
-                                .setHeader("CQLQuery", constant(cqlUpdate))
-                            // If we have a DELETE, then we just set the id as 
a query parameter in the body
-                            
.when(exchangeProperty("DBOperation").isEqualTo("d"))
-                                .setBody(exchange -> {
-                                    final Map body = (Map) 
exchange.getMessage().getBody();
-                                    final Map key = (Map) body.get("key");
-
-                                    return 
Collections.singletonList(key.get("id"));
-                                })
-                                // We set the appropriate query in the header 
so we don't run the same route twice
-                                .setHeader("CQLQuery", constant(cqlDelete))
+                        // If we have a INSERT or UPDATE, we will need to set 
the body with the CQL query parameters since we are using
+                        // camel-cassandraql component
+                        .when(exchangeProperty("DBOperation").in("c", "u"))
+                        .setBody(exchange -> {
+                            final Map body = (Map) 
exchange.getMessage().getBody();
+                            final Map value = (Map) body.get("value");
+                            final Map key = (Map) body.get("key");
+
+                            // We as well check for nulls
+                            final String name = value.get("name") != null ? 
value.get("name").toString() : "";
+                            final String description = 
value.get("description") != null ? value.get("description").toString() : "";
+                            final float weight = value.get("weight") != null ? 
Float.parseFloat(value.get("weight").toString()) : 0;
+
+                            return Arrays.asList(name, description, weight, 
key.get("id"));
+                        })
+                        // We set the appropriate query in the header so we 
don't run the same route twice
+                        .setHeader("CQLQuery", constant(cqlUpdate))
+                        // If we have a DELETE, then we just set the id as a 
query parameter in the body
+                        .when(exchangeProperty("DBOperation").isEqualTo("d"))
+                        .setBody(exchange -> {
+                            final Map body = (Map) 
exchange.getMessage().getBody();
+                            final Map key = (Map) body.get("key");
+
+                            return Collections.singletonList(key.get("id"));
+                        })
+                        // We set the appropriate query in the header so we 
don't run the same route twice
+                        .setHeader("CQLQuery", constant(cqlDelete))
                         .end()
                         .choice()
-                            // We just make sure we ONLY handle INSERT, UPDATE 
and DELETE and nothing else
-                            .when(exchangeProperty("DBOperation").in("c", "u", 
"d"))
-                                // Send query to Cassandra
-                                
.recipientList(simple("cql:{{cassandra.host}}/{{cassandra.keyspace}}?cql=RAW(${header.CQLQuery})"))
+                        // We just make sure we ONLY handle INSERT, UPDATE and 
DELETE and nothing else
+                        .when(exchangeProperty("DBOperation").in("c", "u", 
"d"))
+                        // Send query to Cassandra
+                        
.recipientList(simple("cql:{{cassandra.host}}/{{cassandra.keyspace}}?cql=RAW(${header.CQLQuery})"))
                         .end();
             }
-        });
-
-        // start and run Camel (block)
-        MAIN.run();
+        };
     }
 
 }
diff --git a/examples/debezium/src/main/resources/application.properties 
b/examples/debezium/src/main/resources/application.properties
index bf0a3e9..56eab2f 100644
--- a/examples/debezium/src/main/resources/application.properties
+++ b/examples/debezium/src/main/resources/application.properties
@@ -15,23 +15,21 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
-debezium.mysql.name = debezium-mysql-example-01
+debezium.postgres.name = debezium-postgres-example-01
 
-debezium.mysql.databaseHostName = localhost
-debezium.mysql.databaseServerId = 8445698
-debezium.mysql.databaseUser = debezium
-debezium.mysql.databasePassword = dbz
-debezium.mysql.databaseServerName = debezium-connector-mysql-01
-debezium.mysql.databaseHistoryFileName = dbhistory-01.data
-debezium.mysql.databaseIncludeList = inventory
-
-debezium.mysql.tableIncludeList = inventory.products
-
-debezium.mysql.offsetStorageFileName = offset-01.data
+debezium.postgres.databaseHostName = localhost
+debezium.postgres.databasePort = 5432
+debezium.postgres.databaseUser = debezium
+debezium.postgres.databasePassword = dbz
+debezium.postgres.databaseServerName = debezium-connector-postgres-01
+debezium.postgres.databaseDbname = debezium-db
+debezium.postgres.schemaIncludeList = inventory
+debezium.postgres.tableIncludeList = inventory.products
+debezium.postgres.offsetStorageFileName = target/offset-01.data
 
 kinesis.streamName = camel-debezium-example
-kinesis.accessKey = {{generated-access-key}}
-kinesis.secretKey = {{generated-secret-key}}
+kinesis.accessKey = generated-access-key
+kinesis.secretKey = generated-secret-key
 kinesis.region = EU_CENTRAL_1
 
 cassandra.host = localhost:9042
diff --git 
a/examples/debezium/src/test/java/org/apache/camel/example/debezium/DebeziumTest.java
 
b/examples/debezium/src/test/java/org/apache/camel/example/debezium/DebeziumTest.java
new file mode 100644
index 0000000..52fe8c8
--- /dev/null
+++ 
b/examples/debezium/src/test/java/org/apache/camel/example/debezium/DebeziumTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.debezium;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Component;
+import org.apache.camel.component.sql.SqlComponent;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.camel.util.PropertiesHelper.asProperties;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static 
org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
+
+/**
+ * A unit test checking that Camel can propagate changes from one Database to 
another thanks to Debezium.
+ */
+class DebeziumTest extends CamelTestSupport {
+
+    private static final String AWS_IMAGE = "localstack/localstack:0.13.3";
+    private static final String PGSQL_IMAGE = "debezium/example-postgres:1.9";
+    private static final String CASSANDRA_IMAGE = "cassandra:4.0.1";
+    private static LocalStackContainer AWS_CONTAINER;
+    private static PostgreSQLContainer<?> PGSQL_CONTAINER;
+    private static CassandraContainer<?> CASSANDRA_CONTAINER;
+
+    private static final String SOURCE_DB_NAME = "debezium-db";
+    private static final String SOURCE_DB_SCHEMA = "inventory";
+    private static final String SOURCE_DB_USERNAME = "pgsql-user";
+    private static final String SOURCE_DB_PASSWORD = "pgsql-pw";
+
+    @BeforeAll
+    static void init() throws IOException {
+        Files.deleteIfExists(Path.of("target/offset-01.data"));
+        AWS_CONTAINER = new 
LocalStackContainer(DockerImageName.parse(AWS_IMAGE))
+                .withServices(KINESIS)
+                .waitingFor(Wait.forLogMessage(".*Ready\\.\n", 1));
+        AWS_CONTAINER.start();
+        PGSQL_CONTAINER = new 
PostgreSQLContainer<>(DockerImageName.parse(PGSQL_IMAGE).asCompatibleSubstituteFor("postgres"))
+                .withDatabaseName(SOURCE_DB_NAME)
+                .withUsername(SOURCE_DB_USERNAME)
+                .withPassword(SOURCE_DB_PASSWORD);
+        PGSQL_CONTAINER.start();
+        CASSANDRA_CONTAINER = new CassandraContainer<>(CASSANDRA_IMAGE)
+                
.withInitScript("org/apache/camel/example/debezium/db-init.cql");
+        CASSANDRA_CONTAINER.start();
+    }
+
+    @AfterAll
+    static void destroy() {
+        if (AWS_CONTAINER != null) {
+            AWS_CONTAINER.stop();
+        }
+        if (PGSQL_CONTAINER != null) {
+            PGSQL_CONTAINER.stop();
+        }
+        if (CASSANDRA_CONTAINER != null) {
+            CASSANDRA_CONTAINER.stop();
+        }
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        // Set the location of the configuration
+        
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
+        Kinesis2Component component = 
camelContext.getComponent("aws2-kinesis", Kinesis2Component.class);
+        KinesisClient kinesisClient = KinesisClient.builder()
+                .endpointOverride(AWS_CONTAINER.getEndpointOverride(KINESIS))
+                .credentialsProvider(
+                        StaticCredentialsProvider.create(
+                                
AwsBasicCredentials.create(AWS_CONTAINER.getAccessKey(), 
AWS_CONTAINER.getSecretKey())
+                        )
+                )
+                .region(Region.of(AWS_CONTAINER.getRegion()))
+                .build();
+        // Create the stream
+        
kinesisClient.createStream(CreateStreamRequest.builder().streamName("camel-debezium-example").shardCount(1).build());
+        component.getConfiguration().setAmazonKinesisClient(kinesisClient);
+        // Override the host and port of the broker
+        camelContext.getPropertiesComponent().setOverrideProperties(
+            asProperties(
+                "debezium.postgres.databaseHostName", 
PGSQL_CONTAINER.getHost(),
+                "debezium.postgres.databasePort", 
Integer.toString(PGSQL_CONTAINER.getMappedPort(5432)),
+                "debezium.postgres.databaseUser", SOURCE_DB_USERNAME,
+                "debezium.postgres.databasePassword", SOURCE_DB_PASSWORD,
+                "cassandra.host", String.format("%s:%d", 
CASSANDRA_CONTAINER.getHost(), CASSANDRA_CONTAINER.getMappedPort(9042))
+            )
+        );
+        return camelContext;
+    }
+
+    @Test
+    void should_propagate_db_event_thanks_to_debezium() {
+        NotifyBuilder notify = new 
NotifyBuilder(context).from("aws2-kinesis:*").whenCompleted(3).create();
+
+        List<?> resultSource = template.requestBody("direct:select", null, 
List.class);
+        assertEquals(9, resultSource.size(), "We should not have additional 
products in source");
+        await().atMost(20, SECONDS).until(() -> 
template.requestBody("direct:result", null, List.class).size(), equalTo(0));
+
+        template.sendBody("direct:insert", new Object[] { 1, "scooter", "Small 
2-wheel yellow scooter", 5.54 });
+
+        resultSource = template.requestBody("direct:select", null, List.class);
+        assertEquals(10, resultSource.size(), "We should have one additional 
product in source");
+        await().atMost(20, SECONDS).until(() -> 
template.requestBody("direct:result", null, List.class).size(), equalTo(1));
+
+        template.sendBody("direct:update", new Object[] { "yellow scooter", 1 
});
+
+        resultSource = template.requestBody("direct:select", null, List.class);
+        assertEquals(10, resultSource.size(), "We should not have more product 
in source");
+        await().atMost(20, SECONDS).until(() -> 
template.requestBody("direct:result", null, List.class).size(), equalTo(1));
+
+        template.sendBody("direct:delete", new Object[] { 1 });
+
+        resultSource = template.requestBody("direct:select", null, List.class);
+        assertEquals(9, resultSource.size(), "We should have one less product 
in source");
+        await().atMost(20, SECONDS).until(() -> 
template.requestBody("direct:result", null, List.class).size(), equalTo(0));
+
+        assertTrue(
+            notify.matches(60, SECONDS), "3 messages should be completed"
+        );
+    }
+
+    @Override
+    protected RoutesBuilder[] createRouteBuilders() {
+        return new RoutesBuilder[]{
+                DebeziumPgSQLConsumerToKinesis.createRouteBuilder(), 
KinesisProducerToCassandra.createRouteBuilder(),
+            new ApplyChangesToPgSQLRouteBuilder()
+        };
+    }
+
+    private static class ApplyChangesToPgSQLRouteBuilder extends RouteBuilder {
+
+        @Override
+        public void configure() {
+            // required for the sql component
+            PGSimpleDataSource db = new PGSimpleDataSource();
+            db.setServerNames(new String[]{PGSQL_CONTAINER.getHost()});
+            db.setPortNumbers(new int[]{PGSQL_CONTAINER.getMappedPort(5432)});
+            db.setUser(SOURCE_DB_USERNAME);
+            db.setPassword(SOURCE_DB_PASSWORD);
+            db.setDatabaseName(SOURCE_DB_NAME);
+
+            getContext().getComponent("sql", 
SqlComponent.class).setDataSource(db);
+            from("direct:select").toF("sql:select * from %s.products", 
SOURCE_DB_SCHEMA).to("mock:query");
+            from("direct:insert").toF("sql:insert into %s.products (id, name, 
description, weight) values (#, #, #, #)", SOURCE_DB_SCHEMA).to("mock:insert");
+            from("direct:update").toF("sql:update %s.products set name=# where 
id=#", SOURCE_DB_SCHEMA).to("mock:update");
+            from("direct:delete").toF("sql:delete from %s.products where 
id=#", SOURCE_DB_SCHEMA).to("mock:delete");
+            
from("direct:result").to("cql://{{cassandra.host}}/{{cassandra.keyspace}}?cql=select
 * from dbzSink.products").to("mock:result");
+        }
+    }
+}
diff --git 
a/examples/debezium/src/test/resources/org/apache/camel/example/debezium/db-init.cql
 
b/examples/debezium/src/test/resources/org/apache/camel/example/debezium/db-init.cql
new file mode 100644
index 0000000..501a90c
--- /dev/null
+++ 
b/examples/debezium/src/test/resources/org/apache/camel/example/debezium/db-init.cql
@@ -0,0 +1,10 @@
+CREATE KEYSPACE dbzSink WITH replication = {'class':'SimpleStrategy', 
'replication_factor' : 1};
+
+USE dbzSink;
+
+CREATE TABLE products (
+  id int PRIMARY KEY,
+  name varchar,
+  description varchar,
+  weight float
+);
\ No newline at end of file
diff --git a/examples/transformer-demo/pom.xml 
b/examples/transformer-demo/pom.xml
index 1ea80e3..9d8ca7f 100644
--- a/examples/transformer-demo/pom.xml
+++ b/examples/transformer-demo/pom.xml
@@ -98,15 +98,9 @@
         <!-- for testing -->
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-test-spring</artifactId>
+            <artifactId>camel-test-spring-junit5</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>xmlunit</groupId>
-            <artifactId>xmlunit</artifactId>
-            <scope>test</scope>
-            <version>${xmlunit-version}</version>
-        </dependency>
 
     </dependencies>
 
diff --git 
a/examples/transformer-demo/src/test/java/org/apache/camel/example/transformer/demo/TransformerTest.java
 
b/examples/transformer-demo/src/test/java/org/apache/camel/example/transformer/demo/TransformerTest.java
new file mode 100644
index 0000000..83c5811
--- /dev/null
+++ 
b/examples/transformer-demo/src/test/java/org/apache/camel/example/transformer/demo/TransformerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.transformer.demo;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.spi.DataType;
+import org.apache.camel.spi.DataTypeAware;
+import org.apache.camel.test.spring.junit5.CamelSpringTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * A unit test checking that Camel can transform and validate data.
+ */
+@CamelSpringTest
+@ContextConfiguration("/META-INF/spring/camel-context.xml")
+class TransformerTest {
+
+    @Autowired
+    ProducerTemplate template;
+    @Autowired
+    ModelCamelContext context;
+
+    @Test
+    void should_transform_and_validate_pojo() {
+        NotifyBuilder notify = new NotifyBuilder(context).fromRoute("java")
+                .whenCompleted(1).and()
+                .whenCompleted(1).wereSentTo("file:target/output*").create();
+        // Given
+        Order order = new Order()
+                .setOrderId("Order-Java-0001")
+                .setItemId("MILK")
+                .setQuantity(3);
+        // When
+        OrderResponse response = template.requestBody("direct:java", order, 
OrderResponse.class);
+
+        // Then
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+        assertNotNull(response);
+        assertTrue(response.isAccepted());
+        assertEquals("Order-Java-0001", response.getOrderId());
+        assertEquals(String.format("Order accepted:[item='%s' quantity='%s']", 
order.getItemId(), order.getQuantity()), response.getDescription());
+    }
+
+    @Test
+    void should_transform_and_validate_json() {
+        NotifyBuilder notify = new NotifyBuilder(context).fromRoute("json")
+                .whenCompleted(1).and()
+                .whenCompleted(1).wereSentTo("file:target/output*").create();
+        // Given
+        String orderJson = "{\"orderId\":\"Order-JSON-0001\", 
\"itemId\":\"MIZUYO-KAN\", \"quantity\":\"16350\"}";
+        // When
+        Exchange answerJson = template.send("direct:json", ex -> 
((DataTypeAware) ex.getIn()).setBody(orderJson, new DataType("json")));
+
+        // Then
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+        assertNotNull(answerJson);
+        String response = answerJson.getIn().getBody(String.class);
+        assertNotNull(response);
+        assertTrue(response.contains("\"accepted\":true"));
+        assertTrue(response.contains("\"orderId\":\"Order-JSON-0001\""));
+        assertTrue(response.contains("Order accepted:[item='MIZUYO-KAN' 
quantity='16350']"));
+    }
+
+    @Test
+    void should_transform_and_validate_xml() {
+        NotifyBuilder notify = new NotifyBuilder(context).fromRoute("xml")
+                .whenCompleted(1).and()
+                .whenCompleted(1).wereSentTo("file:target/output*").create();
+        // Given
+        String orderXml = "<order orderId=\"Order-XML-0001\" itemId=\"MIKAN\" 
quantity=\"365\"/>";
+
+        // When
+        Exchange answerXml = template.send("direct:xml",
+            ex -> ((DataTypeAware) ex.getIn()).setBody(orderXml, new 
DataType("xml:XMLOrder"))
+        );
+
+        // Then
+        assertTrue(
+            notify.matches(20, TimeUnit.SECONDS), "1 message should be 
completed"
+        );
+        assertNotNull(answerXml);
+        String response = answerXml.getIn().getBody(String.class);
+        assertNotNull(response);
+        assertTrue(response.contains("accepted=\"true\""));
+        assertTrue(response.contains("orderId=\"Order-XML-0001\""));
+        assertTrue(response.contains("Order accepted:[item='MIKAN' 
quantity='365']"));
+    }
+}

Reply via email to