This is an automated email from the ASF dual-hosted git repository. aldettinger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push: new 26d7968 CAMEL-QUARKUS-1720: Added Postgres replication slot native support 26d7968 is described below commit 26d796852d637723b481891219c5f6cde242d7ca Author: aldettinger <aldettin...@gmail.com> AuthorDate: Tue Oct 6 16:30:24 2020 +0200 CAMEL-QUARKUS-1720: Added Postgres replication slot native support --- .../reference/extensions/pg-replication-slot.adoc | 8 +- .../reference/components/pg-replication-slot.adoc | 6 +- extensions-jvm/pom.xml | 1 - .../pg-replication-slot/deployment/pom.xml | 8 +- .../deployment/PgReplicationSlotProcessor.java | 17 ++-- .../pg-replication-slot/pom.xml | 1 - .../pg-replication-slot/runtime/pom.xml | 9 ++- .../slot/PostgresDriverRegistrationRecorder.java | 22 +++-- .../main/resources/META-INF/quarkus-extension.yaml | 3 +- extensions/pom.xml | 1 + .../pg-replication-slot}/pom.xml | 51 +++++++++++- .../slot/it/PgReplicationSlotResource.java | 28 +++---- .../slot/it/PgReplicationSlotRoute.java | 45 +++++++++++ .../replication/slot/it/PgReplicationSlotIT.java | 16 +--- .../replication/slot/it/PgReplicationSlotTest.java | 93 ++++++++++++++++++++++ .../slot/it/PgReplicationSlotTestResource.java | 75 +++++++++++++++++ integration-tests/pom.xml | 1 + tooling/scripts/test-categories.yaml | 1 + 18 files changed, 315 insertions(+), 71 deletions(-) diff --git a/docs/modules/ROOT/pages/reference/extensions/pg-replication-slot.adoc b/docs/modules/ROOT/pages/reference/extensions/pg-replication-slot.adoc index e4fb55d..6abc2d7 100644 --- a/docs/modules/ROOT/pages/reference/extensions/pg-replication-slot.adoc +++ b/docs/modules/ROOT/pages/reference/extensions/pg-replication-slot.adoc @@ -2,15 +2,15 @@ // This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page = PostgresSQL Replication Slot :cq-artifact-id: camel-quarkus-pg-replication-slot -:cq-native-supported: false -:cq-status: Preview +:cq-native-supported: true +:cq-status: Stable :cq-description: Poll for PostgreSQL Write-Ahead Log (WAL) records using Streaming Replication Slots. :cq-deprecated: false :cq-jvm-since: 1.1.0 -:cq-native-since: n/a +:cq-native-since: 1.2.0 [.badges] -[.badge-key]##JVM since##[.badge-supported]##1.1.0## [.badge-key]##Native##[.badge-unsupported]##unsupported## +[.badge-key]##JVM since##[.badge-supported]##1.1.0## [.badge-key]##Native since##[.badge-supported]##1.2.0## Poll for PostgreSQL Write-Ahead Log (WAL) records using Streaming Replication Slots. diff --git a/docs/modules/ROOT/partials/reference/components/pg-replication-slot.adoc b/docs/modules/ROOT/partials/reference/components/pg-replication-slot.adoc index dd86a2c..ac51a77 100644 --- a/docs/modules/ROOT/partials/reference/components/pg-replication-slot.adoc +++ b/docs/modules/ROOT/partials/reference/components/pg-replication-slot.adoc @@ -2,11 +2,11 @@ // This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page :cq-artifact-id: camel-quarkus-pg-replication-slot :cq-artifact-id-base: pg-replication-slot -:cq-native-supported: false -:cq-status: Preview +:cq-native-supported: true +:cq-status: Stable :cq-deprecated: false :cq-jvm-since: 1.1.0 -:cq-native-since: n/a +:cq-native-since: 1.2.0 :cq-camel-part-name: pg-replication-slot :cq-camel-part-title: PostgresSQL Replication Slot :cq-camel-part-description: Poll for PostgreSQL Write-Ahead Log (WAL) records using Streaming Replication Slots. diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml index d86a722..e838f10 100644 --- a/extensions-jvm/pom.xml +++ b/extensions-jvm/pom.xml @@ -115,7 +115,6 @@ <module>ognl</module> <module>openstack</module> <module>optaplanner</module> - <module>pg-replication-slot</module> <module>pgevent</module> <module>printer</module> <module>protobuf</module> diff --git a/extensions-jvm/pg-replication-slot/deployment/pom.xml b/extensions/pg-replication-slot/deployment/pom.xml similarity index 94% rename from extensions-jvm/pg-replication-slot/deployment/pom.xml rename to extensions/pg-replication-slot/deployment/pom.xml index e5e06a0..c64e515 100644 --- a/extensions-jvm/pg-replication-slot/deployment/pom.xml +++ b/extensions/pg-replication-slot/deployment/pom.xml @@ -34,11 +34,15 @@ <dependencies> <dependency> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-core-deployment</artifactId> + <artifactId>camel-quarkus-pg-replication-slot</artifactId> </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-pg-replication-slot</artifactId> + <artifactId>camel-quarkus-core-deployment</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-jdbc-postgresql-deployment</artifactId> </dependency> </dependencies> diff --git a/extensions-jvm/pg-replication-slot/deployment/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/deployment/PgReplicationSlotProcessor.java b/extensions/pg-replication-slot/deployment/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/deployment/PgReplicationSlotProcessor.java similarity index 68% rename from extensions-jvm/pg-replication-slot/deployment/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/deployment/PgReplicationSlotProcessor.java rename to extensions/pg-replication-slot/deployment/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/deployment/PgReplicationSlotProcessor.java index d2253c0..f0bf2f0 100644 --- a/extensions-jvm/pg-replication-slot/deployment/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/deployment/PgReplicationSlotProcessor.java +++ b/extensions/pg-replication-slot/deployment/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/deployment/PgReplicationSlotProcessor.java @@ -20,13 +20,10 @@ import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.pkg.steps.NativeBuild; -import org.apache.camel.quarkus.core.JvmOnlyRecorder; -import org.jboss.logging.Logger; +import org.apache.camel.quarkus.component.pg.replication.slot.PostgresDriverRegistrationRecorder; class PgReplicationSlotProcessor { - private static final Logger LOG = Logger.getLogger(PgReplicationSlotProcessor.class); private static final String FEATURE = "camel-pg-replication-slot"; @BuildStep @@ -34,13 +31,9 @@ class PgReplicationSlotProcessor { return new FeatureBuildItem(FEATURE); } - /** - * Remove this once this extension starts supporting the native mode. - */ - @BuildStep(onlyIf = NativeBuild.class) - @Record(value = ExecutionTime.RUNTIME_INIT) - void warnJvmInNative(JvmOnlyRecorder recorder) { - JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time - recorder.warnJvmInNative(FEATURE); // warn at runtime + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + public void registerPostgresDriver(PostgresDriverRegistrationRecorder recorder) { + recorder.registerPostgresDriver(); } } diff --git a/extensions-jvm/pg-replication-slot/pom.xml b/extensions/pg-replication-slot/pom.xml similarity index 97% rename from extensions-jvm/pg-replication-slot/pom.xml rename to extensions/pg-replication-slot/pom.xml index 8e4bab7..9246673 100644 --- a/extensions-jvm/pg-replication-slot/pom.xml +++ b/extensions/pg-replication-slot/pom.xml @@ -35,6 +35,5 @@ <modules> <module>deployment</module> <module>runtime</module> - <module>integration-test</module> </modules> </project> diff --git a/extensions-jvm/pg-replication-slot/runtime/pom.xml b/extensions/pg-replication-slot/runtime/pom.xml similarity index 94% rename from extensions-jvm/pg-replication-slot/runtime/pom.xml rename to extensions/pg-replication-slot/runtime/pom.xml index dde6f2f..fb3a427 100644 --- a/extensions-jvm/pg-replication-slot/runtime/pom.xml +++ b/extensions/pg-replication-slot/runtime/pom.xml @@ -34,6 +34,7 @@ <properties> <camel.quarkus.jvmSince>1.1.0</camel.quarkus.jvmSince> + <camel.quarkus.nativeSince>1.2.0</camel.quarkus.nativeSince> </properties> <dependencyManagement> @@ -50,12 +51,16 @@ <dependencies> <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-pg-replication-slot</artifactId> + </dependency> + <dependency> <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-core</artifactId> </dependency> <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-pg-replication-slot</artifactId> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-jdbc-postgresql</artifactId> </dependency> </dependencies> diff --git a/extensions-jvm/pg-replication-slot/integration-test/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java b/extensions/pg-replication-slot/runtime/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/PostgresDriverRegistrationRecorder.java similarity index 63% copy from extensions-jvm/pg-replication-slot/integration-test/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java copy to extensions/pg-replication-slot/runtime/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/PostgresDriverRegistrationRecorder.java index 292d87e..56b57f3 100644 --- a/extensions-jvm/pg-replication-slot/integration-test/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java +++ b/extensions/pg-replication-slot/runtime/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/PostgresDriverRegistrationRecorder.java @@ -14,21 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.pg.replication.slot.it; +package org.apache.camel.quarkus.component.pg.replication.slot; -import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; -import org.junit.jupiter.api.Test; +import io.quarkus.runtime.annotations.Recorder; +import org.postgresql.Driver; -@QuarkusTest -class PgReplicationSlotTest { +/** + * Forces the Postgres driver registration to workaround https://github.com/quarkusio/quarkus/issues/12116. + */ +@Recorder +public class PostgresDriverRegistrationRecorder { - @Test - public void loadComponentPgReplicationSlot() { - /* A simple autogenerated test */ - RestAssured.get("/pg-replication-slot/load/component/pg-replication-slot") - .then() - .statusCode(200); + public void registerPostgresDriver() { + new Driver(); } } diff --git a/extensions-jvm/pg-replication-slot/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/pg-replication-slot/runtime/src/main/resources/META-INF/quarkus-extension.yaml similarity index 97% rename from extensions-jvm/pg-replication-slot/runtime/src/main/resources/META-INF/quarkus-extension.yaml rename to extensions/pg-replication-slot/runtime/src/main/resources/META-INF/quarkus-extension.yaml index 9d48377..03b3c95 100644 --- a/extensions-jvm/pg-replication-slot/runtime/src/main/resources/META-INF/quarkus-extension.yaml +++ b/extensions/pg-replication-slot/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -24,9 +24,8 @@ name: "Camel PostgresSQL Replication Slot" description: "Poll for PostgreSQL Write-Ahead Log (WAL) records using Streaming Replication Slots" metadata: - unlisted: true guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/pg-replication-slot.html" categories: - "integration" status: - - "preview" + - "stable" diff --git a/extensions/pom.xml b/extensions/pom.xml index 6dbc4ab..ea11b97 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -158,6 +158,7 @@ <module>opentracing</module> <module>paho</module> <module>pdf</module> + <module>pg-replication-slot</module> <module>platform-http</module> <module>quartz</module> <module>qute</module> diff --git a/extensions-jvm/pg-replication-slot/integration-test/pom.xml b/integration-tests/pg-replication-slot/pom.xml similarity index 67% rename from extensions-jvm/pg-replication-slot/integration-test/pom.xml rename to integration-tests/pg-replication-slot/pom.xml index b6c14a2..fa7b20e 100644 --- a/extensions-jvm/pg-replication-slot/integration-test/pom.xml +++ b/integration-tests/pg-replication-slot/pom.xml @@ -23,13 +23,12 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-build-parent-it</artifactId> + <artifactId>camel-quarkus-integration-tests</artifactId> <version>1.2.0-SNAPSHOT</version> - <relativePath>../../../poms/build-parent-it/pom.xml</relativePath> </parent> - <artifactId>camel-quarkus-pg-replication-slot-integration-test</artifactId> - <name>Camel Quarkus :: PostgresSQL Replication Slot :: Integration Test</name> + <artifactId>camel-quarkus-integration-test-pg-replication-slot</artifactId> + <name>Camel Quarkus :: Integration Tests :: PostgresSQL Replication Slot</name> <description>Integration tests for Camel Quarkus PostgresSQL Replication Slot extension</description> <dependencyManagement> @@ -53,6 +52,10 @@ <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jackson</artifactId> + </dependency> <!-- test dependencies --> <dependency> @@ -65,6 +68,16 @@ <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-integration-testcontainers-support</artifactId> + <scope>test</scope> + </dependency> <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> <dependency> @@ -97,4 +110,34 @@ </plugin> </plugins> </build> + + <profiles> + <profile> + <id>native</id> + <activation> + <property> + <name>native</name> + </property> + </activation> + <properties> + <quarkus.package.type>native</quarkus.package.type> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/extensions-jvm/pg-replication-slot/integration-test/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotResource.java b/integration-tests/pg-replication-slot/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotResource.java similarity index 62% rename from extensions-jvm/pg-replication-slot/integration-test/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotResource.java rename to integration-tests/pg-replication-slot/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotResource.java index d484c0c..82ee180 100644 --- a/extensions-jvm/pg-replication-slot/integration-test/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotResource.java +++ b/integration-tests/pg-replication-slot/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotResource.java @@ -16,15 +16,14 @@ */ package org.apache.camel.quarkus.component.pg.replication.slot.it; +import java.util.concurrent.ConcurrentLinkedQueue; + import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import org.apache.camel.CamelContext; import org.jboss.logging.Logger; @Path("/pg-replication-slot") @@ -33,19 +32,18 @@ public class PgReplicationSlotResource { private static final Logger LOG = Logger.getLogger(PgReplicationSlotResource.class); - private static final String COMPONENT_PG_REPLICATION_SLOT = "pg-replication-slot"; - @Inject - CamelContext context; + private final ConcurrentLinkedQueue<String> replicationEvents = new ConcurrentLinkedQueue<>(); + + void logReplicationEvent(String event) { + LOG.debugf("Calling logReplicationEvent(\"%s\")", event); + replicationEvents.add(event); + } - @Path("/load/component/pg-replication-slot") + @Path("/get-events") @GET - @Produces(MediaType.TEXT_PLAIN) - public Response loadComponentPgReplicationSlot() throws Exception { - /* This is an autogenerated test */ - if (context.getComponent(COMPONENT_PG_REPLICATION_SLOT) != null) { - return Response.ok().build(); - } - LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_PG_REPLICATION_SLOT); - return Response.status(500, COMPONENT_PG_REPLICATION_SLOT + " could not be loaded from the Camel context").build(); + @Produces(MediaType.APPLICATION_JSON) + public ConcurrentLinkedQueue<String> getReplicationEvents() { + LOG.debug("Calling getReplicationEvents"); + return replicationEvents; } } diff --git a/integration-tests/pg-replication-slot/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotRoute.java b/integration-tests/pg-replication-slot/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotRoute.java new file mode 100644 index 0000000..51863c2 --- /dev/null +++ b/integration-tests/pg-replication-slot/src/main/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotRoute.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.pg.replication.slot.it; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.apache.camel.builder.RouteBuilder; + +@ApplicationScoped +public class PgReplicationSlotRoute extends RouteBuilder { + + public static final String PG_AUTHORITY_CFG_KEY = "quarkus.camel.pg-replication-slot.test.authority"; + public static final String PG_DBNAME_CFG_KEY = "quarkus.camel.pg-replication-slot.test.db-name"; + public static final String PG_PASSRD_CFG_KEY = "quarkus.camel.pg-replication-slot.test.password"; + public static final String PG_USER_CFG_KEY = "quarkus.camel.pg-replication-slot.test.user"; + + private static final String URI_FORMAT = "pg-replication-slot://{{%s}}/{{%s}}/{{%s}}_test_slot:test_decoding?user={{%s}}&password={{%s}}&slotOptions.skip-empty-xacts=true&slotOptions.include-xids=false"; + + @Inject + PgReplicationSlotResource resource; + + @Override + public void configure() { + fromF(URI_FORMAT, PG_AUTHORITY_CFG_KEY, PG_DBNAME_CFG_KEY, PG_DBNAME_CFG_KEY, PG_USER_CFG_KEY, PG_PASSRD_CFG_KEY) + .process(e -> { + resource.logReplicationEvent(e.getMessage().getBody(String.class)); + }); + } + +} diff --git a/extensions-jvm/pg-replication-slot/integration-test/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java b/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotIT.java similarity index 68% rename from extensions-jvm/pg-replication-slot/integration-test/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java rename to integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotIT.java index 292d87e..709c655 100644 --- a/extensions-jvm/pg-replication-slot/integration-test/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java +++ b/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotIT.java @@ -16,19 +16,9 @@ */ package org.apache.camel.quarkus.component.pg.replication.slot.it; -import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; -import org.junit.jupiter.api.Test; +import io.quarkus.test.junit.NativeImageTest; -@QuarkusTest -class PgReplicationSlotTest { - - @Test - public void loadComponentPgReplicationSlot() { - /* A simple autogenerated test */ - RestAssured.get("/pg-replication-slot/load/component/pg-replication-slot") - .then() - .statusCode(200); - } +@NativeImageTest +class PgReplicationSlotIT extends PgReplicationSlotTest { } diff --git a/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java b/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java new file mode 100644 index 0000000..331ceae --- /dev/null +++ b/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.pg.replication.slot.it; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; +import org.eclipse.microprofile.config.ConfigProvider; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static io.restassured.RestAssured.given; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_AUTHORITY_CFG_KEY; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_DBNAME_CFG_KEY; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_PASSRD_CFG_KEY; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_USER_CFG_KEY; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@QuarkusTestResource(PgReplicationSlotTestResource.class) +@QuarkusTest +class PgReplicationSlotTest { + + private static Connection connection; + + @BeforeAll + public static void setUp() throws SQLException { + String authority = ConfigProvider.getConfig().getValue(PG_AUTHORITY_CFG_KEY, String.class); + String dbName = ConfigProvider.getConfig().getValue(PG_DBNAME_CFG_KEY, String.class); + String user = ConfigProvider.getConfig().getValue(PG_USER_CFG_KEY, String.class); + String password = ConfigProvider.getConfig().getValue(PG_PASSRD_CFG_KEY, String.class); + + String url = String.format("jdbc:postgresql://%s/%s", authority, dbName); + Properties props = new Properties(); + props.setProperty("user", user); + props.setProperty("password", password); + + connection = DriverManager.getConnection(url, props); + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE IF NOT EXISTS camel_test_table(id int);"); + } + } + + @AfterAll + public static void tearDown() throws SQLException { + connection.close(); + } + + @Test + public void insertsShouldTriggerReplicationEvents() throws SQLException { + + try (Statement statement = connection.createStatement()) { + statement.execute("INSERT INTO camel_test_table(id) VALUES(1984);"); + statement.execute("INSERT INTO camel_test_table(id) VALUES(1998);"); + } + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().contentType(ContentType.JSON).get("/pg-replication-slot/get-events").path("size()").equals(6); + }); + String[] results = given().contentType(ContentType.JSON).get("/pg-replication-slot/get-events").then().extract() + .as(String[].class); + assertEquals(6, results.length); + assertEquals("BEGIN", results[0]); + assertEquals("table public.camel_test_table: INSERT: id[integer]:1984", results[1]); + assertEquals("COMMIT", results[2]); + assertEquals("BEGIN", results[3]); + assertEquals("table public.camel_test_table: INSERT: id[integer]:1998", results[4]); + assertEquals("COMMIT", results[5]); + } + +} diff --git a/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTestResource.java b/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTestResource.java new file mode 100644 index 0000000..b99d067 --- /dev/null +++ b/integration-tests/pg-replication-slot/src/test/java/org/apache/camel/quarkus/component/pg/replication/slot/it/PgReplicationSlotTestResource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.quarkus.component.pg.replication.slot.it; + +import java.util.Map; + +import org.apache.camel.quarkus.testcontainers.ContainerResourceLifecycleManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.TestcontainersConfiguration; + +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_AUTHORITY_CFG_KEY; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_DBNAME_CFG_KEY; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_PASSRD_CFG_KEY; +import static org.apache.camel.quarkus.component.pg.replication.slot.it.PgReplicationSlotRoute.PG_USER_CFG_KEY; +import static org.apache.camel.util.CollectionHelper.mapOf; + +public class PgReplicationSlotTestResource implements ContainerResourceLifecycleManager { + private static final Logger LOG = LoggerFactory.getLogger(PgReplicationSlotTestResource.class); + private static final int POSTGRES_PORT = 5432; + private static final String POSTGRES_IMAGE = "postgres:13.0"; + private static final String POSTGRES_DB_NAME = "camel_db"; + private static final String POSTGRES_PASSWORD = "postgres-password"; + private static final String POSTGRES_USER = "postgres-user"; + + private GenericContainer pgContainer; + + @Override + public Map<String, String> start() { + LOG.info(TestcontainersConfiguration.getInstance().toString()); + + // Setup the Postgres container with replication enabled + pgContainer = new GenericContainer(POSTGRES_IMAGE).withCommand("postgres -c wal_level=logical") + .withExposedPorts(POSTGRES_PORT).withEnv("POSTGRES_USER", POSTGRES_USER) + .withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD).withEnv("POSTGRES_DB", POSTGRES_DB_NAME) + .withLogConsumer(new Slf4jLogConsumer(LOG)).waitingFor(Wait.forListeningPort()); + pgContainer.start(); + + // Print Postgres server connectivity information + String pgAuthority = pgContainer.getContainerIpAddress() + ":" + pgContainer.getMappedPort(POSTGRES_PORT); + LOG.debug("Postgres database available at " + pgAuthority); + + return mapOf(PG_AUTHORITY_CFG_KEY, pgAuthority, PG_DBNAME_CFG_KEY, POSTGRES_DB_NAME, PG_USER_CFG_KEY, POSTGRES_USER, + PG_PASSRD_CFG_KEY, POSTGRES_PASSWORD); + } + + @Override + public void stop() { + try { + if (pgContainer != null) { + pgContainer.stop(); + } + } catch (Exception ex) { + LOG.error("An issue occured while stopping the PgReplicationSlotTestResource", ex); + } + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 2b30730..f4352fb 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -131,6 +131,7 @@ <module>openapi-java</module> <module>opentracing</module> <module>pdf</module> + <module>pg-replication-slot</module> <module>platform-http</module> <module>platform-http-engine</module> <module>quartz</module> diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml index db2d809..4ea4150 100644 --- a/tooling/scripts/test-categories.yaml +++ b/tooling/scripts/test-categories.yaml @@ -49,6 +49,7 @@ database: - jpa - kudu - mongodb + - pg-replication-slot - sql - arangodb dataformats: