This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 589d11547c [feature](load) add segment bytes limit in segcompaction (#22526) (#22638) 589d11547c is described below commit 589d11547c88d164956551ca5768929bc37f288c Author: Kaijie Chen <c...@apache.org> AuthorDate: Fri Aug 11 17:47:52 2023 +0800 [feature](load) add segment bytes limit in segcompaction (#22526) (#22638) --- be/src/common/config.cpp | 21 +++-- be/src/common/config.h | 21 +++-- be/src/olap/olap_server.cpp | 4 +- be/src/olap/rowset/beta_rowset_writer.cpp | 104 ++++++++++----------- be/src/olap/rowset/beta_rowset_writer.h | 5 +- be/test/olap/segcompaction_test.cpp | 26 +++--- docs/en/docs/admin-manual/config/be-config.md | 32 ++++++- docs/en/docs/advanced/best-practice/compaction.md | 2 +- docs/zh-CN/docs/admin-manual/config/be-config.md | 40 +++++++- .../docs/advanced/best-practice/compaction.md | 2 +- 10 files changed, 165 insertions(+), 92 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8e33a42e4e..6666ee9ef0 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -911,14 +911,23 @@ DEFINE_Bool(hide_webserver_config_page, "false"); DEFINE_Bool(enable_segcompaction, "true"); -// Trigger segcompaction if the num of segments in a rowset exceeds this threshold. -DEFINE_Int32(segcompaction_threshold_segment_num, "10"); +// Max number of segments allowed in a single segcompaction task. +DEFINE_Int32(segcompaction_batch_size, "10"); -// The segment whose row number above the threshold will be compacted during segcompaction -DEFINE_Int32(segcompaction_small_threshold, "1048576"); +// Max row count allowed in a single source segment, bigger segments will be skipped. +DEFINE_Int32(segcompaction_candidate_max_rows, "1048576"); -// This config can be set to limit thread number in segcompaction thread pool. -DEFINE_mInt32(segcompaction_max_threads, "10"); +// Max file size allowed in a single source segment, bigger segments will be skipped. +DEFINE_Int64(segcompaction_candidate_max_bytes, "104857600"); + +// Max total row count allowed in a single segcompaction task. +DEFINE_Int32(segcompaction_task_max_rows, "1572864"); + +// Max total file size allowed in a single segcompaction task. +DEFINE_Int64(segcompaction_task_max_bytes, "157286400"); + +// Global segcompaction thread pool size. +DEFINE_mInt32(segcompaction_num_threads, "5"); // enable java udf and jdbc scannode DEFINE_Bool(enable_java_support, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 7316870cd6..4f4c703308 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -953,14 +953,23 @@ DECLARE_Bool(hide_webserver_config_page); DECLARE_Bool(enable_segcompaction); -// Trigger segcompaction if the num of segments in a rowset exceeds this threshold. -DECLARE_Int32(segcompaction_threshold_segment_num); +// Max number of segments allowed in a single segcompaction task. +DECLARE_Int32(segcompaction_batch_size); -// The segment whose row number above the threshold will be compacted during segcompaction -DECLARE_Int32(segcompaction_small_threshold); +// Max row count allowed in a single source segment, bigger segments will be skipped. +DECLARE_Int32(segcompaction_candidate_max_rows); -// This config can be set to limit thread number in segcompaction thread pool. -DECLARE_mInt32(segcompaction_max_threads); +// Max file size allowed in a single source segment, bigger segments will be skipped. +DECLARE_Int64(segcompaction_candidate_max_bytes); + +// Max total row count allowed in a single segcompaction task. +DECLARE_Int32(segcompaction_task_max_rows); + +// Max total file size allowed in a single segcompaction task. +DECLARE_Int64(segcompaction_task_max_bytes); + +// Global segcompaction thread pool size. +DECLARE_mInt32(segcompaction_num_threads); // enable java udf and jdbc scannode DECLARE_Bool(enable_java_support); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 166e86ce1b..65abb5f2ac 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -133,8 +133,8 @@ Status StorageEngine::start_bg_threads() { if (config::enable_segcompaction) { ThreadPoolBuilder("SegCompactionTaskThreadPool") - .set_min_threads(config::segcompaction_max_threads) - .set_max_threads(config::segcompaction_max_threads) + .set_min_threads(config::segcompaction_num_threads) + .set_max_threads(config::segcompaction_num_threads) .build(&_seg_compaction_thread_pool); } ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 85791cd01f..5c66182381 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -34,6 +34,7 @@ #include "common/logging.h" #include "gutil/integral_types.h" #include "gutil/strings/substitute.h" +#include "io/fs/file_reader.h" #include "io/fs/file_reader_options.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" @@ -142,27 +143,22 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } -Status BetaRowsetWriter::_load_noncompacted_segments( - std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) { +Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, + int32_t segment_id) { auto fs = _rowset_meta->fs(); if (!fs) { return Status::Error<INIT_FAILED>( - "BetaRowsetWriter::_load_noncompacted_segments _rowset_meta->fs get failed"); - } - for (int seg_id = _segcompacted_point; seg_id < num; ++seg_id) { - auto seg_path = - BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, seg_id); - std::shared_ptr<segment_v2::Segment> segment; - auto type = config::enable_file_cache ? config::file_cache_type : ""; - io::FileReaderOptions reader_options(io::cache_type_from_string(type), - io::SegmentCachePathPolicy()); - auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), - _context.tablet_schema, reader_options, &segment); - if (!s.ok()) { - LOG(WARNING) << "failed to open segment. " << seg_path << ":" << s.to_string(); - return s; - } - segments->push_back(std::move(segment)); + "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); + } + auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); + auto type = config::enable_file_cache ? config::file_cache_type : ""; + io::FileReaderOptions reader_options(io::cache_type_from_string(type), + io::SegmentCachePathPolicy()); + auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(), _context.tablet_schema, + reader_options, &segment); + if (!s.ok()) { + LOG(WARNING) << "failed to open segment. " << path << ":" << s; + return s; } return Status::OK(); } @@ -172,43 +168,46 @@ Status BetaRowsetWriter::_load_noncompacted_segments( * 2. if the consecutive smalls end up with a big, compact the smalls, except * single small * 3. if the consecutive smalls end up with small, compact the smalls if the - * length is beyond (config::segcompaction_threshold_segment_num / 2) + * length is beyond (config::segcompaction_batch_size / 2) */ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( - SegCompactionCandidatesSharedPtr segments) { - std::vector<segment_v2::SegmentSharedPtr> all_segments; - // subtract one to skip last (maybe active) segment - RETURN_IF_ERROR(_load_noncompacted_segments(&all_segments, _num_segment - 1)); - - if (VLOG_DEBUG_IS_ON) { - vlog_buffer.clear(); - for (auto& segment : all_segments) { - fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); - } - VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size() - << " list of segments:" << fmt::to_string(vlog_buffer); - } - - bool is_terminated_by_big = false; - bool let_big_terminate = false; - size_t small_threshold = config::segcompaction_small_threshold; - for (int64_t i = 0; i < all_segments.size(); ++i) { - segment_v2::SegmentSharedPtr seg = all_segments[i]; - if (seg->num_rows() > small_threshold) { - if (let_big_terminate) { - is_terminated_by_big = true; - break; - } else { + SegCompactionCandidatesSharedPtr& segments) { + segments = std::make_shared<SegCompactionCandidates>(); + // skip last (maybe active) segment + int32_t last_segment = _num_segment - 1; + size_t task_bytes = 0; + uint32_t task_rows = 0; + int32_t segid; + for (segid = _segcompacted_point; + segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { + segment_v2::SegmentSharedPtr segment; + RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); + const auto segment_rows = segment->num_rows(); + const auto segment_bytes = segment->file_reader()->size(); + bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || + segment_bytes > config::segcompaction_candidate_max_bytes; + if (is_large_segment) { + if (segid == _segcompacted_point) { + // skip large segments at the front RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + continue; + } else { + // stop because we need consecutive segments + break; } - } else { - let_big_terminate = true; // break if find a big after small - segments->push_back(seg); } + bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || + task_bytes + segment_bytes > config::segcompaction_task_max_bytes; + if (is_task_full) { + break; + } + segments->push_back(segment); + task_rows += segment->num_rows(); + task_bytes += segment->file_reader()->size(); } size_t s = segments->size(); - if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { - // start with big segments and end with small, better to do it in next + if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { + // we didn't collect enough segments, better to do it in next // round to compact more at once segments->clear(); return Status::OK(); @@ -343,9 +342,8 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { if (_segcompaction_status.load() != OK) { status = Status::Error<SEGCOMPACTION_FAILED>( "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state"); - } else if ((_num_segment - _segcompacted_point) >= - config::segcompaction_threshold_segment_num) { - SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); + } else if ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { + SegCompactionCandidatesSharedPtr segments; status = _find_longest_consecutive_small_segment(segments); if (LIKELY(status.ok()) && (segments->size() > 0)) { LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id @@ -381,9 +379,7 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { // currently we only rename remaining segments to reduce wait time // so that transaction can be committed ASAP VLOG_DEBUG << "segcompaction last few segments"; - SegCompactionCandidates segments; - RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment)); - for (int i = 0; i < segments.size(); ++i) { + for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); } return Status::OK(); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 29c67218c1..af4dc2ca50 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -144,9 +144,8 @@ private: void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta); Status _segcompaction_if_necessary(); Status _segcompaction_rename_last_segments(); - Status _load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments, - size_t num); - Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr segments); + Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); + Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } bool _check_and_set_is_doing_segcompaction(); diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index 6a4476f423..2a894b9197 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -231,9 +231,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { RowsetSharedPtr rowset; const int num_segments = 15; const uint32_t rows_per_segment = 4096; - config::segcompaction_small_threshold = 6000; // set threshold above - // rows_per_segment - config::segcompaction_threshold_segment_num = 10; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; std::vector<uint32_t> segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; @@ -340,8 +340,8 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { create_tablet_schema(tablet_schema, DUP_KEYS); RowsetSharedPtr rowset; - config::segcompaction_small_threshold = 6000; // set threshold above - // rows_per_segment + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment std::vector<uint32_t> segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; @@ -484,8 +484,8 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) { create_tablet_schema(tablet_schema, DUP_KEYS); RowsetSharedPtr rowset; - config::segcompaction_small_threshold = 6000; // set threshold above - config::segcompaction_threshold_segment_num = 5; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + config::segcompaction_batch_size = 5; std::vector<uint32_t> segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; @@ -607,9 +607,9 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { create_tablet_schema(tablet_schema, UNIQUE_KEYS); RowsetSharedPtr rowset; - config::segcompaction_small_threshold = 6000; // set threshold above - // rows_per_segment - config::segcompaction_threshold_segment_num = 3; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 3; std::vector<uint32_t> segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; @@ -841,9 +841,9 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { create_tablet_schema(tablet_schema, AGG_KEYS); RowsetSharedPtr rowset; - config::segcompaction_small_threshold = 6000; // set threshold above - // rows_per_segment - config::segcompaction_threshold_segment_num = 3; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 3; std::vector<uint32_t> segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 9399d35371..601577f252 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -622,18 +622,42 @@ BaseCompaction:546859: * Description: Enable to use segment compaction during loading to avoid -238 error * Default value: true -#### `segcompaction_threshold_segment_num` +#### `segcompaction_batch_size` * Type: int32 -* Description: Trigger segcompaction if the num of segments in a rowset exceeds this threshold +* Description: Max number of segments allowed in a single segcompaction task. * Default value: 10 -#### `segcompaction_small_threshold` +#### `segcompaction_candidate_max_rows` * Type: int32 -* Description: The segment whose row number above the threshold will be compacted during segcompaction +* Description: Max row count allowed in a single source segment, bigger segments will be skipped. * Default value: 1048576 +#### `segcompaction_candidate_max_bytes` + +* Type: int64 +* Description: Max file size allowed in a single source segment, bigger segments will be skipped. +* Default value: 104857600 + +#### `segcompaction_task_max_rows` + +* Type: int32 +* Description: Max total row count allowed in a single segcompaction task. +* Default value: 1572864 + +#### `segcompaction_task_max_bytes` + +* Type: int64 +* Description: Max total file size allowed in a single segcompaction task. +* Default value: 157286400 + +#### `segcompaction_num_threads` + +* Type: int32 +* Description: Global segcompaction thread pool size. +* Default value: 5 + #### `disable_compaction_trace_log` * Type: bool diff --git a/docs/en/docs/advanced/best-practice/compaction.md b/docs/en/docs/advanced/best-practice/compaction.md index 5963a1bdfb..5168b38dab 100644 --- a/docs/en/docs/advanced/best-practice/compaction.md +++ b/docs/en/docs/advanced/best-practice/compaction.md @@ -59,7 +59,7 @@ The following features are provided by segment compaction: BE configuration: - `enable_segcompaction=true` turn it on. -- `segcompaction_threshold_segment_num` is used to configure the interval for merging. The default value 10 means that every 10 segment files will trigger a segment compaction. It is recommended to set between 10 - 30. The larger value will increase the memory usage of segment compaction. +- `segcompaction_batch_size` is used to configure the interval for merging. The default value 10 means that every 10 segment files will trigger a segment compaction. It is recommended to set between 10 - 30. The larger value will increase the memory usage of segment compaction. Situations where segment compaction is recommended: diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 8e41243e52..291c711470 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -636,18 +636,54 @@ BaseCompaction:546859: * 描述:在导入时进行 segment compaction 来减少 segment 数量, 以避免出现写入时的 -238 错误 * 默认值:true -#### `segcompaction_threshold_segment_num` +#### `segcompaction_batch_size` * 类型:int32 * 描述:当 segment 数量超过此阈值时触发 segment compaction * 默认值:10 -#### `segcompaction_small_threshold` +#### `segcompaction_candidate_max_rows` * 类型:int32 * 描述:当 segment 的行数超过此大小时则会在 segment compaction 时被 compact,否则跳过 * 默认值:1048576 +#### `segcompaction_batch_size` + +* 类型: int32 +* 描述: 单个 segment compaction 任务中的最大原始 segment 数量。 +* 默认值: 10 + +#### `segcompaction_candidate_max_rows` + +* 类型: int32 +* 描述: segment compaction 任务中允许的单个原始 segment 行数,过大的 segment 将被跳过。 +* 默认值: 1048576 + +#### `segcompaction_candidate_max_bytes` + +* 类型: int64 +* 描述: segment compaction 任务中允许的单个原始 segment 大小(字节),过大的 segment 将被跳过。 +* 默认值: 104857600 + +#### `segcompaction_task_max_rows` + +* 类型: int32 +* 描述: 单个 segment compaction 任务中允许的原始 segment 总行数。 +* 默认值: 1572864 + +#### `segcompaction_task_max_bytes` + +* 类型: int64 +* 描述: 单个 segment compaction 任务中允许的原始 segment 总大小(字节)。 +* 默认值: 157286400 + +#### `segcompaction_num_threads` + +* 类型: int32 +* 描述: segment compaction 线程池大小。 +* 默认值: 5 + #### `disable_compaction_trace_log` * 类型: bool diff --git a/docs/zh-CN/docs/advanced/best-practice/compaction.md b/docs/zh-CN/docs/advanced/best-practice/compaction.md index 342397c740..5b8562db50 100644 --- a/docs/zh-CN/docs/advanced/best-practice/compaction.md +++ b/docs/zh-CN/docs/advanced/best-practice/compaction.md @@ -57,7 +57,7 @@ Segment compaction 有以下特点: 开启和配置方法(BE 配置): - `enable_segcompaction = true` 可以使能该功能 -- `segcompaction_threshold_segment_num` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 文件将会进行一次 segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。 +- `segcompaction_batch_size` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 文件将会进行一次 segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。 如有以下场景或问题,建议开启此功能: - 导入大量数据触发 OLAP_ERR_TOO_MANY_SEGMENTS (errcode -238) 错误导致导入失败。此时建议打开 segment compaction 的功能,在导入过程中对 segment 进行合并控制最终的数量。 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org