This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0aeb768bf97 [Fix](export/outfile) Support compression when exporting 
data to Parquet / ORC. (#37167)
0aeb768bf97 is described below

commit 0aeb768bf97a17fc74e874f13f73cf812f718656
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Wed Jul 3 10:53:57 2024 +0800

    [Fix](export/outfile) Support compression when exporting data to Parquet / 
ORC. (#37167)
    
    bp: #36490
---
 be/src/vec/runtime/vorc_transformer.cpp            |  50 +++--
 be/src/vec/runtime/vorc_transformer.h              |  11 +-
 be/src/vec/runtime/vparquet_transformer.cpp        |  31 +--
 be/src/vec/sink/vresult_sink.h                     |   4 +
 .../writer/iceberg/viceberg_partition_writer.cpp   |  27 +--
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   6 +-
 be/src/vec/sink/writer/vhive_partition_writer.cpp  |  27 +--
 .../java/org/apache/doris/analysis/ExportStmt.java |   7 +
 .../org/apache/doris/analysis/OutFileClause.java   |  58 +++--
 .../main/java/org/apache/doris/load/ExportJob.java |   9 +
 .../trees/plans/commands/ExportCommand.java        |   8 +-
 gensrc/thrift/DataSinks.thrift                     |   2 +
 .../export_p0/test_parquet_orc_compression.out     | 248 +++++++++++++++++++++
 .../export_p0/test_parquet_orc_compression.groovy  | 177 +++++++++++++++
 14 files changed, 553 insertions(+), 112 deletions(-)

diff --git a/be/src/vec/runtime/vorc_transformer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
index 54c2bb59923..09bae276d65 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -107,39 +107,27 @@ void VOrcOutputStream::set_written_len(int64_t 
written_len) {
 }
 
 VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
-                                 const VExprContextSPtrs& output_vexpr_ctxs,
-                                 const std::string& schema, bool 
output_object_data)
-        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
-          _file_writer(file_writer),
-          _write_options(new orc::WriterOptions()),
-          _schema_str(&schema),
-          _iceberg_schema(nullptr) {
-    _write_options->setTimezoneName(_state->timezone());
-    _write_options->setUseTightNumericVector(true);
-}
-
-VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
-                                 const VExprContextSPtrs& output_vexpr_ctxs,
+                                 const VExprContextSPtrs& output_vexpr_ctxs, 
std::string schema,
                                  std::vector<std::string> column_names, bool 
output_object_data,
-                                 orc::CompressionKind compression,
+                                 TFileCompressType::type compress_type,
                                  const iceberg::Schema* iceberg_schema)
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
           _file_writer(file_writer),
           _column_names(std::move(column_names)),
           _write_options(new orc::WriterOptions()),
-          _schema_str(nullptr),
+          _schema_str(std::move(schema)),
           _iceberg_schema(iceberg_schema) {
     _write_options->setTimezoneName(_state->timezone());
     _write_options->setUseTightNumericVector(true);
-    _write_options->setCompression(compression);
+    set_compression_type(compress_type);
 }
 
 Status VOrcTransformer::open() {
-    if (_schema_str != nullptr) {
+    if (!_schema_str.empty()) {
         try {
-            _schema = orc::Type::buildTypeFromString(*_schema_str);
+            _schema = orc::Type::buildTypeFromString(_schema_str);
         } catch (const std::exception& e) {
-            return Status::InternalError("Orc build schema from \"{}\" failed: 
{}", *_schema_str,
+            return Status::InternalError("Orc build schema from \"{}\" failed: 
{}", _schema_str,
                                          e.what());
         }
     } else {
@@ -171,6 +159,30 @@ Status VOrcTransformer::open() {
     return Status::OK();
 }
 
+void VOrcTransformer::set_compression_type(const TFileCompressType::type& 
compress_type) {
+    switch (compress_type) {
+    case TFileCompressType::PLAIN: {
+        
_write_options->setCompression(orc::CompressionKind::CompressionKind_NONE);
+        break;
+    }
+    case TFileCompressType::SNAPPYBLOCK: {
+        
_write_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY);
+        break;
+    }
+    case TFileCompressType::ZLIB: {
+        
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
+        break;
+    }
+    case TFileCompressType::ZSTD: {
+        
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD);
+        break;
+    }
+    default: {
+        
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
+    }
+    }
+}
+
 std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(
         const TypeDescriptor& type_descriptor, const iceberg::NestedField* 
nested_field) {
     std::unique_ptr<orc::Type> type;
diff --git a/be/src/vec/runtime/vorc_transformer.h 
b/be/src/vec/runtime/vorc_transformer.h
index 554a1401f61..134e949e76c 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -79,13 +79,9 @@ private:
 class VOrcTransformer final : public VFileFormatTransformer {
 public:
     VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
-                    const VExprContextSPtrs& output_vexpr_ctxs, const 
std::string& schema,
-                    bool output_object_data);
-
-    VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
-                    const VExprContextSPtrs& output_vexpr_ctxs,
+                    const VExprContextSPtrs& output_vexpr_ctxs, std::string 
schema,
                     std::vector<std::string> column_names, bool 
output_object_data,
-                    orc::CompressionKind compression,
+                    TFileCompressType::type compression,
                     const iceberg::Schema* iceberg_schema = nullptr);
 
     ~VOrcTransformer() = default;
@@ -99,6 +95,7 @@ public:
     int64_t written_len() override;
 
 private:
+    void set_compression_type(const TFileCompressType::type& compress_type);
     std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& 
type_descriptor,
                                                const iceberg::NestedField* 
nested_field);
 
@@ -113,7 +110,7 @@ private:
     std::vector<std::string> _column_names;
     std::unique_ptr<orc::OutputStream> _output_stream;
     std::unique_ptr<orc::WriterOptions> _write_options;
-    const std::string* _schema_str;
+    std::string _schema_str;
     std::unique_ptr<orc::Type> _schema;
     std::unique_ptr<orc::Writer> _writer;
 
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index fbb24e00968..116a898c4f1 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -147,39 +147,40 @@ void ParquetBuildHelper::build_compression_type(
         const TParquetCompressionType::type& compression_type) {
     switch (compression_type) {
     case TParquetCompressionType::SNAPPY: {
-        builder.compression(parquet::Compression::SNAPPY);
+        builder.compression(arrow::Compression::SNAPPY);
         break;
     }
     case TParquetCompressionType::GZIP: {
-        builder.compression(parquet::Compression::GZIP);
+        builder.compression(arrow::Compression::GZIP);
         break;
     }
     case TParquetCompressionType::BROTLI: {
-        builder.compression(parquet::Compression::BROTLI);
+        builder.compression(arrow::Compression::BROTLI);
         break;
     }
     case TParquetCompressionType::ZSTD: {
-        builder.compression(parquet::Compression::ZSTD);
+        builder.compression(arrow::Compression::ZSTD);
         break;
     }
     case TParquetCompressionType::LZ4: {
-        builder.compression(parquet::Compression::LZ4);
-        break;
-    }
-    case TParquetCompressionType::LZO: {
-        builder.compression(parquet::Compression::LZO);
-        break;
-    }
-    case TParquetCompressionType::BZ2: {
-        builder.compression(parquet::Compression::BZ2);
+        builder.compression(arrow::Compression::LZ4);
         break;
     }
+    // arrow do not support lzo and bz2 compression type.
+    // case TParquetCompressionType::LZO: {
+    //     builder.compression(arrow::Compression::LZO);
+    //     break;
+    // }
+    // case TParquetCompressionType::BZ2: {
+    //     builder.compression(arrow::Compression::BZ2);
+    //     break;
+    // }
     case TParquetCompressionType::UNCOMPRESSED: {
-        builder.compression(parquet::Compression::UNCOMPRESSED);
+        builder.compression(arrow::Compression::UNCOMPRESSED);
         break;
     }
     default:
-        builder.compression(parquet::Compression::UNCOMPRESSED);
+        builder.compression(arrow::Compression::SNAPPY);
     }
 }
 
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index c5b092bcd41..a3563bbb501 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -67,6 +67,7 @@ struct ResultFileOptions {
     //Now the code version is 1.1.2, so when the version is after 1.2, could 
remove this code.
     bool is_refactor_before_flag = false;
     std::string orc_schema;
+    TFileCompressType::type orc_compression_type;
 
     bool delete_existing_files = false;
     std::string file_suffix;
@@ -119,6 +120,9 @@ struct ResultFileOptions {
         if (t_opt.__isset.orc_schema) {
             orc_schema = t_opt.orc_schema;
         }
+        if (t_opt.__isset.orc_compression_type) {
+            orc_compression_type = t_opt.orc_compression_type;
+        }
     }
 };
 
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index f3c951effda..e2dccd0345b 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -87,32 +87,9 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profil
         return _file_format_transformer->open();
     }
     case TFileFormatType::FORMAT_ORC: {
-        orc::CompressionKind orc_compression_type;
-        switch (_compress_type) {
-        case TFileCompressType::PLAIN: {
-            orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
-            break;
-        }
-        case TFileCompressType::SNAPPYBLOCK: {
-            orc_compression_type = 
orc::CompressionKind::CompressionKind_SNAPPY;
-            break;
-        }
-        case TFileCompressType::ZLIB: {
-            orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
-            break;
-        }
-        case TFileCompressType::ZSTD: {
-            orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
-            break;
-        }
-        default: {
-            return Status::InternalError("Unsupported compress type {} with 
orc", _compress_type);
-        }
-        }
-
         _file_format_transformer.reset(
-                new VOrcTransformer(state, _file_writer.get(), 
_write_output_expr_ctxs,
-                                    _write_column_names, false, 
orc_compression_type, &_schema));
+                new VOrcTransformer(state, _file_writer.get(), 
_write_output_expr_ctxs, "",
+                                    _write_column_names, false, 
_compress_type, &_schema));
         return _file_format_transformer->open();
     }
     default: {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 64fa5313f30..acaa05963c1 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -124,9 +124,9 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
                 _file_opts->parquet_version, _output_object_data));
         break;
     case TFileFormatType::FORMAT_ORC:
-        _vfile_writer.reset(new VOrcTransformer(_state, 
_file_writer_impl.get(),
-                                                _vec_output_expr_ctxs, 
_file_opts->orc_schema,
-                                                _output_object_data));
+        _vfile_writer.reset(new VOrcTransformer(
+                _state, _file_writer_impl.get(), _vec_output_expr_ctxs, 
_file_opts->orc_schema, {},
+                _output_object_data, _file_opts->orc_compression_type));
         break;
     default:
         return Status::InternalError("unsupported file format: {}", 
_file_opts->file_format);
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index f05acece2e8..64d1d9b4266 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -92,32 +92,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
         return _file_format_transformer->open();
     }
     case TFileFormatType::FORMAT_ORC: {
-        orc::CompressionKind orc_compression_type;
-        switch (_hive_compress_type) {
-        case TFileCompressType::PLAIN: {
-            orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
-            break;
-        }
-        case TFileCompressType::SNAPPYBLOCK: {
-            orc_compression_type = 
orc::CompressionKind::CompressionKind_SNAPPY;
-            break;
-        }
-        case TFileCompressType::ZLIB: {
-            orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
-            break;
-        }
-        case TFileCompressType::ZSTD: {
-            orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
-            break;
-        }
-        default: {
-            return Status::InternalError("Unsupported type {} with orc", 
_hive_compress_type);
-        }
-        }
-
         _file_format_transformer.reset(
-                new VOrcTransformer(state, _file_writer.get(), 
_write_output_expr_ctxs,
-                                    _write_column_names, false, 
orc_compression_type));
+                new VOrcTransformer(state, _file_writer.get(), 
_write_output_expr_ctxs, "",
+                                    _write_column_names, false, 
_hive_compress_type));
         return _file_format_transformer->open();
     }
     default: {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 3efda3bf8f8..855379cbc37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -64,6 +64,7 @@ public class ExportStmt extends StatementBase {
     public static final String PARALLELISM = "parallelism";
     public static final String LABEL = "label";
     public static final String DATA_CONSISTENCY = "data_consistency";
+    public static final String COMPRESS_TYPE = "compress_type";
 
     private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
@@ -81,6 +82,7 @@ public class ExportStmt extends StatementBase {
             .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
             .add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
             .add("format")
+            .add(COMPRESS_TYPE)
             .build();
 
     private TableName tblName;
@@ -107,6 +109,7 @@ public class ExportStmt extends StatementBase {
     private String deleteExistingFiles;
     private String withBom;
     private String dataConsistency = ExportJob.CONSISTENT_PARTITION;
+    private String compressionType;
     private SessionVariable sessionVariables;
 
     private String qualifiedUser;
@@ -234,6 +237,7 @@ public class ExportStmt extends StatementBase {
         exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
         exportJob.setWithBom(this.withBom);
         exportJob.setDataConsistency(this.dataConsistency);
+        exportJob.setCompressType(this.compressionType);
 
         if (columns != null) {
             Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
@@ -376,6 +380,9 @@ public class ExportStmt extends StatementBase {
                         + ExportJob.CONSISTENT_PARTITION + "`/`" + 
ExportJob.CONSISTENT_NONE + "`");
             }
         }
+
+        // compress_type
+        this.compressionType = properties.getOrDefault(COMPRESS_TYPE, "");
     }
 
     private void checkColumns() throws DdlException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 58b4898db02..03849abb97e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TParquetCompressionType;
 import org.apache.doris.thrift.TParquetDataType;
@@ -70,6 +71,7 @@ public class OutFileClause {
     public static final Map<String, TParquetRepetitionType> 
PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
     public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = 
Maps.newHashMap();
     public static final Map<String, TParquetCompressionType> 
PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
+    public static final Map<String, TFileCompressType> 
ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap();
     public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = 
Maps.newHashMap();
     public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
     public static final String FILE_NUMBER = "FileNumber";
@@ -106,9 +108,15 @@ public class OutFileClause {
         PARQUET_COMPRESSION_TYPE_MAP.put("brotli", 
TParquetCompressionType.BROTLI);
         PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
         PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
-        PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO);
-        PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2);
-        PARQUET_COMPRESSION_TYPE_MAP.put("default", 
TParquetCompressionType.UNCOMPRESSED);
+        // arrow do not support lzo and bz2 compression type.
+        // PARQUET_COMPRESSION_TYPE_MAP.put("lzo", 
TParquetCompressionType.LZO);
+        // PARQUET_COMPRESSION_TYPE_MAP.put("bz2", 
TParquetCompressionType.BZ2);
+        PARQUET_COMPRESSION_TYPE_MAP.put("plain", 
TParquetCompressionType.UNCOMPRESSED);
+
+        ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN);
+        ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK);
+        ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB);
+        ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD);
 
         PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
         PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
@@ -137,6 +145,7 @@ public class OutFileClause {
     public static final String PROP_DELETE_EXISTING_FILES = 
"delete_existing_files";
     public static final String PROP_FILE_SUFFIX = "file_suffix";
     public static final String PROP_WITH_BOM = "with_bom";
+    public static final String COMPRESS_TYPE = "compress_type";
 
     private static final String PARQUET_PROP_PREFIX = "parquet.";
     private static final String SCHEMA = "schema";
@@ -170,8 +179,8 @@ public class OutFileClause {
     private boolean isAnalyzed = false;
     private String headerType = "";
 
-    private static final String PARQUET_COMPRESSION = "compression";
-    private TParquetCompressionType parquetCompressionType = 
TParquetCompressionType.UNCOMPRESSED;
+    private TParquetCompressionType parquetCompressionType = 
TParquetCompressionType.SNAPPY;
+    private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
     private static final String PARQUET_DISABLE_DICTIONARY = 
"disable_dictionary";
     private boolean parquetDisableDictionary = false;
     private static final String PARQUET_VERSION = "version";
@@ -664,19 +673,11 @@ public class OutFileClause {
         return fullPath.replace(filePath, "");
     }
 
-    void setParquetCompressionType(String propertyValue) {
-        if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(propertyValue)) {
-            this.parquetCompressionType = 
PARQUET_COMPRESSION_TYPE_MAP.get(propertyValue);
-        } else {
-            LOG.warn("not set parquet compression type or is invalid, set 
default to UNCOMPRESSED type.");
-        }
-    }
-
     void setParquetVersion(String propertyValue) {
         if (PARQUET_VERSION_MAP.containsKey(propertyValue)) {
             this.parquetVersion = PARQUET_VERSION_MAP.get(propertyValue);
         } else {
-            LOG.warn("not set parquet version type or is invalid, set default 
to PARQUET_1.0 version.");
+            LOG.debug("not set parquet version type or is invalid, set default 
to PARQUET_1.0 version.");
         }
     }
 
@@ -692,15 +693,25 @@ public class OutFileClause {
      * currently only supports: compression, disable_dictionary, version
      */
     private void getParquetProperties(Set<String> processedPropKeys) throws 
AnalysisException {
+        // save compress type
+        if (properties.containsKey(COMPRESS_TYPE)) {
+            if 
(PARQUET_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase()))
 {
+                this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(
+                        properties.get(COMPRESS_TYPE).toLowerCase());
+                processedPropKeys.add(COMPRESS_TYPE);
+            } else {
+                throw new AnalysisException("parquet compression type [" + 
properties.get(COMPRESS_TYPE)
+                        + "] is invalid, please choose one among SNAPPY, GZIP, 
BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN");
+            }
+        }
+
         // save all parquet prefix property
         Iterator<Map.Entry<String, String>> iter = 
properties.entrySet().iterator();
         while (iter.hasNext()) {
             Map.Entry<String, String> entry = iter.next();
             if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
                 processedPropKeys.add(entry.getKey());
-                if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_COMPRESSION))
 {
-                    setParquetCompressionType(entry.getValue());
-                } else if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY))
 {
+                if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY))
 {
                     this.parquetDisableDictionary = 
Boolean.valueOf(entry.getValue());
                 } else if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_VERSION))
 {
                     setParquetVersion(entry.getValue());
@@ -744,6 +755,18 @@ public class OutFileClause {
     }
 
     private void getOrcProperties(Set<String> processedPropKeys) throws 
AnalysisException {
+        // get compression type
+        // save compress type
+        if (properties.containsKey(COMPRESS_TYPE)) {
+            if 
(ORC_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase()))
 {
+                this.orcCompressionType = 
ORC_COMPRESSION_TYPE_MAP.get(properties.get(COMPRESS_TYPE).toLowerCase());
+                processedPropKeys.add(COMPRESS_TYPE);
+            } else {
+                throw new AnalysisException("orc compression type [" + 
properties.get(COMPRESS_TYPE) + "] is invalid,"
+                        + " please choose one among ZLIB, SNAPPY, ZSTD or 
PLAIN");
+            }
+        }
+
         // check schema. if schema is not set, Doris will gen schema by select 
items
         String schema = properties.get(SCHEMA);
         if (schema == null) {
@@ -846,6 +869,7 @@ public class OutFileClause {
         }
         if (isOrcFormat()) {
             sinkOptions.setOrcSchema(serializeOrcSchema());
+            sinkOptions.setOrcCompressionType(orcCompressionType);
         }
         return sinkOptions;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index b072ed2f2fd..a0de8c0fad6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -62,6 +62,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -174,6 +175,8 @@ public class ExportJob implements Writable {
     private String withBom;
     @SerializedName("dataConsistency")
     private String dataConsistency;
+    @SerializedName("compressType")
+    private String compressType;
 
     private TableRef tableRef;
 
@@ -621,6 +624,12 @@ public class ExportJob implements Writable {
         if (format.equals("csv") || format.equals("csv_with_names") || 
format.equals("csv_with_names_and_types")) {
             outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, 
columnSeparator);
             outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, 
lineDelimiter);
+        } else {
+            // orc / parquet
+            // compressType == null means outfile will use default compression 
type
+            if (compressType != null) {
+                outfileProperties.put(ExportCommand.COMPRESS_TYPE, 
compressType);
+            }
         }
         if (!maxFileSize.isEmpty()) {
             outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, 
maxFileSize);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index f89145ec45c..263bf43e355 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -74,6 +74,7 @@ public class ExportCommand extends Command implements 
ForwardWithSync {
     public static final String PARALLELISM = "parallelism";
     public static final String LABEL = "label";
     public static final String DATA_CONSISTENCY = "data_consistency";
+    public static final String COMPRESS_TYPE = "compress_type";
     private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
     private static final String DEFAULT_PARALLELISM = "1";
@@ -91,6 +92,7 @@ public class ExportCommand extends Command implements 
ForwardWithSync {
             .add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
             .add("format")
             .add(OutFileClause.PROP_WITH_BOM)
+            .add(COMPRESS_TYPE)
             .build();
 
     private final List<String> nameParts;
@@ -337,9 +339,13 @@ public class ExportCommand extends Command implements 
ForwardWithSync {
         } catch (NumberFormatException e) {
             throw new UserException("The value of timeout is invalid!");
         }
-
         exportJob.setTimeoutSecond(timeoutSecond);
 
+        // set compress_type
+        if (fileProperties.containsKey(COMPRESS_TYPE)) {
+            exportJob.setCompressType(fileProperties.get(COMPRESS_TYPE));
+        }
+
         // exportJob generate outfile sql
         
exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, 
this.nameParts));
         return exportJob;
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index e613ef788b1..25be947ba34 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -131,6 +131,8 @@ struct TResultFileSinkOptions {
     16: optional bool delete_existing_files;
     17: optional string file_suffix;
     18: optional bool with_bom;
+
+    19: optional PlanNodes.TFileCompressType orc_compression_type;
 }
 
 struct TMemoryScratchSink {
diff --git a/regression-test/data/export_p0/test_parquet_orc_compression.out 
b/regression-test/data/export_p0/test_parquet_orc_compression.out
new file mode 100644
index 00000000000..7d258b47e3b
--- /dev/null
+++ b/regression-test/data/export_p0/test_parquet_orc_compression.out
@@ -0,0 +1,248 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_export1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
+-- !select_load1 --
+1      2017-10-01
+10     2017-10-01
+11     2017-10-01
+2      2017-10-01
+3      2017-10-01
+4      2017-10-01
+5      2017-10-01
+6      2017-10-01
+7      2017-10-01
+8      2017-10-01
+9      2017-10-01
+
diff --git 
a/regression-test/suites/export_p0/test_parquet_orc_compression.groovy 
b/regression-test/suites/export_p0/test_parquet_orc_compression.groovy
new file mode 100644
index 00000000000..c3a8f3f8bbb
--- /dev/null
+++ b/regression-test/suites/export_p0/test_parquet_orc_compression.groovy
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_parquet_orc_compression", "p0") {
+    // open nereids
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_fallback_to_original_planner=false """
+
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String s3_endpoint = getS3Endpoint()
+    String region = getS3Region()
+    String bucket = context.config.otherConfigs.get("s3BucketName");
+
+
+    def table_export_name = "test_parquet_orc_compression"
+    def outfile_path_prefix = 
"""${bucket}/export/test_parquet_orc_compression/exp_"""
+
+    sql """ DROP TABLE IF EXISTS ${table_export_name} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_export_name} (
+            `user_id` INT NOT NULL COMMENT "用户id",
+            `date` DATE NOT NULL COMMENT "数据灌入日期时间"
+            )
+            DISTRIBUTED BY HASH(user_id)
+            PROPERTIES("replication_num" = "1");
+        """
+    StringBuilder sb = new StringBuilder()
+    int i = 1
+    for (; i < 11; i ++) {
+        sb.append("""
+            (${i}, '2017-10-01'),
+        """)
+    }
+    sb.append("""
+            (${i}, '2017-10-01')
+        """)
+    sql """ INSERT INTO ${table_export_name} VALUES
+            ${sb.toString()}
+        """
+    def insert_res = sql "show last insert;"
+    logger.info("insert result: " + insert_res.toString())
+    order_qt_select_export1 """ SELECT * FROM ${table_export_name} t ORDER BY 
user_id; """
+
+
+    def waiting_export = { export_label ->
+        while (true) {
+            def res = sql """ show export where label = "${export_label}";"""
+            logger.info("export state: " + res[0][2])
+            if (res[0][2] == "FINISHED") {
+                def json = parseJson(res[0][11])
+                assert json instanceof List
+                assertEquals("1", json.fileNumber[0][0])
+                log.info("outfile_path: ${json.url[0][0]}")
+                return json.url[0][0];
+            } else if (res[0][2] == "CANCELLED") {
+                throw new IllegalStateException("""export failed: 
${res[0][10]}""")
+            } else {
+                sleep(5000)
+            }
+        }
+    }
+
+    // export compression 
+    def export_compression = { file_format, compression_type, tvf_read ->
+        def uuid = UUID.randomUUID().toString()
+        def outFilePath = """${outfile_path_prefix}_${uuid}"""
+        def label = "label_${uuid}"
+        try {
+            // exec export
+            sql """
+                EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/"
+                PROPERTIES(
+                    "label" = "${label}",
+                    "format" = "${file_format}",
+                    "compress_type" = "${compression_type}"
+                )
+                WITH S3(
+                    "s3.endpoint" = "${s3_endpoint}",
+                    "s3.region" = "${region}",
+                    "s3.secret_key"="${sk}",
+                    "s3.access_key" = "${ak}"
+                );
+            """
+
+            if (tvf_read) {
+                def outfile_url = waiting_export.call(label)
+            
+                order_qt_select_load1 """ select * from s3(
+                                            "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}0.${file_format}",
+                                            "s3.access_key"= "${ak}",
+                                            "s3.secret_key" = "${sk}",
+                                            "format" = "${file_format}",
+                                            "region" = "${region}"
+                                        ) ORDER BY user_id;
+                                        """
+            }
+        } finally {
+        }
+    }
+
+    // outfile compression
+    def outfile_compression = { file_format, compression_type, tvf_read ->
+        def uuid = UUID.randomUUID().toString()
+        def outFilePath = """${outfile_path_prefix}_${uuid}"""
+
+        def res = sql """
+            SELECT * FROM ${table_export_name} t ORDER BY user_id
+            INTO OUTFILE "s3://${outFilePath}"
+            FORMAT AS ${file_format}
+            PROPERTIES (
+                "compress_type" = "${compression_type}",
+                "s3.endpoint" = "${s3_endpoint}",
+                "s3.region" = "${region}",
+                "s3.secret_key"="${sk}",
+                "s3.access_key" = "${ak}"
+            );
+        """
+
+        if (tvf_read) {
+            def outfile_url = res[0][3]
+            order_qt_select_load1 """ SELECT * FROM S3 (
+                    "uri" = 
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), 
outfile_url.length() - 1)}0.${file_format}",
+                    "ACCESS_KEY"= "${ak}",
+                    "SECRET_KEY" = "${sk}",
+                    "format" = "${file_format}",
+                    "region" = "${region}"
+                );
+                """
+        }
+    }
+
+    // 1. export
+    // 1.1 parquet
+    export_compression("parquet", "snappy", true)
+    export_compression("parquet", "GZIP", true)
+    // parquet-read do not support read BROTLI compression type now
+    export_compression("parquet", "BROTLI", false)
+    export_compression("parquet", "ZSTD", true)
+    export_compression("parquet", "LZ4", true)
+    export_compression("parquet", "plain", true)
+    // 1.2 orc
+    export_compression("orc", "PLAIN", true)
+    export_compression("orc", "SNAPPY", true)
+    export_compression("orc", "ZLIB", true)
+    export_compression("orc", "ZSTD", true)
+
+    // 2. outfile 
+    // parquet
+    outfile_compression("parquet", "snappy", true)
+    outfile_compression("parquet", "GZIP", true)
+    // parquet-read do not support read BROTLI compression type now
+    outfile_compression("parquet", "BROTLI", false)
+    outfile_compression("parquet", "ZSTD", true)
+    outfile_compression("parquet", "LZ4", true)
+    outfile_compression("parquet", "plain", true)
+    // orc
+    outfile_compression("orc", "PLAIN", true)
+    outfile_compression("orc", "SNAPPY", true)
+    outfile_compression("orc", "ZLIB", true)
+    outfile_compression("orc", "ZSTD", true)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to