This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b41fcda6eae branch-3.1: [feat](outfile) support compression type for
csv format in outfile and export #55392 (#55561)
b41fcda6eae is described below
commit b41fcda6eaecf9a7b3d0a4fe239f4a3d748f7bdd
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 4 10:36:45 2025 +0800
branch-3.1: [feat](outfile) support compression type for csv format in
outfile and export #55392 (#55561)
Cherry-picked from #55392
Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
be/src/pipeline/exec/result_sink_operator.h | 6 +
be/src/vec/sink/writer/vfile_result_writer.cpp | 27 ++++-
be/src/vec/sink/writer/vfile_result_writer.h | 2 +
.../org/apache/doris/analysis/OutFileClause.java | 5 +
.../java/org/apache/doris/common/util/Util.java | 4 +-
.../fileformat/CsvFileFormatProperties.java | 32 +++++
.../main/java/org/apache/doris/load/ExportJob.java | 11 +-
.../fileformat/CsvFileFormatPropertiesTest.java | 8 +-
.../fileformat/TextFileFormatPropertiesTest.java | 5 +-
gensrc/thrift/DataSinks.thrift | 3 +
regression-test/data/export_p0/test_export_csv.out | Bin 21332 -> 24555 bytes
.../data/export_p0/test_outfile_csv_compress.out | Bin 0 -> 1729 bytes
.../suites/export_p0/test_export_csv.groovy | 69 +++++++++++
.../export_p0/test_outfile_csv_compress.groovy | 131 +++++++++++++++++++++
14 files changed, 285 insertions(+), 18 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 06544f9bc18..1c2b669e02f 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -54,6 +54,9 @@ struct ResultFileOptions {
bool is_refactor_before_flag = false;
std::string orc_schema;
TFileCompressType::type orc_compression_type;
+ // currently only for csv
+ // TODO: we should merge
parquet_commpression_type/orc_compression_type/compression_type
+ TFileCompressType::type compression_type = TFileCompressType::PLAIN;
bool delete_existing_files = false;
std::string file_suffix;
@@ -116,6 +119,9 @@ struct ResultFileOptions {
if (t_opt.__isset.orc_writer_version) {
orc_writer_version = t_opt.orc_writer_version;
}
+ if (t_opt.__isset.compression_type) {
+ compression_type = t_opt.compression_type;
+ }
}
};
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 555b44ea3df..ad29714cc55 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -126,10 +126,10 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
}));
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
- _vfile_writer.reset(new VCSVTransformer(_state,
_file_writer_impl.get(),
- _vec_output_expr_ctxs,
_output_object_data,
- _header_type, _header,
_file_opts->column_separator,
- _file_opts->line_delimiter,
_file_opts->with_bom));
+ _vfile_writer.reset(new VCSVTransformer(
+ _state, _file_writer_impl.get(), _vec_output_expr_ctxs,
_output_object_data,
+ _header_type, _header, _file_opts->column_separator,
_file_opts->line_delimiter,
+ _file_opts->with_bom, _file_opts->compression_type));
break;
case TFileFormatType::FORMAT_PARQUET:
_vfile_writer.reset(new VParquetTransformer(
@@ -195,7 +195,7 @@ void VFileResultWriter::_get_file_url(std::string*
file_url) {
std::string VFileResultWriter::_file_format_to_name() {
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
- return "csv";
+ return "csv" + _compression_type_to_name();
case TFileFormatType::FORMAT_PARQUET:
return "parquet";
case TFileFormatType::FORMAT_ORC:
@@ -205,6 +205,23 @@ std::string VFileResultWriter::_file_format_to_name() {
}
}
+std::string VFileResultWriter::_compression_type_to_name() {
+ switch (_file_opts->compression_type) {
+ case TFileCompressType::GZ:
+ return ".gzip";
+ case TFileCompressType::BZ2:
+ return ".bzip2";
+ case TFileCompressType::SNAPPYBLOCK:
+ return ".snappy";
+ case TFileCompressType::LZ4BLOCK:
+ return ".lz4";
+ case TFileCompressType::ZSTD:
+ return ".zstd";
+ default:
+ return "";
+ }
+}
+
Status VFileResultWriter::write(RuntimeState* state, Block& block) {
if (block.rows() == 0) {
return Status::OK();
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 8b611d7ceef..209e1af1d69 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -101,7 +101,9 @@ private:
// delete the dir of file_path
Status _delete_dir();
double _get_write_speed(int64_t write_bytes, int64_t write_time);
+ std::string _compression_type_to_name();
+private:
RuntimeState* _state; // not owned, set when init
const pipeline::ResultFileOptions* _file_opts = nullptr;
TStorageBackendType::type _storage_type;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 8211f84a21c..2edc3b4660f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -504,6 +504,11 @@ public class OutFileClause {
analyzeBrokerDesc(copiedProps);
fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
+ // check if compression type for csv is supported
+ if (fileFormatProperties instanceof CsvFileFormatProperties) {
+ CsvFileFormatProperties csvFileFormatProperties =
(CsvFileFormatProperties) fileFormatProperties;
+ csvFileFormatProperties.checkSupportedCompressionType(true);
+ }
if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) {
maxFileSizeBytes =
ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 296f76d1f79..ab14b529441 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -610,7 +610,7 @@ public class Util {
}
}
- public static TFileCompressType getFileCompressType(String compressType) {
+ public static TFileCompressType getFileCompressType(String compressType)
throws AnalysisException {
if (Strings.isNullOrEmpty(compressType)) {
return TFileCompressType.UNKNOWN;
}
@@ -618,7 +618,7 @@ public class Util {
try {
return TFileCompressType.valueOf(upperCaseType);
} catch (IllegalArgumentException e) {
- return TFileCompressType.UNKNOWN;
+ throw new AnalysisException("Unknown compression type: " +
compressType);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
index 25bd0c469db..2ed59a818f4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -22,20 +22,35 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TResultFileSinkOptions;
import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
+import java.util.Set;
public class CsvFileFormatProperties extends FileFormatProperties {
public static final Logger LOG = LogManager.getLogger(
org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class);
+ // supported compression types for csv writer
+ public static final Set<TFileCompressType>
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES = Sets.newHashSet();
+
+ static {
+ SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.PLAIN);
+ SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.GZ);
+ SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.BZ2);
+
SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.SNAPPYBLOCK);
+ SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.LZ4BLOCK);
+ SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.add(TFileCompressType.ZSTD);
+ }
+
public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
public static final String DEFAULT_LINE_DELIMITER = "\n";
@@ -119,6 +134,7 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
throw new AnalysisException("skipLines should not be less than
0.");
}
+ // This default value is "UNKNOWN", so that the caller may infer
the compression type by suffix of file.
String compressTypeStr = getOrDefault(formatProperties,
PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty);
compressionType = Util.getFileCompressType(compressTypeStr);
@@ -140,10 +156,26 @@ public class CsvFileFormatProperties extends
FileFormatProperties {
}
}
+ public void checkSupportedCompressionType(boolean isWrite) {
+ // Currently, only check for write operation.
+ // Because we only support a subset of compression type for writing.
+ if (isWrite) {
+ // "UNKNOWN" means user does not specify the compression type
+ if (this.compressionType == TFileCompressType.UNKNOWN) {
+ this.compressionType = TFileCompressType.PLAIN;
+ }
+ if
(!SUPPORTED_CSV_WRITE_COMPRESSION_TYPES.contains(this.compressionType)) {
+ throw new AnalysisException(
+ "csv compression type [" + this.compressionType.name()
+ "] is invalid for writing");
+ }
+ }
+ }
+
@Override
public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
sinkOptions.setColumnSeparator(columnSeparator);
sinkOptions.setLineDelimiter(lineDelimiter);
+ sinkOptions.setCompressionType(compressionType);
}
// The method `analyzeFileFormatProperties` must have been called once
before this method
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 434deaf440b..be77d591ab8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -578,13 +578,12 @@ public class ExportJob implements Writable {
if (format.equals("csv") || format.equals("csv_with_names") ||
format.equals("csv_with_names_and_types")) {
outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR,
columnSeparator);
outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER,
lineDelimiter);
- } else {
- // orc / parquet
- // compressType == null means outfile will use default compression
type
- if (compressType != null) {
- outfileProperties.put(ExportCommand.COMPRESS_TYPE,
compressType);
- }
}
+ // compressType == null means outfile will use default compression type
+ if (compressType != null) {
+ outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType);
+ }
+
if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE,
maxFileSize);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
index 4b2550cfa52..1482c84055d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.property.fileformat;
+import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.thrift.TFileCompressType;
@@ -130,17 +131,18 @@ public class CsvFileFormatPropertiesTest {
public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
Map<String, String> properties = new HashMap<>();
properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
- csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
- Assert.assertEquals(TFileCompressType.UNKNOWN,
csvFileFormatProperties.getCompressionType());
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+ "Unknown compression type: invalid",
+ () ->
csvFileFormatProperties.analyzeFileFormatProperties(properties, true));
}
@Test
public void testAnalyzeFileFormatPropertiesValidCompressType() throws
AnalysisException {
Map<String, String> properties = new HashMap<>();
properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "gz");
-
csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
Assert.assertEquals(TFileCompressType.GZ,
csvFileFormatProperties.getCompressionType());
+ ExceptionChecker.expectThrowsNoException(() ->
csvFileFormatProperties.checkSupportedCompressionType(true));
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
index b8f694b40bc..2ed04c234b7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/TextFileFormatPropertiesTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.property.fileformat;
+import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.thrift.TFileCompressType;
@@ -93,8 +94,8 @@ public class TextFileFormatPropertiesTest {
public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
Map<String, String> properties = new HashMap<>();
properties.put(TextFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
- textFileFormatProperties.analyzeFileFormatProperties(properties, true);
- Assert.assertEquals(TFileCompressType.UNKNOWN,
textFileFormatProperties.getCompressionType());
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Unknown
compression type: invalid",
+ () ->
textFileFormatProperties.analyzeFileFormatProperties(properties, true));
}
@Test
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 73f91c84311..77862d1c5e4 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -145,6 +145,9 @@ struct TResultFileSinkOptions {
//hive write sink use int96
//export data to file use by user define properties
21: optional bool enable_int96_timestamps
+ // currently only for csv
+ // TODO: merge with parquet_compression_type and orc_compression_type
+ 22: optional PlanNodes.TFileCompressType compression_type
}
struct TMemoryScratchSink {
diff --git a/regression-test/data/export_p0/test_export_csv.out
b/regression-test/data/export_p0/test_export_csv.out
index 9952ded9179..e1d8251f420 100644
Binary files a/regression-test/data/export_p0/test_export_csv.out and
b/regression-test/data/export_p0/test_export_csv.out differ
diff --git a/regression-test/data/export_p0/test_outfile_csv_compress.out
b/regression-test/data/export_p0/test_outfile_csv_compress.out
new file mode 100644
index 00000000000..48ae4946778
Binary files /dev/null and
b/regression-test/data/export_p0/test_outfile_csv_compress.out differ
diff --git a/regression-test/suites/export_p0/test_export_csv.groovy
b/regression-test/suites/export_p0/test_export_csv.groovy
index 09d06996b7f..9091fa55e6b 100644
--- a/regression-test/suites/export_p0/test_export_csv.groovy
+++ b/regression-test/suites/export_p0/test_export_csv.groovy
@@ -403,5 +403,74 @@ suite("test_export_csv", "p0") {
delete_files.call("${outFilePath}")
}
+ // 5. test csv with compression
+ uuid = UUID.randomUUID().toString()
+ outFilePath = "${outfile_path_prefix}" + "/${table_export_name}_${uuid}"
+ label = "label_${uuid}"
+ try {
+ // check export path
+ check_path_exists.call("${outFilePath}")
+
+ // exec export
+ sql """
+ EXPORT TABLE ${table_export_name} where user_id < 11 TO
"file://${outFilePath}/"
+ PROPERTIES(
+ "label" = "${label}",
+ "format" = "csv",
+ "compress_type"="gz"
+ );
+ """
+ waiting_export.call(label)
+
+ // check data correctness
+ sql """ DROP TABLE IF EXISTS ${table_load_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_load_name} (
+ `user_id` LARGEINT NOT NULL COMMENT "用户id",
+ `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+ `datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
+ `city` VARCHAR(20) COMMENT "用户所在城市",
+ `age` SMALLINT COMMENT "用户年龄",
+ `sex` TINYINT COMMENT "用户性别",
+ `bool_col` boolean COMMENT "",
+ `int_col` int COMMENT "",
+ `bigint_col` bigint COMMENT "",
+ `largeint_col` largeint COMMENT "",
+ `float_col` float COMMENT "",
+ `double_col` double COMMENT "",
+ `char_col` CHAR(10) COMMENT "",
+ `decimal_col` decimal COMMENT "",
+ `ipv4_col` ipv4 COMMENT "",
+ `ipv6_col` ipv6 COMMENT ""
+ )
+ DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+ """
+
+ // use local() tvf to reload the data
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHeartbeatPort(ipList, portList)
+ ipList.each { beid, ip ->
+ logger.info("Begin to insert into ${table_load_name} from local()")
+ sql """
+ insert into ${table_load_name}
+ select *
+ from local(
+ "file_path" =
"${local_tvf_prefix}/${table_export_name}_${uuid}/*",
+ "backend_id" = "${beid}",
+ "format" = "csv",
+ "compress_type" = "gz");
+ """
+ insert_res = sql "show last insert;"
+ logger.info("insert from local(), BE id = ${beid}, result: " +
insert_res.toString())
+ }
+
+ qt_select_load5 """ SELECT * FROM ${table_load_name} t ORDER BY
user_id; """
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+ delete_files.call("${outFilePath}")
+ }
+
try_sql("DROP TABLE IF EXISTS ${table_export_name}")
}
diff --git a/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
new file mode 100644
index 00000000000..6bdbb39fe75
--- /dev/null
+++ b/regression-test/suites/export_p0/test_outfile_csv_compress.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_csv_compress", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ def create_table = {table_name ->
+ sql """ DROP TABLE IF EXISTS ${table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ `id` int,
+ `name` varchar(128) NOT NULL COMMENT ""
+ )
+ DISTRIBUTED BY HASH(name) PROPERTIES("replication_num" = "1");
+ """
+ sql """ INSERT INTO ${table_name} values(1, 'zhangsan');"""
+ for (int i = 0; i < 20; i++) {
+ sql """ insert into ${table_name} select id + ${i}, concat(name,
id + ${i}) from ${table_name};"""
+ }
+ }
+
+ def table_name = "test_outfile_csv_compress"
+ create_table(table_name)
+
+ def outFilePath = """s3://${bucket}/outfile_"""
+ def csv_outfile_result = { the_table_name, compression_type ->
+ def result = sql """
+ select * from ${the_table_name}
+ into outfile "${outFilePath}"
+ FORMAT AS CSV
+ PROPERTIES(
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ return result[0][3]
+ }
+
+ for (String compression_type: ["plain", "gz", "bz2", "snappyblock",
"lz4block", "zstd"]) {
+ def outfile_url = csv_outfile_result(table_name, compression_type);
+ print("http://${bucket}.${s3_endpoint}${outfile_url.substring(5 +
bucket.length(), outfile_url.length() - 1)}0.")
+ qt_select """ select c1, c2 from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ ) order by c1, c2 limit 10;
+ """
+ qt_select """ select count(c1), count(c2) from s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ qt_select """desc function s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "${compression_type}"
+ );
+ """
+ }
+
+ // test invalid compression_type
+ test {
+ sql """
+ select * from ${table_name}
+ into outfile "${outFilePath}"
+ FORMAT AS CSV
+ PROPERTIES(
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "compress_type" = "invalid"
+ );
+ """
+ exception """Unknown compression type"""
+ }
+
+ // test empty table
+ sql """drop table if exists test_outfile_csv_compress_empty_table"""
+ sql """create table test_outfile_csv_compress_empty_table(k1 int)
distributed by hash(k1) buckets 1 properties("replication_num" = "1")"""
+ def empty_outfile_url =
csv_outfile_result("test_outfile_csv_compress_empty_table", "gz");
+ qt_select """desc function s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${empty_outfile_url.substring(5 +
bucket.length(), empty_outfile_url.length() - 1)}*",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}",
+ "compress_type" = "gz"
+ );
+ """
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]