This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 692176ec07 [feature-wip](parquet-reader) pre read page data in advance to avoid frequent seek (#12898) 692176ec07 is described below commit 692176ec07b386bac3ef1ca5626a0b11dfd53d15 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Sun Sep 25 21:21:06 2022 +0800 [feature-wip](parquet-reader) pre read page data in advance to avoid frequent seek (#12898) 1. Fix the bug of file position in `HdfsFileReader` 2. Reserve enough buffer for `ColumnColumnReader` to read large continuous memory --- be/src/common/config.h | 7 ++- be/src/io/buffered_reader.cpp | 51 ++++++++++------------ be/src/io/buffered_reader.h | 11 +++-- be/src/io/hdfs_file_reader.cpp | 8 +--- .../parquet/vparquet_column_chunk_reader.cpp | 5 ++- .../exec/format/parquet/vparquet_column_reader.cpp | 17 +++++--- .../exec/format/parquet/vparquet_column_reader.h | 7 +-- .../exec/format/parquet/vparquet_group_reader.cpp | 5 ++- .../exec/format/parquet/vparquet_page_reader.cpp | 11 +++-- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 38 +++++++++++----- be/src/vec/exec/format/parquet/vparquet_reader.h | 1 + be/test/vec/exec/parquet/parquet_thrift_test.cpp | 2 +- 12 files changed, 93 insertions(+), 70 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index ad02df2efb..5b5eccdf0d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -823,7 +823,12 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); CONF_Bool(parquet_predicate_push_down, "true"); -CONF_Int32(parquet_header_max_size, "8388608"); +// Max size of parquet page header in bytes +CONF_mInt32(parquet_header_max_size_mb, "1"); +// Max buffer size for parquet row group +CONF_mInt32(parquet_rowgroup_max_buffer_mb, "128"); +// Max buffer size for parquet chunk column +CONF_mInt32(parquet_column_max_buffer_mb, "8"); CONF_Bool(parquet_reader_using_internal, "false"); // When the rows number reached this limit, will check the filter rate the of bloomfilter diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp index ca40979321..c98b365235 100644 --- a/be/src/io/buffered_reader.cpp +++ b/be/src/io/buffered_reader.cpp @@ -186,40 +186,31 @@ bool BufferedReader::closed() { } BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t offset, - uint64_t length) - : _file(file), _file_start_offset(offset), _file_end_offset(offset + length) {} - -Status BufferedFileStreamReader::seek(uint64_t position) { - if (_file_position != position) { - RETURN_IF_ERROR(_file->seek(position)); - _file_position = position; - } - return Status::OK(); -} + uint64_t length, size_t max_buf_size) + : _file(file), + _file_start_offset(offset), + _file_end_offset(offset + length), + _max_buf_size(max_buf_size) {} Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset, - size_t* bytes_to_read) { - if (offset < _file_start_offset) { + const size_t bytes_to_read) { + if (offset < _file_start_offset || offset >= _file_end_offset) { return Status::IOError("Out-of-bounds Access"); } - if (offset >= _file_end_offset) { - *bytes_to_read = 0; - return Status::OK(); - } - int64_t end_offset = offset + *bytes_to_read; + int64_t end_offset = offset + bytes_to_read; if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) { *buf = _buf.get() + offset - _buf_start_offset; return Status::OK(); } - if (_buf_size < *bytes_to_read) { - size_t new_size = BitUtil::next_power_of_two(*bytes_to_read); - std::unique_ptr<uint8_t[]> new_buf(new uint8_t[new_size]); + size_t buf_size = std::max(_max_buf_size, bytes_to_read); + if (_buf_size < buf_size) { + std::unique_ptr<uint8_t[]> new_buf(new uint8_t[buf_size]); if (offset >= _buf_start_offset && offset < _buf_end_offset) { memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); } _buf = std::move(new_buf); - _buf_size = new_size; + _buf_size = buf_size; } else if (offset > _buf_start_offset && offset < _buf_end_offset) { memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); } @@ -227,19 +218,25 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset _buf_end_offset = offset; } _buf_start_offset = offset; - int64_t to_read = end_offset - _buf_end_offset; - RETURN_IF_ERROR(seek(_buf_end_offset)); - bool eof = false; int64_t buf_remaining = _buf_end_offset - _buf_start_offset; - RETURN_IF_ERROR(_file->read(_buf.get() + buf_remaining, to_read, &to_read, &eof)); - *bytes_to_read = buf_remaining + to_read; + int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset); + int64_t has_read = 0; + while (has_read < to_read) { + int64_t loop_read = 0; + RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - has_read, &loop_read, + _buf.get() + buf_remaining + has_read)); + has_read += loop_read; + } + if (has_read != to_read) { + return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read); + } _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); } Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) { - return read_bytes((const uint8_t**)&slice.data, offset, &slice.size); + return read_bytes((const uint8_t**)&slice.data, offset, slice.size); } } // namespace doris diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h index 2cfcaaa413..97ec01cc7d 100644 --- a/be/src/io/buffered_reader.h +++ b/be/src/io/buffered_reader.h @@ -93,7 +93,7 @@ public: * @param offset start offset ot read in stream * @param bytes_to_read bytes to read */ - virtual Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* bytes_to_read) = 0; + virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) = 0; /** * Save the data address to slice.data, and the slice.size is the bytes to read. */ @@ -103,10 +103,11 @@ public: class BufferedFileStreamReader : public BufferedStreamReader { public: - BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t length); + BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t length, + size_t max_buf_size); ~BufferedFileStreamReader() override = default; - Status read_bytes(const uint8_t** buf, uint64_t offset, size_t* bytes_to_read) override; + Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) override; Status read_bytes(Slice& slice, uint64_t offset) override; private: @@ -115,12 +116,10 @@ private: uint64_t _file_start_offset; uint64_t _file_end_offset; - int64_t _file_position = -1; uint64_t _buf_start_offset = 0; uint64_t _buf_end_offset = 0; size_t _buf_size = 0; - - Status seek(uint64_t position); + size_t _max_buf_size; }; } // namespace doris diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp index 0112e880aa..37b2d73bba 100644 --- a/be/src/io/hdfs_file_reader.cpp +++ b/be/src/io/hdfs_file_reader.cpp @@ -141,12 +141,7 @@ Status HdfsFileReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { if (position != _current_offset) { - int ret = hdfsSeek(_hdfs_fs, _hdfs_file, position); - if (ret != 0) { // check fseek return value - return Status::InternalError("hdfsSeek failed.(BE: {}) namenode:{}, path:{}, err: {}", - BackendOptions::get_localhost(), _namenode, _path, - hdfsGetLastError()); - } + seek(position); } *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes); @@ -191,6 +186,7 @@ Status HdfsFileReader::seek(int64_t position) { return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}", BackendOptions::get_localhost(), position, hdfsGetLastError()); } + _current_offset = position; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 0e7d1cd4ec..193200f28d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -40,8 +40,9 @@ Status ColumnChunkReader::init() { if (_metadata.__isset.dictionary_page_offset) { // seek to the directory page _page_reader->seek_to_page(_metadata.dictionary_page_offset); - RETURN_IF_ERROR(_page_reader->next_page_header()); - RETURN_IF_ERROR(_decode_dict_page()); + // Parse dictionary data when reading + // RETURN_IF_ERROR(_page_reader->next_page_header()); + // RETURN_IF_ERROR(_decode_dict_page()); } else { // seek to the first data page _page_reader->seek_to_page(_metadata.data_page_offset); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 4c54d45aaa..3074705ffa 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -31,7 +31,8 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, const tparquet::RowGroup& row_group, std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, - std::unique_ptr<ParquetColumnReader>& reader) { + std::unique_ptr<ParquetColumnReader>& reader, + size_t max_buf_size) { if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { return Status::Corruption("not supported type"); } @@ -39,13 +40,13 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, tparquet::ColumnChunk chunk = row_group.columns[field->children[0].physical_column_index]; ArrayColumnReader* array_reader = new ArrayColumnReader(ctz); array_reader->init_column_metadata(chunk); - RETURN_IF_ERROR(array_reader->init(file, field, &chunk, row_ranges)); + RETURN_IF_ERROR(array_reader->init(file, field, &chunk, row_ranges, max_buf_size)); reader.reset(array_reader); } else { tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; ScalarColumnReader* scalar_reader = new ScalarColumnReader(ctz); scalar_reader->init_column_metadata(chunk); - RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges)); + RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges, max_buf_size)); reader.reset(scalar_reader); } return Status::OK(); @@ -84,9 +85,10 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end } Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, - std::vector<RowRange>& row_ranges) { + std::vector<RowRange>& row_ranges, size_t max_buf_size) { _stream_reader = - new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size()); + new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size(), + std::min((size_t)_metadata->size(), max_buf_size)); _row_ranges = row_ranges; _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field, _ctz)); RETURN_IF_ERROR(_chunk_reader->init()); @@ -210,9 +212,10 @@ void ArrayColumnReader::_reserve_def_levels_buf(size_t size) { } Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, - std::vector<RowRange>& row_ranges) { + std::vector<RowRange>& row_ranges, size_t max_buf_size) { _stream_reader = - new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size()); + new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size(), + std::min((size_t)_metadata->size(), max_buf_size)); _row_ranges = row_ranges; _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, &field->children[0], _ctz)); RETURN_IF_ERROR(_chunk_reader->init()); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 644da79abb..b245ff6aa4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -60,7 +60,8 @@ public: size_t* read_rows, bool* eof) = 0; static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, const tparquet::RowGroup& row_group, std::vector<RowRange>& row_ranges, - cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader); + cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader, + size_t max_buf_size); void init_column_metadata(const tparquet::ColumnChunk& chunk); void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; } virtual void close() = 0; @@ -84,7 +85,7 @@ public: ScalarColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {}; ~ScalarColumnReader() override { close(); }; Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, - std::vector<RowRange>& row_ranges); + std::vector<RowRange>& row_ranges, size_t max_buf_size); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) override; Status _skip_values(size_t num_values); @@ -97,7 +98,7 @@ public: ArrayColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {}; ~ArrayColumnReader() override { close(); }; Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, - std::vector<RowRange>& row_ranges); + std::vector<RowRange>& row_ranges, size_t max_buf_size); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) override; void close() override; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index ae4505bdd7..0168a97a43 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -46,11 +46,14 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> Status RowGroupReader::_init_column_readers( const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { + const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20; + const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20; + size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size()); for (auto& read_col : _read_columns) { auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name)); std::unique_ptr<ParquetColumnReader> reader; RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, - row_ranges, _ctz, reader)); + row_ranges, _ctz, reader, max_buf_size)); auto col_iter = col_offsets.find(read_col._parquet_col_id); if (col_iter != col_offsets.end()) { tparquet::OffsetIndex oi = col_iter->second; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 80c6ff228b..baa88036c2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -23,7 +23,7 @@ namespace doris::vectorized { -static constexpr size_t initPageHeaderSize = 128; +static constexpr size_t INIT_PAGE_HEADER_SIZE = 128; PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length) : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} @@ -41,19 +41,19 @@ Status PageReader::next_page_header() { const uint8_t* page_header_buf = nullptr; size_t max_size = _end_offset - _offset; - size_t header_size = std::min(initPageHeaderSize, max_size); + size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); + const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; uint32_t real_header_size = 0; while (true) { header_size = std::min(header_size, max_size); - RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, &header_size)); + RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size)); real_header_size = header_size; auto st = deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); if (st.ok()) { break; } - if (_offset + header_size >= _end_offset || - real_header_size > config::parquet_header_max_size) { + if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { return Status::IOError("Failed to deserialize parquet page header"); } header_size <<= 2; @@ -80,7 +80,6 @@ Status PageReader::get_page_data(Slice& slice) { } slice.size = _cur_page_header.compressed_page_size; RETURN_IF_ERROR(_reader->read_bytes(slice, _offset)); - DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size); _offset += slice.size; _state = INITIALIZED; return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index b7372a7495..6d5718d181 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -17,6 +17,8 @@ #include "vparquet_reader.h" +#include <algorithm> + #include "io/file_factory.h" #include "parquet_thrift_util.h" @@ -45,6 +47,10 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams ParquetReader::~ParquetReader() { close(); + if (_group_file_reader != _file_reader.get()) { + delete _group_file_reader; + _group_file_reader = nullptr; + } } void ParquetReader::close() { @@ -63,9 +69,19 @@ void ParquetReader::close() { Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) { if (_file_reader == nullptr) { - RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _scan_params, _scan_range, _file_reader, - config::remote_storage_read_buffer_mb * 1024 * 1024)); + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range, + _file_reader, 2048)); + // RowGroupReader has its own underlying buffer, so we should return file reader directly + // If RowGroupReaders use the same file reader with ParquetReader, the file position will change + // when ParquetReader try to read ColumnIndex meta, which causes performance cost + std::unique_ptr<FileReader> group_file_reader; + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range, + group_file_reader, 0)); + _group_file_reader = group_file_reader.release(); + RETURN_IF_ERROR(_group_file_reader->open()); + } else { + // test only + _group_file_reader = _file_reader.get(); } RETURN_IF_ERROR(_file_reader->open()); if (_file_reader->size() == 0) { @@ -90,16 +106,18 @@ Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) { Status ParquetReader::_init_read_columns() { _include_column_ids.clear(); for (auto& file_col_name : _column_names) { - // Get the Column Reader for the boolean column auto iter = _map_column.find(file_col_name); - auto parquet_col_id = iter->second; if (iter != _map_column.end()) { - _include_column_ids.emplace_back(parquet_col_id); - _read_columns.emplace_back(parquet_col_id, file_col_name); - } else { - continue; + _include_column_ids.emplace_back(iter->second); } } + // The same order as physical columns + std::sort(_include_column_ids.begin(), _include_column_ids.end()); + _read_columns.clear(); + for (int& parquet_col_id : _include_column_ids) { + _read_columns.emplace_back(parquet_col_id, + _file_metadata->schema().get_column(parquet_col_id)->name); + } return Status::OK(); } @@ -148,7 +166,7 @@ Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& c for (auto row_group_id : _read_row_groups) { auto& row_group = _t_metadata->row_groups[row_group_id]; std::shared_ptr<RowGroupReader> row_group_reader; - row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_id, + row_group_reader.reset(new RowGroupReader(_group_file_reader, _read_columns, row_group_id, row_group, _ctz)); std::vector<RowRange> candidate_row_ranges; RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges)); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 098aa154e8..c91bc08059 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -123,6 +123,7 @@ private: const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; std::unique_ptr<FileReader> _file_reader = nullptr; + FileReader* _group_file_reader = nullptr; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 8785c297b1..4272954214 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -153,7 +153,7 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* ? chunk_meta.dictionary_page_offset : chunk_meta.data_page_offset; size_t chunk_size = chunk_meta.total_compressed_size; - BufferedFileStreamReader stream_reader(file_reader, start_offset, chunk_size); + BufferedFileStreamReader stream_reader(file_reader, start_offset, chunk_size, 1024); cctz::time_zone ctz; TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org