This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e4bbe0f2846 [fix][test][branch-4.0] Backport Pulsar IO Debezium
connector test framework changes
e4bbe0f2846 is described below
commit e4bbe0f2846ca084953588815415a1d5e341679f
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Apr 16 10:31:53 2026 +0300
[fix][test][branch-4.0] Backport Pulsar IO Debezium connector test
framework changes
---
tests/integration/pom.xml | 10 +++-
.../containers/DebeziumMongoDbContainer.java | 7 +--
.../containers/DebeziumMySQLContainer.java | 3 +-
.../containers/DebeziumPostgreSqlContainer.java | 3 +-
.../debezium/DebeziumMongoDbSourceTester.java | 53 +++++++++++++++++++---
.../debezium/DebeziumMySqlSourceTester.java | 35 +++++---------
.../debezium/PulsarDebeziumSourcesTest.java | 2 +-
7 files changed, 75 insertions(+), 38 deletions(-)
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 75213fb73b4..ff51bd29f9a 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -302,10 +302,16 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument} -XX:+ExitOnOutOfMemoryError
-Xmx1G -XX:MaxDirectMemorySize=1G
- -Dconfluent.version=${confluent.version}
-Djacoco.version=${jacoco-maven-plugin.version}
-
-Dintegrationtest.coverage.enabled=${integrationtest.coverage.enabled}
-Dintegrationtest.coverage.dir=${integrationtest.coverage.dir}
${test.additional.args}
</argLine>
+ <systemPropertyVariables>
+ <confluent.version>${confluent.version}</confluent.version>
+ <kafka.version>${kafka-client.version}</kafka.version>
+ <jacoco.version>${jacoco-maven-plugin.version}</jacoco.version>
+ <debezium.version>${debezium.version}</debezium.version>
+
<integrationtest.coverage.enabled>${integrationtest.coverage.enabled}</integrationtest.coverage.enabled>
+
<integrationtest.coverage.dir>${integrationtest.coverage.dir}</integrationtest.coverage.dir>
+ </systemPropertyVariables>
<skipTests>false</skipTests>
<suiteXmlFiles>
<file>src/test/resources/${integrationTestSuiteFile}</file>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
index 481725d145b..35806517c6a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -25,12 +25,11 @@ public class DebeziumMongoDbContainer extends
ChaosContainer<DebeziumMongoDbCont
public static final String NAME = "debezium-mongodb-example";
public static final Integer[] PORTS = { 27017 };
- private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
+ private static final String IMAGE_NAME =
+ "quay.io/debezium/example-mongodb:" +
System.getProperty("debezium.version", "1.9.7.Final");
public DebeziumMongoDbContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
- this.withEnv("MONGODB_USER", "mongodb");
- this.withEnv("MONGODB_PASSWORD", "mongodb");
}
@Override
public String getContainerName() {
@@ -42,6 +41,8 @@ public class DebeziumMongoDbContainer extends
ChaosContainer<DebeziumMongoDbCont
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
+ .withEnv("MONGODB_USER", "debezium")
+ .withEnv("MONGODB_PASSWORD", "dbz")
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(getContainerName());
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index 27d624a6c82..7dbd0f4f858 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -26,7 +26,8 @@ public class DebeziumMySQLContainer extends
ChaosContainer<DebeziumMySQLContaine
public static final String NAME = "debezium-mysql-example";
static final Integer[] PORTS = { 3306 };
- private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
+ private static final String IMAGE_NAME =
+ "quay.io/debezium/example-mysql:" +
System.getProperty("debezium.version", "1.9.7.Final");
public DebeziumMySQLContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index 479869c4183..b86b22937c2 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -26,7 +26,8 @@ public class DebeziumPostgreSqlContainer extends
ChaosContainer<DebeziumPostgreS
public static final String NAME = "debezium-postgresql-example";
static final Integer[] PORTS = { 5432 };
- private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
+ private static final String IMAGE_NAME =
+ "quay.io/debezium/example-postgres:" +
System.getProperty("debezium.version", "1.9.7.Final");
public DebeziumPostgreSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 4c562f3e244..35cb3e53923 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -23,6 +23,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -62,17 +63,55 @@ public class DebeziumMongoDbSourceTester extends
SourceTester<DebeziumMongoDbCon
@Override
public void prepareSource() throws Exception {
+ waitForMongoDbReady();
this.debeziumMongoDbContainer.execCmd("bash", "-c",
"/usr/local/bin/init-inventory.sh");
- log.info("debezium mongodb server already contains preconfigured
data.");
+ waitForReplicaSetPrimary();
+ }
+
+ private void waitForMongoDbReady() throws Exception {
+ log.info("Waiting for MongoDB to be ready...");
+ for (int i = 0; i < 50; i++) {
+ try {
+ ContainerExecResult result =
this.debeziumMongoDbContainer.execCmd(
+ "/bin/bash", "-c",
+ "mongosh --quiet --eval \"db.adminCommand('ping').ok\"
localhost:27017 | grep 1");
+ if (result.getExitCode() == 0) {
+ log.info("MongoDB ready after {} seconds", i);
+ return;
+ }
+ } catch (Exception e) {
+ log.debug("MongoDB readiness check attempt {} failed: {}", i +
1, e.getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ throw new RuntimeException("MongoDB not ready after 50 seconds");
+ }
+
+ private void waitForReplicaSetPrimary() throws Exception {
+ log.info("Waiting for MongoDB replica set primary to be ready...");
+ for (int i = 0; i < 60; i++) {
+ try {
+ ContainerExecResult result =
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
+ "mongosh --quiet --eval 'db.hello().isWritablePrimary'
localhost:27017");
+ if (result.getStdout().trim().contains("true")) {
+ log.info("MongoDB replica set primary ready after {}
seconds", i);
+ return;
+ }
+ } catch (Exception e) {
+ log.debug("MongoDB primary check attempt {} failed: {}", i +
1, e.getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ throw new RuntimeException("MongoDB replica set primary not ready
after 60 seconds");
}
@Override
public void prepareInsertEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.insert({ "
+ "_id : NumberLong(\"110\"),"
+ "name : \"test-debezium\","
@@ -84,20 +123,20 @@ public class DebeziumMongoDbSourceTester extends
SourceTester<DebeziumMongoDbCon
@Override
public void prepareDeleteEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.deleteOne({name :
\"test-debezium-update\"})'");
}
@Override
public void prepareUpdateEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.update({"
+ "_id : 110},"
+ "{$set:{name:\"test-debezium-update\", description:
\"this is update description\"}})'");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 2c457cd2fb9..18a1aa0ae00 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -83,37 +83,26 @@ public class DebeziumMySqlSourceTester extends
SourceTester<DebeziumMySQLContain
@Override
public void prepareInsertEvent() throws Exception {
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM
inventory.products'");
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium "
- + "-e \"INSERT INTO inventory.products(name,
description, weight) "
- + "values('test-debezium', 'This is description',
2.0)\"");
+ executeSql("SELECT * FROM inventory.products");
+ executeSql("INSERT INTO inventory.products(name, description, weight) "
+ + "values('test-debezium', 'This is description', 2.0)");
+ }
+
+ private void executeSql(String sqlStatement) throws Exception {
+ this.debeziumMySqlContainer.execCmd("mysql", "-u", "root",
"-pdebezium", "-e", sqlStatement);
}
@Override
public void prepareUpdateEvent() throws Exception {
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium "
- + "-e \"UPDATE inventory.products set
description='update description', weight=10 "
- + "WHERE name='test-debezium'\"");
+ executeSql("UPDATE inventory.products set description='update
description', weight=10 "
+ + "WHERE name='test-debezium'");
}
@Override
public void prepareDeleteEvent() throws Exception {
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM
inventory.products'");
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium "
- + "-e \"DELETE FROM inventory.products WHERE
name='test-debezium'\"");
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM
inventory.products'");
+ executeSql("SELECT * FROM inventory.products");
+ executeSql("DELETE FROM inventory.products WHERE
name='test-debezium'");
+ executeSql("SELECT * FROM inventory.products");
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 50160d94eef..2d8c78dd83d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -91,7 +91,7 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
+ "-" + functionRuntimeType + "-" + randomName(8);
// This is the binlog count that contained in mysql container.
- final int numMessages = 47;
+ final int numMessages = 52;
@Cleanup
PulsarClient client = PulsarClient.builder()