This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 99901814d8b [enhancement](compaction) optimizing memory usage for 
compaction (#36492)
99901814d8b is described below

commit 99901814d8b90887f54b1768b98b4f0b78fab376
Author: Luwei <814383...@qq.com>
AuthorDate: Thu Jun 27 21:41:20 2024 +0800

    [enhancement](compaction) optimizing memory usage for compaction (#36492)
    
    1 Limit the number of rowsets participating in base compaction.
    2 Estimate the appropriate batch size by sampling and analyzing the
    memory consumption of blocks.
---
 be/src/cloud/cloud_base_compaction.cpp             |  10 ++
 be/src/cloud/cloud_cumulative_compaction.cpp       |  13 +-
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   6 +
 be/src/olap/base_compaction.cpp                    |  10 ++
 be/src/olap/base_tablet.h                          |   5 +
 be/src/olap/compaction.cpp                         |  15 ++-
 be/src/olap/compaction.h                           |   2 +
 be/src/olap/cumulative_compaction.cpp              |  15 ++-
 be/src/olap/iterators.h                            |  15 ++-
 be/src/olap/merger.cpp                             |  67 +++++++++-
 be/src/olap/merger.h                               |   6 +-
 be/src/olap/rowset/rowset_meta.h                   |  15 +++
 be/src/olap/rowset/segcompaction.cpp               |   2 +-
 be/src/olap/tablet_reader.h                        |   2 +
 be/src/vec/olap/vertical_block_reader.cpp          |  18 ++-
 be/src/vec/olap/vertical_block_reader.h            |   3 +-
 be/src/vec/olap/vertical_merge_iterator.cpp        |  29 +++--
 be/src/vec/olap/vertical_merge_iterator.h          |  25 +++-
 be/test/olap/base_compaction_test.cpp              |  84 +++++++++++++
 be/test/olap/rowid_conversion_test.cpp             |   6 +-
 be/test/vec/olap/vertical_compaction_test.cpp      |  14 ++-
 .../compaction_width_array_column.groovy           | 137 +++++++++++++++++++++
 23 files changed, 463 insertions(+), 42 deletions(-)

diff --git a/be/src/cloud/cloud_base_compaction.cpp 
b/be/src/cloud/cloud_base_compaction.cpp
index d4a86743a48..4ceab8eb6e3 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -163,6 +163,16 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
         return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for 
compaction");
     }
 
+    int score = 0;
+    int rowset_cnt = 0;
+    while (rowset_cnt < _input_rowsets.size()) {
+        score += 
_input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
+        if (score > config::base_compaction_max_compaction_score) {
+            break;
+        }
+    }
+    _input_rowsets.resize(rowset_cnt);
+
     // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
     if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
         VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index de318f979a5..2a26b1b294b 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -354,11 +354,20 @@ Status 
CloudCumulativeCompaction::pick_rowsets_to_compact() {
         return st;
     }
 
+    int64_t max_score = config::cumulative_compaction_max_deltas;
+    auto process_memory_usage = 
doris::GlobalMemoryArbitrator::process_memory_usage();
+    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() 
* 0.8;
+    if 
(cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
+        memory_usage_high) {
+        max_score = std::max(config::cumulative_compaction_max_deltas /
+                                     
config::cumulative_compaction_max_deltas_factor,
+                             config::cumulative_compaction_min_deltas + 1);
+    }
+
     size_t compaction_score = 0;
     auto compaction_policy = 
cloud_tablet()->tablet_meta()->compaction_policy();
     _engine.cumu_compaction_policy(compaction_policy)
-            ->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
-                                 config::cumulative_compaction_max_deltas,
+            ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score,
                                  config::cumulative_compaction_min_deltas, 
&_input_rowsets,
                                  &_last_delete_version, &compaction_score);
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a08bb43db56..cd354ab28ed 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -385,6 +385,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");
 
 DEFINE_Bool(enable_base_compaction_idle_sched, "true");
 DEFINE_mInt64(base_compaction_min_rowset_num, "5");
+DEFINE_mInt64(base_compaction_max_compaction_score, "20");
 DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
 DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
 
@@ -415,6 +416,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
 // cumulative compaction policy: min and max delta file's number
 DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
 DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
+DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");
 
 // This config can be set to limit thread number in  multiget thread pool.
 DEFINE_mInt32(multi_get_max_threads, "10");
@@ -1313,6 +1315,10 @@ DEFINE_Bool(enable_file_logger, "true");
 // The minimum row group size when exporting Parquet files. default 128MB
 DEFINE_Int64(min_row_group_size, "134217728");
 
+DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");
+
+DEFINE_mInt64(compaction_batch_size, "-1");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6942e316dcd..f137820ca44 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -438,6 +438,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);
 
 DECLARE_Bool(enable_base_compaction_idle_sched);
 DECLARE_mInt64(base_compaction_min_rowset_num);
+DECLARE_mInt64(base_compaction_max_compaction_score);
 DECLARE_mDouble(base_compaction_min_data_ratio);
 DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);
 
@@ -468,6 +469,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
 // cumulative compaction policy: min and max delta file's number
 DECLARE_mInt64(cumulative_compaction_min_deltas);
 DECLARE_mInt64(cumulative_compaction_max_deltas);
+DECLARE_mInt32(cumulative_compaction_max_deltas_factor);
 
 // This config can be set to limit thread number in  multiget thread pool.
 DECLARE_mInt32(multi_get_max_threads);
@@ -1399,6 +1401,10 @@ DECLARE_Bool(enable_file_logger);
 // The minimum row group size when exporting Parquet files.
 DECLARE_Int64(min_row_group_size);
 
+DECLARE_mInt64(compaction_memory_bytes_limit);
+
+DECLARE_mInt64(compaction_batch_size);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 436180c78ca..8be29383c1e 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -151,6 +151,16 @@ Status BaseCompaction::pick_rowsets_to_compact() {
                 "situation, no need to do base compaction.");
     }
 
+    int score = 0;
+    int rowset_cnt = 0;
+    while (rowset_cnt < _input_rowsets.size()) {
+        score += 
_input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
+        if (score > config::base_compaction_max_compaction_score) {
+            break;
+        }
+    }
+    _input_rowsets.resize(rowset_cnt);
+
     // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas 
threshold
     if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
         VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index dc5f488e044..4852a6cba9b 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "olap/iterators.h"
 #include "olap/partial_update_info.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet_fwd.h"
@@ -299,6 +300,10 @@ public:
     std::atomic<int64_t> read_block_count = 0;
     std::atomic<int64_t> write_count = 0;
     std::atomic<int64_t> compaction_count = 0;
+
+    std::mutex sample_info_lock;
+    std::vector<CompactionSampleInfo> sample_infos;
+    Status last_compaction_status = Status::OK();
 };
 
 } /* namespace doris */
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 7b00ec1c09d..e53c93404e7 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -149,6 +149,15 @@ void Compaction::init_profile(const std::string& label) {
     _merge_rowsets_latency_timer = ADD_TIMER(_profile, 
"merge_rowsets_latency");
 }
 
+int64_t Compaction::merge_way_num() {
+    int64_t way_num = 0;
+    for (auto&& rowset : _input_rowsets) {
+        way_num += rowset->rowset_meta()->get_merge_way_num();
+    }
+
+    return way_num;
+}
+
 Status Compaction::merge_input_rowsets() {
     std::vector<RowsetReaderSharedPtr> input_rs_readers;
     input_rs_readers.reserve(_input_rowsets.size());
@@ -170,19 +179,23 @@ Status Compaction::merge_input_rowsets() {
         _stats.rowid_conversion = &_rowid_conversion;
     }
 
+    int64_t way_num = merge_way_num();
+
     Status res;
     {
         SCOPED_TIMER(_merge_rowsets_latency_timer);
         if (_is_vertical) {
             res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
*_cur_tablet_schema,
                                                  input_rs_readers, 
_output_rs_writer.get(),
-                                                 get_avg_segment_rows(), 
&_stats);
+                                                 get_avg_segment_rows(), 
way_num, &_stats);
         } else {
             res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
*_cur_tablet_schema,
                                          input_rs_readers, 
_output_rs_writer.get(), &_stats);
         }
     }
 
+    _tablet->last_compaction_status = res;
+
     if (!res.ok()) {
         LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
                      << ", tablet=" << _tablet->tablet_id()
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 9ec1297c69c..8e0c1099a20 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -81,6 +81,8 @@ protected:
 
     void _load_segment_to_cache();
 
+    int64_t merge_way_num();
+
     // the root tracker for this compaction
     std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 1e0f338da23..2c7e654787a 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -134,11 +134,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
                      << ", tablet=" << _tablet->tablet_id();
     }
 
+    int64_t max_score = config::cumulative_compaction_max_deltas;
+    auto process_memory_usage = 
doris::GlobalMemoryArbitrator::process_memory_usage();
+    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() 
* 0.8;
+    if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() 
|| memory_usage_high) {
+        max_score = std::max(config::cumulative_compaction_max_deltas /
+                                     
config::cumulative_compaction_max_deltas_factor,
+                             config::cumulative_compaction_min_deltas + 1);
+    }
+
     size_t compaction_score = 0;
     tablet()->cumulative_compaction_policy()->pick_input_rowsets(
-            tablet(), candidate_rowsets, 
config::cumulative_compaction_max_deltas,
-            config::cumulative_compaction_min_deltas, &_input_rowsets, 
&_last_delete_version,
-            &compaction_score, _allow_delete_in_cumu_compaction);
+            tablet(), candidate_rowsets, max_score, 
config::cumulative_compaction_min_deltas,
+            &_input_rowsets, &_last_delete_version, &compaction_score,
+            _allow_delete_in_cumu_compaction);
 
     // Cumulative compaction will process with at least 1 rowset.
     // So when there is no rowset being chosen, we should return 
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 330aa9e3475..cbf8f1eca65 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cstddef>
 #include <memory>
 
 #include "common/status.h"
@@ -121,6 +122,12 @@ public:
     size_t topn_limit = 0;
 };
 
+struct CompactionSampleInfo {
+    int64_t bytes = 0;
+    int64_t rows = 0;
+    int64_t group_data_size;
+};
+
 class RowwiseIterator;
 using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
 class RowwiseIterator {
@@ -133,7 +140,13 @@ public:
     // Input options may contain scan range in which this scan.
     // Return Status::OK() if init successfully,
     // Return other error otherwise
-    virtual Status init(const StorageReadOptions& opts) = 0;
+    virtual Status init(const StorageReadOptions& opts) {
+        return Status::NotSupported("to be implemented");
+    }
+
+    virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) {
+        return Status::NotSupported("to be implemented");
+    }
 
     // If there is any valid data, this function will load data
     // into input batch with Status::OK() returned
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index cecbeb163dd..4c620d30252 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -24,6 +24,7 @@
 #include <algorithm>
 #include <iterator>
 #include <memory>
+#include <mutex>
 #include <numeric>
 #include <ostream>
 #include <shared_mutex>
@@ -33,7 +34,9 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/status.h"
 #include "olap/base_tablet.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowid_conversion.h"
@@ -43,6 +46,7 @@
 #include "olap/rowset/segment_v2/segment_writer.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
+#include "olap/tablet_fwd.h"
 #include "olap/tablet_reader.h"
 #include "olap/utils.h"
 #include "util/slice.h"
@@ -241,7 +245,8 @@ Status Merger::vertical_compact_one_group(
         vectorized::RowSourcesBuffer* row_source_buf,
         const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
         RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, 
Statistics* stats_output,
-        std::vector<uint32_t> key_group_cluster_key_idxes) {
+        std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
+        CompactionSampleInfo* sample_info) {
     // build tablet reader
     VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << 
max_rows_per_segment;
     vectorized::VerticalBlockReader reader(row_source_buf);
@@ -279,7 +284,8 @@ Status Merger::vertical_compact_one_group(
 
     reader_params.return_columns = column_group;
     reader_params.origin_return_columns = &reader_params.return_columns;
-    RETURN_IF_ERROR(reader.init(reader_params));
+    reader_params.batch_size = batch_size;
+    RETURN_IF_ERROR(reader.init(reader_params, sample_info));
 
     if (reader_params.record_rowids) {
         
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
@@ -385,6 +391,55 @@ Status Merger::vertical_compact_one_group(int64_t 
tablet_id, ReaderType reader_t
     return Status::OK();
 }
 
+int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t 
way_cnt) {
+    std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
+    CompactionSampleInfo info = tablet->sample_infos[group_index];
+    if (way_cnt <= 0) {
+        LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
+                  << tablet->tablet_id() << " way cnt: " << way_cnt;
+        return 4096 - 32;
+    }
+    int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
+    if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
+        block_mem_limit /= 4;
+    }
+
+    int64_t group_data_size = 0;
+    if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
+        float smoothing_factor = 0.5;
+        group_data_size = int64_t(info.group_data_size * (1 - 
smoothing_factor) +
+                                  info.bytes / info.rows * smoothing_factor);
+        tablet->sample_infos[group_index].group_data_size = group_data_size;
+    } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 
0)) {
+        group_data_size = info.group_data_size;
+    } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
+        group_data_size = info.bytes / info.rows;
+        tablet->sample_infos[group_index].group_data_size = group_data_size;
+    } else {
+        LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
+                  << tablet->tablet_id() << " group data size: " << 
info.group_data_size
+                  << " row num: " << info.rows << " consume bytes: " << 
info.bytes;
+        return 1024 - 32;
+    }
+
+    if (group_data_size <= 0) {
+        LOG(WARNING) << "estimate batch size for vertical compaction, tablet 
id: "
+                     << tablet->tablet_id() << " unexpected group data size: " 
<< group_data_size;
+        return 4096 - 32;
+    }
+
+    tablet->sample_infos[group_index].bytes = 0;
+    tablet->sample_infos[group_index].rows = 0;
+
+    int64_t batch_size = block_mem_limit / group_data_size;
+    int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
+    LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << 
tablet->tablet_id()
+              << " group data size: " << info.group_data_size << " row num: " 
<< info.rows
+              << " consume bytes: " << info.bytes << " way cnt: " << way_cnt
+              << " batch size: " << res;
+    return res;
+}
+
 // steps to do vertical merge:
 // 1. split columns into column groups
 // 2. compact groups one by one, generate a row_source_buf when compact key 
group
@@ -394,7 +449,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
                                       const TabletSchema& tablet_schema,
                                       const 
std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
                                       RowsetWriter* dst_rowset_writer, int64_t 
max_rows_per_segment,
-                                      Statistics* stats_output) {
+                                      int64_t merge_way_num, Statistics* 
stats_output) {
     LOG(INFO) << "Start to do vertical compaction, tablet_id: " << 
tablet->tablet_id();
     std::vector<std::vector<uint32_t>> column_groups;
     vertical_split_columns(tablet_schema, &column_groups);
@@ -405,14 +460,18 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
 
     vectorized::RowSourcesBuffer row_sources_buf(
             tablet->tablet_id(), dst_rowset_writer->context().tablet_path, 
reader_type);
+    tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
     // compact group one by one
     for (auto i = 0; i < column_groups.size(); ++i) {
         VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
         bool is_key = (i == 0);
+        int64_t batch_size = config::compaction_batch_size != -1
+                                     ? config::compaction_batch_size
+                                     : estimate_batch_size(i, tablet, 
merge_way_num);
         RETURN_IF_ERROR(vertical_compact_one_group(
                 tablet, reader_type, tablet_schema, is_key, column_groups[i], 
&row_sources_buf,
                 src_rowset_readers, dst_rowset_writer, max_rows_per_segment, 
stats_output,
-                key_group_cluster_key_idxes));
+                key_group_cluster_key_idxes, batch_size, 
&(tablet->sample_infos[i])));
         if (is_key) {
             RETURN_IF_ERROR(row_sources_buf.flush());
         }
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 5749f518136..7513c90fbd1 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "io/io_common.h"
+#include "olap/iterators.h"
 #include "olap/rowset/rowset_fwd.h"
 #include "olap/tablet_fwd.h"
 
@@ -59,7 +60,7 @@ public:
     static Status vertical_merge_rowsets(
             BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& 
tablet_schema,
             const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
-            RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
+            RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, 
int64_t merge_way_num,
             Statistics* stats_output);
 
     // for vertical compaction
@@ -71,7 +72,8 @@ public:
             vectorized::RowSourcesBuffer* row_source_buf,
             const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
             RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, 
Statistics* stats_output,
-            std::vector<uint32_t> key_group_cluster_key_idxes);
+            std::vector<uint32_t> key_group_cluster_key_idxes, int64_t 
batch_size,
+            CompactionSampleInfo* sample_info);
 
     // for segcompaction
     static Status vertical_compact_one_group(int64_t tablet_id, ReaderType 
reader_type,
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 90b2ce48a0a..aa20b5b1ef1 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -269,6 +269,21 @@ public:
         return score;
     }
 
+    uint32_t get_merge_way_num() const {
+        uint32_t way_num = 0;
+        if (!is_segments_overlapping()) {
+            if (num_segments() == 0) {
+                way_num = 0;
+            } else {
+                way_num = 1;
+            }
+        } else {
+            way_num = num_segments();
+            CHECK(way_num > 0);
+        }
+        return way_num;
+    }
+
     void get_segments_key_bounds(std::vector<KeyBoundsPB>* 
segments_key_bounds) const {
         for (const KeyBoundsPB& key_range : 
_rowset_meta_pb.segments_key_bounds()) {
             segments_key_bounds->push_back(key_range);
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index 22a7049aa8f..95f2a945134 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -101,7 +101,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
     reader_params.tablet = tablet;
     reader_params.return_columns = return_columns;
     reader_params.is_key_column_group = is_key;
-    return (*reader)->init(reader_params);
+    return (*reader)->init(reader_params, nullptr);
 }
 
 std::unique_ptr<segment_v2::SegmentWriter> 
SegcompactionWorker::_create_segcompaction_writer(
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index a3cd3bd4a49..c257ba007f5 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -183,6 +183,8 @@ public:
         void check_validation() const;
 
         std::string to_string() const;
+
+        int64_t batch_size = -1;
     };
 
     TabletReader() = default;
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index c4dda20f40f..872836c91cd 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -25,6 +25,8 @@
 #include <ostream>
 
 #include "cloud/config.h"
+#include "olap/compaction.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
@@ -108,7 +110,8 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
     return Status::OK();
 }
 
-Status VerticalBlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
+Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params,
+                                               CompactionSampleInfo* 
sample_info) {
     std::vector<bool> iterator_init_flag;
     std::vector<RowsetId> rowset_ids;
     std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = 
read_params.segment_iters_ptr;
@@ -157,7 +160,8 @@ Status VerticalBlockReader::_init_collect_iter(const 
ReaderParams& read_params)
     // init collect iterator
     StorageReadOptions opts;
     opts.record_rowids = read_params.record_rowids;
-    RETURN_IF_ERROR(_vcollect_iter->init(opts));
+    opts.block_row_max = read_params.batch_size;
+    RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info));
 
     // In agg keys value columns compact, get first row for _init_agg_state
     if (!read_params.is_key_column_group && read_params.tablet->keys_type() == 
KeysType::AGG_KEYS) {
@@ -204,13 +208,17 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
 }
 
 Status VerticalBlockReader::init(const ReaderParams& read_params) {
+    return init(read_params, nullptr);
+}
+
+Status VerticalBlockReader::init(const ReaderParams& read_params,
+                                 CompactionSampleInfo* sample_info) {
     StorageReadOptions opts;
-    _reader_context.batch_size = opts.block_row_max;
+    _reader_context.batch_size = read_params.batch_size;
     RETURN_IF_ERROR(TabletReader::init(read_params));
 
     _arena = std::make_unique<Arena>();
-
-    auto status = _init_collect_iter(read_params);
+    auto status = _init_collect_iter(read_params, sample_info);
     if (!status.ok()) [[unlikely]] {
         if (!config::is_cloud_mode()) {
             static_cast<Tablet*>(_tablet.get())->report_error(status);
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index 81ef8d79100..e1e8cfa1239 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -56,6 +56,7 @@ public:
 
     // Initialize VerticalBlockReader with tablet, data version and fetch 
range.
     Status init(const ReaderParams& read_params) override;
+    Status init(const ReaderParams& read_params, CompactionSampleInfo* 
sample_info);
 
     Status next_block_with_aggregation(Block* block, bool* eof) override;
 
@@ -79,7 +80,7 @@ private:
     // to minimize the comparison time in merge heap.
     Status _unique_key_next_block(Block* block, bool* eof);
 
-    Status _init_collect_iter(const ReaderParams& read_params);
+    Status _init_collect_iter(const ReaderParams& read_params, 
CompactionSampleInfo* sample_info);
 
     Status _get_segment_iterators(const ReaderParams& read_params,
                                   std::vector<RowwiseIteratorUPtr>* 
segment_iters,
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 3323492ee90..81cfc756d63 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -21,6 +21,7 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <stdlib.h>
 
+#include <cstddef>
 #include <ostream>
 
 #include "cloud/config.h"
@@ -29,6 +30,7 @@
 #include "common/logging.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "olap/field.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "vec/columns/column.h"
 #include "vec/common/string_ref.h"
@@ -340,13 +342,18 @@ Status VerticalMergeIteratorContext::copy_rows(Block* 
block, bool advanced) {
     return Status::OK();
 }
 
-Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
+Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts,
+                                          CompactionSampleInfo* sample_info) {
     if (LIKELY(_inited)) {
         return Status::OK();
     }
     _block_row_max = opts.block_row_max;
     _record_rowids = opts.record_rowids;
     RETURN_IF_ERROR(_load_next_block());
+    if (sample_info != nullptr) {
+        sample_info->bytes += bytes();
+        sample_info->rows += rows();
+    }
     if (valid()) {
         RETURN_IF_ERROR(advance());
     }
@@ -505,7 +512,8 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
     return Status::EndOfFile("no more data in segment");
 }
 
-Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts,
+                                       CompactionSampleInfo* sample_info) {
     DCHECK(_origin_iters.size() == _iterator_init_flags.size());
     _record_rowids = opts.record_rowids;
     if (_origin_iters.empty()) {
@@ -533,7 +541,7 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     for (size_t i = 0; i < num_iters; ++i) {
         if (_iterator_init_flags[i] || pre_iter_invalid) {
             auto& ctx = _ori_iter_ctx[i];
-            RETURN_IF_ERROR(ctx->init(opts));
+            RETURN_IF_ERROR(ctx->init(opts, sample_info));
             if (!ctx->valid()) {
                 pre_iter_invalid = true;
                 continue;
@@ -606,7 +614,8 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) {
     return Status::EndOfFile("no more data in segment");
 }
 
-Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts,
+                                       CompactionSampleInfo* sample_info) {
     DCHECK(_origin_iters.size() == _iterator_init_flags.size());
     DCHECK(_keys_type == KeysType::DUP_KEYS);
     _record_rowids = opts.record_rowids;
@@ -626,7 +635,7 @@ Status VerticalFifoMergeIterator::init(const 
StorageReadOptions& opts) {
         std::unique_ptr<VerticalMergeIteratorContext> ctx(
                 new VerticalMergeIteratorContext(std::move(iter), 
_rowset_ids[seg_order],
                                                  _ori_return_cols, seg_order, 
_seq_col_idx));
-        RETURN_IF_ERROR(ctx->init(opts));
+        RETURN_IF_ERROR(ctx->init(opts, sample_info));
         if (!ctx->valid()) {
             ++seg_order;
             continue;
@@ -667,7 +676,7 @@ Status 
VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
     uint16_t order = row_source.get_source_num();
     auto& ctx = _origin_iter_ctx[order];
     // init ctx and this ctx must be valid
-    RETURN_IF_ERROR(ctx->init(_opts));
+    RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
     DCHECK(ctx->valid());
 
     if (UNLIKELY(ctx->is_first_row())) {
@@ -701,7 +710,7 @@ Status 
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
         auto row_source = _row_sources_buf->current();
         uint16_t order = row_source.get_source_num();
         auto& ctx = _origin_iter_ctx[order];
-        RETURN_IF_ERROR(ctx->init(_opts));
+        RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
         DCHECK(ctx->valid());
         if (!ctx->valid()) {
             LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -740,7 +749,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
         uint16_t order = _row_sources_buf->current().get_source_num();
         DCHECK(order < _origin_iter_ctx.size());
         auto& ctx = _origin_iter_ctx[order];
-        RETURN_IF_ERROR(ctx->init(_opts));
+        RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
         DCHECK(ctx->valid());
         if (!ctx->valid()) {
             LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -763,7 +772,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
     return st;
 }
 
-Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts,
+                                       CompactionSampleInfo* sample_info) {
     if (_origin_iters.empty()) {
         return Status::OK();
     }
@@ -778,6 +788,7 @@ Status VerticalMaskMergeIterator::init(const 
StorageReadOptions& opts) {
     }
     _origin_iters.clear();
 
+    _sample_info = sample_info;
     _block_row_max = opts.block_row_max;
     return Status::OK();
 }
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index f46a0446cf2..3751aa92c78 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -164,7 +164,7 @@ public:
 
     ~VerticalMergeIteratorContext() = default;
     Status block_reset(const std::shared_ptr<Block>& block);
-    Status init(const StorageReadOptions& opts);
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info = nullptr);
     bool compare(const VerticalMergeIteratorContext& rhs) const;
     Status copy_rows(Block* block, bool advanced = true);
     Status copy_rows(Block* block, size_t count);
@@ -200,6 +200,22 @@ public:
         return _block_row_locations[_index_in_block];
     }
 
+    size_t bytes() {
+        if (_block) {
+            return _block->bytes();
+        } else {
+            return 0;
+        }
+    }
+
+    size_t rows() {
+        if (_block) {
+            return _block->rows();
+        } else {
+            return 0;
+        }
+    }
+
 private:
     // Load next block into _block
     Status _load_next_block();
@@ -255,7 +271,7 @@ public:
     VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
     VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = 
delete;
 
-    Status init(const StorageReadOptions& opts) override;
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) override;
     Status next_batch(Block* block) override;
     const Schema& schema() const override { return *_schema; }
     uint64_t merged_rows() const override { return _merged_rows; }
@@ -321,7 +337,7 @@ public:
     VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
     VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = 
delete;
 
-    Status init(const StorageReadOptions& opts) override;
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) override;
     Status next_batch(Block* block) override;
     const Schema& schema() const override { return *_schema; }
     uint64_t merged_rows() const override { return _merged_rows; }
@@ -367,7 +383,7 @@ public:
     VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
     VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = 
delete;
 
-    Status init(const StorageReadOptions& opts) override;
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) override;
 
     Status next_batch(Block* block) override;
 
@@ -396,6 +412,7 @@ private:
     size_t _filtered_rows = 0;
     RowSourcesBuffer* _row_sources_buf;
     StorageReadOptions _opts;
+    CompactionSampleInfo* _sample_info = nullptr;
 };
 
 // segment merge iterator
diff --git a/be/test/olap/base_compaction_test.cpp 
b/be/test/olap/base_compaction_test.cpp
new file mode 100644
index 00000000000..7d9abe54ed2
--- /dev/null
+++ b/be/test/olap/base_compaction_test.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/base_compaction.h"
+
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class TestBaseCompaction : public testing::Test {};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool 
overlapping,
+                                     int data_size) {
+    auto rs_meta = std::make_shared<RowsetMeta>();
+    rs_meta->set_rowset_type(BETA_ROWSET); // important
+    rs_meta->_rowset_meta_pb.set_start_version(version.first);
+    rs_meta->_rowset_meta_pb.set_end_version(version.second);
+    rs_meta->set_num_segments(num_segments);
+    rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+    rs_meta->set_total_disk_size(data_size);
+    RowsetSharedPtr rowset;
+    Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), 
&rowset);
+    if (!st.ok()) {
+        return nullptr;
+    }
+    return rowset;
+}
+
+TEST_F(TestBaseCompaction, filter_input_rowset) {
+    StorageEngine engine({});
+    TabletMetaSharedPtr tablet_meta;
+    tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                     UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                     TCompressionType::LZ4F));
+    TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    tablet->_cumulative_point = 25;
+    BaseCompaction compaction(engine, tablet);
+    //std::vector<RowsetSharedPtr> rowsets;
+
+    RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0);
+    tablet->_rs_version_map.emplace(init_rs->version(), init_rs);
+    for (int i = 2; i < 30; ++i) {
+        RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+        tablet->_rs_version_map.emplace(rs->version(), rs);
+    }
+    Status st = compaction.pick_rowsets_to_compact();
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0);
+    EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1);
+
+    EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21);
+    EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21);
+}
+
+} // namespace doris
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index 7c56710f2e8..5ae80398afb 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -348,9 +348,9 @@ protected:
         stats.rowid_conversion = &rowid_conversion;
         Status s;
         if (is_vertical_merger) {
-            s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
-                                               *tablet_schema, 
input_rs_readers,
-                                               output_rs_writer.get(), 
10000000, &stats);
+            s = Merger::vertical_merge_rowsets(
+                    tablet, ReaderType::READER_BASE_COMPACTION, 
*tablet_schema, input_rs_readers,
+                    output_rs_writer.get(), 10000000, num_segments, &stats);
         } else {
             s = Merger::vmerge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, *tablet_schema,
                                        input_rs_readers, 
output_rs_writer.get(), &stats);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 3afd748e14d..4c4409a7506 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -490,7 +490,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
     stats.rowid_conversion = &rowid_conversion;
     auto s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
                                             *tablet_schema, input_rs_readers,
-                                            output_rs_writer.get(), 100, 
&stats);
+                                            output_rs_writer.get(), 100, 
num_segments, &stats);
     ASSERT_TRUE(s.ok()) << s;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -598,7 +598,7 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMerge) {
     stats.rowid_conversion = &rowid_conversion;
     auto s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
                                             *tablet_schema, input_rs_readers,
-                                            output_rs_writer.get(), 100, 
&stats);
+                                            output_rs_writer.get(), 100, 
num_segments, &stats);
     ASSERT_TRUE(s.ok()) << s;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -706,7 +706,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
     stats.rowid_conversion = &rowid_conversion;
     auto s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
                                             *tablet_schema, input_rs_readers,
-                                            output_rs_writer.get(), 10000, 
&stats);
+                                            output_rs_writer.get(), 10000, 
num_segments, &stats);
     EXPECT_TRUE(s.ok());
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -815,7 +815,8 @@ TEST_F(VerticalCompactionTest, 
TestDupKeyVerticalMergeWithDelete) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     st = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, *tablet_schema,
-                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                        input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                        &stats);
     ASSERT_TRUE(st.ok()) << st;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -917,7 +918,8 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMergeWithDelete) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     st = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, *tablet_schema,
-                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                        input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                        &stats);
     ASSERT_TRUE(st.ok()) << st;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -1010,7 +1012,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
     stats.rowid_conversion = &rowid_conversion;
     auto s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
                                             *tablet_schema, input_rs_readers,
-                                            output_rs_writer.get(), 100, 
&stats);
+                                            output_rs_writer.get(), 100, 
num_segments, &stats);
     EXPECT_TRUE(s.ok());
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
diff --git 
a/regression-test/suites/compaction/compaction_width_array_column.groovy 
b/regression-test/suites/compaction/compaction_width_array_column.groovy
new file mode 100644
index 00000000000..4e3fed354c7
--- /dev/null
+++ b/regression-test/suites/compaction/compaction_width_array_column.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('compaction_width_array_column', "p2") {
+    String backend_id;
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    backend_id = backendId_to_backendIP.keySet()[0]
+    def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+    logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+    assertEquals(code, 0)
+    def configList = parseJson(out.trim())
+    assert configList instanceof List
+
+    def s3BucketName = getS3BucketName()
+    def random = new Random();
+
+    def s3WithProperties = """WITH S3 (
+        |"AWS_ACCESS_KEY" = "${getS3AK()}",
+        |"AWS_SECRET_KEY" = "${getS3SK()}",
+        |"AWS_ENDPOINT" = "${getS3Endpoint()}",
+        |"AWS_REGION" = "${getS3Region()}")
+        |PROPERTIES(
+        |"exec_mem_limit" = "8589934592",
+        |"load_parallelism" = "3")""".stripMargin()
+
+    // set fe configuration
+    sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = 
'161061273600')"
+
+    def tableName = "column_witdh_array"
+
+    def table_create_task = { table_name ->
+        // drop table if exists
+        sql """drop table if exists ${table_name}"""
+        // create table
+        def create_table = new 
File("""${context.file.parent}/ddl/${table_name}.sql""").text
+        create_table = create_table.replaceAll("\\\$\\{table\\_name\\}", 
table_name)
+        sql create_table
+    }
+
+    def table_load_task = { table_name ->
+        uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
+        loadLabel = table_name + "_" + uniqueID
+        //loadLabel = table_name + '_load_5'
+        loadSql = new 
File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
 s3BucketName)
+        loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel)
+        loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name)
+        nowloadSql = loadSql + s3WithProperties
+        try_sql nowloadSql
+
+        while (true) {
+            def stateResult = sql "show load where Label = '${loadLabel}'"
+            logger.info("load result is ${stateResult}")
+            def loadState = stateResult[stateResult.size() - 1][2].toString()
+            if ("CANCELLED".equalsIgnoreCase(loadState)) {
+                throw new IllegalStateException("load ${loadLabel} failed.")
+            } else if ("FINISHED".equalsIgnoreCase(loadState)) {
+                break
+            }
+            sleep(5000)
+        }
+    }
+
+    table_create_task(tableName)
+    table_load_task(tableName)
+
+    def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+    boolean isOverLap = true
+    int tryCnt = 0;
+    while (isOverLap && tryCnt < 3) {
+        isOverLap = false
+
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            assertEquals("success", compactJson.status.toLowerCase())
+        }
+
+        // wait for all compactions done
+        for (def tablet in tablets) {
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                logger.info("rowset info" + rowset)
+                String overLappingStr = rowset.split(" ")[3]
+                if (overLappingStr == "OVERLAPPING") {
+                    isOverLap = true;
+                }
+                logger.info("is over lap " + isOverLap + " " + overLappingStr)
+            }
+        }
+        tryCnt++;
+    }
+
+    assertFalse(isOverLap);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to