This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c958b937d85 [improve][io] Upgrade Debezium to 3.4.2 and Kafka
Client/Connect to 4.1.1 (#25335)
c958b937d85 is described below
commit c958b937d85b901526383fcd853da6afc4d3b15e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 18 23:58:01 2026 +0200
[improve][io] Upgrade Debezium to 3.4.2 and Kafka Client/Connect to 4.1.1
(#25335)
---
pom.xml | 8 ++---
pulsar-io/debezium/core/pom.xml | 32 +++++++++++++++--
pulsar-io/kafka-connect-adaptor/pom.xml | 24 +++++++++++++
.../kafka/connect/AbstractKafkaConnectSource.java | 6 ++--
.../kafka/connect/PulsarIOSourceTaskContext.java | 6 ++++
.../io/kafka/connect/PulsarKafkaSinkContext.java | 6 ++++
.../kafka/connect/PulsarKafkaSinkTaskContext.java | 6 ++++
tests/integration/pom.xml | 2 ++
.../containers/DebeziumMongoDbContainer.java | 14 +++++---
.../containers/DebeziumMySQLContainer.java | 3 +-
.../containers/DebeziumPostgreSqlContainer.java | 3 +-
.../integration/io/sinks/KafkaSinkTester.java | 14 ++++----
.../io/sources/AvroKafkaSourceTest.java | 11 +++---
.../integration/io/sources/KafkaSourceTester.java | 30 +++-------------
.../debezium/DebeziumMongoDbSourceTester.java | 41 +++++++++++++++++++++-
.../debezium/DebeziumMsSqlSourceTester.java | 2 +-
.../debezium/DebeziumMySqlSourceTester.java | 35 +++++++-----------
.../debezium/DebeziumOracleDbSourceTester.java | 2 +-
18 files changed, 165 insertions(+), 80 deletions(-)
diff --git a/pom.xml b/pom.xml
index cc47242f89d..c2b7abbb643 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,7 +234,7 @@ flexible messaging model and an intuitive client
API.</description>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra.version>3.11.2</cassandra.version>
<aerospike-client.version>4.5.0</aerospike-client.version>
- <kafka-client.version>3.9.1</kafka-client.version>
+ <kafka-client.version>4.1.1</kafka-client.version>
<rabbitmq-client.version>5.28.0</rabbitmq-client.version>
<aws-sdk.version>1.12.788</aws-sdk.version>
<aws-sdk2.version>2.32.28</aws-sdk2.version>
@@ -243,14 +243,14 @@ flexible messaging model and an intuitive client
API.</description>
<jclouds.version>2.6.0</jclouds.version>
<guice.version>5.1.0</guice.version>
<sqlite-jdbc.version>3.47.1.0</sqlite-jdbc.version>
- <postgresql-jdbc.version>42.7.7</postgresql-jdbc.version>
+ <postgresql-jdbc.version>42.7.10</postgresql-jdbc.version>
<clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version>
<mariadb-jdbc.version>3.5.5</mariadb-jdbc.version>
<openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
<json-smart.version>2.5.2</json-smart.version>
<opensearch.version>2.19.4</opensearch.version>
<elasticsearch-java.version>8.15.3</elasticsearch-java.version>
- <debezium.version>3.2.5.Final</debezium.version>
+ <debezium.version>3.4.2.Final</debezium.version>
<debezium.postgresql.version>${postgresql-jdbc.version}</debezium.postgresql.version>
<debezium.mysql.version>9.4.0</debezium.mysql.version>
<jsonwebtoken.version>0.13.0</jsonwebtoken.version>
@@ -261,7 +261,7 @@ flexible messaging model and an intuitive client
API.</description>
<hbase.version>2.6.3-hadoop3</hbase.version>
<guava.version>33.4.8-jre</guava.version>
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
- <confluent.version>7.9.2</confluent.version>
+ <confluent.version>8.1.1</confluent.version>
<aircompressor.version>2.0.3</aircompressor.version>
<asynchttpclient.version>2.12.4</asynchttpclient.version>
<commons-lang3.version>3.19.0</commons-lang3.version>
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 2470dfc8e85..1860f76f2ee 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -78,17 +78,45 @@
<artifactId>kafka-log4j-appender</artifactId>
</exclusion>
<exclusion>
- <artifactId>jose4j</artifactId>
- <groupId>org.bitbucket.b_c</groupId>
+ <groupId>org.slf4</groupId>
+ <artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty.ee10</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jose4j</artifactId>
+ <groupId>org.bitbucket.b_c</groupId>
+ </exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.jakarta.rs</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.activation</groupId>
+ <artifactId>activation</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml
b/pulsar-io/kafka-connect-adaptor/pom.xml
index 3b125330b02..55808b4cf21 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -85,6 +85,10 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty.ee10</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
<exclusion>
<artifactId>jose4j</artifactId>
<groupId>org.bitbucket.b_c</groupId>
@@ -93,6 +97,26 @@
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.jakarta.rs</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.activation</groupId>
+ <artifactId>activation</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 6ed9e7891b2..47f691af121 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -22,7 +22,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -105,11 +105,11 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
if (keyConverter instanceof AvroConverter) {
keyConverter = new AvroConverter(new MockSchemaRegistryClient());
-
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
+
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
}
if (valueConverter instanceof AvroConverter) {
valueConverter = new AvroConverter(new MockSchemaRegistryClient());
-
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
+
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
}
keyConverter.configure(config, true);
valueConverter.configure(config, false);
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
index ab74ac4aa29..9afc4d50fbd 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.kafka.connect;
import java.util.Map;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
@@ -41,4 +42,9 @@ class PulsarIOSourceTaskContext implements SourceTaskContext {
public OffsetStorageReader offsetStorageReader() {
return reader;
}
+
+ @Override
+ public PluginMetrics pluginMetrics() {
+ return null;
+ }
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
index d92ae8eeb6d..dabc6e527a3 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.kafka.connect;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.connect.connector.ConnectorContext;
@Slf4j
@@ -33,4 +34,9 @@ public class PulsarKafkaSinkContext implements
ConnectorContext {
public void raiseError(Exception e) {
throw new UnsupportedOperationException("not implemented", e);
}
+
+ @Override
+ public PluginMetrics pluginMetrics() {
+ return null;
+ }
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 760799e0daa..76ff8c0c0a1 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -37,6 +37,7 @@ import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -233,6 +234,11 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
log.warn("requestCommit() is called but is not supported currently.");
}
+ @Override
+ public PluginMetrics pluginMetrics() {
+ return null;
+ }
+
public void flushOffsets(Map<TopicPartition, OffsetAndMetadata> offsets)
throws Exception {
Map<ByteBuffer, ByteBuffer> offsetMap =
Maps.newHashMapWithExpectedSize(offsets.size());
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index ee940a36e7c..ebac4506ebc 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -377,7 +377,9 @@
</argLine>
<systemPropertyVariables>
<confluent.version>${confluent.version}</confluent.version>
+ <kafka.version>${kafka-client.version}</kafka.version>
<jacoco.version>${jacoco-maven-plugin.version}</jacoco.version>
+ <debezium.version>${debezium.version}</debezium.version>
<integrationtest.coverage.enabled>${integrationtest.coverage.enabled}</integrationtest.coverage.enabled>
<integrationtest.coverage.dir>${integrationtest.coverage.dir}</integrationtest.coverage.dir>
<inttest.asyncprofiler.opts>${inttest.asyncprofiler.opts}</inttest.asyncprofiler.opts>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
index 6fa2e9ff471..29d153955e7 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -19,18 +19,19 @@
package org.apache.pulsar.tests.integration.containers;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
public class DebeziumMongoDbContainer extends
ChaosContainer<DebeziumMongoDbContainer> {
public static final String NAME = "debezium-mongodb-example";
public static final Integer[] PORTS = { 27017 };
- private static final String IMAGE_NAME =
"debezium/example-mongodb:3.0.0.Final";
+ private static final String IMAGE_NAME =
+ "quay.io/debezium/example-mongodb:" +
System.getProperty("debezium.version", "3.4.2.Final");
public DebeziumMongoDbContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
- this.withEnv("MONGODB_USER", "mongodb");
- this.withEnv("MONGODB_PASSWORD", "mongodb");
}
@Override
public String getContainerName() {
@@ -42,10 +43,15 @@ public class DebeziumMongoDbContainer extends
ChaosContainer<DebeziumMongoDbCont
super.configure();
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
+ .withEnv("MONGODB_USER", "debezium")
+ .withEnv("MONGODB_PASSWORD", "dbz")
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(getContainerName());
})
- .waitingFor(new HostPortWaitStrategy());
+ .waitingFor(new WaitAllStrategy()
+ .withStrategy(new HostPortWaitStrategy())
+ .withStrategy(Wait.forLogMessage(".*MongoDB init
process complete.*", 1))
+ );
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index cf59cda868d..28c118a3e76 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -26,7 +26,8 @@ public class DebeziumMySQLContainer extends
ChaosContainer<DebeziumMySQLContaine
public static final String NAME = "debezium-mysql-example";
static final Integer[] PORTS = { 3306 };
- private static final String IMAGE_NAME =
"debezium/example-mysql:3.0.0.Final";
+ private static final String IMAGE_NAME =
+ "quay.io/debezium/example-mysql:" +
System.getProperty("debezium.version", "3.4.2.Final");
public DebeziumMySQLContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index 4fd391fd926..2dbc259cf73 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -26,7 +26,8 @@ public class DebeziumPostgreSqlContainer extends
ChaosContainer<DebeziumPostgreS
public static final String NAME = "debezium-postgresql-example";
static final Integer[] PORTS = { 5432 };
- private static final String IMAGE_NAME =
"debezium/example-postgres:3.0.0.Final";
+ private static final String IMAGE_NAME =
+ "quay.io/debezium/example-postgres:" +
System.getProperty("debezium.version", "3.4.2.Final");
public DebeziumPostgreSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
index dbcb1639c11..6bb98964930 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
@@ -34,7 +34,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
/**
@@ -42,7 +42,7 @@ import org.testcontainers.utility.DockerImageName;
*/
@Slf4j
public class KafkaSinkTester extends SinkTester<KafkaContainer> {
- public static final String CONFLUENT_PLATFORM_VERSION =
System.getProperty("confluent.version", "7.8.2");
+ public static final String KAFKA_VERSION =
System.getProperty("kafka.version", "4.1.1");
private final String kafkaTopicName;
private KafkaConsumer<String, String> kafkaConsumer;
@@ -55,18 +55,16 @@ public class KafkaSinkTester extends
SinkTester<KafkaContainer> {
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_sink_topic_" + suffix;
- sinkConfig.put("bootstrapServers", networkAlias + ":9092");
+ sinkConfig.put("bootstrapServers", networkAlias + ":9093");
sinkConfig.put("acks", "all");
sinkConfig.put("batchSize", 1L);
sinkConfig.put("maxRequestSize", 1048576L);
sinkConfig.put("topic", kafkaTopicName);
}
- @SuppressWarnings("deprecation")
@Override
protected KafkaContainer createSinkService(PulsarCluster cluster) {
- return new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" +
CONFLUENT_PLATFORM_VERSION))
- .withEmbeddedZookeeper()
+ return new KafkaContainer(DockerImageName.parse("apache/kafka:" +
KAFKA_VERSION))
.withNetworkAliases(containerName)
.withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
.withName(containerName)
@@ -76,10 +74,10 @@ public class KafkaSinkTester extends
SinkTester<KafkaContainer> {
@Override
public void prepareSink() throws Exception {
ExecResult execResult = serviceContainer.execInContainer(
- "/usr/bin/kafka-topics",
+ "/opt/kafka/bin/kafka-topics.sh",
"--create",
"--bootstrap-server",
- "localhost:9092",
+ "localhost:9093",
"--partitions",
"1",
"--replication-factor",
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
index 771bb5b5396..b5cb3f268fc 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/AvroKafkaSourceTest.java
@@ -58,9 +58,9 @@ import
org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -75,7 +75,7 @@ import org.testng.annotations.Test;
*/
@Slf4j
public class AvroKafkaSourceTest extends PulsarFunctionsTestBase {
- public static final String CONFLUENT_PLATFORM_VERSION =
System.getProperty("confluent.version", "7.8.2");
+ public static final String CONFLUENT_PLATFORM_VERSION =
System.getProperty("confluent.version", "8.1.1");
private static final String SOURCE_TYPE = "kafka";
@@ -126,7 +126,7 @@ public class AvroKafkaSourceTest extends
PulsarFunctionsTestBase {
);
}
- private class EnhancedKafkaContainer extends KafkaContainer {
+ private class EnhancedKafkaContainer extends ConfluentKafkaContainer {
public EnhancedKafkaContainer(DockerImageName dockerImageName) {
super(dockerImageName);
@@ -138,7 +138,7 @@ public class AvroKafkaSourceTest extends
PulsarFunctionsTestBase {
// because we want the Kafka Broker to advertise itself
// with the docker network address
// otherwise the Kafka Schema Registry won't work
- return "PLAINTEXT://" + kafkaContainerName + ":9093";
+ return kafkaContainerName + ":9093";
}
}
@@ -146,7 +146,6 @@ public class AvroKafkaSourceTest extends
PulsarFunctionsTestBase {
protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster
cluster) {
return (EnhancedKafkaContainer) new EnhancedKafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:" +
CONFLUENT_PLATFORM_VERSION))
- .withEmbeddedZookeeper()
.withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
.withName(kafkaContainerName)
);
@@ -380,7 +379,7 @@ public class AvroKafkaSourceTest extends
PulsarFunctionsTestBase {
// and execute it
String bashFileTemplate = "echo '" + payload + "' "
+ "| /usr/bin/kafka-avro-console-producer "
- + "--broker-list " + getBootstrapServersOnDockerNetwork() + " "
+ + "--bootstrap-server " + getBootstrapServersOnDockerNetwork()
+ " "
+ "--property 'value.schema=" + schemaDef + "' "
+ "--property schema.registry.url=" +
getRegistryAddressInDockerNetwork() + " "
+ "--topic " + kafkaTopicName;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
index e3fe0f9a5bd..e315079272c 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
@@ -21,20 +21,16 @@ package org.apache.pulsar.tests.integration.io.sources;
import static
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
/**
* A tester for testing kafka source.
@@ -48,14 +44,12 @@ public class KafkaSourceTester extends
SourceTester<KafkaContainer> {
private KafkaContainer kafkaContainer;
- private KafkaConsumer<String, String> kafkaConsumer;
-
public KafkaSourceTester(String containerName) {
super(SOURCE_TYPE);
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_source_topic_" + suffix;
- sourceConfig.put("bootstrapServers", containerName + ":9092");
+ sourceConfig.put("bootstrapServers", containerName + ":9093");
sourceConfig.put("groupId", "test-source-group");
sourceConfig.put("fetchMinBytes", 1L);
sourceConfig.put("autoCommitIntervalMs", 10L);
@@ -73,10 +67,10 @@ public class KafkaSourceTester extends
SourceTester<KafkaContainer> {
@Override
public void prepareSource() throws Exception {
ExecResult execResult = kafkaContainer.execInContainer(
- "/usr/bin/kafka-topics",
+ "/opt/kafka/bin/kafka-topics.sh",
"--create",
"--bootstrap-server",
- "localhost:9092",
+ "localhost:9093",
"--partitions",
"1",
"--replication-factor",
@@ -86,17 +80,6 @@ public class KafkaSourceTester extends
SourceTester<KafkaContainer> {
assertTrue(
execResult.getStdout().contains("Created topic"),
execResult.getStdout());
-
- kafkaConsumer = new KafkaConsumer<>(
- ImmutableMap.of(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
- ConsumerConfig.GROUP_ID_CONFIG, "source-test-" + randomName(8),
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
- ),
- new StringDeserializer(),
- new StringDeserializer()
- );
- kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
}
@@ -145,9 +128,6 @@ public class KafkaSourceTester extends
SourceTester<KafkaContainer> {
@Override
public void close() throws Exception {
- if (kafkaConsumer != null) {
- kafkaConsumer.close();
- kafkaConsumer = null;
- }
+
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 18655e6efeb..7fb6bde9d87 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -23,6 +23,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -63,8 +64,46 @@ public class DebeziumMongoDbSourceTester extends
SourceTester<DebeziumMongoDbCon
@Override
public void prepareSource() throws Exception {
+ waitForMongoDbReady();
this.debeziumMongoDbContainer.execCmd("bash", "-c",
"/usr/local/bin/init-inventory.sh");
- log.info("debezium mongodb server already contains preconfigured
data.");
+ waitForReplicaSetPrimary();
+ }
+
+ private void waitForMongoDbReady() throws Exception {
+ log.info("Waiting for MongoDB to be ready...");
+ for (int i = 0; i < 50; i++) {
+ try {
+ ContainerExecResult result =
this.debeziumMongoDbContainer.execCmd(
+ "/bin/bash", "-c",
+ "mongosh --quiet --eval \"db.adminCommand('ping').ok\"
localhost:27017 | grep 1");
+ if (result.getExitCode() == 0) {
+ log.info("MongoDB ready after {} seconds", i);
+ return;
+ }
+ } catch (Exception e) {
+ log.debug("MongoDB readiness check attempt {} failed: {}", i +
1, e.getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ throw new RuntimeException("MongoDB not ready after 50 seconds");
+ }
+
+ private void waitForReplicaSetPrimary() throws Exception {
+ log.info("Waiting for MongoDB replica set primary to be ready...");
+ for (int i = 0; i < 60; i++) {
+ try {
+ ContainerExecResult result =
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
+ "mongosh --quiet --eval 'db.hello().isWritablePrimary'
localhost:27017");
+ if (result.getStdout().trim().contains("true")) {
+ log.info("MongoDB replica set primary ready after {}
seconds", i);
+ return;
+ }
+ } catch (Exception e) {
+ log.debug("MongoDB primary check attempt {} failed: {}", i +
1, e.getMessage());
+ }
+ Thread.sleep(1000);
+ }
+ throw new RuntimeException("MongoDB replica set primary not ready
after 60 seconds");
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index f2ca3de9749..a28d26acc10 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -60,7 +60,7 @@ public class DebeziumMsSqlSourceTester extends
SourceTester<DebeziumMsSqlContain
sourceConfig.put("database.password",
DebeziumMsSqlContainer.SA_PASSWORD);
sourceConfig.put("database.names", "TestDB");
sourceConfig.put("database.encrypt", "false");
- sourceConfig.put("snapshot.mode", "schema_only");
+ sourceConfig.put("snapshot.mode", "no_data");
sourceConfig.put("schema.history.internal.pulsar.topic",
"debezium-schema-history-mssql");
sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);
sourceConfig.put("topic.prefix", "mssql");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 590c3b6c308..dc7765550e6 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -83,37 +83,26 @@ public class DebeziumMySqlSourceTester extends
SourceTester<DebeziumMySQLContain
@Override
public void prepareInsertEvent() throws Exception {
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM
inventory.products'");
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium "
- + "-e \"INSERT INTO inventory.products(name,
description, weight) "
- + "values('test-debezium', 'This is description',
2.0)\"");
+ executeSql("SELECT * FROM inventory.products");
+ executeSql("INSERT INTO inventory.products(name, description, weight) "
+ + "values('test-debezium', 'This is description', 2.0)");
+ }
+
+ private void executeSql(String sqlStatement) throws Exception {
+ this.debeziumMySqlContainer.execCmd("mysql", "-u", "root",
"-pdebezium", "-e", sqlStatement);
}
@Override
public void prepareUpdateEvent() throws Exception {
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium "
- + "-e \"UPDATE inventory.products set
description='update description', weight=10 "
- + "WHERE name='test-debezium'\"");
+ executeSql("UPDATE inventory.products set description='update
description', weight=10 "
+ + "WHERE name='test-debezium'");
}
@Override
public void prepareDeleteEvent() throws Exception {
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM
inventory.products'");
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium "
- + "-e \"DELETE FROM inventory.products WHERE
name='test-debezium'\"");
- this.debeziumMySqlContainer.execCmd(
- "/bin/bash", "-c",
- "mysql -h 127.0.0.1 -u root -pdebezium -e 'SELECT * FROM
inventory.products'");
+ executeSql("SELECT * FROM inventory.products");
+ executeSql("DELETE FROM inventory.products WHERE
name='test-debezium'");
+ executeSql("SELECT * FROM inventory.products");
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index ab76ccefcb5..00c84b6aa3a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -60,7 +60,7 @@ public class DebeziumOracleDbSourceTester extends
SourceTester<DebeziumOracleDbC
sourceConfig.put("database.password", "dbz");
sourceConfig.put("topic.prefix", "XE");
sourceConfig.put("database.dbname", "XE");
- sourceConfig.put("snapshot.mode", "schema_only");
+ sourceConfig.put("snapshot.mode", "no_data");
sourceConfig.put("schema.include.list", "inv");
sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);