This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch export in repository https://gitbox.apache.org/repos/asf/doris.git
commit b806a49eb9c9beb37e968b57234910c2488ed944 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Tue May 28 16:38:29 2024 +0800 [enhancement](export) filter empty partition before export table to remote storage (#35389) filter empty partition before export table to remote storage Co-authored-by: caiconghui1 <caicongh...@jd.com> --- .../main/java/org/apache/doris/load/ExportJob.java | 32 ++- .../export_p0/test_export_empty_table.groovy | 269 +-------------------- 2 files changed, 28 insertions(+), 273 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index d0ccf23ae0a..b072ed2f2fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -407,6 +407,13 @@ public class ExportJob implements Writable { ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, this); jobExecutorList.add(executor); } + + // add empty task to make export job could be finished finally if jobExecutorList is empty + // which means that export table without data + if (jobExecutorList.isEmpty()) { + ExportTaskExecutor executor = new ExportTaskExecutor(Lists.newArrayList(), this); + jobExecutorList.add(executor); + } } /** @@ -511,15 +518,23 @@ public class ExportJob implements Writable { // get partitions // user specifies partitions, already checked in ExportCommand if (!this.partitionNames.isEmpty()) { - this.partitionNames.forEach(partitionName -> partitions.add(table.getPartition(partitionName))); + this.partitionNames.forEach(partitionName -> { + Partition partition = table.getPartition(partitionName); + if (partition.hasData()) { + partitions.add(partition); + } + }); } else { - if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) { - throw new UserException("The partitions number of this export job is larger than the maximum number" - + " of partitions allowed by a export job"); - } - partitions.addAll(table.getPartitions()); + table.getPartitions().forEach(partition -> { + if (partition.hasData()) { + partitions.add(partition); + } + }); + } + if (partitions.size() > Config.maximum_number_of_export_partitions) { + throw new UserException("The partitions number of this export job is larger than the maximum number" + + " of partitions allowed by a export job"); } - // get tablets for (Partition partition : partitions) { // Partition data consistency is not need to verify partition version. @@ -589,8 +604,7 @@ public class ExportJob implements Writable { List<Long> tabletsList = new ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum)); List<List<Long>> tablets = new ArrayList<>(); for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) { - int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size() - ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size(); + int end = Math.min(i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT, tabletsList.size()); tablets.add(new ArrayList<>(tabletsList.subList(i, end))); } diff --git a/regression-test/suites/export_p0/test_export_empty_table.groovy b/regression-test/suites/export_p0/test_export_empty_table.groovy index f70ff97b38b..584c65d73bc 100644 --- a/regression-test/suites/export_p0/test_export_empty_table.groovy +++ b/regression-test/suites/export_p0/test_export_empty_table.groovy @@ -143,62 +143,8 @@ suite("test_export_empty_table", "p0") { waiting_export.call(label) // check file amounts - check_file_amounts.call("${outFilePath}", 1) - - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `ipv4_col` ipv4 COMMENT "", - `ipv6_col` ipv6 COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col' - set 'strict_mode', 'true' - set 'format', 'csv' - set 'column_separator', ',' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } - } - - sql """ sync; """ - - qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ - + check_file_amounts.call("${outFilePath}", 0) } finally { - try_sql("DROP TABLE IF EXISTS ${table_load_name}") delete_files.call("${outFilePath}") } @@ -222,59 +168,8 @@ suite("test_export_empty_table", "p0") { waiting_export.call(label) // check file amounts - check_file_amounts.call("${outFilePath}", 1) - - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `ipv4_col` ipv4 COMMENT "", - `ipv6_col` ipv6 COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col' - set 'strict_mode', 'true' - set 'format', 'parquet' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } - } - - qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ - + check_file_amounts.call("${outFilePath}", 0) } finally { - try_sql("DROP TABLE IF EXISTS ${table_load_name}") delete_files.call("${outFilePath}") } @@ -297,59 +192,8 @@ suite("test_export_empty_table", "p0") { waiting_export.call(label) // check file amounts - check_file_amounts.call("${outFilePath}", 1) - - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `ipv4_col` ipv4 COMMENT "", - `ipv6_col` ipv6 COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col' - set 'strict_mode', 'true' - set 'format', 'orc' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } - } - - qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ - + check_file_amounts.call("${outFilePath}", 0) } finally { - try_sql("DROP TABLE IF EXISTS ${table_load_name}") delete_files.call("${outFilePath}") } @@ -373,58 +217,7 @@ suite("test_export_empty_table", "p0") { waiting_export.call(label) // check file amounts - check_file_amounts.call("${outFilePath}", 1) - - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `ipv4_col` ipv4 COMMENT "", - `ipv6_col` ipv6 COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col' - set 'strict_mode', 'true' - set 'format', 'csv_with_names' - set 'column_separator', ',' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } - } - - qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ - + check_file_amounts.call("${outFilePath}", 0) } finally { try_sql("DROP TABLE IF EXISTS ${table_load_name}") delete_files.call("${outFilePath}") @@ -451,60 +244,8 @@ suite("test_export_empty_table", "p0") { waiting_export.call(label) // check file amounts - check_file_amounts.call("${outFilePath}", 1) - - // check data correctness - sql """ DROP TABLE IF EXISTS ${table_load_name} """ - sql """ - CREATE TABLE IF NOT EXISTS ${table_load_name} ( - `user_id` INT NOT NULL COMMENT "用户id", - `date` DATE NOT NULL COMMENT "数据灌入日期时间", - `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间", - `city` VARCHAR(20) COMMENT "用户所在城市", - `age` SMALLINT COMMENT "用户年龄", - `sex` TINYINT COMMENT "用户性别", - `bool_col` boolean COMMENT "", - `int_col` int COMMENT "", - `bigint_col` bigint COMMENT "", - `float_col` float COMMENT "", - `double_col` double COMMENT "", - `char_col` CHAR(10) COMMENT "", - `decimal_col` decimal COMMENT "", - `ipv4_col` ipv4 COMMENT "", - `ipv6_col` ipv6 COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, float_col, double_col, char_col, decimal_col' - set 'strict_mode', 'true' - set 'format', 'csv_with_names_and_types' - set 'column_separator', ',' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } - } - - qt_select_load5 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ - + check_file_amounts.call("${outFilePath}", 0) } finally { - try_sql("DROP TABLE IF EXISTS ${table_load_name}") delete_files.call("${outFilePath}") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org