This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 0bb6ae3a1e1 branch-4.1: [fix](outfile) handle delete_existing_files
before parallel export #61223 (#61726)
0bb6ae3a1e1 is described below
commit 0bb6ae3a1e113e8b6e2f062b5144ea3279ee34e3
Author: Socrates <[email protected]>
AuthorDate: Thu Mar 26 09:17:04 2026 +0800
branch-4.1: [fix](outfile) handle delete_existing_files before parallel
export #61223 (#61726)
Cherry-pick #61223 to branch-4.1
### What problem does this PR solve?
- Related PR: #61223
Handle `delete_existing_files=true` for remote outfile once in FE before
parallel export, clear the delete flag before sink options are sent to
BE, and reject `file:///` with `delete_existing_files=true` to align
outfile behavior with export.
### Cherry-pick commit
- `15766536326` - [fix](outfile) handle delete_existing_files before
parallel export (#61223)
---
be/src/exec/operator/result_sink_operator.h | 4 +
be/src/exec/sink/writer/vfile_result_writer.cpp | 4 +-
.../org/apache/doris/analysis/OutFileClause.java | 10 +-
.../org/apache/doris/common/util/BrokerUtil.java | 12 ++
.../main/java/org/apache/doris/load/ExportMgr.java | 5 +-
.../commands/insert/InsertIntoTVFCommand.java | 11 +-
.../org/apache/doris/planner/ResultFileSink.java | 5 +
.../java/org/apache/doris/qe/StmtExecutor.java | 40 ++++++-
.../java/org/apache/doris/qe/StmtExecutorTest.java | 25 ++++
gensrc/thrift/DataSinks.thrift | 4 +-
.../suites/export_p0/test_outfile.groovy | 14 +++
...t_outfile_parallel_delete_existing_files.groovy | 131 +++++++++++++++++++++
12 files changed, 244 insertions(+), 21 deletions(-)
diff --git a/be/src/exec/operator/result_sink_operator.h
b/be/src/exec/operator/result_sink_operator.h
index 752e6a367c9..7fdbca0b8dc 100644
--- a/be/src/exec/operator/result_sink_operator.h
+++ b/be/src/exec/operator/result_sink_operator.h
@@ -57,6 +57,9 @@ struct ResultFileOptions {
// TODO: we should merge
parquet_commpression_type/orc_compression_type/compression_type
TFileCompressType::type compression_type = TFileCompressType::PLAIN;
+ // Deprecated compatibility flag. New FE handles outfile
delete_existing_files in FE
+ // and clears this field before sending the result sink to BE. Keep
reading it here
+ // only for compatibility with older FE during rolling upgrade.
bool delete_existing_files = false;
std::string file_suffix;
//Bring BOM when exporting to CSV format
@@ -70,6 +73,7 @@ struct ResultFileOptions {
line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter :
"\n";
max_file_size_bytes =
t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes
: max_file_size_bytes;
+ // Deprecated compatibility path. New FE should already have cleared
this flag.
delete_existing_files =
t_opt.__isset.delete_existing_files ?
t_opt.delete_existing_files : false;
file_suffix = t_opt.file_suffix;
diff --git a/be/src/exec/sink/writer/vfile_result_writer.cpp
b/be/src/exec/sink/writer/vfile_result_writer.cpp
index d7e4149c551..c198eaa7e21 100644
--- a/be/src/exec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/exec/sink/writer/vfile_result_writer.cpp
@@ -96,7 +96,9 @@ Status VFileResultWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
_file_opts->orc_writer_version < 1) {
return Status::InternalError("orc writer version is less than 1.");
}
- // Delete existing files
+ // Deprecated compatibility path. New FE already deletes the target
directory in FE
+ // and clears delete_existing_files before BE execution. Keep this branch
only for
+ // requests from older FE versions during rolling upgrade.
if (_file_opts->delete_existing_files) {
RETURN_IF_ERROR(_delete_dir());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index cc7ecf52c4e..c0fc647fd43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -187,6 +187,10 @@ public class OutFileClause {
return maxFileSizeBytes;
}
+ public boolean shouldDeleteExistingFiles() {
+ return deleteExistingFiles;
+ }
+
public BrokerDesc getBrokerDesc() {
return brokerDesc;
}
@@ -454,7 +458,6 @@ public class OutFileClause {
+ ", should use " + expectType + ", but the definition type is
" + orcType);
}
-
private void analyzeForParquetFormat(List<Expr> resultExprs, List<String>
colLabels) throws AnalysisException {
if (this.parquetSchemas.isEmpty()) {
genParquetColumnName(resultExprs, colLabels);
@@ -540,6 +543,9 @@ public class OutFileClause {
+ " To enable this feature, you need to add
`enable_delete_existing_files=true`"
+ " in fe.conf");
}
+ if (deleteExistingFiles && isLocalOutput) {
+ throw new AnalysisException("Local file system does not
support delete existing files");
+ }
copiedProps.remove(PROP_DELETE_EXISTING_FILES);
}
@@ -769,5 +775,3 @@ public class OutFileClause {
return sinkOptions;
}
}
-
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index e5b7c1a7f45..e44d1d9ea4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -119,6 +119,18 @@ public class BrokerUtil {
}
}
+ public static void deleteParentDirectoryWithFileSystem(String path,
BrokerDesc brokerDesc) throws UserException {
+ deleteDirectoryWithFileSystem(extractParentDirectory(path),
brokerDesc);
+ }
+
+ public static String extractParentDirectory(String path) {
+ int lastSlash = path.lastIndexOf('/');
+ if (lastSlash >= 0) {
+ return path.substring(0, lastSlash + 1);
+ }
+ return path;
+ }
+
public static String printBroker(String brokerName, TNetworkAddress
address) {
return brokerName + "[" + address.toString() + "]";
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 8de5a1c95c6..c2b1be2a230 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -114,9 +114,7 @@ public class ExportMgr {
try {
// delete existing files
if (Boolean.parseBoolean(job.getDeleteExistingFiles())) {
- String fullPath = job.getExportPath();
- BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0,
fullPath.lastIndexOf('/') + 1),
- job.getBrokerDesc());
+
BrokerUtil.deleteParentDirectoryWithFileSystem(job.getExportPath(),
job.getBrokerDesc());
}
// ATTN: Must add task after edit log, otherwise the job may
finish before adding job.
for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
@@ -554,4 +552,3 @@ public class ExportMgr {
return size;
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
index 4847cd028fa..fae7747b5d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTVFCommand.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -176,7 +177,7 @@ public class InsertIntoTVFCommand extends Command
implements ForwardWithSync, Ex
throws Exception {
String filePath = props.get("file_path");
// Extract parent directory from prefix path:
s3://bucket/path/to/prefix_ -> s3://bucket/path/to/
- String parentDir = extractParentDirectory(filePath);
+ String parentDir = BrokerUtil.extractParentDirectory(filePath);
LOG.info("TVF sink: deleting existing files in directory: {}",
parentDir);
// Copy props for building StorageProperties (exclude write-specific
params)
@@ -198,12 +199,4 @@ public class InsertIntoTVFCommand extends Command
implements ForwardWithSync, Ex
+ parentDir + ": " + deleteStatus.getErrMsg());
}
}
-
- private static String extractParentDirectory(String prefixPath) {
- int lastSlash = prefixPath.lastIndexOf('/');
- if (lastSlash >= 0) {
- return prefixPath.substring(0, lastSlash + 1);
- }
- return prefixPath;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 0ba8311097a..520e34e27c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -88,6 +88,11 @@ public class ResultFileSink extends DataSink {
fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new
TNetworkAddress(ip, port)));
}
+ public void setDeleteExistingFiles(boolean deleteExistingFiles) {
+ Preconditions.checkNotNull(fileSinkOptions);
+ fileSinkOptions.setDeleteExistingFiles(deleteExistingFiles);
+ }
+
public void resetByDataStreamSink(DataStreamSink dataStreamSink) {
exchNodeId = dataStreamSink.getExchNodeId();
outputPartition = dataStreamSink.getOutputPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 1a7d4df0c49..b39d5b0d62b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -54,6 +54,7 @@ import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
@@ -105,8 +106,10 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
@@ -1298,8 +1301,16 @@ public class StmtExecutor {
}
coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt());
+ OutFileClause outFileClause = null;
+ if (isOutfileQuery) {
+ outFileClause = queryStmt.getOutFileClause();
+ Preconditions.checkState(outFileClause != null, "OUTFILE query
must have OutFileClause");
+ }
try {
+ if (outFileClause != null) {
+ deleteExistingOutfileFilesInFe(outFileClause);
+ }
coordBase.exec();
profile.getSummaryProfile().setQueryScheduleFinishTime(TimeUtils.getStartTimeMs());
updateProfile(false);
@@ -1339,8 +1350,8 @@ public class StmtExecutor {
sendFields(queryStmt.getColLabels(),
queryStmt.getFieldInfos(),
getReturnTypes(queryStmt));
} else {
- if
(!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
-
outfileWriteSuccess(queryStmt.getOutFileClause());
+ if
(!Strings.isNullOrEmpty(outFileClause.getSuccessFileName())) {
+ outfileWriteSuccess(outFileClause);
}
sendFields(OutFileClause.RESULT_COL_NAMES,
OutFileClause.RESULT_COL_TYPES);
}
@@ -1482,6 +1493,31 @@ public class StmtExecutor {
}
}
+ private void deleteExistingOutfileFilesInFe(OutFileClause outFileClause)
throws UserException {
+ // Handle directory cleanup once in FE so parallel outfile writers
never race on deletion.
+ if (!outFileClause.shouldDeleteExistingFiles()) {
+ return;
+ }
+ Preconditions.checkState(outFileClause.getBrokerDesc() != null,
+ "delete_existing_files requires a remote outfile sink");
+ Preconditions.checkState(outFileClause.getBrokerDesc().storageType()
!= StorageType.LOCAL,
+ "delete_existing_files is not supported for local outfile
sinks");
+
BrokerUtil.deleteParentDirectoryWithFileSystem(outFileClause.getFilePath(),
outFileClause.getBrokerDesc());
+ clearDeleteExistingFilesInPlan();
+ }
+
+ private void clearDeleteExistingFilesInPlan() {
+ ResultFileSink resultFileSink = null;
+ for (PlanFragment fragment : planner.getFragments()) {
+ if (fragment.getSink() instanceof ResultFileSink) {
+ Preconditions.checkState(resultFileSink == null, "OUTFILE
query should have only one ResultFileSink");
+ resultFileSink = (ResultFileSink) fragment.getSink();
+ }
+ }
+ Preconditions.checkState(resultFileSink != null, "OUTFILE query must
have ResultFileSink");
+ resultFileSink.setDeleteExistingFiles(false);
+ }
+
public static void syncLoadForTablets(List<List<Backend>> backendsList,
List<Long> allTabletIds) {
backendsList.forEach(backends -> backends.forEach(backend -> {
if (backend.isAlive()) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 05af66313b1..3e3f4ba9d49 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -24,6 +24,9 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.utframe.TestWithFeService;
@@ -35,6 +38,8 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -276,4 +281,24 @@ public class StmtExecutorTest extends TestWithFeService {
StmtExecutor executor = new StmtExecutor(mockCtx, stmt, false);
executor.sendBinaryResultRow(resultSet);
}
+
+ @Test
+ public void testClearDeleteExistingFilesInPlan() throws Exception {
+ Planner planner = Mockito.mock(Planner.class);
+ PlanFragment fragment = Mockito.mock(PlanFragment.class);
+ ResultFileSink resultFileSink = Mockito.mock(ResultFileSink.class);
+ Mockito.when(fragment.getSink()).thenReturn(resultFileSink);
+
Mockito.when(planner.getFragments()).thenReturn(Lists.newArrayList(fragment));
+
+ StmtExecutor executor = new StmtExecutor(connectContext, "");
+ Field plannerField = StmtExecutor.class.getDeclaredField("planner");
+ plannerField.setAccessible(true);
+ plannerField.set(executor, planner);
+
+ Method clearMethod =
StmtExecutor.class.getDeclaredMethod("clearDeleteExistingFilesInPlan");
+ clearMethod.setAccessible(true);
+ clearMethod.invoke(executor);
+
+ Mockito.verify(resultFileSink).setDeleteExistingFiles(false);
+ }
}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 8add59d47af..842765273e0 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -132,7 +132,7 @@ struct TResultFileSinkOptions {
14: optional TParquetVersion parquet_version
15: optional string orc_schema
- 16: optional bool delete_existing_files;
+ 16: optional bool delete_existing_files; // deprecated: FE now handles
outfile cleanup and clears this flag before BE execution; kept for
compatibility with older FE
17: optional string file_suffix;
18: optional bool with_bom;
@@ -480,7 +480,7 @@ struct TTVFTableSink {
7: optional string column_separator
8: optional string line_delimiter
9: optional i64 max_file_size_bytes
- 10: optional bool delete_existing_files
+ 10: optional bool delete_existing_files // deprecated: FE handles TVF
cleanup before execution and always sends false
11: optional map<string, string> hadoop_config
12: optional PlanNodes.TFileCompressType compression_type
13: optional i64 backend_id // local TVF: specify BE
diff --git a/regression-test/suites/export_p0/test_outfile.groovy
b/regression-test/suites/export_p0/test_outfile.groovy
index f33500f5883..35c5e0b681b 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -44,11 +44,15 @@ suite("test_outfile") {
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
+ boolean enableDeleteExistingFiles = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() ==
"enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>)
conf).get("Value").toLowerCase() == "true"
}
+ if (((Map<String, String>) conf).get("Name").toLowerCase() ==
"enable_delete_existing_files") {
+ enableDeleteExistingFiles = ((Map<String, String>)
conf).get("Value").toLowerCase() == "true"
+ }
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run
test_outfile")
@@ -233,4 +237,14 @@ suite("test_outfile") {
path.delete();
}
}
+
+ if (enableDeleteExistingFiles) {
+ test {
+ sql """
+ SELECT 1 INTO OUTFILE
"file://${outFile}/test_outfile_delete_existing_files_${uuid}/"
+ PROPERTIES("delete_existing_files" = "true");
+ """
+ exception "Local file system does not support delete existing
files"
+ }
+ }
}
diff --git
a/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
new file mode 100644
index 00000000000..655146598a9
--- /dev/null
+++
b/regression-test/suites/export_p0/test_outfile_parallel_delete_existing_files.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_parallel_delete_existing_files", "p0") {
+ StringBuilder strBuilder = new StringBuilder()
+ strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser
+ ":" + context.config.jdbcPassword)
+ if
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
?: false) {
+ strBuilder.append(" https://" + context.config.feHttpAddress +
"/rest/v1/config/fe")
+ strBuilder.append(" --cert " +
context.config.otherConfigs.get("trustCert")
+ + " --cacert " + context.config.otherConfigs.get("trustCACert")
+ + " --key " + context.config.otherConfigs.get("trustCAKey"))
+ } else {
+ strBuilder.append(" http://" + context.config.feHttpAddress +
"/rest/v1/config/fe")
+ }
+
+ def process = strBuilder.toString().execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())))
+ def out = process.getText()
+ logger.info("Request FE Config: code=" + code + ", out=" + out + ", err="
+ err)
+ assertEquals(code, 0)
+ def response = parseJson(out.trim())
+ assertEquals(response.code, 0)
+ assertEquals(response.msg, "success")
+ boolean enableDeleteExistingFiles = false
+ for (Object conf : response.data.rows) {
+ assert conf instanceof Map
+ if (((Map<String, String>) conf).get("Name").toLowerCase() ==
"enable_delete_existing_files") {
+ enableDeleteExistingFiles = ((Map<String, String>)
conf).get("Value").toLowerCase() == "true"
+ }
+ }
+ if (!enableDeleteExistingFiles) {
+ logger.warn("Please set enable_delete_existing_files to true to run
test_outfile_parallel_delete_existing_files")
+ return
+ }
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3Endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName")
+ String provider = getS3Provider()
+ String tableName = "test_outfile_parallel_delete_existing_files"
+ String uuid = UUID.randomUUID().toString()
+ String outFilePath =
"${bucket}/outfile/parallel_delete_existing_files/${uuid}/exp_"
+
+ def exportToS3 = { String filterSql, boolean deleteExistingFiles ->
+ String deleteProperty = deleteExistingFiles ?
"\"delete_existing_files\" = \"true\"," : ""
+ sql """
+ SELECT * FROM ${tableName} ${filterSql}
+ INTO OUTFILE "s3://${outFilePath}"
+ FORMAT AS csv
+ PROPERTIES (
+ ${deleteProperty}
+ "column_separator" = ",",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key" = "${sk}",
+ "s3.access_key" = "${ak}",
+ "provider" = "${provider}"
+ );
+ """
+ }
+
+ try {
+ sql """ set enable_parallel_outfile = true """
+ sql """ set parallel_pipeline_task_num = 8 """
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ id INT NOT NULL,
+ name STRING NOT NULL,
+ score INT NOT NULL
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 16
+ PROPERTIES("replication_num" = "1");
+ """
+
+ sql """
+ INSERT INTO ${tableName}
+ SELECT
+ number AS id,
+ concat('name_', cast(number AS string)) AS name,
+ cast(number % 97 AS int) AS score
+ FROM numbers("number" = "20000");
+ """
+
+ def expected = sql """ SELECT count(*), sum(id), sum(score) FROM
${tableName}; """
+
+ exportToS3("WHERE id < 5000", false)
+ exportToS3("", true)
+
+ def actual = sql """
+ SELECT count(*), sum(id), sum(score) FROM S3(
+ "uri" = "s3://${outFilePath}*",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key" = "${sk}",
+ "s3.access_key" = "${ak}",
+ "provider" = "${provider}",
+ "format" = "csv",
+ "column_separator" = ",",
+ "csv_schema" = "id:int;name:string;score:int"
+ );
+ """
+
+ assertEquals(expected[0][0], actual[0][0])
+ assertEquals(expected[0][1], actual[0][1])
+ assertEquals(expected[0][2], actual[0][2])
+ } finally {
+ try_sql(""" set enable_parallel_outfile = false """)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]