This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 93775369983 branch-3.0: [fix](Export) modify some cases of export feature #47976 (#48061) 93775369983 is described below commit 937753699836c3da3a60e21e2811b220d86d9a7d Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Mon Feb 24 11:33:14 2025 +0800 branch-3.0: [fix](Export) modify some cases of export feature #47976 (#48061) Cherry-picked from #47976 Co-authored-by: Tiewei Fang <fangtie...@selectdb.com> --- .../data/export_p0/test_export_basic.out | Bin 7131 -> 7131 bytes .../org/apache/doris/regression/suite/Suite.groovy | 56 ++++ .../suites/export_p0/test_export_basic.groovy | 304 +++++++----------- .../suites/export_p0/test_export_csv.groovy | 208 +++++-------- .../export_p0/test_export_data_consistency.groovy | 69 ++--- .../export_p0/test_export_empty_table.groovy | 142 ++++++--- .../test_export_table_with_label_retry.groovy | 80 ++--- .../suites/export_p0/test_export_view.groovy | 327 ++++++++------------ .../export/test_export_external_table.groovy | 341 ++++++++------------- 9 files changed, 665 insertions(+), 862 deletions(-) diff --git a/regression-test/data/export_p0/test_export_basic.out b/regression-test/data/export_p0/test_export_basic.out index 52aa765fc33..e9614760abd 100644 Binary files a/regression-test/data/export_p0/test_export_basic.out and b/regression-test/data/export_p0/test_export_basic.out differ diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 111c472a364..384c94c4bfb 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1054,6 +1054,62 @@ class Suite implements GroovyInterceptable { Assert.assertEquals(0, code) } + void mkdirRemotePathOnAllBE(String username, String path) { + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + staticLogger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + if (exitcode != 0) { + staticLogger.info("exit code: ${exitcode}, output\n: ${proc.text}") + if (mustSuc == true) { + Assert.assertEquals(0, exitcode) + } + } + } catch (IOException e) { + Assert.assertTrue(false, "execute timeout") + } + } + + ipList.each { beid, ip -> + String cmd = "ssh -o StrictHostKeyChecking=no ${username}@${ip} \"mkdir -p ${path}\"" + logger.info("Execute: ${cmd}".toString()) + executeCommand(cmd, false) + } + } + + void deleteRemotePathOnAllBE(String username, String path) { + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + + def executeCommand = { String cmd, Boolean mustSuc -> + try { + staticLogger.info("execute ${cmd}") + def proc = new ProcessBuilder("/bin/bash", "-c", cmd).redirectErrorStream(true).start() + int exitcode = proc.waitFor() + if (exitcode != 0) { + staticLogger.info("exit code: ${exitcode}, output\n: ${proc.text}") + if (mustSuc == true) { + Assert.assertEquals(0, exitcode) + } + } + } catch (IOException e) { + Assert.assertTrue(false, "execute timeout") + } + } + + ipList.each { beid, ip -> + String cmd = "ssh -o StrictHostKeyChecking=no ${username}@${ip} \"rm -r ${path}\"" + logger.info("Execute: ${cmd}".toString()) + executeCommand(cmd, false) + } + } + String cmd(String cmd, int timeoutSecond = 0) { var processBuilder = new ProcessBuilder() processBuilder.command("/bin/bash", "-c", cmd) diff --git a/regression-test/suites/export_p0/test_export_basic.groovy b/regression-test/suites/export_p0/test_export_basic.groovy index 152f1ab4e6e..19261c55e01 100644 --- a/regression-test/suites/export_p0/test_export_basic.groovy +++ b/regression-test/suites/export_p0/test_export_basic.groovy @@ -59,7 +59,8 @@ suite("test_export_basic", "p0") { def table_export_name = "test_export_basic" def table_load_name = "test_load_basic" - def outfile_path_prefix = """/tmp/test_export""" + def outfile_path_prefix = """/tmp/test_export_basic""" + def local_tvf_prefix = "tmp/test_export_basic" // create table and insert sql """ DROP TABLE IF EXISTS ${table_export_name} """ @@ -96,29 +97,13 @@ suite("test_export_basic", "p0") { qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """ + def machine_user_name = "root" def check_path_exists = { dir_path -> - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - } - - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export = { the_db, export_label -> @@ -152,7 +137,7 @@ suite("test_export_basic", "p0") { // 1. basic test def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" + def outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" def label = "label_${uuid}" try { // check export path @@ -169,9 +154,6 @@ suite("test_export_basic", "p0") { ); """ waiting_export.call(db, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ @@ -184,28 +166,23 @@ suite("test_export_basic", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id, Name, age' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(150, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ @@ -215,9 +192,9 @@ suite("test_export_basic", "p0") { delete_files.call("${outFilePath}") } - // 2. test patition1 + // 2. test partition1 uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" label = "label_${uuid}" try { // check export path @@ -235,9 +212,6 @@ suite("test_export_basic", "p0") { ); """ waiting_export.call(db, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ @@ -250,28 +224,23 @@ suite("test_export_basic", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id, Name, age' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(19, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ @@ -281,9 +250,9 @@ suite("test_export_basic", "p0") { delete_files.call("${outFilePath}") } - // 3. test patition2 + // 3. test partition2 uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" label = "label_${uuid}" try { // check export path @@ -301,9 +270,6 @@ suite("test_export_basic", "p0") { ); """ waiting_export.call(db, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ @@ -316,28 +282,23 @@ suite("test_export_basic", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id, Name, age' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(50, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ @@ -347,9 +308,9 @@ suite("test_export_basic", "p0") { delete_files.call("${outFilePath}") } - // 4. test patition3 and where clause + // 4. test partition3 and where clause uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" label = "label_${uuid}" try { // check export path @@ -367,9 +328,6 @@ suite("test_export_basic", "p0") { ); """ waiting_export.call(db, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ @@ -382,31 +340,26 @@ suite("test_export_basic", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id, Name, age' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(50, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } - qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ + qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ } finally { try_sql("DROP TABLE IF EXISTS ${table_load_name}") @@ -415,7 +368,7 @@ suite("test_export_basic", "p0") { // 5. test order by and limit clause def uuid1 = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid1}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" def label1 = "label_${uuid1}" def uuid2 = UUID.randomUUID().toString() def label2 = "label_${uuid2}" @@ -447,9 +400,6 @@ suite("test_export_basic", "p0") { waiting_export.call(db, label1) waiting_export.call(db, label2) - // check file amounts - check_file_amounts.call("${outFilePath}", 2) - // check show export correctness def res = sql """ show export where STATE = "FINISHED" order by JobId desc limit 2""" assertTrue(res[0][0] > res[1][0]) @@ -461,7 +411,7 @@ suite("test_export_basic", "p0") { // 6. test columns property uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" label = "label_${uuid}" try { // check export path @@ -480,9 +430,6 @@ suite("test_export_basic", "p0") { ); """ waiting_export.call(db, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ @@ -494,28 +441,23 @@ suite("test_export_basic", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id, Name' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(67, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} (id, Name) + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load6 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ @@ -527,7 +469,7 @@ suite("test_export_basic", "p0") { // 7. test columns property 2 uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" label = "label_${uuid}" try { // check export path @@ -546,9 +488,6 @@ suite("test_export_basic", "p0") { ); """ waiting_export.call(db, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ @@ -559,28 +498,23 @@ suite("test_export_basic", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(67, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load7 """ SELECT * FROM ${table_load_name} t ORDER BY id; """ @@ -599,7 +533,7 @@ suite("test_export_basic", "p0") { // 1. first export uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = """${outfile_path_prefix}/${table_export_name}_${uuid}""" label = "label_${uuid}" // check export path check_path_exists.call("${outFilePath}") diff --git a/regression-test/suites/export_p0/test_export_csv.groovy b/regression-test/suites/export_p0/test_export_csv.groovy index 93e894a4f65..09d06996b7f 100644 --- a/regression-test/suites/export_p0/test_export_csv.groovy +++ b/regression-test/suites/export_p0/test_export_csv.groovy @@ -56,7 +56,8 @@ suite("test_export_csv", "p0") { def table_export_name = "test_export_csv" def table_load_name = "test_load_csv" - def outfile_path_prefix = """/tmp/test_export""" + def outfile_path_prefix = """/tmp/test_export_csv""" + def local_tvf_prefix = "tmp/test_export_csv" // create table and insert sql """ DROP TABLE IF EXISTS ${table_export_name} """ @@ -98,30 +99,13 @@ suite("test_export_csv", "p0") { logger.info("insert result: " + insert_res.toString()) qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ - + def machine_user_name = "root" def check_path_exists = { dir_path -> - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - } - - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export = { export_label -> @@ -140,7 +124,7 @@ suite("test_export_csv", "p0") { // 1. test more type def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" + def outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" def label = "label_${uuid}" try { // check export path @@ -157,9 +141,6 @@ suite("test_export_csv", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ sql """ @@ -184,28 +165,22 @@ suite("test_export_csv", "p0") { DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, largeint_col, float_col, double_col, char_col, decimal_col, ipv4_col, ipv6_col' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(100, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ @@ -218,7 +193,7 @@ suite("test_export_csv", "p0") { // 2. test csv column_separator and line_delimiter uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -236,9 +211,6 @@ suite("test_export_csv", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ sql """ @@ -263,29 +235,23 @@ suite("test_export_csv", "p0") { DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', 'ab' - set 'line_delimiter', 'cc' - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, largeint_col, float_col, double_col, char_col, decimal_col, ipv4_col, ipv6_col' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(10, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "ab", + "line_delimiter" = "cc"); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load2 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ @@ -297,7 +263,7 @@ suite("test_export_csv", "p0") { // 3. test csv_with_names uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -315,9 +281,6 @@ suite("test_export_csv", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ sql """ @@ -342,30 +305,24 @@ suite("test_export_csv", "p0") { DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', 'ab' - set 'line_delimiter', 'cc' - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, largeint_col, float_col, double_col, char_col, decimal_col, ipv4_col, ipv6_col' - set 'strict_mode', 'true' - set 'format', 'csv_with_names' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(10, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * + from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv_with_names", + "line_delimiter" = "cc", + "column_separator" = "ab"); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load3 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ @@ -377,7 +334,7 @@ suite("test_export_csv", "p0") { // 4. test csv_with_names_and_types uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -395,9 +352,6 @@ suite("test_export_csv", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ sql """ @@ -422,30 +376,24 @@ suite("test_export_csv", "p0") { DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', 'ab' - set 'line_delimiter', 'cc' - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, largeint_col, float_col, double_col, char_col, decimal_col, ipv4_col, ipv6_col' - set 'strict_mode', 'true' - set 'format', 'csv_with_names_and_types' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(10, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * + from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv_with_names_and_types", + "line_delimiter" = "cc", + "column_separator" = "ab"); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load4 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ diff --git a/regression-test/suites/export_p0/test_export_data_consistency.groovy b/regression-test/suites/export_p0/test_export_data_consistency.groovy index cd19b082802..e1d4ba16385 100644 --- a/regression-test/suites/export_p0/test_export_data_consistency.groovy +++ b/regression-test/suites/export_p0/test_export_data_consistency.groovy @@ -59,6 +59,7 @@ suite("test_export_data_consistency", "p0") { def table_export_name = "test_export_data_consistency" def table_load_name = "test_load_data_consistency" def outfile_path_prefix = """/tmp/test_export_data_consistency""" + def local_tvf_prefix = "tmp/test_export_data_consistency" // create table and insert sql """ DROP TABLE IF EXISTS ${table_export_name} """ @@ -96,29 +97,13 @@ suite("test_export_data_consistency", "p0") { qt_select_export """ SELECT * FROM ${table_export_name} t ORDER BY id; """ + def machine_user_name = "root" def check_path_exists = { dir_path -> - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - } - - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export = { the_db, export_label -> @@ -137,7 +122,7 @@ suite("test_export_data_consistency", "p0") { // 1. basic test def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" + def outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" def label = "label_${uuid}" try { // check export path @@ -168,9 +153,6 @@ suite("test_export_data_consistency", "p0") { // wait export waiting_export.call(db, label) - // check file amounts - check_file_amounts.call("${outFilePath}", 3) - // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ sql """ @@ -182,29 +164,22 @@ suite("test_export_data_consistency", "p0") { DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - for (exportLoadFile in files) { - String file_path = exportLoadFile.getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'id, name, age' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(0, json.NumberFilteredRows) - } - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } // The partition ranges are: @@ -212,7 +187,7 @@ suite("test_export_data_consistency", "p0") { // The export task should keep partition consistency. def result = sql """ SELECT * FROM ${table_load_name} t WHERE id in (10,15,30,40,80,90) ORDER BY id; """ logger.info("result ${result}") - assert result.size == 6 + assert result.size() == 6 if (result[0][1] == 'test') { assert result[1][1] == 'test' } else { diff --git a/regression-test/suites/export_p0/test_export_empty_table.groovy b/regression-test/suites/export_p0/test_export_empty_table.groovy index 584c65d73bc..bd2b4ad09d1 100644 --- a/regression-test/suites/export_p0/test_export_empty_table.groovy +++ b/regression-test/suites/export_p0/test_export_empty_table.groovy @@ -56,8 +56,8 @@ suite("test_export_empty_table", "p0") { } def table_export_name = "test_export_empty_table" - def table_load_name = "test_load_empty_table" - def outfile_path_prefix = """/tmp/test_export""" + def outfile_path_prefix = """/tmp/test_export_empty_table""" + def local_tvf_prefix = "tmp/test_export_empty_table" // create table sql """ DROP TABLE IF EXISTS ${table_export_name} """ @@ -84,29 +84,13 @@ suite("test_export_empty_table", "p0") { qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ + def machine_user_name = "root" def check_path_exists = { dir_path -> - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - } - - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export = { export_label -> @@ -125,7 +109,7 @@ suite("test_export_empty_table", "p0") { // 1. test csv def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" + def outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}" def label = "label_${uuid}" try { // check export path @@ -142,8 +126,25 @@ suite("test_export_empty_table", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 0) + // use local() tvf to check + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + test { + sql """ + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + + // because we export the empty data, + // there is no files. + exception "get file list from backend failed." + } + } } finally { delete_files.call("${outFilePath}") } @@ -151,7 +152,7 @@ suite("test_export_empty_table", "p0") { // 2. test parquet uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -167,15 +168,32 @@ suite("test_export_empty_table", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 0) + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + test { + sql """ + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + + // because we export the empty data, + // there is no files. + exception "get file list from backend failed." + } + } } finally { delete_files.call("${outFilePath}") } // 3. test orc uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -191,15 +209,32 @@ suite("test_export_empty_table", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 0) + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + test { + sql """ + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + + // because we export the empty data, + // there is no files. + exception "get file list from backend failed." + } + } } finally { delete_files.call("${outFilePath}") } // 4. test csv_with_names uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -216,17 +251,33 @@ suite("test_export_empty_table", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 0) + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + test { + sql """ + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + + // because we export the empty data, + // there is no files. + exception "get file list from backend failed." + } + } } finally { - try_sql("DROP TABLE IF EXISTS ${table_load_name}") delete_files.call("${outFilePath}") } // 5. test csv_with_names_and_types uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -243,8 +294,25 @@ suite("test_export_empty_table", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 0) + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + test { + sql """ + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = ","); + """ + + // because we export the empty data, + // there is no files. + exception "get file list from backend failed." + } + } } finally { delete_files.call("${outFilePath}") } diff --git a/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy b/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy index f4c64eb4e94..e6ac6684b5d 100644 --- a/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy +++ b/regression-test/suites/export_p0/test_export_table_with_label_retry.groovy @@ -54,10 +54,11 @@ suite("test_export_table_with_label_retry", "p0") { return } - def table_export_name = "test_export_label" - def table_load_name = "test_load_label" - def wrong_outfile_path_prefix = """tmp/test_export""" - def outfile_path_prefix = """/tmp/test_export""" + def table_export_name = "test_export_table_with_label_retry" + def table_load_name = "test_load_table_with_label_retry" + def wrong_outfile_path_prefix = """mnt/disk2/ftw/projects/doris/output/be""" + def outfile_path_prefix = """/tmp/test_export_table_with_label_retry""" + def local_tvf_prefix = "tmp/test_export_table_with_label_retry" // create table and insert sql """ DROP TABLE IF EXISTS ${table_export_name} """ @@ -99,30 +100,13 @@ suite("test_export_table_with_label_retry", "p0") { logger.info("insert result: " + insert_res.toString()) qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY user_id; """ - + def machine_user_name = "root" def check_path_exists = { dir_path -> - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - } - - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export_expect_failed = { export_label -> @@ -155,8 +139,8 @@ suite("test_export_table_with_label_retry", "p0") { } def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" - def wrongFilePath = """${wrong_outfile_path_prefix}_${uuid}""" + def outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" + def wrongFilePath = "${wrong_outfile_path_prefix}" + "/${table_export_name}_${uuid}" def label = "label_${uuid}" try { // check export path @@ -173,9 +157,6 @@ suite("test_export_table_with_label_retry", "p0") { """ waiting_export_expect_failed.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 0) - // exec right export with same label again sql """ EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/" @@ -188,9 +169,6 @@ suite("test_export_table_with_label_retry", "p0") { waiting_export_expect_success.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness sql """ DROP TABLE IF EXISTS ${table_load_name} """ sql """ @@ -215,28 +193,22 @@ suite("test_export_table_with_label_retry", "p0") { DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); """ - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'columns', 'user_id, date, datetime, city, age, sex, bool_col, int_col, bigint_col, largeint_col, float_col, double_col, char_col, decimal_col, ipv4_col, ipv6_col' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(100, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv"); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } qt_select_load1 """ SELECT * FROM ${table_load_name} t ORDER BY user_id; """ diff --git a/regression-test/suites/export_p0/test_export_view.groovy b/regression-test/suites/export_p0/test_export_view.groovy index dfd020b02fa..0a7f8245a11 100644 --- a/regression-test/suites/export_p0/test_export_view.groovy +++ b/regression-test/suites/export_p0/test_export_view.groovy @@ -54,27 +54,12 @@ suite("test_export_view", "p0") { return } + def machine_user_name = "root" def check_path_exists = { dir_path -> - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - } - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export = { export_label -> while (true) { @@ -106,7 +91,8 @@ suite("test_export_view", "p0") { def table_export_name = "test_export_base_table" def table_export_view_name = "test_export_view_table" def table_load_name = "test_load_view_basic" - def outfile_path_prefix = """/tmp/test_export""" + def outfile_path_prefix = """/tmp/test_export_view""" + def local_tvf_prefix = "tmp/test_export_view" // create table and insert sql """ DROP TABLE IF EXISTS ${table_export_name} """ @@ -175,7 +161,7 @@ suite("test_export_view", "p0") { // 1. basic test def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" + def outFilePath = "${outfile_path_prefix}/${table_export_view_name}_${uuid}" def label = "label_${uuid}" try { // check export path @@ -192,33 +178,27 @@ suite("test_export_view", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(5, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load1 """ SELECT * FROM ${table_load_name} t; """ @@ -230,7 +210,7 @@ suite("test_export_view", "p0") { // 2. test csv_with_names uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_view_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -249,34 +229,27 @@ suite("test_export_view", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - set 'format', 'csv_with_names' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(5, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv_with_names" + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load2 """ SELECT * FROM ${table_load_name} t; """ @@ -289,7 +262,7 @@ suite("test_export_view", "p0") { // 3. test where clause uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_view_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -309,34 +282,27 @@ suite("test_export_view", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - set 'format', 'csv_with_names' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(2, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv_with_names" + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load3 """ SELECT * FROM ${table_load_name}; """ @@ -349,7 +315,7 @@ suite("test_export_view", "p0") { // 4. test where clause and columns property uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_view_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -371,35 +337,26 @@ suite("test_export_view", "p0") { waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - set 'columns', 'k3, s1, k1' - set 'format', 'csv_with_names' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(2, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select s1,k1, null as k2,k3 from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv_with_names" + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load4 """ SELECT * FROM ${table_load_name} t; """ @@ -412,7 +369,7 @@ suite("test_export_view", "p0") { // 5. test csv_with_names_and_types uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_view_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -431,34 +388,26 @@ suite("test_export_view", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'strict_mode', 'true' - set 'format', 'csv_with_names_and_types' - set 'column_separator', ',' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(5, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv_with_names_and_types" + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load5 """ SELECT * FROM ${table_load_name} t; """ @@ -471,7 +420,7 @@ suite("test_export_view", "p0") { // 6. test orc type uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_view_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -488,33 +437,25 @@ suite("test_export_view", "p0") { """ waiting_export.call(label) - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'strict_mode', 'true' - set 'format', 'orc' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(5, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "orc" + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load6 """ SELECT * FROM ${table_load_name} t; """ @@ -527,7 +468,7 @@ suite("test_export_view", "p0") { // 8. test orc type, where clause and columns property uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_view_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -547,35 +488,25 @@ suite("test_export_view", "p0") { """ waiting_export.call(label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) - // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'strict_mode', 'true' - set 'columns', 'k3, s1, k1' - set 'format', 'orc' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(2, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into ${table_load_name} from local()") + sql """ + insert into ${table_load_name} + select s1,k1, null as k2,k3 from local( + "file_path" = "${local_tvf_prefix}/${table_export_view_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "orc" + ); + """ + insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load8 """ SELECT * FROM ${table_load_name} t; """ diff --git a/regression-test/suites/external_table_p0/export/test_export_external_table.groovy b/regression-test/suites/external_table_p0/export/test_export_external_table.groovy index 8125db28bf1..2b2d41e2b4d 100644 --- a/regression-test/suites/external_table_p0/export/test_export_external_table.groovy +++ b/regression-test/suites/external_table_p0/export/test_export_external_table.groovy @@ -54,37 +54,12 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ return } + def machine_user_name = "root" def check_path_exists = { dir_path -> - List<List<Object>> backends = sql """ show backends """ - assertTrue(backends.size() > 0) - File path = new File(dir_path) - if (!path.exists()) { - assert path.mkdirs() - } else { - throw new IllegalStateException("""${dir_path} already exists! """) - } - if (backends.size() > 1) { - for (List<Object> backend : backends) { - def be_host = backend[1] - def cmd="""mkdir -p ${dir_path}""" - sshExec("root", be_host, cmd.toString()) - } - } - - } - def check_file_amounts = { dir_path, amount -> - File path = new File(dir_path) - File[] files = path.listFiles() - assert files.length == amount + mkdirRemotePathOnAllBE(machine_user_name, dir_path) } def delete_files = { dir_path -> - File path = new File(dir_path) - if (path.exists()) { - for (File f: path.listFiles()) { - f.delete(); - } - path.delete(); - } + deleteRemotePathOnAllBE(machine_user_name, dir_path) } def waiting_export = { ctlName, dbName, export_label -> while (true) { @@ -99,12 +74,12 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ } } } - // this table name must be `test1`, because this is an external table. def table_export_name = "test1" def table_load_name = "test_load_external__basic" - def outfile_path_prefix = """/tmp/test_export""" + def outfile_path_prefix = """/tmp/test_export_external_table""" + def local_tvf_prefix = "tmp/test_export_external_table" String enabled = context.config.otherConfigs.get("enableJdbcTest") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -163,7 +138,7 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ // 1. basic test def uuid = UUID.randomUUID().toString() - def outFilePath = """${outfile_path_prefix}_${uuid}""" + def outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" def label = "label_${uuid}" try { // check export path @@ -180,34 +155,27 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ ); """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(100, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load1 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ @@ -218,7 +186,7 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ // 2. export external table under internal catalog uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -237,34 +205,27 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(100, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv", + "column_separator" = "," + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load2 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ @@ -276,7 +237,7 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ sql """ switch ${catalog_name} """ // 3. csv_with_names uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -293,35 +254,27 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ ); """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - set 'format', 'csv_with_names' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(30, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv_with_names" + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load3 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ @@ -333,7 +286,7 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ // 4. csv_with_names_and_types uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -350,35 +303,27 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ ); """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - set 'format', 'csv_with_names_and_types' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(30, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "column_separator" = ",", + "format" = "csv_with_names_and_types" + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load4 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ @@ -390,7 +335,7 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ // 5. orc uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -406,34 +351,26 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ ); """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'strict_mode', 'true' - set 'format', 'orc' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(30, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "orc" + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load5 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ @@ -442,10 +379,9 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ delete_files.call("${outFilePath}") } - - // 5. parquet + // 6. parquet uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -461,34 +397,26 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ ); """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'strict_mode', 'true' - set 'format', 'parquet' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(30, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "parquet" + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load6 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ @@ -500,7 +428,7 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ // 7. test columns property uuid = UUID.randomUUID().toString() - outFilePath = """${outfile_path_prefix}_${uuid}""" + outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}" label = "label_${uuid}" try { // check export path @@ -518,36 +446,27 @@ suite("test_export_external_table", "p0,external,mysql,external_docker,external_ ); """ waiting_export.call(catalog_name, ex_db_name, label) - - // check file amounts - check_file_amounts.call("${outFilePath}", 1) // check data correctness create_load_table(table_load_name) - File[] files = new File("${outFilePath}").listFiles() - String file_path = files[0].getAbsolutePath() - streamLoad { - table "${table_load_name}" - - set 'column_separator', ',' - set 'strict_mode', 'true' - set 'format', 'csv_with_names' - set 'columns', 'k8, k1, k5, k3, k7' - - file "${file_path}" - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(30, json.NumberTotalRows) - assertEquals(0, json.NumberFilteredRows) - } + // use local() tvf to reload the data + def ipList = [:] + def portList = [:] + getBackendIpHeartbeatPort(ipList, portList) + ipList.each { beid, ip -> + logger.info("Begin to insert into internal.${internal_db_name}.${table_load_name} from local()") + sql """ + insert into internal.${internal_db_name}.${table_load_name} (k8, k1, k5, k3, k7) + select * from local( + "file_path" = "${local_tvf_prefix}/${table_export_name}_${uuid}/*", + "backend_id" = "${beid}", + "format" = "csv_with_names", + "column_separator" = "," + ); + """ + def insert_res = sql "show last insert;" + logger.info("insert from local(), BE id = ${beid}, result: " + insert_res.toString()) } order_qt_select_load7 """ SELECT * FROM internal.${internal_db_name}.${table_load_name} order by k8; """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org