This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 589d11547c [feature](load) add segment bytes limit in segcompaction 
(#22526) (#22638)
589d11547c is described below

commit 589d11547c88d164956551ca5768929bc37f288c
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Aug 11 17:47:52 2023 +0800

    [feature](load) add segment bytes limit in segcompaction (#22526) (#22638)
---
 be/src/common/config.cpp                           |  21 +++--
 be/src/common/config.h                             |  21 +++--
 be/src/olap/olap_server.cpp                        |   4 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          | 104 ++++++++++-----------
 be/src/olap/rowset/beta_rowset_writer.h            |   5 +-
 be/test/olap/segcompaction_test.cpp                |  26 +++---
 docs/en/docs/admin-manual/config/be-config.md      |  32 ++++++-
 docs/en/docs/advanced/best-practice/compaction.md  |   2 +-
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  40 +++++++-
 .../docs/advanced/best-practice/compaction.md      |   2 +-
 10 files changed, 165 insertions(+), 92 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8e33a42e4e..6666ee9ef0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -911,14 +911,23 @@ DEFINE_Bool(hide_webserver_config_page, "false");
 
 DEFINE_Bool(enable_segcompaction, "true");
 
-// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
-DEFINE_Int32(segcompaction_threshold_segment_num, "10");
+// Max number of segments allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_batch_size, "10");
 
-// The segment whose row number above the threshold will be compacted during 
segcompaction
-DEFINE_Int32(segcompaction_small_threshold, "1048576");
+// Max row count allowed in a single source segment, bigger segments will be 
skipped.
+DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
 
-// This config can be set to limit thread number in  segcompaction thread pool.
-DEFINE_mInt32(segcompaction_max_threads, "10");
+// Max file size allowed in a single source segment, bigger segments will be 
skipped.
+DEFINE_Int64(segcompaction_candidate_max_bytes, "104857600");
+
+// Max total row count allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_task_max_rows, "1572864");
+
+// Max total file size allowed in a single segcompaction task.
+DEFINE_Int64(segcompaction_task_max_bytes, "157286400");
+
+// Global segcompaction thread pool size.
+DEFINE_mInt32(segcompaction_num_threads, "5");
 
 // enable java udf and jdbc scannode
 DEFINE_Bool(enable_java_support, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7316870cd6..4f4c703308 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -953,14 +953,23 @@ DECLARE_Bool(hide_webserver_config_page);
 
 DECLARE_Bool(enable_segcompaction);
 
-// Trigger segcompaction if the num of segments in a rowset exceeds this 
threshold.
-DECLARE_Int32(segcompaction_threshold_segment_num);
+// Max number of segments allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_batch_size);
 
-// The segment whose row number above the threshold will be compacted during 
segcompaction
-DECLARE_Int32(segcompaction_small_threshold);
+// Max row count allowed in a single source segment, bigger segments will be 
skipped.
+DECLARE_Int32(segcompaction_candidate_max_rows);
 
-// This config can be set to limit thread number in  segcompaction thread pool.
-DECLARE_mInt32(segcompaction_max_threads);
+// Max file size allowed in a single source segment, bigger segments will be 
skipped.
+DECLARE_Int64(segcompaction_candidate_max_bytes);
+
+// Max total row count allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_task_max_rows);
+
+// Max total file size allowed in a single segcompaction task.
+DECLARE_Int64(segcompaction_task_max_bytes);
+
+// Global segcompaction thread pool size.
+DECLARE_mInt32(segcompaction_num_threads);
 
 // enable java udf and jdbc scannode
 DECLARE_Bool(enable_java_support);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 166e86ce1b..65abb5f2ac 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -133,8 +133,8 @@ Status StorageEngine::start_bg_threads() {
 
     if (config::enable_segcompaction) {
         ThreadPoolBuilder("SegCompactionTaskThreadPool")
-                .set_min_threads(config::segcompaction_max_threads)
-                .set_max_threads(config::segcompaction_max_threads)
+                .set_min_threads(config::segcompaction_num_threads)
+                .set_max_threads(config::segcompaction_num_threads)
                 .build(&_seg_compaction_thread_pool);
     }
     ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 85791cd01f..5c66182381 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -34,6 +34,7 @@
 #include "common/logging.h"
 #include "gutil/integral_types.h"
 #include "gutil/strings/substitute.h"
+#include "io/fs/file_reader.h"
 #include "io/fs/file_reader_options.h"
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
@@ -142,27 +143,22 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
-Status BetaRowsetWriter::_load_noncompacted_segments(
-        std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) {
+Status 
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& 
segment,
+                                                    int32_t segment_id) {
     auto fs = _rowset_meta->fs();
     if (!fs) {
         return Status::Error<INIT_FAILED>(
-                "BetaRowsetWriter::_load_noncompacted_segments 
_rowset_meta->fs get failed");
-    }
-    for (int seg_id = _segcompacted_point; seg_id < num; ++seg_id) {
-        auto seg_path =
-                BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, seg_id);
-        std::shared_ptr<segment_v2::Segment> segment;
-        auto type = config::enable_file_cache ? config::file_cache_type : "";
-        io::FileReaderOptions reader_options(io::cache_type_from_string(type),
-                                             io::SegmentCachePathPolicy());
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
-                                           _context.tablet_schema, 
reader_options, &segment);
-        if (!s.ok()) {
-            LOG(WARNING) << "failed to open segment. " << seg_path << ":" << 
s.to_string();
-            return s;
-        }
-        segments->push_back(std::move(segment));
+                "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs 
get failed");
+    }
+    auto path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, segment_id);
+    auto type = config::enable_file_cache ? config::file_cache_type : "";
+    io::FileReaderOptions reader_options(io::cache_type_from_string(type),
+                                         io::SegmentCachePathPolicy());
+    auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(), 
_context.tablet_schema,
+                                       reader_options, &segment);
+    if (!s.ok()) {
+        LOG(WARNING) << "failed to open segment. " << path << ":" << s;
+        return s;
     }
     return Status::OK();
 }
@@ -172,43 +168,46 @@ Status BetaRowsetWriter::_load_noncompacted_segments(
  *  2. if the consecutive smalls end up with a big, compact the smalls, except
  *     single small
  *  3. if the consecutive smalls end up with small, compact the smalls if the
- *     length is beyond (config::segcompaction_threshold_segment_num / 2)
+ *     length is beyond (config::segcompaction_batch_size / 2)
  */
 Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
-        SegCompactionCandidatesSharedPtr segments) {
-    std::vector<segment_v2::SegmentSharedPtr> all_segments;
-    // subtract one to skip last (maybe active) segment
-    RETURN_IF_ERROR(_load_noncompacted_segments(&all_segments, _num_segment - 
1));
-
-    if (VLOG_DEBUG_IS_ON) {
-        vlog_buffer.clear();
-        for (auto& segment : all_segments) {
-            fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), 
segment->num_rows());
-        }
-        VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size()
-                   << " list of segments:" << fmt::to_string(vlog_buffer);
-    }
-
-    bool is_terminated_by_big = false;
-    bool let_big_terminate = false;
-    size_t small_threshold = config::segcompaction_small_threshold;
-    for (int64_t i = 0; i < all_segments.size(); ++i) {
-        segment_v2::SegmentSharedPtr seg = all_segments[i];
-        if (seg->num_rows() > small_threshold) {
-            if (let_big_terminate) {
-                is_terminated_by_big = true;
-                break;
-            } else {
+        SegCompactionCandidatesSharedPtr& segments) {
+    segments = std::make_shared<SegCompactionCandidates>();
+    // skip last (maybe active) segment
+    int32_t last_segment = _num_segment - 1;
+    size_t task_bytes = 0;
+    uint32_t task_rows = 0;
+    int32_t segid;
+    for (segid = _segcompacted_point;
+         segid < last_segment && segments->size() < 
config::segcompaction_batch_size; segid++) {
+        segment_v2::SegmentSharedPtr segment;
+        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
+        const auto segment_rows = segment->num_rows();
+        const auto segment_bytes = segment->file_reader()->size();
+        bool is_large_segment = segment_rows > 
config::segcompaction_candidate_max_rows ||
+                                segment_bytes > 
config::segcompaction_candidate_max_bytes;
+        if (is_large_segment) {
+            if (segid == _segcompacted_point) {
+                // skip large segments at the front
                 
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+                continue;
+            } else {
+                // stop because we need consecutive segments
+                break;
             }
-        } else {
-            let_big_terminate = true; // break if find a big after small
-            segments->push_back(seg);
         }
+        bool is_task_full = task_rows + segment_rows > 
config::segcompaction_task_max_rows ||
+                            task_bytes + segment_bytes > 
config::segcompaction_task_max_bytes;
+        if (is_task_full) {
+            break;
+        }
+        segments->push_back(segment);
+        task_rows += segment->num_rows();
+        task_bytes += segment->file_reader()->size();
     }
     size_t s = segments->size();
-    if (!is_terminated_by_big && s <= 
(config::segcompaction_threshold_segment_num / 2)) {
-        // start with big segments and end with small, better to do it in next
+    if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
+        // we didn't collect enough segments, better to do it in next
         // round to compact more at once
         segments->clear();
         return Status::OK();
@@ -343,9 +342,8 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
     if (_segcompaction_status.load() != OK) {
         status = Status::Error<SEGCOMPACTION_FAILED>(
                 "BetaRowsetWriter::_segcompaction_if_necessary meet invalid 
state");
-    } else if ((_num_segment - _segcompacted_point) >=
-               config::segcompaction_threshold_segment_num) {
-        SegCompactionCandidatesSharedPtr segments = 
std::make_shared<SegCompactionCandidates>();
+    } else if ((_num_segment - _segcompacted_point) >= 
config::segcompaction_batch_size) {
+        SegCompactionCandidatesSharedPtr segments;
         status = _find_longest_consecutive_small_segment(segments);
         if (LIKELY(status.ok()) && (segments->size() > 0)) {
             LOG(INFO) << "submit segcompaction task, tablet_id:" << 
_context.tablet_id
@@ -381,9 +379,7 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
     // currently we only rename remaining segments to reduce wait time
     // so that transaction can be committed ASAP
     VLOG_DEBUG << "segcompaction last few segments";
-    SegCompactionCandidates segments;
-    RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
-    for (int i = 0; i < segments.size(); ++i) {
+    for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
         
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
     }
     return Status::OK();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 29c67218c1..af4dc2ca50 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -144,9 +144,8 @@ private:
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
     Status _segcompaction_if_necessary();
     Status _segcompaction_rename_last_segments();
-    Status 
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
-                                       size_t num);
-    Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr 
segments);
+    Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
+    Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& 
segments);
     bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
 
     bool _check_and_set_is_doing_segcompaction();
diff --git a/be/test/olap/segcompaction_test.cpp 
b/be/test/olap/segcompaction_test.cpp
index 6a4476f423..2a894b9197 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -231,9 +231,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
     RowsetSharedPtr rowset;
     const int num_segments = 15;
     const uint32_t rows_per_segment = 4096;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
-    config::segcompaction_threshold_segment_num = 10;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 10;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -340,8 +340,8 @@ TEST_F(SegCompactionTest, 
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
     create_tablet_schema(tablet_schema, DUP_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -484,8 +484,8 @@ TEST_F(SegCompactionTest, 
SegCompactionInterleaveWithBig_OoOoO) {
     create_tablet_schema(tablet_schema, DUP_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-    config::segcompaction_threshold_segment_num = 5;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+    config::segcompaction_batch_size = 5;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -607,9 +607,9 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadUniqueTableSmall) {
     create_tablet_schema(tablet_schema, UNIQUE_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
-    config::segcompaction_threshold_segment_num = 3;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 3;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
@@ -841,9 +841,9 @@ TEST_F(SegCompactionTest, 
SegCompactionThenReadAggTableSmall) {
     create_tablet_schema(tablet_schema, AGG_KEYS);
 
     RowsetSharedPtr rowset;
-    config::segcompaction_small_threshold = 6000; // set threshold above
-                                                  // rows_per_segment
-    config::segcompaction_threshold_segment_num = 3;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 3;
     std::vector<uint32_t> segment_num_rows;
     { // write `num_segments * rows_per_segment` rows to rowset
         RowsetWriterContext writer_context;
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index 9399d35371..601577f252 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -622,18 +622,42 @@ BaseCompaction:546859:
 * Description: Enable to use segment compaction during loading to avoid -238 
error
 * Default value: true
 
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
 
 * Type: int32
-* Description: Trigger segcompaction if the num of segments in a rowset 
exceeds this threshold
+* Description: Max number of segments allowed in a single segcompaction task.
 * Default value: 10
 
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
 
 * Type: int32
-* Description: The segment whose row number above the threshold will be 
compacted during segcompaction
+* Description: Max row count allowed in a single source segment, bigger 
segments will be skipped.
 * Default value: 1048576
 
+#### `segcompaction_candidate_max_bytes`
+
+* Type: int64
+* Description: Max file size allowed in a single source segment, bigger 
segments will be skipped.
+* Default value: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* Type: int32
+* Description: Max total row count allowed in a single segcompaction task.
+* Default value: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* Type: int64
+* Description: Max total file size allowed in a single segcompaction task.
+* Default value: 157286400
+
+#### `segcompaction_num_threads`
+
+* Type: int32
+* Description: Global segcompaction thread pool size.
+* Default value: 5
+
 #### `disable_compaction_trace_log`
 
 * Type: bool
diff --git a/docs/en/docs/advanced/best-practice/compaction.md 
b/docs/en/docs/advanced/best-practice/compaction.md
index 5963a1bdfb..5168b38dab 100644
--- a/docs/en/docs/advanced/best-practice/compaction.md
+++ b/docs/en/docs/advanced/best-practice/compaction.md
@@ -59,7 +59,7 @@ The following features are provided by segment compaction:
 
 BE configuration:
 - `enable_segcompaction=true` turn it on.
-- `segcompaction_threshold_segment_num` is used to configure the interval for 
merging. The default value 10 means that every 10 segment files will trigger a 
segment compaction. It is recommended to set between 10 - 30. The larger value 
will increase the memory usage of segment compaction.
+- `segcompaction_batch_size` is used to configure the interval for merging. 
The default value 10 means that every 10 segment files will trigger a segment 
compaction. It is recommended to set between 10 - 30. The larger value will 
increase the memory usage of segment compaction.
 
 Situations where segment compaction is recommended:
 
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 8e41243e52..291c711470 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -636,18 +636,54 @@ BaseCompaction:546859:
 * 描述:在导入时进行 segment compaction 来减少 segment 数量, 以避免出现写入时的 -238 错误
 * 默认值:true
 
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
 
 * 类型:int32
 * 描述:当 segment 数量超过此阈值时触发 segment compaction
 * 默认值:10
 
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
 
 * 类型:int32
 * 描述:当 segment 的行数超过此大小时则会在 segment compaction 时被 compact,否则跳过
 * 默认值:1048576
 
+#### `segcompaction_batch_size`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中的最大原始 segment 数量。
+* 默认值: 10
+
+#### `segcompaction_candidate_max_rows`
+
+* 类型: int32
+* 描述: segment compaction 任务中允许的单个原始 segment 行数,过大的 segment 将被跳过。
+* 默认值: 1048576
+
+#### `segcompaction_candidate_max_bytes`
+
+* 类型: int64
+* 描述: segment compaction 任务中允许的单个原始 segment 大小(字节),过大的 segment 将被跳过。
+* 默认值: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总行数。
+* 默认值: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* 类型: int64
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总大小(字节)。
+* 默认值: 157286400
+
+#### `segcompaction_num_threads`
+
+* 类型: int32
+* 描述: segment compaction 线程池大小。
+* 默认值: 5
+
 #### `disable_compaction_trace_log`
 
 * 类型: bool
diff --git a/docs/zh-CN/docs/advanced/best-practice/compaction.md 
b/docs/zh-CN/docs/advanced/best-practice/compaction.md
index 342397c740..5b8562db50 100644
--- a/docs/zh-CN/docs/advanced/best-practice/compaction.md
+++ b/docs/zh-CN/docs/advanced/best-practice/compaction.md
@@ -57,7 +57,7 @@ Segment compaction 有以下特点:
 
 开启和配置方法(BE 配置):
 - `enable_segcompaction = true` 可以使能该功能
-- `segcompaction_threshold_segment_num` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 
文件将会进行一次 segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
+- `segcompaction_batch_size` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 文件将会进行一次 
segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
 
 如有以下场景或问题,建议开启此功能:
 - 导入大量数据触发 OLAP_ERR_TOO_MANY_SEGMENTS (errcode -238) 错误导致导入失败。此时建议打开 segment 
compaction 的功能,在导入过程中对 segment 进行合并控制最终的数量。


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to