This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new d65a505 [Improve](case) add case for debezium update, delete and avro convert (#65) d65a505 is described below commit d65a5055d1ea9328efec3a97b97710fa06f6d2ba Author: wudi <676366...@qq.com> AuthorDate: Mon Apr 7 10:22:44 2025 +0800 [Improve](case) add case for debezium update, delete and avro convert (#65) --- .../connector/e2e/kafka/KafkaContainerService.java | 4 + .../e2e/kafka/KafkaContainerServiceImpl.java | 50 +++++- .../e2e/sink/AbstractKafka2DorisSink.java | 44 ++++- .../e2e/sink/avro/AbstractAvroE2ESinkTest.java | 98 ++++++++++ .../connector/e2e/sink/avro/AvroMsgE2ETest.java | 198 +++++++++++++++++++++ .../e2e/sink/stringconverter/StringMsgE2ETest.java | 61 ++++--- .../e2e/avro_converter/confluent_avro_convert.json | 23 +++ .../e2e/avro_converter/confluent_avro_tab.sql | 11 ++ .../e2e/avro_converter/doris_avro_convert.json | 21 +++ .../e2e/avro_converter/doris_avro_tab.sql | 11 ++ .../e2e/string_converter/debezium_dml_event.json | 25 +++ .../string_converter/debezium_dml_event_tab.sql | 11 ++ 12 files changed, 529 insertions(+), 28 deletions(-) diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java index 45616c4..2e9dc8d 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerService.java @@ -25,6 +25,10 @@ public interface KafkaContainerService { void startContainer(); + void startSchemaRegistry(); + + String getSchemaRegistryUrl(); + void startConnector(); String getInstanceHostAndPort(); diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java index 4e38ab3..ec39f49 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/kafka/KafkaContainerServiceImpl.java @@ -26,6 +26,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -41,7 +42,10 @@ import org.apache.http.impl.client.HttpClients; import org.apache.kafka.connect.cli.ConnectDistributed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; public class KafkaContainerServiceImpl implements KafkaContainerService { @@ -60,9 +64,12 @@ public class KafkaContainerServiceImpl implements KafkaContainerService { private String kafkaServerHost; private int kafkaServerPort; private static final String CONNECT_PORT = "8083"; + private static final String REGISTRY_PORT = "8081"; private static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private static final int MAX_RETRIES = 5; + private GenericContainer schemaRegistryContainer; + private static Network network = Network.SHARED; @Override public String getInstanceHostAndPort() { @@ -148,7 +155,7 @@ public class KafkaContainerServiceImpl implements KafkaContainerService { public void startContainer() { LOG.info("kafka server is about to be initialized."); kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)); - + kafkaContainer.withNetwork(network).withNetworkAliases("kafka"); kafkaContainer.start(); try { Thread.sleep(5000); @@ -166,12 +173,53 @@ public class KafkaContainerServiceImpl implements KafkaContainerService { kafkaServerPort); } + @Override + public void startSchemaRegistry() { + LOG.info("start schema registry."); + schemaRegistryContainer = + new GenericContainer<>("confluentinc/cp-schema-registry:7.6.1") + .withNetwork(network) + .withExposedPorts(8081) + .withNetworkAliases("schema-registry") + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + "PLAINTEXT://kafka:9092") + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081";) + .waitingFor( + Wait.forListeningPort().withStartupTimeout(Duration.ofMinutes(2))); + + schemaRegistryContainer.start(); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new DorisException(e); + } + LOG.info( + "start schema registry successfuly, with " + + schemaRegistryContainer.getHost() + + ":" + + schemaRegistryContainer.getMappedPort(8081)); + } + + @Override + public String getSchemaRegistryUrl() { + return "http://"; + + schemaRegistryContainer.getHost() + + ":" + + schemaRegistryContainer.getMappedPort(8081); + } + @Override public void close() { LOG.info("Kafka server is about to be shut down."); shutdownConnector(); kafkaContainer.close(); LOG.info("Kafka server shuts down successfully."); + if (schemaRegistryContainer != null) { + schemaRegistryContainer.close(); + LOG.info("Kafka schema registry shuts down successfully."); + } } private void shutdownConnector() { diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java index ba919b6..78fd626 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/AbstractKafka2DorisSink.java @@ -28,8 +28,12 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -39,6 +43,7 @@ import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerService; import org.apache.doris.kafka.connector.e2e.kafka.KafkaContainerServiceImpl; import org.apache.doris.kafka.connector.exception.DorisException; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +77,17 @@ public abstract class AbstractKafka2DorisSink { kafkaInstanceHostAndPort = kafkaContainerService.getInstanceHostAndPort(); } + protected static void initSchemaRegistry() { + if (Objects.isNull(kafkaContainerService)) { + return; + } + kafkaContainerService.startSchemaRegistry(); + } + + protected static String getSchemaRegistryUrl() { + return kafkaContainerService.getSchemaRegistryUrl(); + } + protected static Connection getJdbcConnection() { try { return DriverManager.getConnection( @@ -152,7 +168,31 @@ public abstract class AbstractKafka2DorisSink { @AfterClass public static void close() { - kafkaContainerService.close(); - dorisContainerService.close(); + // Closed automatically, multiple itcases can be reused + // kafkaContainerService.close(); + // dorisContainerService.close(); + } + + public void checkResult(List<String> expected, String query, int columnSize) throws Exception { + List<String> actual = new ArrayList<>(); + + try (Statement statement = getJdbcConnection().createStatement()) { + ResultSet sinkResultSet = statement.executeQuery(query); + while (sinkResultSet.next()) { + List<String> row = new ArrayList<>(); + for (int i = 1; i <= columnSize; i++) { + Object value = sinkResultSet.getObject(i); + if (value == null) { + row.add("null"); + } else { + row.add(value.toString()); + } + } + actual.add(StringUtils.join(row, ",")); + } + } + LOG.info("expected result: {}", Arrays.toString(expected.toArray())); + LOG.info("actual result: {}", Arrays.toString(actual.toArray())); + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); } } diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java new file mode 100644 index 0000000..d6e11df --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AbstractAvroE2ESinkTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.sink.avro; + +import java.util.Objects; +import java.util.Properties; +import org.apache.avro.generic.GenericRecord; +import org.apache.doris.kafka.connector.e2e.sink.AbstractKafka2DorisSink; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Produce messages with avro. 1.Manage your own schema 2.Use a schema registry */ +public abstract class AbstractAvroE2ESinkTest extends AbstractKafka2DorisSink { + private static final Logger LOG = LoggerFactory.getLogger(AbstractAvroE2ESinkTest.class); + private static KafkaProducer<String, byte[]> producer; + private static KafkaProducer<String, GenericRecord> avroProducer; + + public static void initByteProducer() { + if (Objects.nonNull(producer)) { + return; + } + // Producer properties + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInstanceHostAndPort); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.StringSerializer.class); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.ByteArraySerializer.class); + producer = new KafkaProducer<>(producerProperties); + LOG.info("kafka producer started successfully."); + } + + public static void initAvroProducer() { + if (Objects.nonNull(avroProducer)) { + return; + } + // Producer properties + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInstanceHostAndPort); + producerProperties.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.StringSerializer.class); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + io.confluent.kafka.serializers.KafkaAvroSerializer.class); + producerProperties.put("schema.registry.url", getSchemaRegistryUrl()); + avroProducer = new KafkaProducer<>(producerProperties); + LOG.info("kafka avro producer started successfully."); + } + + protected void produceMsg2Kafka(String topic, GenericRecord value) { + LOG.info("Kafka avro producer will produce msg. topic={}, msg={}", topic, value); + ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, value); + avroProducer.send( + record, + (recordMetadata, e) -> + LOG.info( + "Send avro Callback is {}, with error is ", + recordMetadata.offset(), + e)); + LOG.info("Kafka avro producer produced msg successfully. topic={}, msg={}", topic, value); + } + + protected void produceMsg2Kafka(String topic, byte[] value) { + LOG.info("Kafka producer will produce msg. topic={}, msg={}", topic, value); + + ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, value); + producer.send( + record, + (recordMetadata, e) -> + LOG.info( + "Send Callback is {}, with error is ", recordMetadata.offset(), e)); + + LOG.info("Kafka producer produced msg successfully. topic={}, msg={}", topic, value); + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java new file mode 100644 index 0000000..d920903 --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/avro/AvroMsgE2ETest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.e2e.sink.avro; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AvroMsgE2ETest extends AbstractAvroE2ESinkTest { + private static final Logger LOG = LoggerFactory.getLogger(AvroMsgE2ETest.class); + private static String connectorName; + private static String jsonMsgConnectorContent; + private static DorisOptions dorisOptions; + private static String database; + + @BeforeClass + public static void setUp() { + initServer(); + } + + public static void initialize(String connectorPath) { + jsonMsgConnectorContent = loadContent(connectorPath); + JsonNode rootNode = null; + try { + rootNode = objectMapper.readTree(jsonMsgConnectorContent); + } catch (IOException e) { + throw new DorisException("Failed to read content body.", e); + } + connectorName = rootNode.get(NAME).asText(); + JsonNode configNode = rootNode.get(CONFIG); + Map<String, String> configMap = objectMapper.convertValue(configNode, Map.class); + configMap.put(ConfigCheckUtils.TASK_ID, "1"); + Map<String, String> lowerCaseConfigMap = + DorisSinkConnectorConfig.convertToLowercase(configMap); + DorisSinkConnectorConfig.setDefaultValues(lowerCaseConfigMap); + dorisOptions = new DorisOptions(lowerCaseConfigMap); + database = dorisOptions.getDatabase(); + createDatabase(database); + setTimeZone(); + } + + private static void setTimeZone() { + executeSql(getJdbcConnection(), "set global time_zone = 'Asia/Shanghai'"); + } + + @Test + public void testConfluentAvroConvert() throws Exception { + LOG.info("starting to testConfluentAvroConvert test"); + initSchemaRegistry(); + initAvroProducer(); + initialize("src/test/resources/e2e/avro_converter/confluent_avro_convert.json"); + + // replace file path + String connectJson = + loadContent("src/test/resources/e2e/avro_converter/confluent_avro_convert.json"); + JsonNode jsonNode = new ObjectMapper().readTree(connectJson); + ObjectNode configNode = (ObjectNode) jsonNode.get("config"); + + configNode.put("key.converter.schema.registry.url", getSchemaRegistryUrl()); + configNode.put("value.converter.schema.registry.url", getSchemaRegistryUrl()); + jsonMsgConnectorContent = new ObjectMapper().writeValueAsString(jsonNode); + + String topic = "avro-user-confluent"; + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(loadContent("src/test/resources/decode/avro/user.avsc")); + + GenericRecord user1 = new GenericData.Record(schema); + user1.put("id", 3); + user1.put("name", "kafka-confluent"); + user1.put("age", 38); + produceMsg2Kafka(topic, user1); + + GenericRecord user2 = new GenericData.Record(schema); + user2.put("id", 4); + user2.put("name", "doris-confluent"); + user2.put("age", 58); + produceMsg2Kafka(topic, user2); + + String tableSql = + loadContent("src/test/resources/e2e/avro_converter/confluent_avro_tab.sql"); + createTable(tableSql); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + Thread.sleep(30000); + + String table = dorisOptions.getTopicMapTable(topic); + List<String> expected = Arrays.asList("3,kafka-confluent,38", "4,doris-confluent,58"); + String query = String.format("select id,name,age from %s.%s order by id", database, table); + checkResult(expected, query, 3); + } + + @Test + public void testDorisAvroConvert() throws Exception { + LOG.info("starting to testDorisAvroConvert test"); + initByteProducer(); + initialize("src/test/resources/e2e/avro_converter/doris_avro_convert.json"); + // replace file path + String connectJson = + loadContent("src/test/resources/e2e/avro_converter/doris_avro_convert.json"); + JsonNode jsonNode = new ObjectMapper().readTree(connectJson); + ObjectNode configNode = (ObjectNode) jsonNode.get("config"); + + String absolutePath = getAbsolutePath("decode/avro/user.avsc"); + configNode.put( + "value.converter.avro.topic2schema.filepath", "avro-user:file://" + absolutePath); + jsonMsgConnectorContent = new ObjectMapper().writeValueAsString(jsonNode); + + String topic = "avro-user"; + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(loadContent("src/test/resources/decode/avro/user.avsc")); + + GenericRecord user1 = new GenericData.Record(schema); + user1.put("id", 1); + user1.put("name", "kafka"); + user1.put("age", 30); + produceMsg2Kafka(topic, convertAvro2Byte(user1, schema)); + + GenericRecord user2 = new GenericData.Record(schema); + user2.put("id", 2); + user2.put("name", "doris"); + user2.put("age", 18); + produceMsg2Kafka(topic, convertAvro2Byte(user2, schema)); + + String tableSql = loadContent("src/test/resources/e2e/avro_converter/doris_avro_tab.sql"); + createTable(tableSql); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + Thread.sleep(30000); + + String table = dorisOptions.getTopicMapTable(topic); + List<String> expected = Arrays.asList("1,kafka,30", "2,doris,18"); + String query = String.format("select id,name,age from %s.%s order by id", database, table); + checkResult(expected, query, 3); + } + + @AfterClass + public static void closeInstance() { + kafkaContainerService.deleteKafkaConnector(connectorName); + } + + public static String getAbsolutePath(String fileName) { + ClassLoader classLoader = AvroMsgE2ETest.class.getClassLoader(); + URL resource = classLoader.getResource(fileName); + if (resource != null) { + return Paths.get(resource.getPath()).toAbsolutePath().toString(); + } else { + return null; + } + } + + public static byte[] convertAvro2Byte(GenericRecord data, Schema schema) throws IOException { + DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + writer.write(data, encoder); + encoder.flush(); + byte[] avroBytes = out.toByteArray(); + return avroBytes; + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index dba6d54..8c4f207 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -24,12 +24,10 @@ import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.exception.DorisException; @@ -190,6 +188,42 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expected, query, 51); } + @Test + public void testDebeziumIngestionDMLEvent() throws Exception { + initialize("src/test/resources/e2e/string_converter/debezium_dml_event.json"); + String insert1 = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal [...] + String topic = "debezium_dml_event"; + produceMsg2Kafka(topic, insert1); + String tableSql = + loadContent("src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql"); + createTable(tableSql); + Thread.sleep(2000); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + List<String> expected = Arrays.asList("1,zhangsan,18"); + Thread.sleep(20000); + String query = String.format("select * from %s.%s order by id", database, table); + checkResult(expected, query, 3); + + // update + String update = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal [...] + produceMsg2Kafka(topic, update); + Thread.sleep(20000); + List<String> expectedUpdate = Arrays.asList("1,zhangsan,48"); + checkResult(expectedUpdate, query, 3); + + // delete + String delete = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"}],\"optional\":true,\"name\":\"mysql-server-1.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":fal [...] + produceMsg2Kafka(topic, delete); + Thread.sleep(20000); + List<String> expectedDelete = Arrays.asList(); + checkResult(expectedDelete, query, 3); + } + @Test public void testTimeExampleTypes() throws Exception { initialize("src/test/resources/e2e/string_converter/time_types.json"); @@ -272,29 +306,6 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expectedResult, query1, 3); } - public void checkResult(List<String> expected, String query, int columnSize) throws Exception { - List<String> actual = new ArrayList<>(); - - try (Statement statement = getJdbcConnection().createStatement()) { - ResultSet sinkResultSet = statement.executeQuery(query); - while (sinkResultSet.next()) { - List<String> row = new ArrayList<>(); - for (int i = 1; i <= columnSize; i++) { - Object value = sinkResultSet.getObject(i); - if (value == null) { - row.add("null"); - } else { - row.add(value.toString()); - } - } - actual.add(StringUtils.join(row, ",")); - } - } - LOG.info("expected result: {}", Arrays.toString(expected.toArray())); - LOG.info("actual result: {}", Arrays.toString(actual.toArray())); - Assert.assertArrayEquals(expected.toArray(), actual.toArray()); - } - @AfterClass public static void closeInstance() { kafkaContainerService.deleteKafkaConnector(connectorName); diff --git a/src/test/resources/e2e/avro_converter/confluent_avro_convert.json b/src/test/resources/e2e/avro_converter/confluent_avro_convert.json new file mode 100644 index 0000000..88dbd11 --- /dev/null +++ b/src/test/resources/e2e/avro_converter/confluent_avro_convert.json @@ -0,0 +1,23 @@ +{ + "name":"confluent-avro-test", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"avro-user-confluent", + "tasks.max":"1", + "doris.topic2table.map": "avro-user-confluent:confluent_avro_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"confluent_avro_convert", + "load.model":"stream_load", + "key.converter":"io.confluent.connect.avro.AvroConverter", + "key.converter.schema.registry.url":"http://127.0.0.1:8081";, + "value.converter":"io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url":"http://127.0.0.1:8081"; + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql b/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql new file mode 100644 index 0000000..5864783 --- /dev/null +++ b/src/test/resources/e2e/avro_converter/confluent_avro_tab.sql @@ -0,0 +1,11 @@ +CREATE TABLE confluent_avro_convert.confluent_avro_tab +( + `id` INT, + `name` VARCHAR(256), + `age` SMALLINT, +)DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 1 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"light_schema_change" = "true" +); \ No newline at end of file diff --git a/src/test/resources/e2e/avro_converter/doris_avro_convert.json b/src/test/resources/e2e/avro_converter/doris_avro_convert.json new file mode 100644 index 0000000..0a44ef8 --- /dev/null +++ b/src/test/resources/e2e/avro_converter/doris_avro_convert.json @@ -0,0 +1,21 @@ +{ + "name":"avro_sink-connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"avro-user", + "tasks.max":"1", + "doris.topic2table.map": "avro-user:doris_avro_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"avro_convert", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.doris.kafka.connector.decode.avro.DorisAvroConverter", + "value.converter.avro.topic2schema.filepath":"avro-user:file:///opt/avro_user.avsc" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/avro_converter/doris_avro_tab.sql b/src/test/resources/e2e/avro_converter/doris_avro_tab.sql new file mode 100644 index 0000000..a665c32 --- /dev/null +++ b/src/test/resources/e2e/avro_converter/doris_avro_tab.sql @@ -0,0 +1,11 @@ +CREATE TABLE avro_convert.doris_avro_tab +( + `id` INT, + `name` VARCHAR(256), + `age` INT, +)DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 1 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"light_schema_change" = "true" +); \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/debezium_dml_event.json b/src/test/resources/e2e/string_converter/debezium_dml_event.json new file mode 100644 index 0000000..73ee118 --- /dev/null +++ b/src/test/resources/e2e/string_converter/debezium_dml_event.json @@ -0,0 +1,25 @@ +{ + "name":"debezium_dml_event", + "config": { + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"debezium_dml_event", + "tasks.max":"1", + "doris.topic2table.map": "debezium_dml_event:debezium_dml_event_tab", + "buffer.count.records":"1", + "buffer.flush.time":"10", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"debezium_ingestion_msg", + "converter.mode": "debezium_ingestion", + "load.model":"stream_load", + "delivery.guarantee":"exactly_once", + "enable.2pc": "true", + "enable.delete": "true", + "key.converter":"org.apache.kafka.connect.json.JsonConverter", + "value.converter":"org.apache.kafka.connect.json.JsonConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql b/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql new file mode 100644 index 0000000..a3c7db4 --- /dev/null +++ b/src/test/resources/e2e/string_converter/debezium_dml_event_tab.sql @@ -0,0 +1,11 @@ +CREATE TABLE debezium_ingestion_msg.debezium_dml_event_tab +( + `id` INT, + `name` VARCHAR(256), + `age` SMALLINT, +)UNIQUE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 1 +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1", +"light_schema_change" = "true" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org