This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit b0b5f84e40eccee2dc5938d26d8478b64374f052 Author: 超威老仲 <elves...@qq.com> AuthorDate: Mon Apr 1 18:45:45 2024 +0800 [feature](load) support compressed JSON format data for broker load (#30809) --- be/src/exec/decompressor.cpp | 97 +++++++++++++++++++-- be/src/exec/decompressor.h | 11 ++- be/src/util/block_compression.cpp | 2 +- be/src/vec/exec/format/csv/csv_reader.cpp | 65 +------------- be/src/vec/exec/format/json/new_json_reader.cpp | 20 ++++- be/src/vec/exec/format/json/new_json_reader.h | 3 + .../Load/BROKER-LOAD.md | 2 +- .../Load/BROKER-LOAD.md | 2 +- .../org/apache/doris/datasource/FileGroupInfo.java | 4 +- .../load_p0/broker_load/test_compress_type.out | 2 +- .../stream_load/basic_data_by_line.json.bz2 | Bin 0 -> 3161 bytes .../load_p0/stream_load/basic_data_by_line.json.gz | Bin 0 -> 3519 bytes .../stream_load/basic_data_by_line.json.lz4 | Bin 0 -> 5520 bytes .../load_p0/broker_load/test_compress_type.groovy | 24 ++++- 14 files changed, 149 insertions(+), 83 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index c0dd7554942..c9f16f10e7a 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -19,39 +19,42 @@ #include <strings.h> +#include <memory> #include <ostream> #include "common/logging.h" +#include "common/status.h" #include "gutil/endian.h" #include "gutil/strings/substitute.h" namespace doris { -Status Decompressor::create_decompressor(CompressType type, Decompressor** decompressor) { +Status Decompressor::create_decompressor(CompressType type, + std::unique_ptr<Decompressor>* decompressor) { switch (type) { case CompressType::UNCOMPRESSED: - *decompressor = nullptr; + decompressor->reset(nullptr); break; case CompressType::GZIP: - *decompressor = new GzipDecompressor(false); + decompressor->reset(new GzipDecompressor(false)); break; case CompressType::DEFLATE: - *decompressor = new GzipDecompressor(true); + decompressor->reset(new GzipDecompressor(true)); break; case CompressType::BZIP2: - *decompressor = new Bzip2Decompressor(); + decompressor->reset(new Bzip2Decompressor()); break; case CompressType::LZ4FRAME: - *decompressor = new Lz4FrameDecompressor(); + decompressor->reset(new Lz4FrameDecompressor()); break; case CompressType::LZ4BLOCK: - *decompressor = new Lz4BlockDecompressor(); + decompressor->reset(new Lz4BlockDecompressor()); break; case CompressType::SNAPPYBLOCK: - *decompressor = new SnappyBlockDecompressor(); + decompressor->reset(new SnappyBlockDecompressor()); break; case CompressType::LZOP: - *decompressor = new LzopDecompressor(); + decompressor->reset(new LzopDecompressor()); break; default: return Status::InternalError("Unknown compress type: {}", type); @@ -65,6 +68,82 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom return st; } +Status Decompressor::create_decompressor(TFileCompressType::type type, + std::unique_ptr<Decompressor>* decompressor) { + CompressType compress_type; + switch (type) { + case TFileCompressType::PLAIN: + case TFileCompressType::UNKNOWN: + compress_type = CompressType::UNCOMPRESSED; + break; + case TFileCompressType::GZ: + compress_type = CompressType::GZIP; + break; + case TFileCompressType::LZO: + case TFileCompressType::LZOP: + compress_type = CompressType::LZOP; + break; + case TFileCompressType::BZ2: + compress_type = CompressType::BZIP2; + break; + case TFileCompressType::LZ4FRAME: + compress_type = CompressType::LZ4FRAME; + break; + case TFileCompressType::LZ4BLOCK: + compress_type = CompressType::LZ4BLOCK; + break; + case TFileCompressType::DEFLATE: + compress_type = CompressType::DEFLATE; + break; + case TFileCompressType::SNAPPYBLOCK: + compress_type = CompressType::SNAPPYBLOCK; + break; + default: + return Status::InternalError<false>("unknown compress type: {}", type); + } + RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor)); + + return Status::OK(); +} + +Status Decompressor::create_decompressor(TFileFormatType::type type, + std::unique_ptr<Decompressor>* decompressor) { + CompressType compress_type; + switch (type) { + case TFileFormatType::FORMAT_PROTO: + [[fallthrough]]; + case TFileFormatType::FORMAT_CSV_PLAIN: + compress_type = CompressType::UNCOMPRESSED; + break; + case TFileFormatType::FORMAT_CSV_GZ: + compress_type = CompressType::GZIP; + break; + case TFileFormatType::FORMAT_CSV_BZ2: + compress_type = CompressType::BZIP2; + break; + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + compress_type = CompressType::LZ4FRAME; + break; + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: + compress_type = CompressType::LZ4BLOCK; + break; + case TFileFormatType::FORMAT_CSV_LZOP: + compress_type = CompressType::LZOP; + break; + case TFileFormatType::FORMAT_CSV_DEFLATE: + compress_type = CompressType::DEFLATE; + break; + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: + compress_type = CompressType::SNAPPYBLOCK; + break; + default: + return Status::InternalError<false>("unknown compress type: {}", type); + } + RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor)); + + return Status::OK(); +} + uint32_t Decompressor::_read_int32(uint8_t* buf) { return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; } diff --git a/be/src/exec/decompressor.h b/be/src/exec/decompressor.h index 45653fd8b04..4ca7fbdf6d7 100644 --- a/be/src/exec/decompressor.h +++ b/be/src/exec/decompressor.h @@ -26,9 +26,11 @@ #include <stdint.h> #include <zlib.h> +#include <memory> #include <string> #include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" namespace doris { @@ -57,7 +59,14 @@ public: size_t* more_output_bytes) = 0; public: - static Status create_decompressor(CompressType type, Decompressor** decompressor); + static Status create_decompressor(CompressType type, + std::unique_ptr<Decompressor>* decompressor); + + static Status create_decompressor(TFileCompressType::type type, + std::unique_ptr<Decompressor>* decompressor); + + static Status create_decompressor(TFileFormatType::type type, + std::unique_ptr<Decompressor>* decompressor); virtual std::string debug_info(); diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 4370d33e76f..7dcd907bee8 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -228,7 +228,7 @@ public: } private: - Decompressor* _decompressor; + std::unique_ptr<Decompressor> _decompressor; }; // Used for LZ4 frame format, decompress speed is two times faster than LZ4. class Lz4fBlockCompression : public BlockCompressionCodec { diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 86986f8eea5..c42a465fc63 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -571,72 +571,11 @@ Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names, } Status CsvReader::_create_decompressor() { - CompressType compress_type; if (_file_compress_type != TFileCompressType::UNKNOWN) { - switch (_file_compress_type) { - case TFileCompressType::PLAIN: - compress_type = CompressType::UNCOMPRESSED; - break; - case TFileCompressType::GZ: - compress_type = CompressType::GZIP; - break; - case TFileCompressType::LZO: - case TFileCompressType::LZOP: - compress_type = CompressType::LZOP; - break; - case TFileCompressType::BZ2: - compress_type = CompressType::BZIP2; - break; - case TFileCompressType::LZ4FRAME: - compress_type = CompressType::LZ4FRAME; - break; - case TFileCompressType::LZ4BLOCK: - compress_type = CompressType::LZ4BLOCK; - break; - case TFileCompressType::DEFLATE: - compress_type = CompressType::DEFLATE; - break; - case TFileCompressType::SNAPPYBLOCK: - compress_type = CompressType::SNAPPYBLOCK; - break; - default: - return Status::InternalError<false>("unknown compress type: {}", _file_compress_type); - } + RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); } else { - switch (_file_format_type) { - case TFileFormatType::FORMAT_PROTO: - [[fallthrough]]; - case TFileFormatType::FORMAT_CSV_PLAIN: - compress_type = CompressType::UNCOMPRESSED; - break; - case TFileFormatType::FORMAT_CSV_GZ: - compress_type = CompressType::GZIP; - break; - case TFileFormatType::FORMAT_CSV_BZ2: - compress_type = CompressType::BZIP2; - break; - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - compress_type = CompressType::LZ4FRAME; - break; - case TFileFormatType::FORMAT_CSV_LZ4BLOCK: - compress_type = CompressType::LZ4BLOCK; - break; - case TFileFormatType::FORMAT_CSV_LZOP: - compress_type = CompressType::LZOP; - break; - case TFileFormatType::FORMAT_CSV_DEFLATE: - compress_type = CompressType::DEFLATE; - break; - case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: - compress_type = CompressType::SNAPPYBLOCK; - break; - default: - return Status::InternalError<false>("unknown format type: {}", _file_format_type); - } + RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor)); } - Decompressor* decompressor; - RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, &decompressor)); - _decompressor.reset(decompressor); return Status::OK(); } diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 06f1cf85e8a..5747fb609ea 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -87,6 +87,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann _file_reader(nullptr), _line_reader(nullptr), _reader_eof(false), + _decompressor(nullptr), _skip_first_line(false), _next_row(0), _total_rows(0), @@ -99,6 +100,12 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "ReadTime"); _file_read_timer = ADD_TIMER(_profile, "FileReadTime"); + if (_range.__isset.compress_type) { + // for compatibility + _file_compress_type = _range.compress_type; + } else { + _file_compress_type = _params.compress_type; + } _init_system_properties(); _init_file_description(); } @@ -115,6 +122,7 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams _file_slot_descs(file_slot_descs), _line_reader(nullptr), _reader_eof(false), + _decompressor(nullptr), _skip_first_line(false), _next_row(0), _total_rows(0), @@ -122,6 +130,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), _io_ctx(io_ctx) { + if (_range.__isset.compress_type) { + // for compatibility + _file_compress_type = _range.compress_type; + } else { + _file_compress_type = _params.compress_type; + } _init_system_properties(); _init_file_description(); } @@ -155,6 +169,10 @@ Status NewJsonReader::init_reader( // generate _col_default_value_map RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, col_default_value_ctx)); + // create decompressor. + // _decompressor may be nullptr if this is not a compressed file + RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); + #ifdef __AVX2__ if (config::enable_simdjson_reader) { RETURN_IF_ERROR(_simdjson_init_reader()); @@ -402,7 +420,7 @@ Status NewJsonReader::_open_line_reader() { _skip_first_line = false; } _line_reader = NewPlainTextLineReader::create_unique( - _profile, _file_reader, nullptr, + _profile, _file_reader, _decompressor.get(), std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length), size, _current_offset); return Status::OK(); diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index dac33908e75..e84605ff025 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -33,6 +33,7 @@ #include <vector> #include "common/status.h" +#include "exec/decompressor.h" #include "exec/line_reader.h" #include "exprs/json_functions.h" #include "io/file_factory.h" @@ -212,6 +213,8 @@ private: io::FileReaderSPtr _file_reader; std::unique_ptr<LineReader> _line_reader; bool _reader_eof; + std::unique_ptr<Decompressor> _decompressor; + TFileCompressType::type _file_compress_type; // When we fetch range doesn't start from 0 will always skip the first line bool _skip_first_line; diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 6bb4eac3e23..63a58a99574 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -107,7 +107,7 @@ WITH BROKER broker_name Specifies the file type, CSV, PARQUET and ORC formats are supported. Default is CSV. - `COMPRESS_TYPE AS` - Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP + Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP. Only valid in CSV or JSON format. - `column list` diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 7f13fca55ab..f2211e4967b 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -107,7 +107,7 @@ WITH BROKER broker_name 指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。 - `COMPRESS_TYPE AS` - 指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。 + 指定文件压缩类型, 支持 GZ/BZ2/LZ4FRAME。仅在 CSV 或 JSON 格式下生效。 - `column list` diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index eb7f862d7eb..2a2f9f8e563 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -217,8 +217,8 @@ public class FileGroupInfo { if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) { // Now only support split plain text if (compressType == TFileCompressType.PLAIN - && (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) - || formatType == TFileFormatType.FORMAT_JSON) { + && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) + || formatType == TFileFormatType.FORMAT_JSON)) { long rangeBytes = bytesPerInstance - curInstanceBytes; TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes, columnsFromPath); diff --git a/regression-test/data/load_p0/broker_load/test_compress_type.out b/regression-test/data/load_p0/broker_load/test_compress_type.out index e66359296c0..b11397e97dd 100644 --- a/regression-test/data/load_p0/broker_load/test_compress_type.out +++ b/regression-test/data/load_p0/broker_load/test_compress_type.out @@ -1,4 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -240 +360 diff --git a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.bz2 b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.bz2 new file mode 100644 index 00000000000..6944b68a469 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.bz2 differ diff --git a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.gz b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.gz new file mode 100644 index 00000000000..d3c9ea71ff0 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.gz differ diff --git a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.lz4 b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.lz4 new file mode 100644 index 00000000000..736c858f803 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.lz4 differ diff --git a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy index 50d4e34cd99..a241ec4db2f 100644 --- a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy +++ b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy @@ -32,6 +32,12 @@ suite("test_compress_type", "load_p0") { "", "", "", + "COMPRESS_TYPE AS \"GZ\"", + "COMPRESS_TYPE AS \"BZ2\"", + "COMPRESS_TYPE AS \"LZ4FRAME\"", + "", + "", + "", ] def fileFormat = [ @@ -46,7 +52,13 @@ suite("test_compress_type", "load_p0") { "FORMAT AS \"CSV\"", "", "", - "" + "", + "FORMAT AS \"JSON\"", + "FORMAT AS \"JSON\"", + "FORMAT AS \"JSON\"", + "FORMAT AS \"JSON\"", + "FORMAT AS \"JSON\"", + "FORMAT AS \"JSON\"", ] def paths = [ @@ -61,7 +73,13 @@ suite("test_compress_type", "load_p0") { "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4", "s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz", "s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2", - "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4" + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4", + "s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.gz", + "s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.bz2", + "s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.lz4", + "s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.gz", + "s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.bz2", + "s3://doris-build-1308700295/regression/load/data/basic_data_by_line.json.lz4", ] def labels = [] @@ -126,4 +144,4 @@ suite("test_compress_type", "load_p0") { } qt_sql """ select count(*) from ${tableName} """ -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org