This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 93775369983 branch-3.0: [fix](Export) modify some cases of export 
feature #47976 (#48061)
93775369983 is described below

commit 937753699836c3da3a60e21e2811b220d86d9a7d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 24 11:33:14 2025 +0800

    branch-3.0: [fix](Export) modify some cases of export feature #47976 
(#48061)
    
    Cherry-picked from #47976
    
    Co-authored-by: Tiewei Fang <fangtie...@selectdb.com>
---
 .../data/export_p0/test_export_basic.out           | Bin 7131 -> 7131 bytes
 .../org/apache/doris/regression/suite/Suite.groovy |  56 ++++
 .../suites/export_p0/test_export_basic.groovy      | 304 +++++++-----------
 .../suites/export_p0/test_export_csv.groovy        | 208 +++++--------
 .../export_p0/test_export_data_consistency.groovy  |  69 ++---
 .../export_p0/test_export_empty_table.groovy       | 142 ++++++---
 .../test_export_table_with_label_retry.groovy      |  80 ++---
 .../suites/export_p0/test_export_view.groovy       | 327 ++++++++------------
 .../export/test_export_external_table.groovy       | 341 ++++++++-------------
 9 files changed, 665 insertions(+), 862 deletions(-)

diff --git a/regression-test/data/export_p0/test_export_basic.out 
b/regression-test/data/export_p0/test_export_basic.out
index 52aa765fc33..e9614760abd 100644
Binary files a/regression-test/data/export_p0/test_export_basic.out and 
b/regression-test/data/export_p0/test_export_basic.out differ
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 111c472a364..384c94c4bfb 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1054,6 +1054,62 @@ class Suite implements GroovyInterceptable {
         Assert.assertEquals(0, code)
     }
 
+    void mkdirRemotePathOnAllBE(String username, String path) {
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+
+        def executeCommand = { String cmd, Boolean mustSuc ->
+            try {
+                staticLogger.info("execute ${cmd}")
+                def proc = new ProcessBuilder("/bin/bash", "-c", 
cmd).redirectErrorStream(true).start()
+                int exitcode = proc.waitFor()
+                if (exitcode != 0) {
+                    staticLogger.info("exit code: ${exitcode}, output\n: 
${proc.text}")
+                    if (mustSuc == true) {
+                        Assert.assertEquals(0, exitcode)
+                    }
+                }
+            } catch (IOException e) {
+                Assert.assertTrue(false, "execute timeout")
+            }
+        }
+
+        ipList.each { beid, ip ->
+            String cmd = "ssh -o StrictHostKeyChecking=no ${username}@${ip} 
\"mkdir -p ${path}\""
+            logger.info("Execute: ${cmd}".toString())
+            executeCommand(cmd, false)
+        }
+    }
+
+    void deleteRemotePathOnAllBE(String username, String path) {
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+
+        def executeCommand = { String cmd, Boolean mustSuc ->
+            try {
+                staticLogger.info("execute ${cmd}")
+                def proc = new ProcessBuilder("/bin/bash", "-c", 
cmd).redirectErrorStream(true).start()
+                int exitcode = proc.waitFor()
+                if (exitcode != 0) {
+                    staticLogger.info("exit code: ${exitcode}, output\n: 
${proc.text}")
+                    if (mustSuc == true) {
+                        Assert.assertEquals(0, exitcode)
+                    }
+                }
+            } catch (IOException e) {
+                Assert.assertTrue(false, "execute timeout")
+            }
+        }
+
+        ipList.each { beid, ip ->
+            String cmd = "ssh -o StrictHostKeyChecking=no ${username}@${ip} 
\"rm -r ${path}\""
+            logger.info("Execute: ${cmd}".toString())
+            executeCommand(cmd, false)
+        }
+    }
+
     String cmd(String cmd, int timeoutSecond = 0) {
         var processBuilder = new ProcessBuilder()
         processBuilder.command("/bin/bash", "-c", cmd)
diff --git a/regression-test/suites/export_p0/test_export_basic.groovy 
b/regression-test/suites/export_p0/test_export_basic.groovy
index 152f1ab4e6e..19261c55e01 100644
--- a/regression-test/suites/export_p0/test_export_basic.groovy
+++ b/regression-test/suites/export_p0/test_export_basic.groovy
@@ -59,7 +59,8 @@ suite("test_export_basic", "p0") {
     
     def table_export_name = "test_export_basic"
     def table_load_name = "test_load_basic"
-    def outfile_path_prefix = """/tmp/test_export"""
+    def outfile_path_prefix = """/tmp/test_export_basic"""
+    def local_tvf_prefix = "tmp/test_export_basic"
 
     // create table and insert
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -96,29 +97,13 @@ suite("test_export_basic", "p0") {
     qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
 
 
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-    }
-
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def waiting_export = { the_db, export_label ->
@@ -152,7 +137,7 @@ suite("test_export_basic", "p0") {
 
     // 1. basic test
     def uuid = UUID.randomUUID().toString()
-    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     def label = "label_${uuid}"
     try {
         // check export path
@@ -169,9 +154,6 @@ suite("test_export_basic", "p0") {
             );
         """
         waiting_export.call(db, label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -184,28 +166,23 @@ suite("test_export_basic", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'id, Name, age'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(150, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -215,9 +192,9 @@ suite("test_export_basic", "p0") {
         delete_files.call("${outFilePath}")
     }
 
-    // 2. test patition1
+    // 2. test partition1
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     label = "label_${uuid}"
     try {
         // check export path
@@ -235,9 +212,6 @@ suite("test_export_basic", "p0") {
             );
         """
         waiting_export.call(db, label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -250,28 +224,23 @@ suite("test_export_basic", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'id, Name, age'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(19, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -281,9 +250,9 @@ suite("test_export_basic", "p0") {
         delete_files.call("${outFilePath}")
     }
 
-    // 3. test patition2
+    // 3. test partition2
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     label = "label_${uuid}"
     try {
         // check export path
@@ -301,9 +270,6 @@ suite("test_export_basic", "p0") {
             );
         """
         waiting_export.call(db, label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -316,28 +282,23 @@ suite("test_export_basic", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'id, Name, age'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(50, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -347,9 +308,9 @@ suite("test_export_basic", "p0") {
         delete_files.call("${outFilePath}")
     }
 
-     // 4. test patition3 and where clause
+    // 4. test partition3 and where clause
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     label = "label_${uuid}"
     try {
         // check export path
@@ -367,9 +328,6 @@ suite("test_export_basic", "p0") {
             );
         """
         waiting_export.call(db, label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -382,31 +340,26 @@ suite("test_export_basic", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'id, Name, age'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(50, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
-        qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
+        qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
     
     } finally {
         try_sql("DROP TABLE IF EXISTS ${table_load_name}")
@@ -415,7 +368,7 @@ suite("test_export_basic", "p0") {
 
     // 5. test order by and limit clause
     def uuid1 = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid1}"""
+    outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     def label1 = "label_${uuid1}"
     def uuid2 = UUID.randomUUID().toString()
     def label2 = "label_${uuid2}"
@@ -447,9 +400,6 @@ suite("test_export_basic", "p0") {
         waiting_export.call(db, label1)
         waiting_export.call(db, label2)
 
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 2)
-
         // check show export correctness
         def res = sql """ show export where STATE = "FINISHED" order by JobId 
desc limit 2"""
         assertTrue(res[0][0] > res[1][0])
@@ -461,7 +411,7 @@ suite("test_export_basic", "p0") {
 
     // 6. test columns property
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     label = "label_${uuid}"
     try {
         // check export path
@@ -480,9 +430,6 @@ suite("test_export_basic", "p0") {
             );
         """
         waiting_export.call(db, label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -494,28 +441,23 @@ suite("test_export_basic", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'id, Name'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(67, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name} (id, Name)
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load6 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -527,7 +469,7 @@ suite("test_export_basic", "p0") {
 
     // 7. test columns property 2
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
     label = "label_${uuid}"
     try {
         // check export path
@@ -546,9 +488,6 @@ suite("test_export_basic", "p0") {
             );
         """
         waiting_export.call(db, label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
@@ -559,28 +498,23 @@ suite("test_export_basic", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'id'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(67, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load7 """ SELECT * FROM ${table_load_name} t ORDER BY id; """
@@ -599,7 +533,7 @@ suite("test_export_basic", "p0") {
 
         // 1. first export
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}"""
         label = "label_${uuid}"
         // check export path
         check_path_exists.call("${outFilePath}")
diff --git a/regression-test/suites/export_p0/test_export_csv.groovy 
b/regression-test/suites/export_p0/test_export_csv.groovy
index 93e894a4f65..09d06996b7f 100644
--- a/regression-test/suites/export_p0/test_export_csv.groovy
+++ b/regression-test/suites/export_p0/test_export_csv.groovy
@@ -56,7 +56,8 @@ suite("test_export_csv", "p0") {
 
     def table_export_name = "test_export_csv"
     def table_load_name = "test_load_csv"
-    def outfile_path_prefix = """/tmp/test_export"""
+    def outfile_path_prefix = """/tmp/test_export_csv"""
+    def local_tvf_prefix = "tmp/test_export_csv"
 
     // create table and insert
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -98,30 +99,13 @@ suite("test_export_csv", "p0") {
     logger.info("insert result: " + insert_res.toString())
     qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY 
user_id; """
 
-
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-    }
-
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def waiting_export = { export_label ->
@@ -140,7 +124,7 @@ suite("test_export_csv", "p0") {
 
     // 1. test more type
     def uuid = UUID.randomUUID().toString()
-    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
     def label = "label_${uuid}"
     try {
         // check export path
@@ -157,9 +141,6 @@ suite("test_export_csv", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
         sql """
@@ -184,28 +165,22 @@ suite("test_export_csv", "p0") {
             DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, largeint_col, float_col, double_col, char_col, 
decimal_col, ipv4_col, ipv6_col'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(100, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");         
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
@@ -218,7 +193,7 @@ suite("test_export_csv", "p0") {
 
     // 2. test csv column_separator and line_delimiter
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -236,9 +211,6 @@ suite("test_export_csv", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
         sql """
@@ -263,29 +235,23 @@ suite("test_export_csv", "p0") {
             DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', 'ab'
-            set 'line_delimiter', 'cc'
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, largeint_col, float_col, double_col, char_col, 
decimal_col, ipv4_col, ipv6_col'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(10, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = "ab",
+                    "line_delimiter" = "cc");         
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
@@ -297,7 +263,7 @@ suite("test_export_csv", "p0") {
 
     // 3. test csv_with_names
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -315,9 +281,6 @@ suite("test_export_csv", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
         sql """
@@ -342,30 +305,24 @@ suite("test_export_csv", "p0") {
             DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', 'ab'
-            set 'line_delimiter', 'cc'
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, largeint_col, float_col, double_col, char_col, 
decimal_col, ipv4_col, ipv6_col'
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(10, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+           logger.info("Begin to insert into ${table_load_name} from local()")
+           sql """
+                insert into ${table_load_name}
+                select *
+                from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv_with_names",
+                    "line_delimiter" = "cc",
+                    "column_separator" = "ab");         
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
@@ -377,7 +334,7 @@ suite("test_export_csv", "p0") {
 
     // 4. test csv_with_names_and_types
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -395,9 +352,6 @@ suite("test_export_csv", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
         sql """
@@ -422,30 +376,24 @@ suite("test_export_csv", "p0") {
             DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', 'ab'
-            set 'line_delimiter', 'cc'
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, largeint_col, float_col, double_col, char_col, 
decimal_col, ipv4_col, ipv6_col'
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names_and_types'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(10, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+           logger.info("Begin to insert into ${table_load_name} from local()")
+           sql """
+                insert into ${table_load_name}
+                select *
+                from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv_with_names_and_types",
+                    "line_delimiter" = "cc",
+                    "column_separator" = "ab");         
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
diff --git 
a/regression-test/suites/export_p0/test_export_data_consistency.groovy 
b/regression-test/suites/export_p0/test_export_data_consistency.groovy
index cd19b082802..e1d4ba16385 100644
--- a/regression-test/suites/export_p0/test_export_data_consistency.groovy
+++ b/regression-test/suites/export_p0/test_export_data_consistency.groovy
@@ -59,6 +59,7 @@ suite("test_export_data_consistency", "p0") {
     def table_export_name = "test_export_data_consistency"
     def table_load_name = "test_load_data_consistency"
     def outfile_path_prefix = """/tmp/test_export_data_consistency"""
+    def local_tvf_prefix = "tmp/test_export_data_consistency"
 
     // create table and insert
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -96,29 +97,13 @@ suite("test_export_data_consistency", "p0") {
     qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """
 
 
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-    }
-
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def waiting_export = { the_db, export_label ->
@@ -137,7 +122,7 @@ suite("test_export_data_consistency", "p0") {
 
     // 1. basic test
     def uuid = UUID.randomUUID().toString()
-    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
     def label = "label_${uuid}"
     try {
         // check export path
@@ -168,9 +153,6 @@ suite("test_export_data_consistency", "p0") {
         // wait export
         waiting_export.call(db, label)
 
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 3)
-
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
         sql """
@@ -182,29 +164,22 @@ suite("test_export_data_consistency", "p0") {
             DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        for (exportLoadFile in files) {
-            String file_path = exportLoadFile.getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'column_separator', ','
-                set 'columns', 'id, name, age'
-                set 'strict_mode', 'true'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(0, json.NumberFilteredRows)
-                }
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+           logger.info("Begin to insert into ${table_load_name} from local()")
+           sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");         
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         // The partition ranges are:
@@ -212,7 +187,7 @@ suite("test_export_data_consistency", "p0") {
         // The export task should keep partition consistency.
         def result = sql """ SELECT * FROM ${table_load_name} t WHERE id in 
(10,15,30,40,80,90) ORDER BY id; """
         logger.info("result ${result}")
-        assert result.size == 6
+        assert result.size() == 6
         if (result[0][1] == 'test') {
             assert result[1][1] == 'test'
         } else {
diff --git a/regression-test/suites/export_p0/test_export_empty_table.groovy 
b/regression-test/suites/export_p0/test_export_empty_table.groovy
index 584c65d73bc..bd2b4ad09d1 100644
--- a/regression-test/suites/export_p0/test_export_empty_table.groovy
+++ b/regression-test/suites/export_p0/test_export_empty_table.groovy
@@ -56,8 +56,8 @@ suite("test_export_empty_table", "p0") {
     }
 
     def table_export_name = "test_export_empty_table"
-    def table_load_name = "test_load_empty_table"
-    def outfile_path_prefix = """/tmp/test_export"""
+    def outfile_path_prefix = """/tmp/test_export_empty_table"""
+    def local_tvf_prefix = "tmp/test_export_empty_table"
 
     // create table
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -84,29 +84,13 @@ suite("test_export_empty_table", "p0") {
    
     qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY 
user_id; """
 
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-    }
-
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def waiting_export = { export_label ->
@@ -125,7 +109,7 @@ suite("test_export_empty_table", "p0") {
 
     // 1. test csv
     def uuid = UUID.randomUUID().toString()
-    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
     def label = "label_${uuid}"
     try {
         // check export path
@@ -142,8 +126,25 @@ suite("test_export_empty_table", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 0)
+        // use local() tvf to check
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            test {
+                sql """
+                    select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");
+                """
+
+                // because we export the empty data,
+                // there is no files.
+                exception "get file list from backend failed."
+            }
+        }
     } finally {
         delete_files.call("${outFilePath}")
     }
@@ -151,7 +152,7 @@ suite("test_export_empty_table", "p0") {
 
     // 2. test parquet
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -167,15 +168,32 @@ suite("test_export_empty_table", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 0)
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            test {
+                sql """
+                    select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");
+                """
+
+                // because we export the empty data,
+                // there is no files.
+                exception "get file list from backend failed."
+            }
+         }
     } finally {
         delete_files.call("${outFilePath}")
     }
 
     // 3. test orc
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -191,15 +209,32 @@ suite("test_export_empty_table", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 0)
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            test {
+                sql """
+                    select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");
+                """
+
+                // because we export the empty data,
+                // there is no files.
+                exception "get file list from backend failed."
+            }
+        }
     } finally {
         delete_files.call("${outFilePath}")
     }
 
     // 4. test csv_with_names
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -216,17 +251,33 @@ suite("test_export_empty_table", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 0)
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            test {
+                sql """
+                    select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");
+                """
+
+                // because we export the empty data,
+                // there is no files.
+                exception "get file list from backend failed."
+            }
+        }
     } finally {
-        try_sql("DROP TABLE IF EXISTS ${table_load_name}")
         delete_files.call("${outFilePath}")
     }
 
 
     // 5. test csv_with_names_and_types
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -243,8 +294,25 @@ suite("test_export_empty_table", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 0)
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            test {
+                sql """
+                    select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ",");
+                """
+
+                // because we export the empty data,
+                // there is no files.
+                exception "get file list from backend failed."
+            }
+        }
     } finally {
         delete_files.call("${outFilePath}")
     }
diff --git 
a/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy 
b/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
index f4c64eb4e94..e6ac6684b5d 100644
--- a/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
+++ b/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy
@@ -54,10 +54,11 @@ suite("test_export_table_with_label_retry", "p0") {
         return
     }
 
-    def table_export_name = "test_export_label"
-    def table_load_name = "test_load_label"
-    def wrong_outfile_path_prefix = """tmp/test_export"""
-    def outfile_path_prefix = """/tmp/test_export"""
+    def table_export_name = "test_export_table_with_label_retry"
+    def table_load_name = "test_load_table_with_label_retry"
+    def wrong_outfile_path_prefix = 
"""mnt/disk2/ftw/projects/doris/output/be"""
+    def outfile_path_prefix = """/tmp/test_export_table_with_label_retry"""
+    def local_tvf_prefix = "tmp/test_export_table_with_label_retry"
 
     // create table and insert
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -99,30 +100,13 @@ suite("test_export_table_with_label_retry", "p0") {
     logger.info("insert result: " + insert_res.toString())
     qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY 
user_id; """
 
-
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-    }
-
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
 
     def waiting_export_expect_failed = { export_label ->
@@ -155,8 +139,8 @@ suite("test_export_table_with_label_retry", "p0") {
     }
 
     def uuid = UUID.randomUUID().toString()
-    def outFilePath = """${outfile_path_prefix}_${uuid}"""
-    def wrongFilePath = """${wrong_outfile_path_prefix}_${uuid}"""
+    def outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
+    def wrongFilePath = "${wrong_outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
     def label = "label_${uuid}"
     try {
         // check export path
@@ -173,9 +157,6 @@ suite("test_export_table_with_label_retry", "p0") {
         """
         waiting_export_expect_failed.call(label)
 
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 0)
-
         // exec right export with same label again
         sql """
             EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
@@ -188,9 +169,6 @@ suite("test_export_table_with_label_retry", "p0") {
 
         waiting_export_expect_success.call(label)
 
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         sql """ DROP TABLE IF EXISTS ${table_load_name} """
         sql """
@@ -215,28 +193,22 @@ suite("test_export_table_with_label_retry", "p0") {
             DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
         """
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, 
int_col, bigint_col, largeint_col, float_col, double_col, char_col, 
decimal_col, ipv4_col, ipv6_col'
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(100, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "column_separator" = ",",
+                    "format" = "csv");         
+                """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY 
user_id; """
diff --git a/regression-test/suites/export_p0/test_export_view.groovy 
b/regression-test/suites/export_p0/test_export_view.groovy
index dfd020b02fa..0a7f8245a11 100644
--- a/regression-test/suites/export_p0/test_export_view.groovy
+++ b/regression-test/suites/export_p0/test_export_view.groovy
@@ -54,27 +54,12 @@ suite("test_export_view", "p0") {
         return
     }
 
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-    }
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
     def waiting_export = { export_label ->
         while (true) {
@@ -106,7 +91,8 @@ suite("test_export_view", "p0") {
     def table_export_name = "test_export_base_table"
     def table_export_view_name = "test_export_view_table"
     def table_load_name = "test_load_view_basic"
-    def outfile_path_prefix = """/tmp/test_export"""
+    def outfile_path_prefix = """/tmp/test_export_view"""
+    def local_tvf_prefix = "tmp/test_export_view"
 
     // create table and insert
     sql """ DROP TABLE IF EXISTS ${table_export_name} """
@@ -175,7 +161,7 @@ suite("test_export_view", "p0") {
 
     // 1. basic test
     def uuid = UUID.randomUUID().toString()
-    def outFilePath = """${outfile_path_prefix}_${uuid}"""
+    def outFilePath = 
"${outfile_path_prefix}/${table_export_view_name}_${uuid}"
     def label = "label_${uuid}"
     try {
         // check export path
@@ -192,33 +178,27 @@ suite("test_export_view", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'strict_mode', 'true'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(5, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "csv",
+                    "column_separator" = ","
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load1 """ SELECT * FROM ${table_load_name} t; """
@@ -230,7 +210,7 @@ suite("test_export_view", "p0") {
 
     // 2. test csv_with_names
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + 
"/${table_export_view_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -249,34 +229,27 @@ suite("test_export_view", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(5, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "column_separator" = ",",
+                    "format" = "csv_with_names"
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load2 """ SELECT * FROM ${table_load_name} t; """
@@ -289,7 +262,7 @@ suite("test_export_view", "p0") {
     
     // 3. test where clause
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + 
"/${table_export_view_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -309,34 +282,27 @@ suite("test_export_view", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
 
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(2, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "column_separator" = ",",
+                    "format" = "csv_with_names"
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load3 """ SELECT * FROM ${table_load_name}; """
@@ -349,7 +315,7 @@ suite("test_export_view", "p0") {
 
     // 4. test where clause and columns property
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + 
"/${table_export_view_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -371,35 +337,26 @@ suite("test_export_view", "p0") {
 
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'column_separator', ','
-            set 'strict_mode', 'true'
-            set 'columns', 'k3, s1, k1'
-            set 'format', 'csv_with_names'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(2, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select s1,k1, null as k2,k3 from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "column_separator" = ",",
+                    "format" = "csv_with_names"
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load4 """ SELECT * FROM ${table_load_name} t; """
@@ -412,7 +369,7 @@ suite("test_export_view", "p0") {
 
     // 5. test csv_with_names_and_types
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + 
"/${table_export_view_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -431,34 +388,26 @@ suite("test_export_view", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'strict_mode', 'true'
-            set 'format', 'csv_with_names_and_types'
-            set 'column_separator', ','
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(5, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "column_separator" = ",",
+                    "format" = "csv_with_names_and_types"
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load5 """ SELECT * FROM ${table_load_name} t; """
@@ -471,7 +420,7 @@ suite("test_export_view", "p0") {
 
     // 6. test orc type
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + 
"/${table_export_view_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -488,33 +437,25 @@ suite("test_export_view", "p0") {
         """
         waiting_export.call(label)
         
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'strict_mode', 'true'
-            set 'format', 'orc'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(5, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select * from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "orc"
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load6 """ SELECT * FROM ${table_load_name} t; """
@@ -527,7 +468,7 @@ suite("test_export_view", "p0") {
 
     // 8. test orc type, where clause and columns property
     uuid = UUID.randomUUID().toString()
-    outFilePath = """${outfile_path_prefix}_${uuid}"""
+    outFilePath = "${outfile_path_prefix}" + 
"/${table_export_view_name}_${uuid}"
     label = "label_${uuid}"
     try {
         // check export path
@@ -547,35 +488,25 @@ suite("test_export_view", "p0") {
         """
 
         waiting_export.call(label)
-        
-        // check file amounts
-        check_file_amounts.call("${outFilePath}", 1)
-
         // check data correctness
         create_load_table(table_load_name)
 
-        File[] files = new File("${outFilePath}").listFiles()
-        String file_path = files[0].getAbsolutePath()
-        streamLoad {
-            table "${table_load_name}"
-
-            set 'strict_mode', 'true'
-            set 'columns', 'k3, s1, k1'
-            set 'format', 'orc'
-
-            file "${file_path}"
-            time 10000 // limit inflight 10s
-
-            check { result, exception, startTime, endTime ->
-                if (exception != null) {
-                    throw exception
-                }
-                log.info("Stream load result: ${result}".toString())
-                def json = parseJson(result)
-                assertEquals("success", json.Status.toLowerCase())
-                assertEquals(2, json.NumberTotalRows)
-                assertEquals(0, json.NumberFilteredRows)
-            }
+        // use local() tvf to reload the data
+        def ipList = [:]
+        def portList = [:]
+        getBackendIpHeartbeatPort(ipList, portList)
+        ipList.each { beid, ip ->
+            logger.info("Begin to insert into ${table_load_name} from local()")
+            sql """
+                insert into ${table_load_name}
+                select s1,k1, null as k2,k3 from local(
+                    "file_path" = 
"${local_tvf_prefix}/${table_export_view_name}_${uuid}/*",
+                    "backend_id" = "${beid}",
+                    "format" = "orc"
+                );         
+            """ 
+            insert_res = sql "show last insert;"
+            logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
         }
 
         order_qt_select_load8 """ SELECT * FROM ${table_load_name} t; """
diff --git 
a/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
 
b/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
index 8125db28bf1..2b2d41e2b4d 100644
--- 
a/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
+++ 
b/regression-test/suites/external_table_p0/export/test_export_external_table.groovy
@@ -54,37 +54,12 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
         return
     }
 
+    def machine_user_name = "root"
     def check_path_exists = { dir_path ->
-        List<List<Object>> backends =  sql """ show backends """
-        assertTrue(backends.size() > 0)
-        File path = new File(dir_path)
-        if (!path.exists()) {
-            assert path.mkdirs()
-        } else {
-            throw new IllegalStateException("""${dir_path} already exists! """)
-        }
-        if (backends.size() > 1) {
-            for (List<Object> backend : backends) {
-                def be_host = backend[1]
-                def cmd="""mkdir -p ${dir_path}"""
-                sshExec("root", be_host, cmd.toString())
-            }
-        }
-
-    }
-    def check_file_amounts = { dir_path, amount ->
-        File path = new File(dir_path)
-        File[] files = path.listFiles()
-        assert files.length == amount
+        mkdirRemotePathOnAllBE(machine_user_name, dir_path)
     }
     def delete_files = { dir_path ->
-        File path = new File(dir_path)
-        if (path.exists()) {
-            for (File f: path.listFiles()) {
-                f.delete();
-            }
-            path.delete();
-        }
+        deleteRemotePathOnAllBE(machine_user_name, dir_path)
     }
     def waiting_export = { ctlName, dbName, export_label ->
         while (true) {
@@ -99,12 +74,12 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
             }
         }
     }
-    
 
     // this table name must be `test1`, because this is an external table.
     def table_export_name = "test1"
     def table_load_name = "test_load_external__basic"
-    def outfile_path_prefix = """/tmp/test_export"""
+    def outfile_path_prefix = """/tmp/test_export_external_table"""
+    def local_tvf_prefix = "tmp/test_export_external_table"
 
     String enabled = context.config.otherConfigs.get("enableJdbcTest")
     String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
@@ -163,7 +138,7 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
 
         // 1. basic test
         def uuid = UUID.randomUUID().toString()
-        def outFilePath = """${outfile_path_prefix}_${uuid}"""
+        def outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}" 
         def label = "label_${uuid}"
         try {
             // check export path
@@ -180,34 +155,27 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
                 );
             """
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'column_separator', ','
-                set 'strict_mode', 'true'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(100, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into 
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into  
internal.${internal_db_name}.${table_load_name}
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "format" = "csv",
+                        "column_separator" = ","
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load1 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -218,7 +186,7 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
 
         // 2. export external table under internal catalog
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}" 
         label = "label_${uuid}"
         try {
             // check export path
@@ -237,34 +205,27 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
             """
 
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'column_separator', ','
-                set 'strict_mode', 'true'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(100, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into 
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into  
internal.${internal_db_name}.${table_load_name}
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "format" = "csv",
+                        "column_separator" = ","
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load2 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -276,7 +237,7 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
         sql """ switch ${catalog_name} """
         // 3. csv_with_names
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
         label = "label_${uuid}"
         try {
             // check export path
@@ -293,35 +254,27 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
                 );
             """
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'column_separator', ','
-                set 'strict_mode', 'true'
-                set 'format', 'csv_with_names'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(30, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into 
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into  
internal.${internal_db_name}.${table_load_name}
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "column_separator" = ",",
+                        "format" = "csv_with_names"
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load3 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -333,7 +286,7 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
 
         // 4. csv_with_names_and_types
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
         label = "label_${uuid}"
         try {
             // check export path
@@ -350,35 +303,27 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
                 );
             """
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'column_separator', ','
-                set 'strict_mode', 'true'
-                set 'format', 'csv_with_names_and_types'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(30, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into  
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into  
internal.${internal_db_name}.${table_load_name}
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "column_separator" = ",",
+                        "format" = "csv_with_names_and_types"
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load4 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -390,7 +335,7 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
 
         // 5. orc
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
         label = "label_${uuid}"
         try {
             // check export path
@@ -406,34 +351,26 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
                 );
             """
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'strict_mode', 'true'
-                set 'format', 'orc'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(30, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into  
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into  
internal.${internal_db_name}.${table_load_name}
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "format" = "orc"
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load5 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -442,10 +379,9 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
             delete_files.call("${outFilePath}")
         }
 
-
-        // 5. parquet
+        // 6. parquet
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
         label = "label_${uuid}"
         try {
             // check export path
@@ -461,34 +397,26 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
                 );
             """
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'strict_mode', 'true'
-                set 'format', 'parquet'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(30, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into  
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into internal.${internal_db_name}.${table_load_name}
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "format" = "parquet"
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load6 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """
@@ -500,7 +428,7 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
 
         // 7. test columns property
         uuid = UUID.randomUUID().toString()
-        outFilePath = """${outfile_path_prefix}_${uuid}"""
+        outFilePath = "${outfile_path_prefix}" + 
"/${table_export_name}_${uuid}"
         label = "label_${uuid}"
         try {
             // check export path
@@ -518,36 +446,27 @@ suite("test_export_external_table", 
"p0,external,mysql,external_docker,external_
                 );
             """
             waiting_export.call(catalog_name, ex_db_name, label)
-            
-            // check file amounts
-            check_file_amounts.call("${outFilePath}", 1)
 
             // check data correctness
             create_load_table(table_load_name)
 
-            File[] files = new File("${outFilePath}").listFiles()
-            String file_path = files[0].getAbsolutePath()
-            streamLoad {
-                table "${table_load_name}"
-
-                set 'column_separator', ','
-                set 'strict_mode', 'true'
-                set 'format', 'csv_with_names'
-                set 'columns', 'k8, k1, k5, k3, k7'
-
-                file "${file_path}"
-                time 10000 // limit inflight 10s
-
-                check { result, exception, startTime, endTime ->
-                    if (exception != null) {
-                        throw exception
-                    }
-                    log.info("Stream load result: ${result}".toString())
-                    def json = parseJson(result)
-                    assertEquals("success", json.Status.toLowerCase())
-                    assertEquals(30, json.NumberTotalRows)
-                    assertEquals(0, json.NumberFilteredRows)
-                }
+            // use local() tvf to reload the data
+            def ipList = [:]
+            def portList = [:]
+            getBackendIpHeartbeatPort(ipList, portList)
+            ipList.each { beid, ip ->
+                logger.info("Begin to insert into 
internal.${internal_db_name}.${table_load_name} from local()")
+                sql """
+                    insert into 
internal.${internal_db_name}.${table_load_name} (k8, k1, k5, k3, k7)
+                    select * from local(
+                        "file_path" = 
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+                        "backend_id" = "${beid}",
+                        "format" = "csv_with_names",
+                        "column_separator" = ","
+                    );         
+                """ 
+                def insert_res = sql "show last insert;"
+                logger.info("insert from local(), BE id = ${beid}, result: " + 
insert_res.toString())
             }
 
             order_qt_select_load7 """ SELECT * FROM 
internal.${internal_db_name}.${table_load_name} order by k8; """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to