This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c958b937d85 [improve][io] Upgrade Debezium to 3.4.2 and Kafka 
Client/Connect to 4.1.1 (#25335)
c958b937d85 is described below

commit c958b937d85b901526383fcd853da6afc4d3b15e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 18 23:58:01 2026 +0200

    [improve][io] Upgrade Debezium to 3.4.2 and Kafka Client/Connect to 4.1.1 
(#25335)
---
 pom.xml                                            |  8 ++---
 pulsar-io/debezium/core/pom.xml                    | 32 +++++++++++++++--
 pulsar-io/kafka-connect-adaptor/pom.xml            | 24 +++++++++++++
 .../kafka/connect/AbstractKafkaConnectSource.java  |  6 ++--
 .../kafka/connect/PulsarIOSourceTaskContext.java   |  6 ++++
 .../io/kafka/connect/PulsarKafkaSinkContext.java   |  6 ++++
 .../kafka/connect/PulsarKafkaSinkTaskContext.java  |  6 ++++
 tests/integration/pom.xml                          |  2 ++
 .../containers/DebeziumMongoDbContainer.java       | 14 +++++---
 .../containers/DebeziumMySQLContainer.java         |  3 +-
 .../containers/DebeziumPostgreSqlContainer.java    |  3 +-
 .../integration/io/sinks/KafkaSinkTester.java      | 14 ++++----
 .../io/sources/AvroKafkaSourceTest.java            | 11 +++---
 .../integration/io/sources/KafkaSourceTester.java  | 30 +++-------------
 .../debezium/DebeziumMongoDbSourceTester.java      | 41 +++++++++++++++++++++-
 .../debezium/DebeziumMsSqlSourceTester.java        |  2 +-
 .../debezium/DebeziumMySqlSourceTester.java        | 35 +++++++-----------
 .../debezium/DebeziumOracleDbSourceTester.java     |  2 +-
 18 files changed, 165 insertions(+), 80 deletions(-)

diff --git a/pom.xml b/pom.xml
index cc47242f89d..c2b7abbb643 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,7 +234,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra.version>3.11.2</cassandra.version>
     <aerospike-client.version>4.5.0</aerospike-client.version>
-    <kafka-client.version>3.9.1</kafka-client.version>
+    <kafka-client.version>4.1.1</kafka-client.version>
     <rabbitmq-client.version>5.28.0</rabbitmq-client.version>
     <aws-sdk.version>1.12.788</aws-sdk.version>
     <aws-sdk2.version>2.32.28</aws-sdk2.version>
@@ -243,14 +243,14 @@ flexible messaging model and an intuitive client 
API.</description>
     <jclouds.version>2.6.0</jclouds.version>
     <guice.version>5.1.0</guice.version>
     <sqlite-jdbc.version>3.47.1.0</sqlite-jdbc.version>
-    <postgresql-jdbc.version>42.7.7</postgresql-jdbc.version>
+    <postgresql-jdbc.version>42.7.10</postgresql-jdbc.version>
     <clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version>
     <mariadb-jdbc.version>3.5.5</mariadb-jdbc.version>
     <openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
     <json-smart.version>2.5.2</json-smart.version>
     <opensearch.version>2.19.4</opensearch.version>
     <elasticsearch-java.version>8.15.3</elasticsearch-java.version>
-    <debezium.version>3.2.5.Final</debezium.version>
+    <debezium.version>3.4.2.Final</debezium.version>
     
<debezium.postgresql.version>${postgresql-jdbc.version}</debezium.postgresql.version>
     <debezium.mysql.version>9.4.0</debezium.mysql.version>
     <jsonwebtoken.version>0.13.0</jsonwebtoken.version>
@@ -261,7 +261,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <hbase.version>2.6.3-hadoop3</hbase.version>
     <guava.version>33.4.8-jre</guava.version>
     <prometheus-jmx.version>0.16.1</prometheus-jmx.version>
-    <confluent.version>7.9.2</confluent.version>
+    <confluent.version>8.1.1</confluent.version>
     <aircompressor.version>2.0.3</aircompressor.version>
     <asynchttpclient.version>2.12.4</asynchttpclient.version>
     <commons-lang3.version>3.19.0</commons-lang3.version>
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 2470dfc8e85..1860f76f2ee 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -78,17 +78,45 @@
           <artifactId>kafka-log4j-appender</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jose4j</artifactId>
-          <groupId>org.bitbucket.b_c</groupId>
+          <groupId>org.slf4</groupId>
+          <artifactId>*</artifactId>
         </exclusion>
         <exclusion>
           <groupId>org.eclipse.jetty</groupId>
           <artifactId>*</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty.ee10</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jose4j</artifactId>
+          <groupId>org.bitbucket.b_c</groupId>
+        </exclusion>
         <exclusion>
           <groupId>org.lz4</groupId>
           <artifactId>lz4-java</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.jakarta.rs</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.containers</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.inject</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.xml.bind</groupId>
+          <artifactId>jaxb-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.activation</groupId>
+          <artifactId>activation</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml 
b/pulsar-io/kafka-connect-adaptor/pom.xml
index 3b125330b02..55808b4cf21 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -85,6 +85,10 @@
           <groupId>org.eclipse.jetty</groupId>
           <artifactId>*</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty.ee10</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
         <exclusion>
           <artifactId>jose4j</artifactId>
           <groupId>org.bitbucket.b_c</groupId>
@@ -93,6 +97,26 @@
           <groupId>org.lz4</groupId>
           <artifactId>lz4-java</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.jakarta.rs</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.containers</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.inject</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.xml.bind</groupId>
+          <artifactId>jaxb-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.activation</groupId>
+          <artifactId>activation</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 6ed9e7891b2..47f691af121 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -22,7 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import io.confluent.connect.avro.AvroConverter;
 import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -105,11 +105,11 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
         if (keyConverter instanceof AvroConverter) {
             keyConverter = new AvroConverter(new MockSchemaRegistryClient());
-            
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
+            
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
         }
         if (valueConverter instanceof AvroConverter) {
             valueConverter = new AvroConverter(new MockSchemaRegistryClient());
-            
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
+            
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
         }
         keyConverter.configure(config, true);
         valueConverter.configure(config, false);
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
index ab74ac4aa29..9afc4d50fbd 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.kafka.connect;
 
 import java.util.Map;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 
@@ -41,4 +42,9 @@ class PulsarIOSourceTaskContext implements SourceTaskContext {
     public OffsetStorageReader offsetStorageReader() {
         return reader;
     }
+
+    @Override
+    public PluginMetrics pluginMetrics() {
+        return null;
+    }
 }
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
index d92ae8eeb6d..dabc6e527a3 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.kafka.connect;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.connect.connector.ConnectorContext;
 
 @Slf4j
@@ -33,4 +34,9 @@ public class PulsarKafkaSinkContext implements 
ConnectorContext {
     public void raiseError(Exception e) {
         throw new UnsupportedOperationException("not implemented", e);
     }
+
+    @Override
+    public PluginMetrics pluginMetrics() {
+        return null;
+    }
 }
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 760799e0daa..76ff8c0c0a1 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -37,6 +37,7 @@ import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -233,6 +234,11 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
         log.warn("requestCommit() is called but is not supported currently.");
     }
 
+    @Override
+    public PluginMetrics pluginMetrics() {
+        return null;
+    }
+
     public void flushOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) 
throws Exception {
         Map<ByteBuffer, ByteBuffer> offsetMap = 
Maps.newHashMapWithExpectedSize(offsets.size());
 
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index ee940a36e7c..ebac4506ebc 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -377,7 +377,9 @@
               </argLine>
               <systemPropertyVariables>
                 <confluent.version>${confluent.version}</confluent.version>
+                <kafka.version>${kafka-client.version}</kafka.version>
                 <jacoco.version>${jacoco-maven-plugin.version}</jacoco.version>
+                <debezium.version>${debezium.version}</debezium.version>
                 
<integrationtest.coverage.enabled>${integrationtest.coverage.enabled}</integrationtest.coverage.enabled>
                 
<integrationtest.coverage.dir>${integrationtest.coverage.dir}</integrationtest.coverage.dir>
                 
<inttest.asyncprofiler.opts>${inttest.asyncprofiler.opts}</inttest.asyncprofiler.opts>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
index 6fa2e9ff471..29d153955e7 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -19,18 +19,19 @@
 package org.apache.pulsar.tests.integration.containers;
 
 import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
 
 public class DebeziumMongoDbContainer extends 
ChaosContainer<DebeziumMongoDbContainer> {
 
     public static final String NAME = "debezium-mongodb-example";
 
     public static final Integer[] PORTS = { 27017 };
-    private static final String IMAGE_NAME = 
"debezium/example-mongodb:3.0.0.Final";
+    private static final String IMAGE_NAME =
+            "quay.io/debezium/example-mongodb:" + 
System.getProperty("debezium.version", "3.4.2.Final");
 
     public DebeziumMongoDbContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
-        this.withEnv("MONGODB_USER", "mongodb");
-        this.withEnv("MONGODB_PASSWORD", "mongodb");
     }
     @Override
     public String getContainerName() {
@@ -42,10 +43,15 @@ public class DebeziumMongoDbContainer extends 
ChaosContainer<DebeziumMongoDbCont
         super.configure();
         this.withNetworkAliases(NAME)
                 .withExposedPorts(PORTS)
+                .withEnv("MONGODB_USER", "debezium")
+                .withEnv("MONGODB_PASSWORD", "dbz")
                 .withCreateContainerCmdModifier(createContainerCmd -> {
                     createContainerCmd.withHostName(NAME);
                     createContainerCmd.withName(getContainerName());
                 })
-                .waitingFor(new HostPortWaitStrategy());
+                .waitingFor(new WaitAllStrategy()
+                        .withStrategy(new HostPortWaitStrategy())
+                        .withStrategy(Wait.forLogMessage(".*MongoDB init 
process complete.*", 1))
+                );
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index cf59cda868d..28c118a3e76 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -26,7 +26,8 @@ public class DebeziumMySQLContainer extends 
ChaosContainer<DebeziumMySQLContaine
     public static final String NAME = "debezium-mysql-example";
     static final Integer[] PORTS = { 3306 };
 
-    private static final String IMAGE_NAME = 
"debezium/example-mysql:3.0.0.Final";
+    private static final String IMAGE_NAME =
+            "quay.io/debezium/example-mysql:" + 
System.getProperty("debezium.version", "3.4.2.Final");
 
     public DebeziumMySQLContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index 4fd391fd926..2dbc259cf73 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -26,7 +26,8 @@ public class DebeziumPostgreSqlContainer extends 
ChaosContainer<DebeziumPostgreS
     public static final String NAME = "debezium-postgresql-example";
     static final Integer[] PORTS = { 5432 };
 
-    private static final String IMAGE_NAME = 
"debezium/example-postgres:3.0.0.Final";
+    private static final String IMAGE_NAME =
+            "quay.io/debezium/example-postgres:" + 
System.getProperty("debezium.version", "3.4.2.Final");
 
     public DebeziumPostgreSqlContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
index dbcb1639c11..6bb98964930 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
@@ -34,7 +34,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
 /**
@@ -42,7 +42,7 @@ import org.testcontainers.utility.DockerImageName;
  */
 @Slf4j
 public class KafkaSinkTester extends SinkTester<KafkaContainer> {
-    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "7.8.2");
+    public static final String KAFKA_VERSION = 
System.getProperty("kafka.version", "4.1.1");
 
     private final String kafkaTopicName;
     private KafkaConsumer<String, String> kafkaConsumer;
@@ -55,18 +55,16 @@ public class KafkaSinkTester extends 
SinkTester<KafkaContainer> {
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_sink_topic_" + suffix;
 
-        sinkConfig.put("bootstrapServers", networkAlias + ":9092");
+        sinkConfig.put("bootstrapServers", networkAlias + ":9093");
         sinkConfig.put("acks", "all");
         sinkConfig.put("batchSize", 1L);
         sinkConfig.put("maxRequestSize", 1048576L);
         sinkConfig.put("topic", kafkaTopicName);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     protected KafkaContainer createSinkService(PulsarCluster cluster) {
-        return new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + 
CONFLUENT_PLATFORM_VERSION))
-                .withEmbeddedZookeeper()
+        return new KafkaContainer(DockerImageName.parse("apache/kafka:" + 
KAFKA_VERSION))
                 .withNetworkAliases(containerName)
                 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
                         .withName(containerName)
@@ -76,10 +74,10 @@ public class KafkaSinkTester extends 
SinkTester<KafkaContainer> {
     @Override
     public void prepareSink() throws Exception {
         ExecResult execResult = serviceContainer.execInContainer(
-                "/usr/bin/kafka-topics",
+                "/opt/kafka/bin/kafka-topics.sh",
                 "--create",
                 "--bootstrap-server",
-                "localhost:9092",
+                "localhost:9093",
                 "--partitions",
                 "1",
                 "--replication-factor",
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
index 771bb5b5396..b5cb3f268fc 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
@@ -58,9 +58,9 @@ import 
org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -75,7 +75,7 @@ import org.testng.annotations.Test;
  */
 @Slf4j
 public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
-    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "7.8.2");
+    public static final String CONFLUENT_PLATFORM_VERSION = 
System.getProperty("confluent.version", "8.1.1");
 
     private static final String SOURCE_TYPE = "kafka";
 
@@ -126,7 +126,7 @@ public class AvroKafkaSourceTest extends 
PulsarFunctionsTestBase {
         );
     }
 
-    private class EnhancedKafkaContainer extends KafkaContainer {
+    private class EnhancedKafkaContainer extends ConfluentKafkaContainer {
 
         public EnhancedKafkaContainer(DockerImageName dockerImageName) {
             super(dockerImageName);
@@ -138,7 +138,7 @@ public class AvroKafkaSourceTest extends 
PulsarFunctionsTestBase {
             // because we want the Kafka Broker to advertise itself
             // with the docker network address
             // otherwise the Kafka Schema Registry won't work
-            return "PLAINTEXT://" + kafkaContainerName + ":9093";
+            return kafkaContainerName + ":9093";
         }
 
     }
@@ -146,7 +146,6 @@ public class AvroKafkaSourceTest extends 
PulsarFunctionsTestBase {
     protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster 
cluster) {
         return (EnhancedKafkaContainer) new EnhancedKafkaContainer(
                 DockerImageName.parse("confluentinc/cp-kafka:" + 
CONFLUENT_PLATFORM_VERSION))
-                .withEmbeddedZookeeper()
                 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
                         .withName(kafkaContainerName)
                 );
@@ -380,7 +379,7 @@ public class AvroKafkaSourceTest extends 
PulsarFunctionsTestBase {
         // and execute it
         String bashFileTemplate = "echo '" + payload + "' "
                 + "| /usr/bin/kafka-avro-console-producer "
-                + "--broker-list " + getBootstrapServersOnDockerNetwork() + " "
+                + "--bootstrap-server " + getBootstrapServersOnDockerNetwork() 
+ " "
                 + "--property 'value.schema=" + schemaDef + "' "
                 + "--property schema.registry.url=" + 
getRegistryAddressInDockerNetwork() + " "
                 + "--topic " + kafkaTopicName;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
index e3fe0f9a5bd..e315079272c 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
@@ -21,20 +21,16 @@ package org.apache.pulsar.tests.integration.io.sources;
 import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
 
 /**
  * A tester for testing kafka source.
@@ -48,14 +44,12 @@ public class KafkaSourceTester extends 
SourceTester<KafkaContainer> {
 
     private KafkaContainer kafkaContainer;
 
-    private KafkaConsumer<String, String> kafkaConsumer;
-
     public KafkaSourceTester(String containerName) {
         super(SOURCE_TYPE);
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_source_topic_" + suffix;
 
-        sourceConfig.put("bootstrapServers", containerName + ":9092");
+        sourceConfig.put("bootstrapServers", containerName + ":9093");
         sourceConfig.put("groupId", "test-source-group");
         sourceConfig.put("fetchMinBytes", 1L);
         sourceConfig.put("autoCommitIntervalMs", 10L);
@@ -73,10 +67,10 @@ public class KafkaSourceTester extends 
SourceTester<KafkaContainer> {
     @Override
     public void prepareSource() throws Exception {
         ExecResult execResult = kafkaContainer.execInContainer(
-            "/usr/bin/kafka-topics",
+            "/opt/kafka/bin/kafka-topics.sh",
             "--create",
             "--bootstrap-server",
-            "localhost:9092",
+            "localhost:9093",
             "--partitions",
             "1",
             "--replication-factor",
@@ -86,17 +80,6 @@ public class KafkaSourceTester extends 
SourceTester<KafkaContainer> {
         assertTrue(
             execResult.getStdout().contains("Created topic"),
             execResult.getStdout());
-
-        kafkaConsumer = new KafkaConsumer<>(
-            ImmutableMap.of(
-                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers(),
-                ConsumerConfig.GROUP_ID_CONFIG, "source-test-" + randomName(8),
-                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
-            ),
-            new StringDeserializer(),
-            new StringDeserializer()
-        );
-        kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
         log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
     }
 
@@ -145,9 +128,6 @@ public class KafkaSourceTester extends 
SourceTester<KafkaContainer> {
 
     @Override
     public void close() throws Exception {
-        if (kafkaConsumer != null) {
-            kafkaConsumer.close();
-            kafkaConsumer = null;
-        }
+
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 18655e6efeb..7fb6bde9d87 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -23,6 +23,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.io.sources.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
@@ -63,8 +64,46 @@ public class DebeziumMongoDbSourceTester extends 
SourceTester<DebeziumMongoDbCon
 
     @Override
     public void prepareSource() throws Exception {
+        waitForMongoDbReady();
         this.debeziumMongoDbContainer.execCmd("bash", "-c", 
"/usr/local/bin/init-inventory.sh");
-        log.info("debezium mongodb server already contains preconfigured 
data.");
+        waitForReplicaSetPrimary();
+    }
+
+    private void waitForMongoDbReady() throws Exception {
+        log.info("Waiting for MongoDB to be ready...");
+        for (int i = 0; i < 50; i++) {
+            try {
+                ContainerExecResult result = 
this.debeziumMongoDbContainer.execCmd(
+                        "/bin/bash", "-c",
+                        "mongosh --quiet --eval \"db.adminCommand('ping').ok\" 
localhost:27017 | grep 1");
+                if (result.getExitCode() == 0) {
+                    log.info("MongoDB ready after {} seconds", i);
+                    return;
+                }
+            } catch (Exception e) {
+                log.debug("MongoDB readiness check attempt {} failed: {}", i + 
1, e.getMessage());
+            }
+            Thread.sleep(1000);
+        }
+        throw new RuntimeException("MongoDB not ready after 50 seconds");
+    }
+
+    private void waitForReplicaSetPrimary() throws Exception {
+        log.info("Waiting for MongoDB replica set primary to be ready...");
+        for (int i = 0; i < 60; i++) {
+            try {
+                ContainerExecResult result = 
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
+                        "mongosh --quiet --eval 'db.hello().isWritablePrimary' 
localhost:27017");
+                if (result.getStdout().trim().contains("true")) {
+                    log.info("MongoDB replica set primary ready after {} 
seconds", i);
+                    return;
+                }
+            } catch (Exception e) {
+                log.debug("MongoDB primary check attempt {} failed: {}", i + 
1, e.getMessage());
+            }
+            Thread.sleep(1000);
+        }
+        throw new RuntimeException("MongoDB replica set primary not ready 
after 60 seconds");
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index f2ca3de9749..a28d26acc10 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -60,7 +60,7 @@ public class DebeziumMsSqlSourceTester extends 
SourceTester<DebeziumMsSqlContain
         sourceConfig.put("database.password", 
DebeziumMsSqlContainer.SA_PASSWORD);
         sourceConfig.put("database.names", "TestDB");
         sourceConfig.put("database.encrypt", "false");
-        sourceConfig.put("snapshot.mode", "schema_only");
+        sourceConfig.put("snapshot.mode", "no_data");
         sourceConfig.put("schema.history.internal.pulsar.topic", 
"debezium-schema-history-mssql");
         sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);
         sourceConfig.put("topic.prefix", "mssql");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 590c3b6c308..dc7765550e6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -83,37 +83,26 @@ public class DebeziumMySqlSourceTester extends 
SourceTester<DebeziumMySQLContain
 
     @Override
     public void prepareInsertEvent() throws Exception {
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM 
inventory.products'");
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium "
-                        + "-e \"INSERT INTO inventory.products(name, 
description, weight) "
-                        + "values('test-debezium', 'This is description', 
2.0)\"");
+        executeSql("SELECT * FROM inventory.products");
+        executeSql("INSERT INTO inventory.products(name, description, weight) "
+                + "values('test-debezium', 'This is description', 2.0)");
+    }
+
+    private void executeSql(String sqlStatement) throws Exception {
+        this.debeziumMySqlContainer.execCmd("mysql", "-u", "root", 
"-pdebezium", "-e", sqlStatement);
     }
 
     @Override
     public void prepareUpdateEvent() throws Exception {
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium "
-                        + "-e \"UPDATE inventory.products set 
description='update description', weight=10 "
-                        + "WHERE name='test-debezium'\"");
+        executeSql("UPDATE inventory.products set description='update 
description', weight=10 "
+                + "WHERE name='test-debezium'");
     }
 
     @Override
     public void prepareDeleteEvent() throws Exception {
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM 
inventory.products'");
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium "
-                        + "-e \"DELETE FROM inventory.products WHERE 
name='test-debezium'\"");
-        this.debeziumMySqlContainer.execCmd(
-                "/bin/bash", "-c",
-                "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM 
inventory.products'");
+        executeSql("SELECT * FROM inventory.products");
+        executeSql("DELETE FROM inventory.products WHERE 
name='test-debezium'");
+        executeSql("SELECT * FROM inventory.products");
     }
 
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index ab76ccefcb5..00c84b6aa3a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -60,7 +60,7 @@ public class DebeziumOracleDbSourceTester extends 
SourceTester<DebeziumOracleDbC
         sourceConfig.put("database.password", "dbz");
         sourceConfig.put("topic.prefix", "XE");
         sourceConfig.put("database.dbname", "XE");
-        sourceConfig.put("snapshot.mode", "schema_only");
+        sourceConfig.put("snapshot.mode", "no_data");
 
         sourceConfig.put("schema.include.list", "inv");
         sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);


Reply via email to