This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0603ec1d9d7 [enhancement](compaction) optimizing memory usage for 
compaction (#37099) (#37486)
0603ec1d9d7 is described below

commit 0603ec1d9d746c748b40732b0cd627550dc94d79
Author: Luwei <814383...@qq.com>
AuthorDate: Sun Aug 4 10:49:18 2024 +0800

    [enhancement](compaction) optimizing memory usage for compaction (#37099) 
(#37486)
---
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   6 +
 be/src/olap/base_compaction.cpp                    |  10 ++
 be/src/olap/base_tablet.h                          |   5 +
 be/src/olap/compaction.cpp                         |  14 ++-
 be/src/olap/compaction.h                           |   1 +
 be/src/olap/cumulative_compaction.cpp              |  15 ++-
 be/src/olap/iterators.h                            |  15 ++-
 be/src/olap/merger.cpp                             |  67 +++++++++-
 be/src/olap/merger.h                               |   6 +-
 be/src/olap/rowset/rowset_meta.h                   |  15 +++
 be/src/olap/rowset/segcompaction.cpp               |   2 +-
 be/src/olap/tablet_reader.h                        |   2 +
 be/src/vec/olap/vertical_block_reader.cpp          |  23 +++-
 be/src/vec/olap/vertical_block_reader.h            |   3 +-
 be/src/vec/olap/vertical_merge_iterator.cpp        |  29 +++--
 be/src/vec/olap/vertical_merge_iterator.h          |  25 +++-
 be/test/olap/base_compaction_test.cpp              |  84 +++++++++++++
 be/test/olap/rowid_conversion_test.cpp             |   6 +-
 be/test/vec/olap/vertical_compaction_test.cpp      |  18 ++-
 .../compaction_width_array_column.groovy           | 137 +++++++++++++++++++++
 21 files changed, 450 insertions(+), 39 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 44ad6f8be6d..e5ab5c20373 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -378,6 +378,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");
 
 DEFINE_Bool(enable_base_compaction_idle_sched, "true");
 DEFINE_mInt64(base_compaction_min_rowset_num, "5");
+DEFINE_mInt64(base_compaction_max_compaction_score, "20");
 DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
 DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
 
@@ -408,6 +409,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
 // cumulative compaction policy: min and max delta file's number
 DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
 DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
+DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");
 
 // This config can be set to limit thread number in  multiget thread pool.
 DEFINE_mInt32(multi_get_max_threads, "10");
@@ -1256,6 +1258,10 @@ DEFINE_Int64(min_row_group_size, "134217728");
 // The time out milliseconds for remote fetch schema RPC, default 60s
 DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000");
 
+DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");
+
+DEFINE_mInt64(compaction_batch_size, "-1");
+
 // If set to false, the parquet reader will not use page index to filter data.
 // This is only for debug purpose, in case sometimes the page index
 // filter wrong data.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2514b4f2fa8..d1f91ab693d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -434,6 +434,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);
 
 DECLARE_Bool(enable_base_compaction_idle_sched);
 DECLARE_mInt64(base_compaction_min_rowset_num);
+DECLARE_mInt64(base_compaction_max_compaction_score);
 DECLARE_mDouble(base_compaction_min_data_ratio);
 DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);
 
@@ -464,6 +465,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
 // cumulative compaction policy: min and max delta file's number
 DECLARE_mInt64(cumulative_compaction_min_deltas);
 DECLARE_mInt64(cumulative_compaction_max_deltas);
+DECLARE_mInt32(cumulative_compaction_max_deltas_factor);
 
 // This config can be set to limit thread number in  multiget thread pool.
 DECLARE_mInt32(multi_get_max_threads);
@@ -1346,6 +1348,10 @@ DECLARE_mInt64(fetch_remote_schema_rpc_timeout_ms);
 // The minimum row group size when exporting Parquet files.
 DECLARE_Int64(min_row_group_size);
 
+DECLARE_mInt64(compaction_memory_bytes_limit);
+
+DECLARE_mInt64(compaction_batch_size);
+
 DECLARE_mBool(enable_parquet_page_index);
 
 // Wheather to ignore not found file in external teble(eg, hive)
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 474909cbf45..a9455d45381 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -154,6 +154,16 @@ Status BaseCompaction::pick_rowsets_to_compact() {
                 "situation, no need to do base compaction.");
     }
 
+    int score = 0;
+    int rowset_cnt = 0;
+    while (rowset_cnt < _input_rowsets.size()) {
+        score += 
_input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
+        if (score > config::base_compaction_max_compaction_score) {
+            break;
+        }
+    }
+    _input_rowsets.resize(rowset_cnt);
+
     // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas 
threshold
     if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
         VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << 
_tablet->tablet_id()
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 768c69624fa..4338986efe6 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
@@ -104,6 +105,10 @@ public:
     IntCounter* flush_finish_count = nullptr;
     std::atomic<int64_t> published_count = 0;
 
+    std::mutex sample_info_lock;
+    std::vector<CompactionSampleInfo> sample_infos;
+    Status last_compaction_status = Status::OK();
+
     std::atomic<int64_t> read_block_count = 0;
     std::atomic<int64_t> write_count = 0;
     std::atomic<int64_t> compaction_count = 0;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 171b68f30b6..849db757ac0 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -316,6 +316,15 @@ bool Compaction::handle_ordered_data_compaction() {
     return st.ok();
 }
 
+int64_t Compaction::merge_way_num() {
+    int64_t way_num = 0;
+    for (auto&& rowset : _input_rowsets) {
+        way_num += rowset->rowset_meta()->get_merge_way_num();
+    }
+
+    return way_num;
+}
+
 Status Compaction::do_compaction_impl(int64_t permits) {
     OlapStopWatch watch;
 
@@ -363,6 +372,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
                                                
_tablet->enable_unique_key_merge_on_write())) {
         stats.rowid_conversion = &_rowid_conversion;
     }
+    int64_t way_num = merge_way_num();
 
     Status res;
     {
@@ -370,13 +380,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         if (vertical_compaction) {
             res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
                                                  _input_rs_readers, 
_output_rs_writer.get(),
-                                                 get_avg_segment_rows(), 
&stats);
+                                                 get_avg_segment_rows(), 
way_num, &stats);
         } else {
             res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
                                          _input_rs_readers, 
_output_rs_writer.get(), &stats);
         }
     }
 
+    _tablet->last_compaction_status = res;
+
     if (!res.ok()) {
         LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
                      << ", tablet=" << _tablet->tablet_id()
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 5b1580f209d..5aa3e260194 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -105,6 +105,7 @@ protected:
 private:
     bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& 
commit_rowset_ids_set) const;
     void _load_segment_to_cache();
+    int64_t merge_way_num();
 
 protected:
     // the root tracker for this compaction
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 42748012cab..f461de3a5e9 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -116,11 +116,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
                      << ", tablet=" << _tablet->tablet_id();
     }
 
+    int64_t max_score = config::cumulative_compaction_max_deltas;
+    auto process_memory_usage = 
doris::GlobalMemoryArbitrator::process_memory_usage();
+    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() 
* 0.8;
+    if (_tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || 
memory_usage_high) {
+        max_score = std::max(config::cumulative_compaction_max_deltas /
+                                     
config::cumulative_compaction_max_deltas_factor,
+                             config::cumulative_compaction_min_deltas + 1);
+    }
+
     size_t compaction_score = 0;
     _tablet->cumulative_compaction_policy()->pick_input_rowsets(
-            _tablet.get(), candidate_rowsets, 
config::cumulative_compaction_max_deltas,
-            config::cumulative_compaction_min_deltas, &_input_rowsets, 
&_last_delete_version,
-            &compaction_score, allow_delete_in_cumu_compaction());
+            _tablet.get(), candidate_rowsets, max_score, 
config::cumulative_compaction_min_deltas,
+            &_input_rowsets, &_last_delete_version, &compaction_score,
+            allow_delete_in_cumu_compaction());
 
     // Cumulative compaction will process with at least 1 rowset.
     // So when there is no rowset being chosen, we should return 
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index deb14ff554f..5d752a2bf73 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cstddef>
 #include <memory>
 
 #include "common/status.h"
@@ -122,6 +123,12 @@ public:
     size_t topn_limit = 0;
 };
 
+struct CompactionSampleInfo {
+    int64_t bytes = 0;
+    int64_t rows = 0;
+    int64_t group_data_size;
+};
+
 class RowwiseIterator;
 using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
 class RowwiseIterator {
@@ -134,7 +141,13 @@ public:
     // Input options may contain scan range in which this scan.
     // Return Status::OK() if init successfully,
     // Return other error otherwise
-    virtual Status init(const StorageReadOptions& opts) = 0;
+    virtual Status init(const StorageReadOptions& opts) {
+        return Status::NotSupported("to be implemented");
+    }
+
+    virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) {
+        return Status::NotSupported("to be implemented");
+    }
 
     // If there is any valid data, this function will load data
     // into input batch with Status::OK() returned
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index b73c5bda645..37f1c2116d2 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -24,6 +24,7 @@
 #include <algorithm>
 #include <iterator>
 #include <memory>
+#include <mutex>
 #include <numeric>
 #include <ostream>
 #include <shared_mutex>
@@ -33,6 +34,8 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/status.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowid_conversion.h"
@@ -42,6 +45,7 @@
 #include "olap/rowset/segment_v2/segment_writer.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
+#include "olap/tablet_fwd.h"
 #include "olap/tablet_reader.h"
 #include "olap/utils.h"
 #include "util/slice.h"
@@ -212,7 +216,8 @@ Status Merger::vertical_compact_one_group(
         const std::vector<uint32_t>& column_group, 
vectorized::RowSourcesBuffer* row_source_buf,
         const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
         RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, 
Statistics* stats_output,
-        std::vector<uint32_t> key_group_cluster_key_idxes) {
+        std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
+        CompactionSampleInfo* sample_info) {
     // build tablet reader
     VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << 
max_rows_per_segment;
     vectorized::VerticalBlockReader reader(row_source_buf);
@@ -250,7 +255,8 @@ Status Merger::vertical_compact_one_group(
 
     reader_params.return_columns = column_group;
     reader_params.origin_return_columns = &reader_params.return_columns;
-    RETURN_IF_ERROR(reader.init(reader_params));
+    reader_params.batch_size = batch_size;
+    RETURN_IF_ERROR(reader.init(reader_params, sample_info));
 
     if (reader_params.record_rowids) {
         
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
@@ -356,6 +362,55 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr 
tablet, ReaderType rea
     return Status::OK();
 }
 
+int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t 
way_cnt) {
+    std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
+    CompactionSampleInfo info = tablet->sample_infos[group_index];
+    if (way_cnt <= 0) {
+        LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
+                  << tablet->tablet_id() << " way cnt: " << way_cnt;
+        return 4096 - 32;
+    }
+    int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
+    if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
+        block_mem_limit /= 4;
+    }
+
+    int64_t group_data_size = 0;
+    if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
+        float smoothing_factor = 0.5;
+        group_data_size = int64_t(info.group_data_size * (1 - 
smoothing_factor) +
+                                  info.bytes / info.rows * smoothing_factor);
+        tablet->sample_infos[group_index].group_data_size = group_data_size;
+    } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 
0)) {
+        group_data_size = info.group_data_size;
+    } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
+        group_data_size = info.bytes / info.rows;
+        tablet->sample_infos[group_index].group_data_size = group_data_size;
+    } else {
+        LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
+                  << tablet->tablet_id() << " group data size: " << 
info.group_data_size
+                  << " row num: " << info.rows << " consume bytes: " << 
info.bytes;
+        return 1024 - 32;
+    }
+
+    if (group_data_size <= 0) {
+        LOG(WARNING) << "estimate batch size for vertical compaction, tablet 
id: "
+                     << tablet->tablet_id() << " unexpected group data size: " 
<< group_data_size;
+        return 4096 - 32;
+    }
+
+    tablet->sample_infos[group_index].bytes = 0;
+    tablet->sample_infos[group_index].rows = 0;
+
+    int64_t batch_size = block_mem_limit / group_data_size;
+    int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
+    LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << 
tablet->tablet_id()
+              << " group data size: " << info.group_data_size << " row num: " 
<< info.rows
+              << " consume bytes: " << info.bytes << " way cnt: " << way_cnt
+              << " batch size: " << res;
+    return res;
+}
+
 // steps to do vertical merge:
 // 1. split columns into column groups
 // 2. compact groups one by one, generate a row_source_buf when compact key 
group
@@ -365,7 +420,7 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr 
tablet, ReaderType reader_
                                       TabletSchemaSPtr tablet_schema,
                                       const 
std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
                                       RowsetWriter* dst_rowset_writer, int64_t 
max_rows_per_segment,
-                                      Statistics* stats_output) {
+                                      int64_t merge_way_num, Statistics* 
stats_output) {
     LOG(INFO) << "Start to do vertical compaction, tablet_id: " << 
tablet->tablet_id();
     std::vector<std::vector<uint32_t>> column_groups;
     vertical_split_columns(tablet_schema, &column_groups);
@@ -376,14 +431,18 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr 
tablet, ReaderType reader_
 
     vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), 
tablet->tablet_path(),
                                                  reader_type);
+    tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
     // compact group one by one
     for (auto i = 0; i < column_groups.size(); ++i) {
         VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
         bool is_key = (i == 0);
+        int64_t batch_size = config::compaction_batch_size != -1
+                                     ? config::compaction_batch_size
+                                     : estimate_batch_size(i, tablet, 
merge_way_num);
         RETURN_IF_ERROR(vertical_compact_one_group(
                 tablet, reader_type, tablet_schema, is_key, column_groups[i], 
&row_sources_buf,
                 src_rowset_readers, dst_rowset_writer, max_rows_per_segment, 
stats_output,
-                key_group_cluster_key_idxes));
+                key_group_cluster_key_idxes, batch_size, 
&(tablet->sample_infos[i])));
         if (is_key) {
             RETURN_IF_ERROR(row_sources_buf.flush());
         }
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index ab948f55ed9..49ca1e5227f 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -23,6 +23,7 @@
 
 #include "common/status.h"
 #include "io/io_common.h"
+#include "olap/iterators.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
@@ -62,7 +63,7 @@ public:
     static Status vertical_merge_rowsets(
             TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr 
tablet_schema,
             const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
-            RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
+            RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, 
int64_t merge_way_num,
             Statistics* stats_output);
 
 public:
@@ -75,7 +76,8 @@ public:
             vectorized::RowSourcesBuffer* row_source_buf,
             const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
             RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, 
Statistics* stats_output,
-            std::vector<uint32_t> key_group_cluster_key_idxes);
+            std::vector<uint32_t> key_group_cluster_key_idxes, int64_t 
batch_size,
+            CompactionSampleInfo* sample_info);
 
     // for segcompaction
     static Status vertical_compact_one_group(TabletSharedPtr tablet, 
ReaderType reader_type,
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 5284deb461b..99221789b81 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -262,6 +262,21 @@ public:
         return score;
     }
 
+    uint32_t get_merge_way_num() const {
+        uint32_t way_num = 0;
+        if (!is_segments_overlapping()) {
+            if (num_segments() == 0) {
+                way_num = 0;
+            } else {
+                way_num = 1;
+            }
+        } else {
+            way_num = num_segments();
+            CHECK(way_num > 0);
+        }
+        return way_num;
+    }
+
     void get_segments_key_bounds(std::vector<KeyBoundsPB>* 
segments_key_bounds) const {
         for (const KeyBoundsPB& key_range : 
_rowset_meta_pb.segments_key_bounds()) {
             segments_key_bounds->push_back(key_range);
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index 8fee04ccb80..9f7f0ec91f4 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -102,7 +102,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
     reader_params.tablet = tablet;
     reader_params.return_columns = return_columns;
     reader_params.is_key_column_group = is_key;
-    return (*reader)->init(reader_params);
+    return (*reader)->init(reader_params, nullptr);
 }
 
 std::unique_ptr<segment_v2::SegmentWriter> 
SegcompactionWorker::_create_segcompaction_writer(
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 3bf83ec296c..942c61f8207 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -184,6 +184,8 @@ public:
         void check_validation() const;
 
         std::string to_string() const;
+
+        int64_t batch_size = -1;
     };
 
     TabletReader() = default;
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index c472e678abd..58a2332d5a8 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -24,6 +24,8 @@
 #include <boost/iterator/iterator_facade.hpp>
 #include <ostream>
 
+#include "olap/compaction.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
@@ -107,7 +109,8 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
     return Status::OK();
 }
 
-Status VerticalBlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
+Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params,
+                                               CompactionSampleInfo* 
sample_info) {
     std::vector<bool> iterator_init_flag;
     std::vector<RowsetId> rowset_ids;
     std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = 
read_params.segment_iters_ptr;
@@ -156,7 +159,10 @@ Status VerticalBlockReader::_init_collect_iter(const 
ReaderParams& read_params)
     // init collect iterator
     StorageReadOptions opts;
     opts.record_rowids = read_params.record_rowids;
-    RETURN_IF_ERROR(_vcollect_iter->init(opts));
+    if (read_params.batch_size > 0) {
+        opts.block_row_max = read_params.batch_size;
+    }
+    RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info));
 
     // In agg keys value columns compact, get first row for _init_agg_state
     if (!read_params.is_key_column_group && read_params.tablet->keys_type() == 
KeysType::AGG_KEYS) {
@@ -203,11 +209,20 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
 }
 
 Status VerticalBlockReader::init(const ReaderParams& read_params) {
+    return init(read_params, nullptr);
+}
+
+Status VerticalBlockReader::init(const ReaderParams& read_params,
+                                 CompactionSampleInfo* sample_info) {
     StorageReadOptions opts;
-    _reader_context.batch_size = opts.block_row_max;
+    if (read_params.batch_size > 0) {
+        _reader_context.batch_size = read_params.batch_size;
+    } else {
+        _reader_context.batch_size = opts.block_row_max;
+    }
     RETURN_IF_ERROR(TabletReader::init(read_params));
 
-    auto status = _init_collect_iter(read_params);
+    auto status = _init_collect_iter(read_params, sample_info);
     if (!status.ok()) [[unlikely]] {
         if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
             static_cast<Tablet*>(_tablet.get())->report_error(status);
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index 77a01587b58..2043db4b00a 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -56,6 +56,7 @@ public:
 
     // Initialize VerticalBlockReader with tablet, data version and fetch 
range.
     Status init(const ReaderParams& read_params) override;
+    Status init(const ReaderParams& read_params, CompactionSampleInfo* 
sample_info);
 
     Status next_block_with_aggregation(Block* block, bool* eof) override;
 
@@ -79,7 +80,7 @@ private:
     // to minimize the comparison time in merge heap.
     Status _unique_key_next_block(Block* block, bool* eof);
 
-    Status _init_collect_iter(const ReaderParams& read_params);
+    Status _init_collect_iter(const ReaderParams& read_params, 
CompactionSampleInfo* sample_info);
 
     Status _get_segment_iterators(const ReaderParams& read_params,
                                   std::vector<RowwiseIteratorUPtr>* 
segment_iters,
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 49916048b5c..95bf9d41c79 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -21,12 +21,14 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <stdlib.h>
 
+#include <cstddef>
 #include <ostream>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h"
 #include "olap/field.h"
+#include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "vec/columns/column.h"
 #include "vec/common/string_ref.h"
@@ -327,13 +329,18 @@ Status VerticalMergeIteratorContext::copy_rows(Block* 
block, bool advanced) {
     return Status::OK();
 }
 
-Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
+Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts,
+                                          CompactionSampleInfo* sample_info) {
     if (LIKELY(_inited)) {
         return Status::OK();
     }
     _block_row_max = opts.block_row_max;
     _record_rowids = opts.record_rowids;
     RETURN_IF_ERROR(_load_next_block());
+    if (sample_info != nullptr) {
+        sample_info->bytes += bytes();
+        sample_info->rows += rows();
+    }
     if (valid()) {
         RETURN_IF_ERROR(advance());
     }
@@ -492,7 +499,8 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
     return Status::EndOfFile("no more data in segment");
 }
 
-Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts,
+                                       CompactionSampleInfo* sample_info) {
     DCHECK(_origin_iters.size() == _iterator_init_flags.size());
     _record_rowids = opts.record_rowids;
     if (_origin_iters.empty()) {
@@ -520,7 +528,7 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     for (size_t i = 0; i < num_iters; ++i) {
         if (_iterator_init_flags[i] || pre_iter_invalid) {
             auto& ctx = _ori_iter_ctx[i];
-            RETURN_IF_ERROR(ctx->init(opts));
+            RETURN_IF_ERROR(ctx->init(opts, sample_info));
             if (!ctx->valid()) {
                 pre_iter_invalid = true;
                 continue;
@@ -593,7 +601,8 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) {
     return Status::EndOfFile("no more data in segment");
 }
 
-Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts,
+                                       CompactionSampleInfo* sample_info) {
     DCHECK(_origin_iters.size() == _iterator_init_flags.size());
     DCHECK(_keys_type == KeysType::DUP_KEYS);
     _record_rowids = opts.record_rowids;
@@ -613,7 +622,7 @@ Status VerticalFifoMergeIterator::init(const 
StorageReadOptions& opts) {
         std::unique_ptr<VerticalMergeIteratorContext> ctx(
                 new VerticalMergeIteratorContext(std::move(iter), 
_rowset_ids[seg_order],
                                                  _ori_return_cols, seg_order, 
_seq_col_idx));
-        RETURN_IF_ERROR(ctx->init(opts));
+        RETURN_IF_ERROR(ctx->init(opts, sample_info));
         if (!ctx->valid()) {
             ++seg_order;
             continue;
@@ -654,7 +663,7 @@ Status 
VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
     uint16_t order = row_source.get_source_num();
     auto& ctx = _origin_iter_ctx[order];
     // init ctx and this ctx must be valid
-    RETURN_IF_ERROR(ctx->init(_opts));
+    RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
     DCHECK(ctx->valid());
 
     if (UNLIKELY(ctx->is_first_row())) {
@@ -688,7 +697,7 @@ Status 
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
         auto row_source = _row_sources_buf->current();
         uint16_t order = row_source.get_source_num();
         auto& ctx = _origin_iter_ctx[order];
-        RETURN_IF_ERROR(ctx->init(_opts));
+        RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
         DCHECK(ctx->valid());
         if (!ctx->valid()) {
             LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -727,7 +736,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
         uint16_t order = _row_sources_buf->current().get_source_num();
         DCHECK(order < _origin_iter_ctx.size());
         auto& ctx = _origin_iter_ctx[order];
-        RETURN_IF_ERROR(ctx->init(_opts));
+        RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
         DCHECK(ctx->valid());
         if (!ctx->valid()) {
             LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -750,7 +759,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
     return st;
 }
 
-Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts,
+                                       CompactionSampleInfo* sample_info) {
     if (_origin_iters.empty()) {
         return Status::OK();
     }
@@ -765,6 +775,7 @@ Status VerticalMaskMergeIterator::init(const 
StorageReadOptions& opts) {
     }
     _origin_iters.clear();
 
+    _sample_info = sample_info;
     _block_row_max = opts.block_row_max;
     return Status::OK();
 }
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index f46a0446cf2..3751aa92c78 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -164,7 +164,7 @@ public:
 
     ~VerticalMergeIteratorContext() = default;
     Status block_reset(const std::shared_ptr<Block>& block);
-    Status init(const StorageReadOptions& opts);
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info = nullptr);
     bool compare(const VerticalMergeIteratorContext& rhs) const;
     Status copy_rows(Block* block, bool advanced = true);
     Status copy_rows(Block* block, size_t count);
@@ -200,6 +200,22 @@ public:
         return _block_row_locations[_index_in_block];
     }
 
+    size_t bytes() {
+        if (_block) {
+            return _block->bytes();
+        } else {
+            return 0;
+        }
+    }
+
+    size_t rows() {
+        if (_block) {
+            return _block->rows();
+        } else {
+            return 0;
+        }
+    }
+
 private:
     // Load next block into _block
     Status _load_next_block();
@@ -255,7 +271,7 @@ public:
     VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
     VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = 
delete;
 
-    Status init(const StorageReadOptions& opts) override;
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) override;
     Status next_batch(Block* block) override;
     const Schema& schema() const override { return *_schema; }
     uint64_t merged_rows() const override { return _merged_rows; }
@@ -321,7 +337,7 @@ public:
     VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
     VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = 
delete;
 
-    Status init(const StorageReadOptions& opts) override;
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) override;
     Status next_batch(Block* block) override;
     const Schema& schema() const override { return *_schema; }
     uint64_t merged_rows() const override { return _merged_rows; }
@@ -367,7 +383,7 @@ public:
     VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
     VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = 
delete;
 
-    Status init(const StorageReadOptions& opts) override;
+    Status init(const StorageReadOptions& opts, CompactionSampleInfo* 
sample_info) override;
 
     Status next_batch(Block* block) override;
 
@@ -396,6 +412,7 @@ private:
     size_t _filtered_rows = 0;
     RowSourcesBuffer* _row_sources_buf;
     StorageReadOptions _opts;
+    CompactionSampleInfo* _sample_info = nullptr;
 };
 
 // segment merge iterator
diff --git a/be/test/olap/base_compaction_test.cpp 
b/be/test/olap/base_compaction_test.cpp
new file mode 100644
index 00000000000..ff53e842787
--- /dev/null
+++ b/be/test/olap/base_compaction_test.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/base_compaction.h"
+
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class TestBaseCompaction : public testing::Test {};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool 
overlapping,
+                                     int data_size) {
+    auto rs_meta = std::make_shared<RowsetMeta>();
+    rs_meta->set_rowset_type(BETA_ROWSET); // important
+    rs_meta->_rowset_meta_pb.set_start_version(version.first);
+    rs_meta->_rowset_meta_pb.set_end_version(version.second);
+    rs_meta->set_num_segments(num_segments);
+    rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+    rs_meta->set_total_disk_size(data_size);
+    RowsetSharedPtr rowset;
+    Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), 
&rowset);
+    if (!st.ok()) {
+        return nullptr;
+    }
+    return rowset;
+}
+
+TEST_F(TestBaseCompaction, filter_input_rowset) {
+    StorageEngine engine({});
+    TabletMetaSharedPtr tablet_meta;
+    tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                     UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                     TCompressionType::LZ4F));
+    TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    tablet->_cumulative_point = 25;
+    BaseCompaction compaction(tablet);
+    //std::vector<RowsetSharedPtr> rowsets;
+
+    RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0);
+    tablet->_rs_version_map.emplace(init_rs->version(), init_rs);
+    for (int i = 2; i < 30; ++i) {
+        RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+        tablet->_rs_version_map.emplace(rs->version(), rs);
+    }
+    Status st = compaction.pick_rowsets_to_compact();
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0);
+    EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1);
+
+    EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21);
+    EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21);
+}
+
+} // namespace doris
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index d28e9f7dfe9..658b104493f 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -353,9 +353,9 @@ protected:
         RowIdConversion rowid_conversion;
         stats.rowid_conversion = &rowid_conversion;
         if (is_vertical_merger) {
-            s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
-                                               tablet_schema, input_rs_readers,
-                                               output_rs_writer.get(), 
10000000, &stats);
+            s = Merger::vertical_merge_rowsets(
+                    tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, 
input_rs_readers,
+                    output_rs_writer.get(), 10000000, num_segments, &stats);
         } else {
             s = Merger::vmerge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
                                        input_rs_readers, 
output_rs_writer.get(), &stats);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 1eb023a01ac..56bf40546bd 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -491,7 +491,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                       input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                       &stats);
     ASSERT_TRUE(s.ok()) << s;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -598,7 +599,8 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMerge) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                       input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                       &stats);
     ASSERT_TRUE(s.ok()) << s;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -705,7 +707,8 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 10000, &stats);
+                                       input_rs_readers, 
output_rs_writer.get(), 10000,
+                                       num_segments, &stats);
     EXPECT_TRUE(s.ok());
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -814,7 +817,8 @@ TEST_F(VerticalCompactionTest, 
TestDupKeyVerticalMergeWithDelete) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     st = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                        input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                        &stats);
     ASSERT_TRUE(st.ok()) << st;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -916,7 +920,8 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMergeWithDelete) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     st = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                        input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                        &stats);
     ASSERT_TRUE(st.ok()) << st;
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -1008,7 +1013,8 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                       input_rs_readers, 
output_rs_writer.get(), 100, num_segments,
+                                       &stats);
     EXPECT_TRUE(s.ok());
     RowsetSharedPtr out_rowset;
     EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
diff --git 
a/regression-test/suites/compaction/compaction_width_array_column.groovy 
b/regression-test/suites/compaction/compaction_width_array_column.groovy
new file mode 100644
index 00000000000..4e3fed354c7
--- /dev/null
+++ b/regression-test/suites/compaction/compaction_width_array_column.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('compaction_width_array_column', "p2") {
+    String backend_id;
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    backend_id = backendId_to_backendIP.keySet()[0]
+    def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+    logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+    assertEquals(code, 0)
+    def configList = parseJson(out.trim())
+    assert configList instanceof List
+
+    def s3BucketName = getS3BucketName()
+    def random = new Random();
+
+    def s3WithProperties = """WITH S3 (
+        |"AWS_ACCESS_KEY" = "${getS3AK()}",
+        |"AWS_SECRET_KEY" = "${getS3SK()}",
+        |"AWS_ENDPOINT" = "${getS3Endpoint()}",
+        |"AWS_REGION" = "${getS3Region()}")
+        |PROPERTIES(
+        |"exec_mem_limit" = "8589934592",
+        |"load_parallelism" = "3")""".stripMargin()
+
+    // set fe configuration
+    sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = 
'161061273600')"
+
+    def tableName = "column_witdh_array"
+
+    def table_create_task = { table_name ->
+        // drop table if exists
+        sql """drop table if exists ${table_name}"""
+        // create table
+        def create_table = new 
File("""${context.file.parent}/ddl/${table_name}.sql""").text
+        create_table = create_table.replaceAll("\\\$\\{table\\_name\\}", 
table_name)
+        sql create_table
+    }
+
+    def table_load_task = { table_name ->
+        uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
+        loadLabel = table_name + "_" + uniqueID
+        //loadLabel = table_name + '_load_5'
+        loadSql = new 
File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
 s3BucketName)
+        loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel)
+        loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name)
+        nowloadSql = loadSql + s3WithProperties
+        try_sql nowloadSql
+
+        while (true) {
+            def stateResult = sql "show load where Label = '${loadLabel}'"
+            logger.info("load result is ${stateResult}")
+            def loadState = stateResult[stateResult.size() - 1][2].toString()
+            if ("CANCELLED".equalsIgnoreCase(loadState)) {
+                throw new IllegalStateException("load ${loadLabel} failed.")
+            } else if ("FINISHED".equalsIgnoreCase(loadState)) {
+                break
+            }
+            sleep(5000)
+        }
+    }
+
+    table_create_task(tableName)
+    table_load_task(tableName)
+
+    def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+    boolean isOverLap = true
+    int tryCnt = 0;
+    while (isOverLap && tryCnt < 3) {
+        isOverLap = false
+
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            assertEquals("success", compactJson.status.toLowerCase())
+        }
+
+        // wait for all compactions done
+        for (def tablet in tablets) {
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                logger.info("rowset info" + rowset)
+                String overLappingStr = rowset.split(" ")[3]
+                if (overLappingStr == "OVERLAPPING") {
+                    isOverLap = true;
+                }
+                logger.info("is over lap " + isOverLap + " " + overLappingStr)
+            }
+        }
+        tryCnt++;
+    }
+
+    assertFalse(isOverLap);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to