This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b0b5f84e40eccee2dc5938d26d8478b64374f052
Author: 超威老仲 <elves...@qq.com>
AuthorDate: Mon Apr 1 18:45:45 2024 +0800

    [feature](load) support compressed JSON format data for broker load (#30809)
---
 be/src/exec/decompressor.cpp                       |  97 +++++++++++++++++++--
 be/src/exec/decompressor.h                         |  11 ++-
 be/src/util/block_compression.cpp                  |   2 +-
 be/src/vec/exec/format/csv/csv_reader.cpp          |  65 +-------------
 be/src/vec/exec/format/json/new_json_reader.cpp    |  20 ++++-
 be/src/vec/exec/format/json/new_json_reader.h      |   3 +
 .../Load/BROKER-LOAD.md                            |   2 +-
 .../Load/BROKER-LOAD.md                            |   2 +-
 .../org/apache/doris/datasource/FileGroupInfo.java |   4 +-
 .../load_p0/broker_load/test_compress_type.out     |   2 +-
 .../stream_load/basic_data_by_line.json.bz2        | Bin 0 -> 3161 bytes
 .../load_p0/stream_load/basic_data_by_line.json.gz | Bin 0 -> 3519 bytes
 .../stream_load/basic_data_by_line.json.lz4        | Bin 0 -> 5520 bytes
 .../load_p0/broker_load/test_compress_type.groovy  |  24 ++++-
 14 files changed, 149 insertions(+), 83 deletions(-)

diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index c0dd7554942..c9f16f10e7a 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -19,39 +19,42 @@
 
 #include <strings.h>
 
+#include <memory>
 #include <ostream>
 
 #include "common/logging.h"
+#include "common/status.h"
 #include "gutil/endian.h"
 #include "gutil/strings/substitute.h"
 
 namespace doris {
 
-Status Decompressor::create_decompressor(CompressType type, Decompressor** 
decompressor) {
+Status Decompressor::create_decompressor(CompressType type,
+                                         std::unique_ptr<Decompressor>* 
decompressor) {
     switch (type) {
     case CompressType::UNCOMPRESSED:
-        *decompressor = nullptr;
+        decompressor->reset(nullptr);
         break;
     case CompressType::GZIP:
-        *decompressor = new GzipDecompressor(false);
+        decompressor->reset(new GzipDecompressor(false));
         break;
     case CompressType::DEFLATE:
-        *decompressor = new GzipDecompressor(true);
+        decompressor->reset(new GzipDecompressor(true));
         break;
     case CompressType::BZIP2:
-        *decompressor = new Bzip2Decompressor();
+        decompressor->reset(new Bzip2Decompressor());
         break;
     case CompressType::LZ4FRAME:
-        *decompressor = new Lz4FrameDecompressor();
+        decompressor->reset(new Lz4FrameDecompressor());
         break;
     case CompressType::LZ4BLOCK:
-        *decompressor = new Lz4BlockDecompressor();
+        decompressor->reset(new Lz4BlockDecompressor());
         break;
     case CompressType::SNAPPYBLOCK:
-        *decompressor = new SnappyBlockDecompressor();
+        decompressor->reset(new SnappyBlockDecompressor());
         break;
     case CompressType::LZOP:
-        *decompressor = new LzopDecompressor();
+        decompressor->reset(new LzopDecompressor());
         break;
     default:
         return Status::InternalError("Unknown compress type: {}", type);
@@ -65,6 +68,82 @@ Status Decompressor::create_decompressor(CompressType type, 
Decompressor** decom
     return st;
 }
 
+Status Decompressor::create_decompressor(TFileCompressType::type type,
+                                         std::unique_ptr<Decompressor>* 
decompressor) {
+    CompressType compress_type;
+    switch (type) {
+    case TFileCompressType::PLAIN:
+    case TFileCompressType::UNKNOWN:
+        compress_type = CompressType::UNCOMPRESSED;
+        break;
+    case TFileCompressType::GZ:
+        compress_type = CompressType::GZIP;
+        break;
+    case TFileCompressType::LZO:
+    case TFileCompressType::LZOP:
+        compress_type = CompressType::LZOP;
+        break;
+    case TFileCompressType::BZ2:
+        compress_type = CompressType::BZIP2;
+        break;
+    case TFileCompressType::LZ4FRAME:
+        compress_type = CompressType::LZ4FRAME;
+        break;
+    case TFileCompressType::LZ4BLOCK:
+        compress_type = CompressType::LZ4BLOCK;
+        break;
+    case TFileCompressType::DEFLATE:
+        compress_type = CompressType::DEFLATE;
+        break;
+    case TFileCompressType::SNAPPYBLOCK:
+        compress_type = CompressType::SNAPPYBLOCK;
+        break;
+    default:
+        return Status::InternalError<false>("unknown compress type: {}", type);
+    }
+    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, 
decompressor));
+
+    return Status::OK();
+}
+
+Status Decompressor::create_decompressor(TFileFormatType::type type,
+                                         std::unique_ptr<Decompressor>* 
decompressor) {
+    CompressType compress_type;
+    switch (type) {
+    case TFileFormatType::FORMAT_PROTO:
+        [[fallthrough]];
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        compress_type = CompressType::UNCOMPRESSED;
+        break;
+    case TFileFormatType::FORMAT_CSV_GZ:
+        compress_type = CompressType::GZIP;
+        break;
+    case TFileFormatType::FORMAT_CSV_BZ2:
+        compress_type = CompressType::BZIP2;
+        break;
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+        compress_type = CompressType::LZ4FRAME;
+        break;
+    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
+        compress_type = CompressType::LZ4BLOCK;
+        break;
+    case TFileFormatType::FORMAT_CSV_LZOP:
+        compress_type = CompressType::LZOP;
+        break;
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+        compress_type = CompressType::DEFLATE;
+        break;
+    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
+        compress_type = CompressType::SNAPPYBLOCK;
+        break;
+    default:
+        return Status::InternalError<false>("unknown compress type: {}", type);
+    }
+    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, 
decompressor));
+
+    return Status::OK();
+}
+
 uint32_t Decompressor::_read_int32(uint8_t* buf) {
     return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
 }
diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h
index 45653fd8b04..4ca7fbdf6d7 100644
--- a/be/src/exec/decompressor.h
+++ b/be/src/exec/decompressor.h
@@ -26,9 +26,11 @@
 #include <stdint.h>
 #include <zlib.h>
 
+#include <memory>
 #include <string>
 
 #include "common/status.h"
+#include "gen_cpp/PlanNodes_types.h"
 
 namespace doris {
 
@@ -57,7 +59,14 @@ public:
                               size_t* more_output_bytes) = 0;
 
 public:
-    static Status create_decompressor(CompressType type, Decompressor** 
decompressor);
+    static Status create_decompressor(CompressType type,
+                                      std::unique_ptr<Decompressor>* 
decompressor);
+
+    static Status create_decompressor(TFileCompressType::type type,
+                                      std::unique_ptr<Decompressor>* 
decompressor);
+
+    static Status create_decompressor(TFileFormatType::type type,
+                                      std::unique_ptr<Decompressor>* 
decompressor);
 
     virtual std::string debug_info();
 
diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index 4370d33e76f..7dcd907bee8 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -228,7 +228,7 @@ public:
     }
 
 private:
-    Decompressor* _decompressor;
+    std::unique_ptr<Decompressor> _decompressor;
 };
 // Used for LZ4 frame format, decompress speed is two times faster than LZ4.
 class Lz4fBlockCompression : public BlockCompressionCodec {
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 86986f8eea5..c42a465fc63 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -571,72 +571,11 @@ Status 
CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
 }
 
 Status CsvReader::_create_decompressor() {
-    CompressType compress_type;
     if (_file_compress_type != TFileCompressType::UNKNOWN) {
-        switch (_file_compress_type) {
-        case TFileCompressType::PLAIN:
-            compress_type = CompressType::UNCOMPRESSED;
-            break;
-        case TFileCompressType::GZ:
-            compress_type = CompressType::GZIP;
-            break;
-        case TFileCompressType::LZO:
-        case TFileCompressType::LZOP:
-            compress_type = CompressType::LZOP;
-            break;
-        case TFileCompressType::BZ2:
-            compress_type = CompressType::BZIP2;
-            break;
-        case TFileCompressType::LZ4FRAME:
-            compress_type = CompressType::LZ4FRAME;
-            break;
-        case TFileCompressType::LZ4BLOCK:
-            compress_type = CompressType::LZ4BLOCK;
-            break;
-        case TFileCompressType::DEFLATE:
-            compress_type = CompressType::DEFLATE;
-            break;
-        case TFileCompressType::SNAPPYBLOCK:
-            compress_type = CompressType::SNAPPYBLOCK;
-            break;
-        default:
-            return Status::InternalError<false>("unknown compress type: {}", 
_file_compress_type);
-        }
+        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, 
&_decompressor));
     } else {
-        switch (_file_format_type) {
-        case TFileFormatType::FORMAT_PROTO:
-            [[fallthrough]];
-        case TFileFormatType::FORMAT_CSV_PLAIN:
-            compress_type = CompressType::UNCOMPRESSED;
-            break;
-        case TFileFormatType::FORMAT_CSV_GZ:
-            compress_type = CompressType::GZIP;
-            break;
-        case TFileFormatType::FORMAT_CSV_BZ2:
-            compress_type = CompressType::BZIP2;
-            break;
-        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-            compress_type = CompressType::LZ4FRAME;
-            break;
-        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
-            compress_type = CompressType::LZ4BLOCK;
-            break;
-        case TFileFormatType::FORMAT_CSV_LZOP:
-            compress_type = CompressType::LZOP;
-            break;
-        case TFileFormatType::FORMAT_CSV_DEFLATE:
-            compress_type = CompressType::DEFLATE;
-            break;
-        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
-            compress_type = CompressType::SNAPPYBLOCK;
-            break;
-        default:
-            return Status::InternalError<false>("unknown format type: {}", 
_file_format_type);
-        }
+        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, 
&_decompressor));
     }
-    Decompressor* decompressor;
-    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, 
&decompressor));
-    _decompressor.reset(decompressor);
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 06f1cf85e8a..5747fb609ea 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -87,6 +87,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
           _file_reader(nullptr),
           _line_reader(nullptr),
           _reader_eof(false),
+          _decompressor(nullptr),
           _skip_first_line(false),
           _next_row(0),
           _total_rows(0),
@@ -99,6 +100,12 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
     _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
     _read_timer = ADD_TIMER(_profile, "ReadTime");
     _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
+    if (_range.__isset.compress_type) {
+        // for compatibility
+        _file_compress_type = _range.compress_type;
+    } else {
+        _file_compress_type = _params.compress_type;
+    }
     _init_system_properties();
     _init_file_description();
 }
@@ -115,6 +122,7 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const 
TFileScanRangeParams
           _file_slot_descs(file_slot_descs),
           _line_reader(nullptr),
           _reader_eof(false),
+          _decompressor(nullptr),
           _skip_first_line(false),
           _next_row(0),
           _total_rows(0),
@@ -122,6 +130,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, 
const TFileScanRangeParams
           _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
           _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator),
           _io_ctx(io_ctx) {
+    if (_range.__isset.compress_type) {
+        // for compatibility
+        _file_compress_type = _range.compress_type;
+    } else {
+        _file_compress_type = _params.compress_type;
+    }
     _init_system_properties();
     _init_file_description();
 }
@@ -155,6 +169,10 @@ Status NewJsonReader::init_reader(
     // generate _col_default_value_map
     RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, 
col_default_value_ctx));
 
+    // create decompressor.
+    // _decompressor may be nullptr if this is not a compressed file
+    RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, 
&_decompressor));
+
 #ifdef __AVX2__
     if (config::enable_simdjson_reader) {
         RETURN_IF_ERROR(_simdjson_init_reader());
@@ -402,7 +420,7 @@ Status NewJsonReader::_open_line_reader() {
         _skip_first_line = false;
     }
     _line_reader = NewPlainTextLineReader::create_unique(
-            _profile, _file_reader, nullptr,
+            _profile, _file_reader, _decompressor.get(),
             std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, 
_line_delimiter_length), size,
             _current_offset);
     return Status::OK();
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index dac33908e75..e84605ff025 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -33,6 +33,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "exec/decompressor.h"
 #include "exec/line_reader.h"
 #include "exprs/json_functions.h"
 #include "io/file_factory.h"
@@ -212,6 +213,8 @@ private:
     io::FileReaderSPtr _file_reader;
     std::unique_ptr<LineReader> _line_reader;
     bool _reader_eof;
+    std::unique_ptr<Decompressor> _decompressor;
+    TFileCompressType::type _file_compress_type;
 
     // When we fetch range doesn't start from 0 will always skip the first line
     bool _skip_first_line;
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 6bb4eac3e23..63a58a99574 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -107,7 +107,7 @@ WITH BROKER broker_name
     Specifies the file type, CSV, PARQUET and ORC formats are supported. 
Default is CSV.
 
   - `COMPRESS_TYPE AS`
-    Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+    Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP. Only 
valid in CSV or JSON format.
 
   - `column list`
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 7f13fca55ab..f2211e4967b 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -107,7 +107,7 @@ WITH BROKER broker_name
     指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。
 
   - `COMPRESS_TYPE AS`
-    指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。
+    指定文件压缩类型, 支持 GZ/BZ2/LZ4FRAME。仅在 CSV 或 JSON 格式下生效。
 
   - `column list`
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
index eb7f862d7eb..2a2f9f8e563 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
@@ -217,8 +217,8 @@ public class FileGroupInfo {
             if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) 
{
                 // Now only support split plain text
                 if (compressType == TFileCompressType.PLAIN
-                        && (formatType == TFileFormatType.FORMAT_CSV_PLAIN && 
fileStatus.isSplitable)
-                        || formatType == TFileFormatType.FORMAT_JSON) {
+                        && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && 
fileStatus.isSplitable)
+                        || formatType == TFileFormatType.FORMAT_JSON)) {
                     long rangeBytes = bytesPerInstance - curInstanceBytes;
                     TFileRangeDesc rangeDesc = 
createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
                             columnsFromPath);
diff --git a/regression-test/data/load_p0/broker_load/test_compress_type.out 
b/regression-test/data/load_p0/broker_load/test_compress_type.out
index e66359296c0..b11397e97dd 100644
--- a/regression-test/data/load_p0/broker_load/test_compress_type.out
+++ b/regression-test/data/load_p0/broker_load/test_compress_type.out
@@ -1,4 +1,4 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !sql --
-240
+360
 
diff --git 
a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.bz2 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.bz2
new file mode 100644
index 00000000000..6944b68a469
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.bz2 differ
diff --git 
a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.gz 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.gz
new file mode 100644
index 00000000000..d3c9ea71ff0
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.gz differ
diff --git 
a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.lz4 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.lz4
new file mode 100644
index 00000000000..736c858f803
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.lz4 differ
diff --git 
a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy 
b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
index 50d4e34cd99..a241ec4db2f 100644
--- a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
@@ -32,6 +32,12 @@ suite("test_compress_type", "load_p0") {
             "",
             "",
             "",
+            "COMPRESS_TYPE AS \"GZ\"",
+            "COMPRESS_TYPE AS \"BZ2\"",
+            "COMPRESS_TYPE AS \"LZ4FRAME\"",
+            "",
+            "",
+            "",
     ]
 
     def fileFormat = [
@@ -46,7 +52,13 @@ suite("test_compress_type", "load_p0") {
             "FORMAT AS \"CSV\"",
             "",
             "",
-            ""
+            "",
+            "FORMAT AS \"JSON\"",
+            "FORMAT AS \"JSON\"",
+            "FORMAT AS \"JSON\"",
+            "FORMAT AS \"JSON\"",
+            "FORMAT AS \"JSON\"",
+            "FORMAT AS \"JSON\"",
     ]
 
     def paths = [
@@ -61,7 +73,13 @@ suite("test_compress_type", "load_p0") {
             
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
             
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
             
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
-            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4"
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.gz",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.bz2",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.lz4",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.gz",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.bz2",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.lz4",
     ]
     def labels = []
 
@@ -126,4 +144,4 @@ suite("test_compress_type", "load_p0") {
     }
 
     qt_sql """ select count(*) from ${tableName} """
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to