This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new f5ea699 [improve] update config default value and add combine flush mode (#73) f5ea699 is described below commit f5ea699aec317bd28ff322e6d9039f3c802444d7 Author: wudi <676366...@qq.com> AuthorDate: Mon May 12 11:40:30 2025 +0800 [improve] update config default value and add combine flush mode (#73) --- .../doris/kafka/connector/DorisSinkTask.java | 2 +- .../doris/kafka/connector/cfg/DorisOptions.java | 26 +++- .../connector/cfg/DorisSinkConnectorConfig.java | 9 +- .../service/DorisCombinedSinkService.java | 143 +++++++++++++++++++++ .../connector/service/DorisDefaultSinkService.java | 14 +- .../connector/service/DorisSinkServiceFactory.java | 9 +- .../kafka/connector/utils/ConfigCheckUtils.java | 25 +++- .../doris/kafka/connector/writer/DorisWriter.java | 11 +- .../e2e/sink/stringconverter/StringMsgE2ETest.java | 62 +++++++++ .../string_converter/combine_flush_connector.json | 22 ++++ .../combine_flush_connector_2pc.json | 23 ++++ .../e2e/string_converter/combine_flush_tab.sql | 12 ++ .../e2e/string_converter/combine_flush_tab_2pc.sql | 12 ++ 13 files changed, 347 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java index 576de26..5f97b29 100644 --- a/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java +++ b/src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java @@ -55,7 +55,7 @@ public class DorisSinkTask extends SinkTask { LOG.info("kafka doris sink task start with {}", parsedConfig); this.options = new DorisOptions(parsedConfig); this.remainingRetries = options.getMaxRetries(); - this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig, context); + this.sink = DorisSinkServiceFactory.getDorisSinkService(parsedConfig, context, options); } /** diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 922eee6..aa5402c 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -70,6 +70,7 @@ public class DorisOptions { private final int maxRetries; private final int retryIntervalMs; private final BehaviorOnNullValues behaviorOnNullValues; + private final boolean enableCombineFlush; public DorisOptions(Map<String, String> config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -84,6 +85,8 @@ public class DorisOptions { this.loadModel = LoadModel.of(config.get(DorisSinkConnectorConfig.LOAD_MODEL)); this.deliveryGuarantee = DeliveryGuarantee.of(config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE)); + this.enableCombineFlush = + Boolean.valueOf(config.get(DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH)); this.converterMode = ConverterMode.of(config.get(DorisSinkConnectorConfig.CONVERTER_MODE)); this.schemaEvolutionMode = SchemaEvolutionMode.of( @@ -98,14 +101,21 @@ public class DorisOptions { config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)); this.tableNameField = config.get(DorisSinkConnectorConfig.RECORD_TABLE_NAME_FIELD); - if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) { - if (Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) { - this.enable2PC = true; - this.force2PC = true; - } else { - this.enable2PC = false; + if (enableCombineFlush) { + LOG.info("Enable combine flush, set 2pc to false."); + this.enable2PC = false; + this.force2PC = false; + } else { + if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) { + if (Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC))) { + this.enable2PC = true; + this.force2PC = true; + } else { + this.enable2PC = false; + } } } + this.enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT)); this.enableDelete = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_DELETE)); @@ -350,4 +360,8 @@ public class DorisOptions { public BehaviorOnNullValues getBehaviorOnNullValues() { return behaviorOnNullValues; } + + public boolean isEnableCombineFlush() { + return enableCombineFlush; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 5b8807e..97bc125 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -48,16 +48,16 @@ public class DorisSinkConnectorConfig { // Connector config private static final String CONNECTOR_CONFIG = "Connector Config"; public static final String BUFFER_COUNT_RECORDS = "buffer.count.records"; - public static final long BUFFER_COUNT_RECORDS_DEFAULT = 10000; + public static final long BUFFER_COUNT_RECORDS_DEFAULT = 50000; public static final String BUFFER_SIZE_BYTES = "buffer.size.bytes"; - public static final long BUFFER_SIZE_BYTES_DEFAULT = 5000000; + public static final long BUFFER_SIZE_BYTES_DEFAULT = 100 * 1024 * 1024; public static final long BUFFER_SIZE_BYTES_MIN = 1; public static final String TOPICS_TABLES_MAP = "doris.topic2table.map"; public static final String RECORD_TABLE_NAME_FIELD = "record.tablename.field"; public static final String LABEL_PREFIX = "label.prefix"; // Time in seconds - public static final long BUFFER_FLUSH_TIME_SEC_MIN = 10; + public static final long BUFFER_FLUSH_TIME_SEC_MIN = 1; public static final long BUFFER_FLUSH_TIME_SEC_DEFAULT = 120; public static final String BUFFER_FLUSH_TIME_SEC = "buffer.flush.time"; @@ -81,6 +81,8 @@ public class DorisSinkConnectorConfig { public static final String AUTO_REDIRECT = "auto.redirect"; public static final String DELIVERY_GUARANTEE = "delivery.guarantee"; public static final String DELIVERY_GUARANTEE_DEFAULT = DeliveryGuarantee.AT_LEAST_ONCE.name(); + public static final String ENABLE_COMBINE_FLUSH = "enable.combine.flush"; + public static final String ENABLE_COMBINE_FLUSH_DEFAULT = "false"; public static final String CONVERTER_MODE = "converter.mode"; public static final String CONVERT_MODE_DEFAULT = ConverterMode.NORMAL.getName(); @@ -130,6 +132,7 @@ public class DorisSinkConnectorConfig { setFieldToDefaultValues( config, RETRY_INTERVAL_MS, String.valueOf(RETRY_INTERVAL_MS_DEFAULT)); setFieldToDefaultValues(config, BEHAVIOR_ON_NULL_VALUES, BEHAVIOR_ON_NULL_VALUES_DEFAULT); + setFieldToDefaultValues(config, ENABLE_COMBINE_FLUSH, ENABLE_COMBINE_FLUSH_DEFAULT); } public static Map<String, String> convertToLowercase(Map<String, String> config) { diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java new file mode 100644 index 0000000..711b405 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.service; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.doris.kafka.connector.writer.DorisWriter; +import org.apache.doris.kafka.connector.writer.StreamLoadWriter; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Combined all partitions and write once. */ +public class DorisCombinedSinkService extends DorisDefaultSinkService { + private static final Logger LOG = LoggerFactory.getLogger(DorisCombinedSinkService.class); + + private final Map<String, HashMap<Integer, Long>> topicPartitionOffset; + + DorisCombinedSinkService(Map<String, String> config, SinkTaskContext context) { + super(config, context); + this.topicPartitionOffset = new HashMap<>(); + } + + /** + * Create new task + * + * @param tableName destination table name in doris + * @param topicPartition TopicPartition passed from Kafka + */ + @Override + public void startTask(final String tableName, final TopicPartition topicPartition) { + // check if the task is already started + String writerKey = getWriterKey(topicPartition.topic(), tableName); + if (writer.containsKey(writerKey)) { + LOG.info("already start task with key {}", writerKey); + } else { + String topic = topicPartition.topic(); + + // Only by topic + int partition = -1; + DorisWriter dorisWriter = + new StreamLoadWriter( + tableName, topic, partition, dorisOptions, conn, connectMonitor); + writer.put(writerKey, dorisWriter); + metricsJmxReporter.start(); + } + } + + @Override + public void insert(final Collection<SinkRecord> records) { + // note that records can be empty + for (SinkRecord record : records) { + // skip records + if (shouldSkipRecord(record)) { + continue; + } + // check topic mutating SMTs + checkTopicMutating(record); + + String topic = record.topic(); + int partition = record.kafkaPartition(); + topicPartitionOffset + .computeIfAbsent(topic, k -> new HashMap<>()) + .put(partition, record.kafkaOffset()); + // Might happen a count of record based flushing,buffer + insert(record); + } + + // check all sink writer to see if they need to be flushed + for (DorisWriter writer : writer.values()) { + // Time based flushing + if (writer.shouldFlush()) { + writer.flushBuffer(); + } + } + } + + @Override + public void insert(SinkRecord record) { + String tableName = getSinkDorisTableName(record); + String writerKey = getWriterKey(record.topic(), tableName); + // init a new topic partition + if (!writer.containsKey(writerKey)) { + startTask(tableName, new TopicPartition(record.topic(), -1)); + } + writer.get(writerKey).insert(record); + } + + @Override + public long getOffset(final TopicPartition topicPartition) { + String topic = topicPartition.topic(); + int partition = topicPartition.partition(); + if (topicPartitionOffset.containsKey(topic) + && topicPartitionOffset.get(topic).containsKey(partition)) { + return topicPartitionOffset.get(topic).get(partition); + } + return 0; + } + + @Override + public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) { + // Here we force flushing the data in memory once to + // ensure that the offsets recorded in topicPartitionOffset have been flushed to doris + for (DorisWriter writer : writer.values()) { + writer.flushBuffer(); + } + } + + /** + * Parse the writer unique key + * + * @param topic topic name + * @param tableName table name + * @return writer key + */ + private String getWriterKey(String topic, String tableName) { + if (dorisOptions.getTopicMapTable(topic).equals(tableName)) { + return topic; + } + return topic + "_" + tableName; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java index e091264..68f0d34 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java @@ -64,13 +64,13 @@ import org.slf4j.LoggerFactory; public class DorisDefaultSinkService implements DorisSinkService { private static final Logger LOG = LoggerFactory.getLogger(DorisDefaultSinkService.class); - private final ConnectionProvider conn; - private final Map<String, DorisWriter> writer; - private final DorisOptions dorisOptions; - private final MetricsJmxReporter metricsJmxReporter; - private final DorisConnectMonitor connectMonitor; - private final ObjectMapper objectMapper; - private final SinkTaskContext context; + protected final ConnectionProvider conn; + protected final Map<String, DorisWriter> writer; + protected final DorisOptions dorisOptions; + protected final MetricsJmxReporter metricsJmxReporter; + protected final DorisConnectMonitor connectMonitor; + protected final ObjectMapper objectMapper; + protected final SinkTaskContext context; DorisDefaultSinkService(Map<String, String> config, SinkTaskContext context) { this.dorisOptions = new DorisOptions(config); diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java index dbf7192..130ea06 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisSinkServiceFactory.java @@ -20,13 +20,18 @@ package org.apache.doris.kafka.connector.service; import java.util.Map; +import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.kafka.connect.sink.SinkTaskContext; /** A factory to create {@link DorisSinkService} */ public class DorisSinkServiceFactory { public static DorisSinkService getDorisSinkService( - Map<String, String> connectorConfig, SinkTaskContext context) { - return new DorisDefaultSinkService(connectorConfig, context); + Map<String, String> connectorConfig, SinkTaskContext context, DorisOptions options) { + if (options.isEnableCombineFlush()) { + return new DorisCombinedSinkService(connectorConfig, context); + } else { + return new DorisDefaultSinkService(connectorConfig, context); + } } } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index 465b7dc..e1588c3 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -144,7 +144,7 @@ public class ConfigCheckUtils { || isIllegalRange( bufferFlushTime, DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN)) { LOG.error( - "{} cannot be empty or not a number or less than 10.", + "{} cannot be empty or not a number or less than 1.", DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); configIsValid = false; } @@ -162,11 +162,32 @@ public class ConfigCheckUtils { if (!validateEnumInstances(deliveryGuarantee, DeliveryGuarantee.instances())) { LOG.error( "The value of {} is an illegal parameter of {}.", - loadModel, + deliveryGuarantee, DorisSinkConnectorConfig.DELIVERY_GUARANTEE); configIsValid = false; } + String enableCombineFlush = config.get(DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH); + if (!validateEnumInstances(enableCombineFlush, new String[] {"true", "false"})) { + LOG.error( + "The value of {} is an illegal parameter of {}.", + enableCombineFlush, + DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH); + configIsValid = false; + } + + if (configIsValid + && Boolean.parseBoolean(enableCombineFlush) + && DeliveryGuarantee.EXACTLY_ONCE.name().equalsIgnoreCase(deliveryGuarantee)) { + LOG.error( + "The value of {} is not supported set {} when {} is set to {}.", + DorisSinkConnectorConfig.ENABLE_COMBINE_FLUSH, + enableCombineFlush, + DorisSinkConnectorConfig.DELIVERY_GUARANTEE, + DeliveryGuarantee.EXACTLY_ONCE.name()); + configIsValid = false; + } + String converterMode = config.get(DorisSinkConnectorConfig.CONVERTER_MODE); if (!validateEnumInstances(converterMode, ConverterMode.instances())) { LOG.error( diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java index 348f784..9e5cfb9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java @@ -116,8 +116,11 @@ public abstract class DorisWriter { protected void insertRecord(final SinkRecord record) { // discard the record if the record offset is smaller or equal to server side offset - if (record.kafkaOffset() > this.offsetPersistedInDoris.get() - && record.kafkaOffset() > processedOffset.get()) { + // when enable.combine.flush=true, No verification is required because the offsets of + // multiple partitions cannot be compared. + if (dorisOptions.isEnableCombineFlush() + || (record.kafkaOffset() > this.offsetPersistedInDoris.get() + && record.kafkaOffset() > processedOffset.get())) { SinkRecord dorisRecord = record; RecordBuffer tmpBuff = null; @@ -130,6 +133,10 @@ public abstract class DorisWriter { } if (tmpBuff != null) { + LOG.info( + "trigger flush by buffer size or count, buffer size: {}, num of records: {}", + tmpBuff.getBufferSizeBytes(), + tmpBuff.getNumOfRecords()); flush(tmpBuff); } processedOffset.set(dorisRecord.kafkaOffset()); diff --git a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java index 6d073c2..056f513 100644 --- a/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java +++ b/src/test/java/org/apache/doris/kafka/connector/e2e/sink/stringconverter/StringMsgE2ETest.java @@ -411,6 +411,68 @@ public class StringMsgE2ETest extends AbstractStringE2ESinkTest { checkResult(expected, query1, 3); } + @Test + public void testCombineFlush() throws IOException, InterruptedException, SQLException { + initialize("src/test/resources/e2e/string_converter/combine_flush_connector.json"); + Thread.sleep(5000); + String topic = "combine_test"; + String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}"; + + produceMsg2Kafka(topic, msg); + String tableSql = + loadContent("src/test/resources/e2e/string_converter/combine_flush_tab.sql"); + createTable(tableSql); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + Statement statement = getJdbcConnection().createStatement(); + String querySql = "select * from " + database + "." + table; + LOG.info("start to query result from doris. sql={}", querySql); + ResultSet resultSet = statement.executeQuery(querySql); + + Assert.assertTrue(resultSet.next()); + + int id = resultSet.getInt("id"); + String name = resultSet.getString("name"); + int age = resultSet.getInt("age"); + LOG.info("Query result is id={}, name={}, age={}", id, name, age); + + Assert.assertEquals(1, id); + Assert.assertEquals("zhangsan", name); + Assert.assertEquals(12, age); + } + + @Test + public void testCombineFlush2PC() throws IOException, InterruptedException, SQLException { + initialize("src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json"); + Thread.sleep(5000); + String topic = "combine_test_2pc"; + String msg = "{\"id\":1,\"name\":\"zhangsan\",\"age\":12}"; + + produceMsg2Kafka(topic, msg); + String tableSql = + loadContent("src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql"); + createTable(tableSql); + kafkaContainerService.registerKafkaConnector(connectorName, jsonMsgConnectorContent); + + String table = dorisOptions.getTopicMapTable(topic); + Statement statement = getJdbcConnection().createStatement(); + String querySql = "select * from " + database + "." + table; + LOG.info("start to query result from doris. sql={}", querySql); + ResultSet resultSet = statement.executeQuery(querySql); + + Assert.assertTrue(resultSet.next()); + + int id = resultSet.getInt("id"); + String name = resultSet.getString("name"); + int age = resultSet.getInt("age"); + LOG.info("Query result is id={}, name={}, age={}", id, name, age); + + Assert.assertEquals(1, id); + Assert.assertEquals("zhangsan", name); + Assert.assertEquals(12, age); + } + @AfterClass public static void closeInstance() { kafkaContainerService.deleteKafkaConnector(connectorName); diff --git a/src/test/resources/e2e/string_converter/combine_flush_connector.json b/src/test/resources/e2e/string_converter/combine_flush_connector.json new file mode 100644 index 0000000..96c3cf8 --- /dev/null +++ b/src/test/resources/e2e/string_converter/combine_flush_connector.json @@ -0,0 +1,22 @@ +{ + "name":"combine_flush_connector", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"combine_test", + "tasks.max":"1", + "doris.topic2table.map": "combine_test:combine_flush_tab", + "buffer.count.records":"100", + "buffer.flush.time":"1", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"combine_flush", + "load.model":"stream_load", + "enable.combine.flush":"true", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json b/src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json new file mode 100644 index 0000000..066ef3b --- /dev/null +++ b/src/test/resources/e2e/string_converter/combine_flush_connector_2pc.json @@ -0,0 +1,23 @@ +{ + "name":"combine_flush_connector_2pc", + "config":{ + "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", + "topics":"combine_test_2pc", + "tasks.max":"1", + "doris.topic2table.map": "combine_test_2pc:combine_flush_tab_2pc", + "buffer.count.records":"100", + "buffer.flush.time":"1", + "buffer.size.bytes":"10000000", + "doris.urls":"127.0.0.1", + "doris.user":"root", + "doris.password":"", + "doris.http.port":"8030", + "doris.query.port":"9030", + "doris.database":"combine_flush_2pc", + "load.model":"stream_load", + "enable.combine.flush":"true", + "enable.2pc": "true", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "value.converter":"org.apache.kafka.connect.storage.StringConverter" + } +} \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/combine_flush_tab.sql b/src/test/resources/e2e/string_converter/combine_flush_tab.sql new file mode 100644 index 0000000..88180fa --- /dev/null +++ b/src/test/resources/e2e/string_converter/combine_flush_tab.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE combine_flush.combine_flush_tab ( + id INT NULL, + name VARCHAR(100) NULL, + age INT NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file diff --git a/src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql b/src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql new file mode 100644 index 0000000..b946239 --- /dev/null +++ b/src/test/resources/e2e/string_converter/combine_flush_tab_2pc.sql @@ -0,0 +1,12 @@ +-- Please note that the database here should be consistent with doris.database in the file where the connector is registered. +CREATE TABLE combine_flush_2pc.combine_flush_tab_2pc ( + id INT NULL, + name VARCHAR(100) NULL, + age INT NULL +) ENGINE=OLAP +UNIQUE KEY(`id`) +COMMENT 'OLAP' +DISTRIBUTED BY HASH(`id`) BUCKETS AUTO +PROPERTIES ( +"replication_allocation" = "tag.location.default: 1" +); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org