This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch export
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b806a49eb9c9beb37e968b57234910c2488ed944
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Tue May 28 16:38:29 2024 +0800

    [enhancement](export) filter empty partition before export table to remote 
storage (#35389)
    
    filter empty partition before export table to remote storage
    Co-authored-by: caiconghui1 <caicongh...@jd.com>
---
 .../main/java/org/apache/doris/load/ExportJob.java |  32 ++-
 .../export_p0/test_export_empty_table.groovy       | 269 +--------------------
 2 files changed, 28 insertions(+), 273 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index d0ccf23ae0a..b072ed2f2fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -407,6 +407,13 @@ public class ExportJob implements Writable {
             ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, 
this);
             jobExecutorList.add(executor);
         }
+
+        // add empty task to make export job could be finished finally if 
jobExecutorList is empty
+        // which means that export table without data
+        if (jobExecutorList.isEmpty()) {
+            ExportTaskExecutor executor = new 
ExportTaskExecutor(Lists.newArrayList(), this);
+            jobExecutorList.add(executor);
+        }
     }
 
     /**
@@ -511,15 +518,23 @@ public class ExportJob implements Writable {
             // get partitions
             // user specifies partitions, already checked in ExportCommand
             if (!this.partitionNames.isEmpty()) {
-                this.partitionNames.forEach(partitionName -> 
partitions.add(table.getPartition(partitionName)));
+                this.partitionNames.forEach(partitionName -> {
+                    Partition partition = table.getPartition(partitionName);
+                    if (partition.hasData()) {
+                        partitions.add(partition);
+                    }
+                });
             } else {
-                if (table.getPartitions().size() > 
Config.maximum_number_of_export_partitions) {
-                    throw new UserException("The partitions number of this 
export job is larger than the maximum number"
-                            + " of partitions allowed by a export job");
-                }
-                partitions.addAll(table.getPartitions());
+                table.getPartitions().forEach(partition -> {
+                    if (partition.hasData()) {
+                        partitions.add(partition);
+                    }
+                });
+            }
+            if (partitions.size() > 
Config.maximum_number_of_export_partitions) {
+                throw new UserException("The partitions number of this export 
job is larger than the maximum number"
+                        + " of partitions allowed by a export job");
             }
-
             // get tablets
             for (Partition partition : partitions) {
                 // Partition data consistency is not need to verify partition 
version.
@@ -589,8 +604,7 @@ public class ExportJob implements Writable {
             List<Long> tabletsList = new 
ArrayList<>(flatTabletIdList.subList(start, start + tabletsNum));
             List<List<Long>> tablets = new ArrayList<>();
             for (int i = 0; i < tabletsList.size(); i += 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
-                int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < 
tabletsList.size()
-                        ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : 
tabletsList.size();
+                int end = Math.min(i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT, 
tabletsList.size());
                 tablets.add(new ArrayList<>(tabletsList.subList(i, end)));
             }
 
diff --git a/regression-test/suites/export_p0/test_export_empty_table.groovy 
b/regression-test/suites/export_p0/test_export_empty_table.groovy
index f70ff97b38b..584c65d73bc 100644
--- a/regression-test/suites/export_p0/test_export_empty_table.groovy
+++ b/regression-test/suites/export_p0/test_export_empty_table.groovy
@@ -143,62 +143,8 @@ suite("test_export_empty_table", "p0") {
         waiting_export.call(label)
         
         // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
-        // check data correctness
-        sql """ DROP TABLE IF EXISTS ${table_load_name} """
-        sql """
-        CREATE TABLE IF NOT EXISTS ${table_load_name} (
-            `user_id` INT NOT NULL COMMENT "用户id",
-            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
-            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
-            `city` VARCHAR(20) COMMENT "用户所在城市",
-            `age` SMALLINT COMMENT "用户年龄",
-            `sex` TINYINT COMMENT "用户性别",
-            `bool_col` boolean COMMENT "",
-            `int_col` int COMMENT "",
-            `bigint_col` bigint COMMENT "",
-            `float_col` float COMMENT "",
-            `double_col` double COMMENT "",
-            `char_col` CHAR(10) COMMENT "",
-            `decimal_col` decimal COMMENT "",
-            `ipv4_col` ipv4 COMMENT "",
-            `ipv6_col` ipv6 COMMENT ""
-            )
-            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
-        """
-
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
-            set 'strict_mode', 'true'
-            set 'format', 'csv'
-            set 'column_separator', ','
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(0, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
-        }
-
-        sql """ sync; """
-
-        qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
-    
+        check_file_amounts.call("${outFilePath}", 0)
     } finally {
-        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
         delete_files.call("${outFilePath}")
     }
 
@@ -222,59 +168,8 @@ suite("test_export_empty_table", "p0") {
         waiting_export.call(label)
         
         // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
-        // check data correctness
-        sql """ DROP TABLE IF EXISTS ${table_load_name} """
-        sql """
-        CREATE TABLE IF NOT EXISTS ${table_load_name} (
-            `user_id` INT NOT NULL COMMENT "用户id",
-            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
-            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
-            `city` VARCHAR(20) COMMENT "用户所在城市",
-            `age` SMALLINT COMMENT "用户年龄",
-            `sex` TINYINT COMMENT "用户性别",
-            `bool_col` boolean COMMENT "",
-            `int_col` int COMMENT "",
-            `bigint_col` bigint COMMENT "",
-            `float_col` float COMMENT "",
-            `double_col` double COMMENT "",
-            `char_col` CHAR(10) COMMENT "",
-            `decimal_col` decimal COMMENT "",
-            `ipv4_col` ipv4 COMMENT "",
-            `ipv6_col` ipv6 COMMENT ""
-            )
-            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
-        """
-
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
-            set 'strict_mode', 'true'
-            set 'format', 'parquet'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(0, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
-        }
-
-        qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
-    
+        check_file_amounts.call("${outFilePath}", 0)
     } finally {
-        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
         delete_files.call("${outFilePath}")
     }
 
@@ -297,59 +192,8 @@ suite("test_export_empty_table", "p0") {
         waiting_export.call(label)
         
         // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
-        // check data correctness
-        sql """ DROP TABLE IF EXISTS ${table_load_name} """
-        sql """
-        CREATE TABLE IF NOT EXISTS ${table_load_name} (
-            `user_id` INT NOT NULL COMMENT "用户id",
-            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
-            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
-            `city` VARCHAR(20) COMMENT "用户所在城市",
-            `age` SMALLINT COMMENT "用户年龄",
-            `sex` TINYINT COMMENT "用户性别",
-            `bool_col` boolean COMMENT "",
-            `int_col` int COMMENT "",
-            `bigint_col` bigint COMMENT "",
-            `float_col` float COMMENT "",
-            `double_col` double COMMENT "",
-            `char_col` CHAR(10) COMMENT "",
-            `decimal_col` decimal COMMENT "",
-            `ipv4_col` ipv4 COMMENT "",
-            `ipv6_col` ipv6 COMMENT ""
-            )
-            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
-        """
-
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
-            set 'strict_mode', 'true'
-            set 'format', 'orc'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(0, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
-        }
-
-        qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
-    
+        check_file_amounts.call("${outFilePath}", 0)
     } finally {
-        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
         delete_files.call("${outFilePath}")
     }
 
@@ -373,58 +217,7 @@ suite("test_export_empty_table", "p0") {
         waiting_export.call(label)
         
         // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
-        // check data correctness
-        sql """ DROP TABLE IF EXISTS ${table_load_name} """
-        sql """
-        CREATE TABLE IF NOT EXISTS ${table_load_name} (
-            `user_id` INT NOT NULL COMMENT "用户id",
-            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
-            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
-            `city` VARCHAR(20) COMMENT "用户所在城市",
-            `age` SMALLINT COMMENT "用户年龄",
-            `sex` TINYINT COMMENT "用户性别",
-            `bool_col` boolean COMMENT "",
-            `int_col` int COMMENT "",
-            `bigint_col` bigint COMMENT "",
-            `float_col` float COMMENT "",
-            `double_col` double COMMENT "",
-            `char_col` CHAR(10) COMMENT "",
-            `decimal_col` decimal COMMENT "",
-            `ipv4_col` ipv4 COMMENT "",
-            `ipv6_col` ipv6 COMMENT ""
-            )
-            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
-        """
-
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names'
-            set 'column_separator', ','
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(0, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
-        }
-
-        qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
-    
+        check_file_amounts.call("${outFilePath}", 0)
     } finally {
         try_sql("DROP TABLE IF EXISTS ${table_load_name}")
         delete_files.call("${outFilePath}")
@@ -451,60 +244,8 @@ suite("test_export_empty_table", "p0") {
         waiting_export.call(label)
         
         // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
-        // check data correctness
-        sql """ DROP TABLE IF EXISTS ${table_load_name} """
-        sql """
-        CREATE TABLE IF NOT EXISTS ${table_load_name} (
-            `user_id` INT NOT NULL COMMENT "用户id",
-            `date` DATE NOT NULL COMMENT "数据灌入日期时间",
-            `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
-            `city` VARCHAR(20) COMMENT "用户所在城市",
-            `age` SMALLINT COMMENT "用户年龄",
-            `sex` TINYINT COMMENT "用户性别",
-            `bool_col` boolean COMMENT "",
-            `int_col` int COMMENT "",
-            `bigint_col` bigint COMMENT "",
-            `float_col` float COMMENT "",
-            `double_col` double COMMENT "",
-            `char_col` CHAR(10) COMMENT "",
-            `decimal_col` decimal COMMENT "",
-            `ipv4_col` ipv4 COMMENT "",
-            `ipv6_col` ipv6 COMMENT ""
-            )
-            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
-        """
-
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, float_col, double_col, char_col, decimal_col'
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names_and_types'
-            set 'column_separator', ','
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(0, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
-        }
-
-        qt_select_load5 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
-    
+        check_file_amounts.call("${outFilePath}", 0)
     } finally {
-        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
         delete_files.call("${outFilePath}")
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to