This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git

The following commit(s) were added to refs/heads/master by this push:
     new d65a505  [Improve](case) add case for debezium update, delete and avro 
convert (#65)
d65a505 is described below

commit d65a5055d1ea9328efec3a97b97710fa06f6d2ba
Author: wudi <676366...@qq.com>
AuthorDate: Mon Apr 7 10:22:44 2025 +0800

    [Improve](case) add case for debezium update, delete and avro convert (#65)
---
 .../connector/e2e/kafka/KafkaContainerService.java |   4 +
 .../e2e/kafka/KafkaContainerServiceImpl.java       |  50 +++++-
 .../e2e/sink/AbstractKafka2DorisSink.java          |  44 ++++-
 .../e2e/sink/avro/AbstractAvroE2ESinkTest.java     |  98 ++++++++++
 .../connector/e2e/sink/avro/AvroMsgE2ETest.java    | 198 +++++++++++++++++++++
 .../e2e/sink/stringconverter/StringMsgE2ETest.java |  61 ++++---
 .../e2e/avro_converter/confluent_avro_convert.json |  23 +++
 .../e2e/avro_converter/confluent_avro_tab.sql      |  11 ++
 .../e2e/avro_converter/doris_avro_convert.json     |  21 +++
 .../e2e/avro_converter/doris_avro_tab.sql          |  11 ++
 .../e2e/string_converter/debezium_dml_event.json   |  25 +++
 .../string_converter/debezium_dml_event_tab.sql    |  11 ++
 12 files changed, 529 insertions(+), 28 deletions(-)

diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
index 45616c4..2e9dc8d 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java
@@ -25,6 +25,10 @@ public interface KafkaContainerService {
 
     void startContainer();
 
+    void startSchemaRegistry();
+
+    String getSchemaRegistryUrl();
+
     void startConnector();
 
     String getInstanceHostAndPort();
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
index 4e38ab3..ec39f49 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java
@@ -26,6 +26,7 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -41,7 +42,10 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.kafka.connect.cli.ConnectDistributed;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerImageName;
 
 public class KafkaContainerServiceImpl implements KafkaContainerService {
@@ -60,9 +64,12 @@ public class KafkaContainerServiceImpl implements 
KafkaContainerService {
     private String kafkaServerHost;
     private int kafkaServerPort;
     private static final String CONNECT_PORT = "8083";
+    private static final String REGISTRY_PORT = "8081";
     private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
     private static final int MAX_RETRIES = 5;
+    private GenericContainer schemaRegistryContainer;
+    private static Network network = Network.SHARED;
 
     @Override
     public String getInstanceHostAndPort() {
@@ -148,7 +155,7 @@ public class KafkaContainerServiceImpl implements 
KafkaContainerService {
     public void startContainer() {
         LOG.info("kafka server is about to be initialized.");
         kafkaContainer = new 
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
-
+        kafkaContainer.withNetwork(network).withNetworkAliases("kafka");
         kafkaContainer.start();
         try {
             Thread.sleep(5000);
@@ -166,12 +173,53 @@ public class KafkaContainerServiceImpl implements 
KafkaContainerService {
                 kafkaServerPort);
     }
 
+    @Override
+    public void startSchemaRegistry() {
+        LOG.info("start schema registry.");
+        schemaRegistryContainer =
+                new GenericContainer<>("confluentinc/cp-schema-registry:7.6.1")
+                        .withNetwork(network)
+                        .withExposedPorts(8081)
+                        .withNetworkAliases("schema-registry")
+                        .withEnv("SCHEMA_REGISTRY_HOST_NAME", 
"schema-registry")
+                        .withEnv(
+                                "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
+                                "PLAINTEXT://kafka:9092")
+                        .withEnv("SCHEMA_REGISTRY_LISTENERS", 
"http://0.0.0.0:8081";)
+                        .waitingFor(
+                                
Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2)));
+
+        schemaRegistryContainer.start();
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            throw new DorisException(e);
+        }
+        LOG.info(
+                "start schema registry successfuly, with "
+                        + schemaRegistryContainer.getHost()
+                        + ":"
+                        + schemaRegistryContainer.getMappedPort(8081));
+    }
+
+    @Override
+    public String getSchemaRegistryUrl() {
+        return "http://";
+                + schemaRegistryContainer.getHost()
+                + ":"
+                + schemaRegistryContainer.getMappedPort(8081);
+    }
+
     @Override
     public void close() {
         LOG.info("Kafka server is about to be shut down.");
         shutdownConnector();
         kafkaContainer.close();
         LOG.info("Kafka server shuts down successfully.");
+        if (schemaRegistryContainer != null) {
+            schemaRegistryContainer.close();
+            LOG.info("Kafka schema registry shuts down successfully.");
+        }
     }
 
     private void shutdownConnector() {
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
index ba919b6..78fd626 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java
@@ -28,8 +28,12 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
@@ -39,6 +43,7 @@ import 
org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService;
 import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl;
 import org.apache.doris.kafka.connector.exception.DorisException;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +77,17 @@ public abstract class AbstractKafka2DorisSink {
         kafkaInstanceHostAndPort = 
kafkaContainerService.getInstanceHostAndPort();
     }
 
+    protected static void initSchemaRegistry() {
+        if (Objects.isNull(kafkaContainerService)) {
+            return;
+        }
+        kafkaContainerService.startSchemaRegistry();
+    }
+
+    protected static String getSchemaRegistryUrl() {
+        return kafkaContainerService.getSchemaRegistryUrl();
+    }
+
     protected static Connection getJdbcConnection() {
         try {
             return DriverManager.getConnection(
@@ -152,7 +168,31 @@ public abstract class AbstractKafka2DorisSink {
 
     @AfterClass
     public static void close() {
-        kafkaContainerService.close();
-        dorisContainerService.close();
+        // Closed automatically, multiple itcases can be reused
+        // kafkaContainerService.close();
+        // dorisContainerService.close();
+    }
+
+    public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
+        List<String> actual = new ArrayList<>();
+
+        try (Statement statement = getJdbcConnection().createStatement()) {
+            ResultSet sinkResultSet = statement.executeQuery(query);
+            while (sinkResultSet.next()) {
+                List<String> row = new ArrayList<>();
+                for (int i = 1; i <= columnSize; i++) {
+                    Object value = sinkResultSet.getObject(i);
+                    if (value == null) {
+                        row.add("null");
+                    } else {
+                        row.add(value.toString());
+                    }
+                }
+                actual.add(StringUtils.join(row, ","));
+            }
+        }
+        LOG.info("expected result: {}", Arrays.toString(expected.toArray()));
+        LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
+        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
     }
 }
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java
new file mode 100644
index 0000000..d6e11df
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.avro;
+
+import java.util.Objects;
+import java.util.Properties;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.doris.kafka.connector.e2e.sink.AbstractKafka2DorisSink;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Produce messages with avro. 1.Manage your own schema 2.Use a schema 
registry */
+public abstract class AbstractAvroE2ESinkTest extends AbstractKafka2DorisSink {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAvroE2ESinkTest.class);
+    private static KafkaProducer<String, byte[]> producer;
+    private static KafkaProducer<String, GenericRecord> avroProducer;
+
+    public static void initByteProducer() {
+        if (Objects.nonNull(producer)) {
+            return;
+        }
+        // Producer properties
+        Properties producerProperties = new Properties();
+        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaInstanceHostAndPort);
+        producerProperties.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                org.apache.kafka.common.serialization.StringSerializer.class);
+        producerProperties.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                
org.apache.kafka.common.serialization.ByteArraySerializer.class);
+        producer = new KafkaProducer<>(producerProperties);
+        LOG.info("kafka producer started successfully.");
+    }
+
+    public static void initAvroProducer() {
+        if (Objects.nonNull(avroProducer)) {
+            return;
+        }
+        // Producer properties
+        Properties producerProperties = new Properties();
+        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaInstanceHostAndPort);
+        producerProperties.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                org.apache.kafka.common.serialization.StringSerializer.class);
+        producerProperties.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                io.confluent.kafka.serializers.KafkaAvroSerializer.class);
+        producerProperties.put("schema.registry.url", getSchemaRegistryUrl());
+        avroProducer = new KafkaProducer<>(producerProperties);
+        LOG.info("kafka avro producer started successfully.");
+    }
+
+    protected void produceMsg2Kafka(String topic, GenericRecord value) {
+        LOG.info("Kafka avro producer will produce msg. topic={}, msg={}", 
topic, value);
+        ProducerRecord<String, GenericRecord> record = new 
ProducerRecord<>(topic, value);
+        avroProducer.send(
+                record,
+                (recordMetadata, e) ->
+                        LOG.info(
+                                "Send avro Callback is {}, with error is ",
+                                recordMetadata.offset(),
+                                e));
+        LOG.info("Kafka avro producer produced msg successfully. topic={}, 
msg={}", topic, value);
+    }
+
+    protected void produceMsg2Kafka(String topic, byte[] value) {
+        LOG.info("Kafka producer will produce msg. topic={}, msg={}", topic, 
value);
+
+        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, 
value);
+        producer.send(
+                record,
+                (recordMetadata, e) ->
+                        LOG.info(
+                                "Send Callback is {}, with error is ", 
recordMetadata.offset(), e));
+
+        LOG.info("Kafka producer produced msg successfully. topic={}, msg={}", 
topic, value);
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
new file mode 100644
index 0000000..d920903
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.e2e.sink.avro;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
+import org.apache.doris.kafka.connector.exception.DorisException;
+import org.apache.doris.kafka.connector.utils.ConfigCheckUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AvroMsgE2ETest extends AbstractAvroE2ESinkTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroMsgE2ETest.class);
+    private static String connectorName;
+    private static String jsonMsgConnectorContent;
+    private static DorisOptions dorisOptions;
+    private static String database;
+
+    @BeforeClass
+    public static void setUp() {
+        initServer();
+    }
+
+    public static void initialize(String connectorPath) {
+        jsonMsgConnectorContent = loadContent(connectorPath);
+        JsonNode rootNode = null;
+        try {
+            rootNode = objectMapper.readTree(jsonMsgConnectorContent);
+        } catch (IOException e) {
+            throw new DorisException("Failed to read content body.", e);
+        }
+        connectorName = rootNode.get(NAME).asText();
+        JsonNode configNode = rootNode.get(CONFIG);
+        Map<String, String> configMap = objectMapper.convertValue(configNode, 
Map.class);
+        configMap.put(ConfigCheckUtils.TASK_ID, "1");
+        Map<String, String> lowerCaseConfigMap =
+                DorisSinkConnectorConfig.convertToLowercase(configMap);
+        DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap);
+        dorisOptions = new DorisOptions(lowerCaseConfigMap);
+        database = dorisOptions.getDatabase();
+        createDatabase(database);
+        setTimeZone();
+    }
+
+    private static void setTimeZone() {
+        executeSql(getJdbcConnection(), "set global time_zone = 
'Asia/Shanghai'");
+    }
+
+    @Test
+    public void testConfluentAvroConvert() throws Exception {
+        LOG.info("starting to testConfluentAvroConvert test");
+        initSchemaRegistry();
+        initAvroProducer();
+        
initialize("src/test/resources/e2e/avro_converter/confluent_avro_convert.json");
+
+        // replace file path
+        String connectJson =
+                
loadContent("src/test/resources/e2e/avro_converter/confluent_avro_convert.json");
+        JsonNode jsonNode = new ObjectMapper().readTree(connectJson);
+        ObjectNode configNode = (ObjectNode) jsonNode.get("config");
+
+        configNode.put("key.converter.schema.registry.url", 
getSchemaRegistryUrl());
+        configNode.put("value.converter.schema.registry.url", 
getSchemaRegistryUrl());
+        jsonMsgConnectorContent = new 
ObjectMapper().writeValueAsString(jsonNode);
+
+        String topic = "avro-user-confluent";
+        Schema.Parser parser = new Schema.Parser();
+        Schema schema = 
parser.parse(loadContent("src/test/resources/decode/avro/user.avsc"));
+
+        GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("id", 3);
+        user1.put("name", "kafka-confluent");
+        user1.put("age", 38);
+        produceMsg2Kafka(topic, user1);
+
+        GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("id", 4);
+        user2.put("name", "doris-confluent");
+        user2.put("age", 58);
+        produceMsg2Kafka(topic, user2);
+
+        String tableSql =
+                
loadContent("src/test/resources/e2e/avro_converter/confluent_avro_tab.sql");
+        createTable(tableSql);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+        Thread.sleep(30000);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        List<String> expected = Arrays.asList("3,kafka-confluent,38", 
"4,doris-confluent,58");
+        String query = String.format("select id,name,age from %s.%s order by 
id", database, table);
+        checkResult(expected, query, 3);
+    }
+
+    @Test
+    public void testDorisAvroConvert() throws Exception {
+        LOG.info("starting to testDorisAvroConvert test");
+        initByteProducer();
+        
initialize("src/test/resources/e2e/avro_converter/doris_avro_convert.json");
+        // replace file path
+        String connectJson =
+                
loadContent("src/test/resources/e2e/avro_converter/doris_avro_convert.json");
+        JsonNode jsonNode = new ObjectMapper().readTree(connectJson);
+        ObjectNode configNode = (ObjectNode) jsonNode.get("config");
+
+        String absolutePath = getAbsolutePath("decode/avro/user.avsc");
+        configNode.put(
+                "value.converter.avro.topic2schema.filepath", 
"avro-user:file://" + absolutePath);
+        jsonMsgConnectorContent = new 
ObjectMapper().writeValueAsString(jsonNode);
+
+        String topic = "avro-user";
+        Schema.Parser parser = new Schema.Parser();
+        Schema schema = 
parser.parse(loadContent("src/test/resources/decode/avro/user.avsc"));
+
+        GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("id", 1);
+        user1.put("name", "kafka");
+        user1.put("age", 30);
+        produceMsg2Kafka(topic, convertAvro2Byte(user1, schema));
+
+        GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("id", 2);
+        user2.put("name", "doris");
+        user2.put("age", 18);
+        produceMsg2Kafka(topic, convertAvro2Byte(user2, schema));
+
+        String tableSql = 
loadContent("src/test/resources/e2e/avro_converter/doris_avro_tab.sql");
+        createTable(tableSql);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+        Thread.sleep(30000);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        List<String> expected = Arrays.asList("1,kafka,30", "2,doris,18");
+        String query = String.format("select id,name,age from %s.%s order by 
id", database, table);
+        checkResult(expected, query, 3);
+    }
+
+    @AfterClass
+    public static void closeInstance() {
+        kafkaContainerService.deleteKafkaConnector(connectorName);
+    }
+
+    public static String getAbsolutePath(String fileName) {
+        ClassLoader classLoader = AvroMsgE2ETest.class.getClassLoader();
+        URL resource = classLoader.getResource(fileName);
+        if (resource != null) {
+            return Paths.get(resource.getPath()).toAbsolutePath().toString();
+        } else {
+            return null;
+        }
+    }
+
+    public static byte[] convertAvro2Byte(GenericRecord data, Schema schema) 
throws IOException {
+        DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+        writer.write(data, encoder);
+        encoder.flush();
+        byte[] avroBytes = out.toByteArray();
+        return avroBytes;
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
index dba6d54..8c4f207 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java
@@ -24,12 +24,10 @@ import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.doris.kafka.connector.exception.DorisException;
@@ -190,6 +188,42 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expected, query, 51);
     }
 
+    @Test
+    public void testDebeziumIngestionDMLEvent() throws Exception {
+        
initialize("src/test/resources/e2e/string_converter/debezium_dml_event.json");
+        String insert1 =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal
 [...]
+        String topic = "debezium_dml_event";
+        produceMsg2Kafka(topic, insert1);
+        String tableSql =
+                
loadContent("src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql");
+        createTable(tableSql);
+        Thread.sleep(2000);
+        kafkaContainerService.registerKafkaConnector(connectorName, 
jsonMsgConnectorContent);
+
+        String table = dorisOptions.getTopicMapTable(topic);
+        List<String> expected = Arrays.asList("1,zhangsan,18");
+        Thread.sleep(20000);
+        String query = String.format("select * from %s.%s order by id", 
database, table);
+        checkResult(expected, query, 3);
+
+        // update
+        String update =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal
 [...]
+        produceMsg2Kafka(topic, update);
+        Thread.sleep(20000);
+        List<String> expectedUpdate = Arrays.asList("1,zhangsan,48");
+        checkResult(expectedUpdate, query, 3);
+
+        // delete
+        String delete =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal
 [...]
+        produceMsg2Kafka(topic, delete);
+        Thread.sleep(20000);
+        List<String> expectedDelete = Arrays.asList();
+        checkResult(expectedDelete, query, 3);
+    }
+
     @Test
     public void testTimeExampleTypes() throws Exception {
         initialize("src/test/resources/e2e/string_converter/time_types.json");
@@ -272,29 +306,6 @@ public class StringMsgE2ETest extends 
AbstractStringE2ESinkTest {
         checkResult(expectedResult, query1, 3);
     }
 
-    public void checkResult(List<String> expected, String query, int 
columnSize) throws Exception {
-        List<String> actual = new ArrayList<>();
-
-        try (Statement statement = getJdbcConnection().createStatement()) {
-            ResultSet sinkResultSet = statement.executeQuery(query);
-            while (sinkResultSet.next()) {
-                List<String> row = new ArrayList<>();
-                for (int i = 1; i <= columnSize; i++) {
-                    Object value = sinkResultSet.getObject(i);
-                    if (value == null) {
-                        row.add("null");
-                    } else {
-                        row.add(value.toString());
-                    }
-                }
-                actual.add(StringUtils.join(row, ","));
-            }
-        }
-        LOG.info("expected result: {}", Arrays.toString(expected.toArray()));
-        LOG.info("actual result: {}", Arrays.toString(actual.toArray()));
-        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
-    }
-
     @AfterClass
     public static void closeInstance() {
         kafkaContainerService.deleteKafkaConnector(connectorName);
diff --git a/src/test/resources/e2e/avro_converter/confluent_avro_convert.json 
b/src/test/resources/e2e/avro_converter/confluent_avro_convert.json
new file mode 100644
index 0000000..88dbd11
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/confluent_avro_convert.json
@@ -0,0 +1,23 @@
+{
+  "name":"confluent-avro-test",
+  "config":{
+  "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+  "topics":"avro-user-confluent",
+  "tasks.max":"1",
+  "doris.topic2table.map": "avro-user-confluent:confluent_avro_tab",
+  "buffer.count.records":"1",
+  "buffer.flush.time":"10",
+  "buffer.size.bytes":"10000000",
+  "doris.urls":"127.0.0.1",
+  "doris.user":"root",
+  "doris.password":"",
+  "doris.http.port":"8030",
+  "doris.query.port":"9030",
+  "doris.database":"confluent_avro_convert",
+  "load.model":"stream_load",
+  "key.converter":"io.confluent.connect.avro.AvroConverter",
+  "key.converter.schema.registry.url":"http://127.0.0.1:8081";,
+  "value.converter":"io.confluent.connect.avro.AvroConverter",
+  "value.converter.schema.registry.url":"http://127.0.0.1:8081";
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql 
b/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql
new file mode 100644
index 0000000..5864783
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql
@@ -0,0 +1,11 @@
+CREATE TABLE confluent_avro_convert.confluent_avro_tab
+(
+    `id`    INT,
+    `name`  VARCHAR(256),
+    `age`   SMALLINT,
+)DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/avro_converter/doris_avro_convert.json 
b/src/test/resources/e2e/avro_converter/doris_avro_convert.json
new file mode 100644
index 0000000..0a44ef8
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/doris_avro_convert.json
@@ -0,0 +1,21 @@
+{
+  "name":"avro_sink-connector",
+  "config":{
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"avro-user",
+    "tasks.max":"1",
+    "doris.topic2table.map": "avro-user:doris_avro_tab",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"avro_convert",
+    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
+    
"value.converter":"org.apache.doris.kafka.connector.decode.avro.DorisAvroConverter",
+    
"value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/avro_converter/doris_avro_tab.sql 
b/src/test/resources/e2e/avro_converter/doris_avro_tab.sql
new file mode 100644
index 0000000..a665c32
--- /dev/null
+++ b/src/test/resources/e2e/avro_converter/doris_avro_tab.sql
@@ -0,0 +1,11 @@
+CREATE TABLE avro_convert.doris_avro_tab
+(
+    `id`    INT,
+    `name`  VARCHAR(256),
+    `age`   INT,
+)DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/debezium_dml_event.json 
b/src/test/resources/e2e/string_converter/debezium_dml_event.json
new file mode 100644
index 0000000..73ee118
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/debezium_dml_event.json
@@ -0,0 +1,25 @@
+{
+  "name":"debezium_dml_event",
+  "config": {
+    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
+    "topics":"debezium_dml_event",
+    "tasks.max":"1",
+    "doris.topic2table.map": "debezium_dml_event:debezium_dml_event_tab",
+    "buffer.count.records":"1",
+    "buffer.flush.time":"10",
+    "buffer.size.bytes":"10000000",
+    "doris.urls":"127.0.0.1",
+    "doris.user":"root",
+    "doris.password":"",
+    "doris.http.port":"8030",
+    "doris.query.port":"9030",
+    "doris.database":"debezium_ingestion_msg",
+    "converter.mode": "debezium_ingestion",
+    "load.model":"stream_load",
+    "delivery.guarantee":"exactly_once",
+    "enable.2pc": "true",
+    "enable.delete": "true",
+    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
+    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
+  }
+}
\ No newline at end of file
diff --git a/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql 
b/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql
new file mode 100644
index 0000000..a3c7db4
--- /dev/null
+++ b/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql
@@ -0,0 +1,11 @@
+CREATE TABLE debezium_ingestion_msg.debezium_dml_event_tab
+(
+    `id`    INT,
+    `name`  VARCHAR(256),
+    `age`   SMALLINT,
+)UNIQUE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_allocation" = "tag.location.default: 1",
+"light_schema_change" = "true"
+);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to