This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e4bbe0f2846 [fix][test][branch-4.0] Backport Pulsar IO Debezium 
connector test framework changes
e4bbe0f2846 is described below

commit e4bbe0f2846ca084953588815415a1d5e341679f
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Apr 16 10:31:53 2026 +0300

    [fix][test][branch-4.0] Backport Pulsar IO Debezium connector test 
framework changes
---
 tests/integration/pom.xml                          | 10 +++-
 .../containers/DebeziumMongoDbContainer.java       |  7 +--
 .../containers/DebeziumMySQLContainer.java         |  3 +-
 .../containers/DebeziumPostgreSqlContainer.java    |  3 +-
 .../debezium/DebeziumMongoDbSourceTester.java      | 53 +++++++++++++++++++---
 .../debezium/DebeziumMySqlSourceTester.java        | 35 +++++---------
 .../debezium/PulsarDebeziumSourcesTest.java        |  2 +-
 7 files changed, 75 insertions(+), 38 deletions(-)

diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 75213fb73b4..ff51bd29f9a 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -302,10 +302,16 @@
             <artifactId>maven-surefire-plugin</artifactId>
             <configuration>
               <argLine>${testJacocoAgentArgument} -XX:+ExitOnOutOfMemoryError 
-Xmx1G -XX:MaxDirectMemorySize=1G
-              -Dconfluent.version=${confluent.version} 
-Djacoco.version=${jacoco-maven-plugin.version}
-              
-Dintegrationtest.coverage.enabled=${integrationtest.coverage.enabled} 
-Dintegrationtest.coverage.dir=${integrationtest.coverage.dir}
               ${test.additional.args}
               </argLine>
+              <systemPropertyVariables>
+                <confluent.version>${confluent.version}</confluent.version>
+                <kafka.version>${kafka-client.version}</kafka.version>
+                <jacoco.version>${jacoco-maven-plugin.version}</jacoco.version>
+                <debezium.version>${debezium.version}</debezium.version>
+                
<integrationtest.coverage.enabled>${integrationtest.coverage.enabled}</integrationtest.coverage.enabled>
+                
<integrationtest.coverage.dir>${integrationtest.coverage.dir}</integrationtest.coverage.dir>
+              </systemPropertyVariables>
               <skipTests>false</skipTests>
               <suiteXmlFiles>
                 <file>src/test/resources/${integrationTestSuiteFile}</file>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
index 481725d145b..35806517c6a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -25,12 +25,11 @@ public class DebeziumMongoDbContainer extends 
ChaosContainer<DebeziumMongoDbCont
     public static final String NAME = "debezium-mongodb-example";
 
     public static final Integer[] PORTS = { 27017 };
-    private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
+    private static final String IMAGE_NAME =
+            "quay.io/debezium/example-mongodb:" + 
System.getProperty("debezium.version", "1.9.7.Final");
 
     public DebeziumMongoDbContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
-        this.withEnv("MONGODB_USER", "mongodb");
-        this.withEnv("MONGODB_PASSWORD", "mongodb");
     }
     @Override
     public String getContainerName() {
@@ -42,6 +41,8 @@ public class DebeziumMongoDbContainer extends 
ChaosContainer<DebeziumMongoDbCont
         super.configure();
         this.withNetworkAliases(NAME)
                 .withExposedPorts(PORTS)
+                .withEnv("MONGODB_USER", "debezium")
+                .withEnv("MONGODB_PASSWORD", "dbz")
                 .withCreateContainerCmdModifier(createContainerCmd -> {
                     createContainerCmd.withHostName(NAME);
                     createContainerCmd.withName(getContainerName());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index 27d624a6c82..7dbd0f4f858 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -26,7 +26,8 @@ public class DebeziumMySQLContainer extends 
ChaosContainer<DebeziumMySQLContaine
     public static final String NAME = "debezium-mysql-example";
     static final Integer[] PORTS = { 3306 };
 
-    private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
+    private static final String IMAGE_NAME =
+            "quay.io/debezium/example-mysql:" + 
System.getProperty("debezium.version", "1.9.7.Final");
 
     public DebeziumMySQLContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index 479869c4183..b86b22937c2 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -26,7 +26,8 @@ public class DebeziumPostgreSqlContainer extends 
ChaosContainer<DebeziumPostgreS
     public static final String NAME = "debezium-postgresql-example";
     static final Integer[] PORTS = { 5432 };
 
-    private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
+    private static final String IMAGE_NAME =
+            "quay.io/debezium/example-postgres:" + 
System.getProperty("debezium.version", "1.9.7.Final");
 
     public DebeziumPostgreSqlContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 4c562f3e244..35cb3e53923 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -23,6 +23,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.io.sources.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
@@ -62,17 +63,55 @@ public class DebeziumMongoDbSourceTester extends 
SourceTester<DebeziumMongoDbCon
 
     @Override
     public void prepareSource() throws Exception {
+        waitForMongoDbReady();
         this.debeziumMongoDbContainer.execCmd("bash", "-c", 
"/usr/local/bin/init-inventory.sh");
-        log.info("debezium mongodb server already contains preconfigured 
data.");
+        waitForReplicaSetPrimary();
+    }
+
+    private void waitForMongoDbReady() throws Exception {
+        log.info("Waiting for MongoDB to be ready...");
+        for (int i = 0; i < 50; i++) {
+            try {
+                ContainerExecResult result = 
this.debeziumMongoDbContainer.execCmd(
+                        "/bin/bash", "-c",
+                        "mongosh --quiet --eval \"db.adminCommand('ping').ok\" 
localhost:27017 | grep 1");
+                if (result.getExitCode() == 0) {
+                    log.info("MongoDB ready after {} seconds", i);
+                    return;
+                }
+            } catch (Exception e) {
+                log.debug("MongoDB readiness check attempt {} failed: {}", i + 
1, e.getMessage());
+            }
+            Thread.sleep(1000);
+        }
+        throw new RuntimeException("MongoDB not ready after 50 seconds");
+    }
+
+    private void waitForReplicaSetPrimary() throws Exception {
+        log.info("Waiting for MongoDB replica set primary to be ready...");
+        for (int i = 0; i < 60; i++) {
+            try {
+                ContainerExecResult result = 
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
+                        "mongosh --quiet --eval 'db.hello().isWritablePrimary' 
localhost:27017");
+                if (result.getStdout().trim().contains("true")) {
+                    log.info("MongoDB replica set primary ready after {} 
seconds", i);
+                    return;
+                }
+            } catch (Exception e) {
+                log.debug("MongoDB primary check attempt {} failed: {}", i + 
1, e.getMessage());
+            }
+            Thread.sleep(1000);
+        }
+        throw new RuntimeException("MongoDB replica set primary not ready 
after 60 seconds");
     }
 
     @Override
     public void prepareInsertEvent() throws Exception {
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.find()'");
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.insert({ "
                         + "_id : NumberLong(\"110\"),"
                         + "name : \"test-debezium\","
@@ -84,20 +123,20 @@ public class DebeziumMongoDbSourceTester extends 
SourceTester<DebeziumMongoDbCon
     @Override
     public void prepareDeleteEvent() throws Exception {
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.find()'");
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.deleteOne({name : 
\"test-debezium-update\"})'");
     }
 
     @Override
     public void prepareUpdateEvent() throws Exception {
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.find()'");
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.update({"
                         + "_id : 110},"
                         + "{$set:{name:\"test-debezium-update\", description: 
\"this is update description\"}})'");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 2c457cd2fb9..18a1aa0ae00 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -83,37 +83,26 @@ public class DebeziumMySqlSourceTester extends 
SourceTester<DebeziumMySQLContain
 
     @Override
     public void prepareInsertEvent() throws Exception {
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM 
inventory.products'");
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium "
-                        + "-e \"INSERT INTO inventory.products(name, 
description, weight) "
-                        + "values('test-debezium', 'This is description', 
2.0)\"");
+        executeSql("SELECT * FROM inventory.products");
+        executeSql("INSERT INTO inventory.products(name, description, weight) "
+                + "values('test-debezium', 'This is description', 2.0)");
+    }
+
+    private void executeSql(String sqlStatement) throws Exception {
+        this.debeziumMySqlContainer.execCmd("mysql", "-u", "root", 
"-pdebezium", "-e", sqlStatement);
     }
 
     @Override
     public void prepareUpdateEvent() throws Exception {
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium "
-                        + "-e \"UPDATE inventory.products set 
description='update description', weight=10 "
-                        + "WHERE name='test-debezium'\"");
+        executeSql("UPDATE inventory.products set description='update 
description', weight=10 "
+                + "WHERE name='test-debezium'");
     }
 
     @Override
     public void prepareDeleteEvent() throws Exception {
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM 
inventory.products'");
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium "
-                        + "-e \"DELETE FROM inventory.products WHERE 
name='test-debezium'\"");
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM 
inventory.products'");
+        executeSql("SELECT * FROM inventory.products");
+        executeSql("DELETE FROM inventory.products WHERE 
name='test-debezium'");
+        executeSql("SELECT * FROM inventory.products");
     }
 
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 50160d94eef..2d8c78dd83d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -91,7 +91,7 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
                 + "-" + functionRuntimeType + "-" + randomName(8);
 
         // This is the binlog count that contained in mysql container.
-        final int numMessages = 47;
+        final int numMessages = 52;
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()

Reply via email to