This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fb8868e497e [Fear](compaction) sparse wide table compaction 
optimization  (#59967)
fb8868e497e is described below

commit fb8868e497e2c19a40e626396a9d428ad41d343d
Author: Jimmy <[email protected]>
AuthorDate: Tue Jan 27 11:44:45 2026 +0800

    [Fear](compaction) sparse wide table compaction optimization  (#59967)
    
    This commit optimizes the compaction for sparse wide tables with:
    
    1. Column batch replacement interface:
    - replace_column_data_range(): batch copy using memcpy for fixed-width
    types
       - support_replace_column_data_range(): runtime type check
    
    2. Iterator batch processing:
       - RowBatch: represents continuous rows from same segment
    - unique_key_next_batch(): returns multiple batches instead of
    row-by-row
    
    3. Reader sparse optimization:
       - Pre-fill NULL for nullable columns
       - SIMD detection for all-NULL/all-non-NULL batches
       - Run-length processing for mixed NULL batches
    
    4. Writer SIMD optimization:
       - Use simd::count_zero_num for fast NULL counting
       - Fast path for all-NULL and all-non-NULL cases
    
    5. Add batch Put operation for RLE encoder:
       - Put(value, run_length): write same value multiple times efficiently
       - Fast path when already in repeated run mode
       - Reduces loop iterations from N to 1 for repeated values
---
 be/src/common/config.cpp                          |    8 +
 be/src/common/config.h                            |    9 +
 be/src/olap/base_tablet.h                         |    5 +
 be/src/olap/iterators.h                           |    1 +
 be/src/olap/merger.cpp                            |   50 +-
 be/src/olap/merger.h                              |    3 +-
 be/src/olap/rowset/segment_v2/column_writer.cpp   |  118 +++
 be/src/olap/rowset/segment_v2/column_writer.h     |    7 +
 be/src/olap/tablet_reader.h                       |    4 +
 be/src/vec/columns/column.h                       |   17 +
 be/src/vec/columns/column_decimal.h               |   12 +
 be/src/vec/columns/column_nullable.h              |   26 +
 be/src/vec/columns/column_vector.h                |   12 +
 be/src/vec/olap/vertical_block_reader.cpp         |  165 ++-
 be/src/vec/olap/vertical_block_reader.h           |   18 +
 be/src/vec/olap/vertical_merge_iterator.cpp       |  154 +++
 be/src/vec/olap/vertical_merge_iterator.h         |   35 +
 be/test/vec/olap/vertical_compaction_test.cpp     |  145 +++
 be/test/vec/olap/vertical_merge_iterator_test.cpp | 1130 +++++++++++++++++++++
 19 files changed, 1914 insertions(+), 5 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 67dc6cfde02..3e49c887940 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -440,6 +440,14 @@ DEFINE_mInt32(vertical_compaction_num_columns_per_group, 
"5");
 DEFINE_Int32(vertical_compaction_max_row_source_memory_mb, "1024");
 // In vertical compaction, max dest segment file size
 DEFINE_mInt64(vertical_compaction_max_segment_size, "1073741824");
+// Density threshold for sparse column compaction optimization
+// density = (total_cells - null_cells) / total_cells, smaller means more 
sparse
+// When density <= threshold, enable sparse optimization
+// 0 = disable optimization, 1 = always enable
+// Default 0.05 means enable sparse optimization when desity <= 5%
+DEFINE_mDouble(sparse_column_compaction_threshold_percent, "0.05");
+// Enable RLE batch Put optimization for compaction
+DEFINE_mBool(enable_rle_batch_put_optimization, "true");
 
 // If enabled, segments will be flushed column by column
 DEFINE_mBool(enable_vertical_segment_writer, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e12c1879310..f08a260e20f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -492,6 +492,15 @@ DECLARE_mInt32(vertical_compaction_num_columns_per_group);
 DECLARE_Int32(vertical_compaction_max_row_source_memory_mb);
 // In vertical compaction, max dest segment file size
 DECLARE_mInt64(vertical_compaction_max_segment_size);
+// Threshold for sparse column compaction optimization (average bytes per row)
+// Density threshold for sparse column compaction optimization
+// density = (total_cells - null_cells) / total_cells, smaller means more 
sparse
+// When density <= threshold, enable sparse optimization
+// 0 = disable optimization, 1 = always enable
+// Default 1 means always enable sparse optimization
+DECLARE_mDouble(sparse_column_compaction_threshold_percent);
+// Enable RLE batch Put optimization for compaction
+DECLARE_mBool(enable_rle_batch_put_optimization);
 
 // If enabled, segments will be flushed column by column
 DECLARE_mBool(enable_vertical_segment_writer);
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index acd31ec2ff7..48b937903f6 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -400,6 +400,11 @@ public:
     std::mutex sample_info_lock;
     std::vector<CompactionSampleInfo> sample_infos;
     Status last_compaction_status = Status::OK();
+
+    // Density ratio for sparse optimization (non_null_cells / total_cells)
+    // Value range: [0.0, 1.0], smaller value means more sparse
+    // Default 1.0 means no history data, will not enable sparse optimization 
initially
+    std::atomic<double> compaction_density {1.0};
 };
 
 struct CaptureRowsetOps {
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 8fc880b7190..efbb0a2e00b 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -160,6 +160,7 @@ struct CompactionSampleInfo {
     int64_t bytes = 0;
     int64_t rows = 0;
     int64_t group_data_size = 0;
+    int64_t null_count = 0; // Number of NULL cells in this column group
 };
 
 struct BlockWithSameBit {
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index f98f909348f..4f0b1ce3b71 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -247,7 +247,7 @@ Status Merger::vertical_compact_one_group(
         const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
         RowsetWriter* dst_rowset_writer, uint32_t max_rows_per_segment, 
Statistics* stats_output,
         std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
-        CompactionSampleInfo* sample_info) {
+        CompactionSampleInfo* sample_info, bool enable_sparse_optimization) {
     // build tablet reader
     VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << 
max_rows_per_segment;
     vectorized::VerticalBlockReader reader(row_source_buf);
@@ -256,6 +256,7 @@ Status Merger::vertical_compact_one_group(
     reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
+    reader_params.enable_sparse_optimization = enable_sparse_optimization;
 
     TabletReadSource read_source;
     read_source.rs_splits.reserve(src_rowset_readers.size());
@@ -477,6 +478,31 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
     std::vector<uint32_t> key_group_cluster_key_idxes;
     vertical_split_columns(tablet_schema, &column_groups, 
&key_group_cluster_key_idxes);
 
+    // Calculate total rows for density calculation after compaction
+    int64_t total_rows = 0;
+    for (const auto& rs_reader : src_rowset_readers) {
+        total_rows += rs_reader->rowset()->rowset_meta()->num_rows();
+    }
+
+    // Use historical density for sparse wide table optimization
+    // density = (total_cells - null_cells) / total_cells, smaller means more 
sparse
+    // When density <= threshold, enable sparse optimization
+    // threshold = 0 means disable, 1 means always enable (default)
+    bool enable_sparse_optimization = false;
+    if (config::sparse_column_compaction_threshold_percent > 0 &&
+        tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+        double density = tablet->compaction_density.load();
+        enable_sparse_optimization = density <= 
config::sparse_column_compaction_threshold_percent;
+
+        LOG(INFO) << "Vertical compaction sparse optimization check: 
tablet_id="
+                  << tablet->tablet_id() << ", density=" << density
+                  << ", threshold=" << 
config::sparse_column_compaction_threshold_percent
+                  << ", total_rows=" << total_rows
+                  << ", num_columns=" << tablet_schema.num_columns()
+                  << ", total_cells=" << total_rows * 
tablet_schema.num_columns()
+                  << ", enable_sparse_optimization=" << 
enable_sparse_optimization;
+    }
+
     vectorized::RowSourcesBuffer row_sources_buf(
             tablet->tablet_id(), dst_rowset_writer->context().tablet_path, 
reader_type);
     Merger::Statistics total_stats;
@@ -501,7 +527,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
         Status st = vertical_compact_one_group(
                 tablet, reader_type, tablet_schema, is_key, column_groups[i], 
&row_sources_buf,
                 src_rowset_readers, dst_rowset_writer, max_rows_per_segment, 
group_stats_ptr,
-                key_group_cluster_key_idxes, batch_size, &sample_info);
+                key_group_cluster_key_idxes, batch_size, &sample_info, 
enable_sparse_optimization);
         {
             std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
             tablet->sample_infos[i] = sample_info;
@@ -526,6 +552,26 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
         RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
     }
 
+    // Calculate and store density for next compaction's sparse optimization 
threshold
+    // density = (total_cells - total_null_count) / total_cells
+    // Smaller density means more sparse
+    {
+        std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
+        int64_t total_null_count = 0;
+        for (const auto& info : tablet->sample_infos) {
+            total_null_count += info.null_count;
+        }
+        int64_t total_cells = total_rows * tablet_schema.num_columns();
+        if (total_cells > 0) {
+            double density = static_cast<double>(total_cells - 
total_null_count) /
+                             static_cast<double>(total_cells);
+            tablet->compaction_density.store(density);
+            LOG(INFO) << "Vertical compaction density update: tablet_id=" << 
tablet->tablet_id()
+                      << ", total_cells=" << total_cells
+                      << ", total_null_count=" << total_null_count << ", 
density=" << density;
+        }
+    }
+
     // finish compact, build output rowset
     VLOG_NOTICE << "finish compact groups";
     RETURN_IF_ERROR(dst_rowset_writer->final_flush());
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 13a3d386424..733c2de11f0 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -81,7 +81,8 @@ public:
             const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
             RowsetWriter* dst_rowset_writer, uint32_t max_rows_per_segment,
             Statistics* stats_output, std::vector<uint32_t> 
key_group_cluster_key_idxes,
-            int64_t batch_size, CompactionSampleInfo* sample_info);
+            int64_t batch_size, CompactionSampleInfo* sample_info,
+            bool enable_sparse_optimization = false);
 
     // for segcompaction
     static Status vertical_compact_one_group(int64_t tablet_id, ReaderType 
reader_type,
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index dc2fe2cca56..fab61cf1530 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -20,6 +20,7 @@
 #include <gen_cpp/segment_v2.pb.h>
 
 #include <algorithm>
+#include <cstring>
 #include <filesystem>
 #include <memory>
 
@@ -44,6 +45,7 @@
 #include "util/debug_points.h"
 #include "util/faststring.h"
 #include "util/rle_encoding.h"
+#include "util/simd/bits.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type_agg_state.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -60,6 +62,27 @@ public:
               _bitmap_buf(BitmapSize(reserve_bits)),
               _rle_encoder(&_bitmap_buf, 1) {}
 
+    void reserve_for_write(size_t num_rows, size_t non_null_count) {
+        if (num_rows == 0) {
+            return;
+        }
+        if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) 
{
+            if (_bitmap_buf.capacity() < kSmallReserveBytes) {
+                _bitmap_buf.reserve(kSmallReserveBytes);
+            }
+            return;
+        }
+        size_t raw_bytes = BitmapSize(num_rows);
+        size_t run_est = std::min(num_rows, non_null_count * 2 + 1);
+        size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes;
+        size_t raw_overhead = raw_bytes / 63 + 1;
+        size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
+        size_t reserve_bytes = std::min(raw_est, run_bytes_est);
+        if (_bitmap_buf.capacity() < reserve_bytes) {
+            _bitmap_buf.reserve(reserve_bytes);
+        }
+    }
+
     void add_run(bool value, size_t run) {
         _has_null |= value;
         _rle_encoder.Put(value, run);
@@ -82,6 +105,10 @@ public:
     uint64_t size() { return _bitmap_buf.size(); }
 
 private:
+    static constexpr size_t kSmallReserveBytes = 64;
+    static constexpr size_t kReserveSlackBytes = 16;
+    static constexpr size_t kBytesPerRun = 6;
+
     bool _has_null;
     faststring _bitmap_buf;
     RleEncoder<bool> _rle_encoder;
@@ -372,6 +399,25 @@ Status ColumnWriter::append_nullable(const uint8_t* 
is_null_bits, const void* da
 
 Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** 
ptr,
                                      size_t num_rows) {
+    // Fast path: use SIMD to detect all-NULL or all-non-NULL columns
+    if (config::enable_rle_batch_put_optimization) {
+        size_t non_null_count =
+                simd::count_zero_num(reinterpret_cast<const 
int8_t*>(null_map), num_rows);
+
+        if (non_null_count == 0) {
+            // All NULL: skip run-length iteration, directly append all nulls
+            RETURN_IF_ERROR(append_nulls(num_rows));
+            *ptr += get_field()->size() * num_rows;
+            return Status::OK();
+        }
+
+        if (non_null_count == num_rows) {
+            // All non-NULL: skip run-length iteration, directly append all 
data
+            return append_data(ptr, num_rows);
+        }
+    }
+
+    // Mixed case or sparse optimization disabled: use run-length processing
     size_t offset = 0;
     auto next_run_step = [&]() {
         size_t step = 1;
@@ -594,6 +640,78 @@ Status 
ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, siz
     return Status::OK();
 }
 
+Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const 
uint8_t** ptr,
+                                           size_t num_rows) {
+    // When optimization is disabled, use base class implementation
+    if (!config::enable_rle_batch_put_optimization) {
+        return ColumnWriter::append_nullable(null_map, ptr, num_rows);
+    }
+
+    if (UNLIKELY(num_rows == 0)) {
+        return Status::OK();
+    }
+
+    // Build run-length encoded null runs using memchr for fast boundary 
detection
+    _null_run_buffer.clear();
+    if (_null_run_buffer.capacity() < num_rows) {
+        _null_run_buffer.reserve(std::min(num_rows, size_t(256)));
+    }
+
+    size_t non_null_count = 0;
+    size_t offset = 0;
+    while (offset < num_rows) {
+        bool is_null = null_map[offset] != 0;
+        size_t remaining = num_rows - offset;
+        const uint8_t* run_end =
+                static_cast<const uint8_t*>(memchr(null_map + offset, is_null 
? 0 : 1, remaining));
+        size_t run_length = run_end != nullptr ? (run_end - (null_map + 
offset)) : remaining;
+        _null_run_buffer.push_back(NullRun {is_null, 
static_cast<uint32_t>(run_length)});
+        if (!is_null) {
+            non_null_count += run_length;
+        }
+        offset += run_length;
+    }
+
+    // Pre-allocate buffer based on estimated size
+    if (_null_bitmap_builder != nullptr) {
+        size_t current_rows = _next_rowid - _first_rowid;
+        size_t expected_rows = current_rows + num_rows;
+        size_t est_non_null = non_null_count;
+        if (num_rows > 0 && expected_rows > num_rows) {
+            est_non_null = (non_null_count * expected_rows) / num_rows;
+        }
+        _null_bitmap_builder->reserve_for_write(expected_rows, est_non_null);
+    }
+
+    if (non_null_count == 0) {
+        // All NULL: skip data writing, only update null bitmap and indexes
+        RETURN_IF_ERROR(append_nulls(num_rows));
+        *ptr += get_field()->size() * num_rows;
+        return Status::OK();
+    }
+
+    if (non_null_count == num_rows) {
+        // All non-NULL: use normal append_data which handles both data and 
null bitmap
+        return append_data(ptr, num_rows);
+    }
+
+    // Process by runs
+    for (const auto& run : _null_run_buffer) {
+        size_t run_length = run.len;
+        if (run.is_null) {
+            RETURN_IF_ERROR(append_nulls(run_length));
+            *ptr += get_field()->size() * run_length;
+        } else {
+            // TODO:
+            //  1. `*ptr += get_field()->size() * step;` should do in this 
function, not append_data;
+            //  2. support array vectorized load and ptr offset add
+            RETURN_IF_ERROR(append_data(ptr, run_length));
+        }
+    }
+
+    return Status::OK();
+}
+
 uint64_t ScalarColumnWriter::estimate_buffer_size() {
     uint64_t size = _data_size;
     size += _page_builder->size();
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index 0ff22fbe14b..e7d09c07d58 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -250,6 +250,7 @@ public:
         _new_page_callback = flush_page_callback;
     }
     Status append_data(const uint8_t** ptr, size_t num_rows) override;
+    Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, 
size_t num_rows) override;
 
     // used for append not null data. When page is full, will append data not 
reach num_rows.
     Status append_data_in_current_page(const uint8_t** ptr, size_t* 
num_written);
@@ -265,6 +266,12 @@ private:
     Status _internal_append_data_in_current_page(const uint8_t* ptr, size_t* 
num_written);
 
 private:
+    struct NullRun {
+        bool is_null;
+        uint32_t len;
+    };
+
+    std::vector<NullRun> _null_run_buffer;
     std::unique_ptr<PageBuilder> _page_builder;
 
     std::unique_ptr<NullBitmapBuilder> _null_bitmap_builder;
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 7e8c3630f51..71efa484fa1 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -185,6 +185,10 @@ public:
         bool is_key_column_group = false;
         std::vector<uint32_t> key_group_cluster_key_idxes;
 
+        // For sparse column compaction optimization
+        // When true, use optimized path for sparse wide tables
+        bool enable_sparse_optimization = false;
+
         bool is_segcompaction = false;
 
         std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr;
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index e162ee49516..846cf902018 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -687,6 +687,23 @@ public:
     // only used in agg value replace for column which is not variable length, 
eg.BlockReader::_copy_value_data
     // usage: self_column.replace_column_data(other_column, other_column's row 
index, self_column's row index)
     virtual void replace_column_data(const IColumn&, size_t row, size_t 
self_row = 0) = 0;
+
+    // Batch version of replace_column_data for replacing continuous range of 
data
+    // Used in sparse column compaction optimization for better performance
+    // Default implementation calls replace_column_data in a loop
+    // Subclasses (e.g., ColumnVector, ColumnDecimal) can override with 
optimized memcpy
+    virtual void replace_column_data_range(const IColumn& src, size_t 
src_start, size_t count,
+                                           size_t self_start) {
+        for (size_t i = 0; i < count; ++i) {
+            replace_column_data(src, src_start + i, self_start + i);
+        }
+    }
+    // Whether this column type supports efficient in-place range replacement.
+    // Returns true for fixed-width types (ColumnVector, ColumnDecimal) that 
can use memcpy.
+    // Returns false for variable-length types (ColumnString, ColumnArray, 
etc.) that require
+    // more complex handling. Used by sparse column compaction to choose the 
right code path.
+    virtual bool support_replace_column_data_range() const { return false; }
+
     // replace data to default value if null, used to avoid null data output 
decimal check failure
     // usage: nested_column.replace_column_null_data(nested_null_map.data())
     // only wrok on column_vector and column column decimal, there will be no 
behavior when other columns type call this method
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 7bca9f221b0..15feb3a9a82 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -221,6 +221,18 @@ public:
         data[self_row] = assert_cast<const Self&, 
TypeCheckOnRelease::DISABLE>(rhs).data[row];
     }
 
+    // Optimized batch version using memcpy for continuous range
+    void replace_column_data_range(const IColumn& src, size_t src_start, 
size_t count,
+                                   size_t self_start) override {
+        DCHECK(size() >= self_start + count);
+        const auto& src_col = assert_cast<const Self&, 
TypeCheckOnRelease::DISABLE>(src);
+        DCHECK(src_col.size() >= src_start + count);
+        memcpy(data.data() + self_start, src_col.data.data() + src_start,
+               count * sizeof(value_type));
+    }
+
+    bool support_replace_column_data_range() const override { return true; }
+
     void replace_column_null_data(const uint8_t* __restrict null_map) override;
 
     bool support_replace_column_null_data() const override { return true; }
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 2b532b47d4a..e601147b72c 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -309,6 +309,32 @@ public:
         }
     }
 
+    // Batch replace continuous range of data (for sparse compaction 
optimization)
+    // NOTE: This function is designed for "all non-NULL" scenario where the 
caller
+    // has already verified that all source values are non-NULL using SIMD 
count.
+    // For mixed NULL/non-NULL cases, use replace_column_data in a loop.
+    void replace_column_data_range(const IColumn& rhs, size_t src_start, 
size_t count,
+                                   size_t self_start) override {
+        DCHECK(size() >= self_start + count);
+        const auto& nullable_rhs =
+                assert_cast<const ColumnNullable&, 
TypeCheckOnRelease::DISABLE>(rhs);
+        DCHECK(nullable_rhs.size() >= src_start + count);
+
+        // Copy null_map using memcpy for efficiency
+        memcpy(get_null_map_data().data() + self_start,
+               nullable_rhs.get_null_map_data().data() + src_start, count);
+
+        // Batch copy nested column data using optimized 
replace_column_data_range
+        // This leverages memcpy in ColumnVector/ColumnDecimal for maximum 
performance
+        
_nested_column->replace_column_data_range(*nullable_rhs._nested_column, 
src_start, count,
+                                                  self_start);
+    }
+
+    // Delegate to nested column - only fixed-width types support efficient 
replacement
+    bool support_replace_column_data_range() const override {
+        return _nested_column->support_replace_column_data_range();
+    }
+
     void replace_float_special_values() override { 
_nested_column->replace_float_special_values(); }
 
     MutableColumnPtr convert_to_predicate_column_if_dictionary() override {
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index e3da600eeee..9ad04a07117 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -356,6 +356,18 @@ public:
         data[self_row] = assert_cast<const Self&, 
TypeCheckOnRelease::DISABLE>(rhs).data[row];
     }
 
+    // Optimized batch version using memcpy for continuous range
+    void replace_column_data_range(const IColumn& src, size_t src_start, 
size_t count,
+                                   size_t self_start) override {
+        DCHECK(size() >= self_start + count);
+        const auto& src_col = assert_cast<const Self&, 
TypeCheckOnRelease::DISABLE>(src);
+        DCHECK(src_col.size() >= src_start + count);
+        memcpy(data.data() + self_start, src_col.data.data() + src_start,
+               count * sizeof(value_type));
+    }
+
+    bool support_replace_column_data_range() const override { return true; }
+
     void replace_column_null_data(const uint8_t* __restrict null_map) override;
 
     bool support_replace_column_null_data() const override { return true; }
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 24bdf4e87a6..2a6161d465c 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -29,10 +29,13 @@
 #include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/rowset/rowset_reader_context.h"
+#include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet_schema.h"
+#include "util/simd/bits.h"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
@@ -257,6 +260,13 @@ Status VerticalBlockReader::init(const ReaderParams& 
read_params,
         DCHECK(false) << "No next row function for type:" << 
tablet()->keys_type();
         break;
     }
+
+    // Use sparse optimization flag from ReaderParams (calculated in 
merger.cpp based on avg_row_bytes threshold)
+    _enable_sparse_optimization = read_params.enable_sparse_optimization;
+
+    // Save sample_info pointer for null count tracking
+    _sample_info = sample_info;
+
     return Status::OK();
 }
 
@@ -537,9 +547,56 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
         _eof = *eof;
         return Status::OK();
     }
-    int target_block_row = 0;
+
+    // Value column processing - use batch optimization
     auto target_columns = block->mutate_columns();
-    size_t column_count = block->columns();
+    const size_t column_count = block->columns();
+
+    // Try to use batch optimization for value column compaction
+    // Only use batch optimization when sparse optimization is enabled
+    {
+        auto* mask_iter = 
dynamic_cast<VerticalMaskMergeIterator*>(_vcollect_iter.get());
+        if (mask_iter != nullptr && _enable_sparse_optimization) {
+            // Step 1: Batch fetch row information
+            std::vector<RowBatch> batches;
+            size_t actual_rows = 0;
+            RETURN_IF_ERROR(mask_iter->unique_key_next_batch(&batches, 
_reader_context.batch_size,
+                                                             &actual_rows));
+            if (actual_rows == 0) {
+                *eof = true;
+                _eof = true;
+                return Status::OK();
+            }
+
+            // Step 2: Prepare columns - pre-fill NULL for fixed-width, 
reserve for others
+            std::vector<ColumnNullable*> nullable_dst_cols;
+            std::vector<bool> supports_replace;
+            _prepare_sparse_columns(target_columns, actual_rows, 
nullable_dst_cols,
+                                    supports_replace);
+
+            // Step 3: Process each batch
+            size_t dst_offset =
+                    target_columns.empty() ? 0 : target_columns[0]->size() - 
actual_rows;
+            for (const auto& batch : batches) {
+                Block* src_block = batch.block.get();
+                DCHECK(src_block != nullptr);
+                DCHECK(src_block->columns() == column_count);
+
+                for (size_t col_idx = 0; col_idx < column_count; ++col_idx) {
+                    const auto& src_col = 
src_block->get_by_position(col_idx).column;
+                    _process_sparse_column(nullable_dst_cols[col_idx], 
supports_replace[col_idx],
+                                           target_columns[col_idx], src_col, 
batch, dst_offset);
+                }
+                dst_offset += batch.count;
+            }
+
+            block->set_columns(std::move(target_columns));
+            return Status::OK();
+        }
+    }
+
+    // Fallback: original row-by-row processing
+    int target_block_row = 0;
     do {
         Status res = _vcollect_iter->unique_key_next_row(&_next_row);
         if (UNLIKELY(!res.ok())) {
@@ -565,5 +622,109 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
     return Status::OK();
 }
 
+// Prepare columns for sparse optimization: pre-fill NULL for fixed-width 
types, reserve for others
+void VerticalBlockReader::_prepare_sparse_columns(MutableColumns& columns, 
size_t actual_rows,
+                                                  
std::vector<ColumnNullable*>& nullable_dst_cols,
+                                                  std::vector<bool>& 
supports_replace) {
+    size_t column_count = columns.size();
+    nullable_dst_cols.resize(column_count, nullptr);
+    supports_replace.resize(column_count, false);
+
+    for (size_t col_idx = 0; col_idx < column_count; ++col_idx) {
+        auto& col = columns[col_idx];
+        if (col->is_nullable()) {
+            auto* nullable_col =
+                    assert_cast<ColumnNullable*, 
TypeCheckOnRelease::DISABLE>(col.get());
+            nullable_dst_cols[col_idx] = nullable_col;
+            supports_replace[col_idx] = 
nullable_col->support_replace_column_data_range();
+
+            if (supports_replace[col_idx]) {
+                // Fixed-width types: pre-fill with NULL for sparse 
optimization
+                size_t new_size = nullable_col->size() + actual_rows;
+                
nullable_col->get_null_map_column().get_data().resize_fill(new_size, 1);
+                nullable_col->get_nested_column().resize(new_size);
+            } else {
+                // Variable-length types: just reserve space
+                col->reserve(col->size() + actual_rows);
+            }
+        } else {
+            // Non-Nullable column, reserve space
+            col->reserve(col->size() + actual_rows);
+        }
+    }
+}
+
+// Copy non-NULL runs from source to destination using run-length encoding
+void VerticalBlockReader::_copy_non_null_runs(ColumnNullable* nullable_dst,
+                                              const ColumnNullable* 
nullable_src,
+                                              const uint8_t* null_map, const 
RowBatch& batch,
+                                              size_t dst_offset) {
+    size_t i = 0;
+    while (i < batch.count) {
+        // Skip NULL values (null_map == 1)
+        while (i < batch.count && null_map[batch.start_row + i] != 0) {
+            i++;
+        }
+        if (i >= batch.count) break;
+
+        // Found start of non-NULL run
+        size_t run_start = i;
+        while (i < batch.count && null_map[batch.start_row + i] == 0) {
+            i++;
+        }
+        size_t run_length = i - run_start;
+
+        // Batch copy this non-NULL run
+        nullable_dst->replace_column_data_range(*nullable_src, batch.start_row 
+ run_start,
+                                                run_length, dst_offset + 
run_start);
+    }
+}
+
+// Process a single column for one batch in sparse optimization
+void VerticalBlockReader::_process_sparse_column(ColumnNullable* nullable_dst,
+                                                 bool supports_replace,
+                                                 MutableColumnPtr& target_col,
+                                                 const ColumnPtr& src_col, 
const RowBatch& batch,
+                                                 size_t dst_offset) {
+    if (nullable_dst != nullptr && supports_replace) {
+        // Sparse optimization path for fixed-width types
+        const auto* nullable_src = static_cast<const 
ColumnNullable*>(src_col.get());
+        const auto& null_map = nullable_src->get_null_map_data();
+
+        size_t non_null_count = simd::count_zero_num(
+                reinterpret_cast<const int8_t*>(null_map.data() + 
batch.start_row),
+                static_cast<size_t>(batch.count));
+
+        if (_sample_info != nullptr) {
+            _sample_info->null_count += (batch.count - non_null_count);
+        }
+
+        if (non_null_count == 0) {
+            // All NULL, skip (already pre-filled)
+        } else if (non_null_count == batch.count) {
+            // All non-NULL, use batch copy
+            nullable_dst->replace_column_data_range(*nullable_src, 
batch.start_row, batch.count,
+                                                    dst_offset);
+        } else {
+            // Mixed case: copy non-NULL runs
+            _copy_non_null_runs(nullable_dst, nullable_src, null_map.data(), 
batch, dst_offset);
+        }
+    } else if (nullable_dst != nullptr) {
+        // Variable-length nullable types
+        if (_sample_info != nullptr) {
+            const auto* nullable_src = static_cast<const 
ColumnNullable*>(src_col.get());
+            const auto& null_map = nullable_src->get_null_map_data();
+            size_t non_null_count = simd::count_zero_num(
+                    reinterpret_cast<const int8_t*>(null_map.data() + 
batch.start_row),
+                    static_cast<size_t>(batch.count));
+            _sample_info->null_count += (batch.count - non_null_count);
+        }
+        target_col->insert_range_from(*src_col, batch.start_row, batch.count);
+    } else {
+        // Non-Nullable column
+        target_col->insert_range_from(*src_col, batch.start_row, batch.count);
+    }
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index 2043db4b00a..ca48e01cf99 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -44,6 +44,7 @@ struct RowsetId;
 
 namespace vectorized {
 class RowSourcesBuffer;
+struct RowBatch;
 
 class VerticalBlockReader final : public TabletReader {
 public:
@@ -93,6 +94,16 @@ private:
     size_t _copy_agg_data();
     void _update_agg_value(MutableColumns& columns, int begin, int end, bool 
is_close = true);
 
+    // Helper functions for sparse column optimization
+    void _prepare_sparse_columns(MutableColumns& columns, size_t actual_rows,
+                                 std::vector<ColumnNullable*>& 
nullable_dst_cols,
+                                 std::vector<bool>& supports_replace);
+    void _process_sparse_column(ColumnNullable* nullable_dst, bool 
supports_replace,
+                                MutableColumnPtr& target_col, const ColumnPtr& 
src_col,
+                                const RowBatch& batch, size_t dst_offset);
+    void _copy_non_null_runs(ColumnNullable* nullable_dst, const 
ColumnNullable* nullable_src,
+                             const uint8_t* null_map, const RowBatch& batch, 
size_t dst_offset);
+
 private:
     size_t _id;
     std::shared_ptr<RowwiseIterator> _vcollect_iter;
@@ -125,6 +136,13 @@ private:
     phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> 
_temp_ref_map;
 
     std::vector<RowLocation> _block_row_locations;
+
+    // For sparse column compaction optimization
+    // Set from reader_params.enable_sparse_optimization (calculated in 
Merger::vertical_merge_rowsets)
+    bool _enable_sparse_optimization = false;
+
+    // For tracking NULL cell count during compaction (used for sparse 
optimization threshold)
+    CompactionSampleInfo* _sample_info = nullptr;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 27030bb10e2..064b2ab6e14 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -160,6 +160,22 @@ size_t RowSourcesBuffer::same_source_count(uint16_t 
source, size_t limit) {
     return result;
 }
 
+size_t RowSourcesBuffer::same_source_continuous_non_agg_count(uint16_t source, 
size_t limit) {
+    size_t result = 0;
+    int64_t start = _buf_idx;
+    int64_t end = _buffer.size();
+    while (result < limit && start < end) {
+        RowSource rs(_buffer[start]);
+        // Stop if different source or agg_flag=true
+        if (rs.get_source_num() != source || rs.agg_flag()) {
+            break;
+        }
+        ++result;
+        ++start;
+    }
+    return result;
+}
+
 Status RowSourcesBuffer::_create_buffer_file() {
     if (_fd != -1) {
         return Status::OK();
@@ -377,6 +393,41 @@ Status VerticalMergeIteratorContext::advance() {
     return Status::OK();
 }
 
+Status VerticalMergeIteratorContext::advance_by(size_t n) {
+    if (n == 0) {
+        return Status::OK();
+    }
+    _is_same = false;
+
+    while (n > 0) {
+        // Calculate how many rows we can advance within current block
+        // _index_in_block points to current row, so remaining = total - 
current - 1
+        int64_t remaining_in_block = static_cast<int64_t>(_block->rows()) - 
_index_in_block - 1;
+
+        if (static_cast<int64_t>(n) <= remaining_in_block) {
+            // All n rows are within current block
+            _index_in_block += n;
+            return Status::OK();
+        }
+
+        // Need to cross block boundary
+        // Consume all remaining rows in current block (including advancing 
past current row)
+        n -= remaining_in_block + 1;
+
+        // Load next block
+        RETURN_IF_ERROR(_load_next_block());
+        if (!_valid) {
+            if (n > 0) {
+                return Status::InternalError("advance_by exceeded available 
data");
+            }
+            break;
+        }
+        // After _load_next_block(), _index_in_block is set to -1
+        // Next iteration will correctly calculate remaining_in_block
+    }
+    return Status::OK();
+}
+
 Status VerticalMergeIteratorContext::_load_next_block() {
     do {
         if (_block != nullptr) {
@@ -743,6 +794,109 @@ Status 
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
     return st;
 }
 
+Status VerticalMaskMergeIterator::unique_key_next_batch(std::vector<RowBatch>* 
batches,
+                                                        size_t max_rows, 
size_t* actual_rows) {
+    DCHECK(_row_sources_buf);
+    batches->clear();
+    *actual_rows = 0;
+
+    std::shared_ptr<Block> current_block;
+    uint32_t batch_start = 0;
+    uint32_t batch_count = 0;
+
+    while (*actual_rows < max_rows) {
+        // Check if RowSourceBuffer has remaining data
+        auto st = _row_sources_buf->has_remaining();
+        if (!st.ok()) {
+            if (st.is<END_OF_FILE>()) {
+                break;
+            }
+            return st;
+        }
+
+        auto row_source = _row_sources_buf->current();
+        uint16_t order = row_source.get_source_num();
+        auto& ctx = _origin_iter_ctx[order];
+
+        // Initialize context
+        RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
+        if (!ctx->valid()) {
+            return Status::InternalError("VerticalMergeIteratorContext not 
valid");
+        }
+
+        // Handle is_first_row and position to current row
+        // Match the logic of unique_key_next_row:
+        // - Only skip advance when is_first_row=true AND agg_flag=false
+        // - When is_first_row=true AND agg_flag=true, we must advance to skip 
this row
+        bool is_first = ctx->is_first_row();
+        if (is_first && !row_source.agg_flag()) {
+            // First row in block with non-agg flag, don't advance
+            ctx->set_is_first_row(false);
+        } else {
+            // All other cases: advance to position at current row
+            // Keep is_first_row=true when skipping leading agg rows.
+            RETURN_IF_ERROR(ctx->advance());
+        }
+
+        // If current row has agg_flag=true, skip it (single row)
+        if (row_source.agg_flag()) {
+            _row_sources_buf->advance();
+            _filtered_rows++;
+            continue;
+        }
+
+        // Current row is non-agg, count continuous same-source non-agg rows
+        // Limit by: remaining capacity and current block's remaining rows
+        size_t limit = std::min(max_rows - *actual_rows, ctx->remain_rows());
+        size_t run_count = 
_row_sources_buf->same_source_continuous_non_agg_count(order, limit);
+
+        // run_count >= 1 (current row is non-agg)
+        uint32_t start_row = ctx->current_row_pos();
+
+        // Save block pointer before advance_by (block won't change due to 
limit)
+        auto block = ctx->block_ptr();
+
+        // Advance remaining rows in this run (first row already positioned)
+        if (run_count > 1) {
+            RETURN_IF_ERROR(ctx->advance_by(run_count - 1));
+        }
+
+        _row_sources_buf->advance(run_count);
+
+        // Try to merge into current batch or create new batch
+        if (current_block == block && start_row == batch_start + batch_count) {
+            // Continuous with previous batch, merge
+            batch_count += static_cast<uint32_t>(run_count);
+        } else {
+            // Save previous batch if exists
+            if (current_block != nullptr && batch_count > 0) {
+                batches->emplace_back(current_block, batch_start, batch_count);
+            }
+            // Start new batch
+            current_block = std::move(block);
+            batch_start = start_row;
+            batch_count = static_cast<uint32_t>(run_count);
+        }
+
+        *actual_rows += run_count;
+    }
+
+    // Save the last batch
+    if (current_block != nullptr && batch_count > 0) {
+        batches->emplace_back(current_block, batch_start, batch_count);
+    }
+
+    // Check if we've reached the end
+    if (*actual_rows == 0) {
+        auto st = _row_sources_buf->has_remaining();
+        if (st.is<END_OF_FILE>()) {
+            RETURN_IF_ERROR(check_all_iter_finished());
+        }
+    }
+
+    return Status::OK();
+}
+
 Status VerticalMaskMergeIterator::next_batch(Block* block) {
     DCHECK(_row_sources_buf);
     size_t rows = 0;
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index 92ab78e531d..45865d8c7c1 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -121,6 +121,13 @@ public:
     // return continuous agg_flag=true count from index
     size_t continuous_agg_count(uint64_t index);
 
+    // Count continuous rows from current position that:
+    // 1. Have the same source number
+    // 2. Have agg_flag=false (non-aggregated rows)
+    // Stops at first agg_flag=true row or different source
+    // Used for batch optimization in unique_key_next_batch
+    size_t same_source_continuous_non_agg_count(uint16_t source, size_t limit);
+
 private:
     Status _create_buffer_file();
     Status _serialize();
@@ -168,6 +175,10 @@ public:
 
     Status advance();
 
+    // Advance by n rows at once (batch optimization)
+    // More efficient than calling advance() n times when n rows are in same 
block
+    Status advance_by(size_t n);
+
     // Return if it has remaining data in this context.
     // Only when this function return true, current_row()
     // will return a valid row
@@ -213,6 +224,14 @@ public:
         }
     }
 
+    // Get current row position in block (for batch optimization)
+    uint32_t current_row_pos() const { return _index_in_block; }
+
+    // Get block pointer (for batch optimization)
+    Block* block() const { return _block.get(); }
+
+    const std::shared_ptr<Block>& block_ptr() const { return _block; }
+
 private:
     // Load next block into _block
     Status _load_next_block();
@@ -366,6 +385,17 @@ private:
     std::vector<RowLocation> _block_row_locations;
 };
 
+// --------------- RowBatch for batch optimization ------------- //
+// Batch of continuous rows from the same segment context
+struct RowBatch {
+    std::shared_ptr<Block> block; // source block snapshot
+    uint32_t start_row;           // start row position in source block
+    uint32_t count;               // number of rows
+
+    RowBatch(std::shared_ptr<Block> b, uint32_t start, uint32_t cnt)
+            : block(std::move(b)), start_row(start), count(cnt) {}
+};
+
 // --------------- VerticalMaskMergeIterator ------------- //
 class VerticalMaskMergeIterator : public RowwiseIterator {
 public:
@@ -390,6 +420,11 @@ public:
 
     Status unique_key_next_row(IteratorRowRef* ref) override;
 
+    // Batch version of unique_key_next_row for performance optimization
+    // Returns batches of continuous rows from the same segment
+    Status unique_key_next_batch(std::vector<RowBatch>* batches, size_t 
max_rows,
+                                 size_t* actual_rows);
+
     uint64_t merged_rows() const override { return _filtered_rows; }
 
 private:
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 63f52fa9357..1876bc26371 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -1049,5 +1049,150 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) 
{
     }
 }
 
+// Test to cover _sample_info->null_count logic in vertical_block_reader.cpp
+// This test creates a UNIQUE_KEYS table with nullable columns and sparse data
+TEST_F(VerticalCompactionTest, 
TestUniqueKeyVerticalMergeWithNullableSparseColumn) {
+    // Save original threshold and set to 1 to always enable sparse 
optimization
+    double original_threshold = 
config::sparse_column_compaction_threshold_percent;
+    config::sparse_column_compaction_threshold_percent = 1.0;
+
+    auto num_input_rowset = 2;
+    auto num_segments = 1;
+    auto rows_per_segment = 100;
+
+    // Create schema with nullable column (c2 is nullable)
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    TabletSchemaPB tablet_schema_pb;
+    tablet_schema_pb.set_keys_type(UNIQUE_KEYS);
+    tablet_schema_pb.set_num_short_key_columns(1);
+    tablet_schema_pb.set_num_rows_per_row_block(1024);
+    tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+    tablet_schema_pb.set_next_column_unique_id(4);
+
+    ColumnPB* column_1 = tablet_schema_pb.add_column();
+    column_1->set_unique_id(1);
+    column_1->set_name("c1");
+    column_1->set_type("INT");
+    column_1->set_is_key(true);
+    column_1->set_length(4);
+    column_1->set_index_length(4);
+    column_1->set_is_nullable(false);
+    column_1->set_is_bf_column(false);
+
+    // c2 is nullable - this is key for testing _sample_info->null_count
+    ColumnPB* column_2 = tablet_schema_pb.add_column();
+    column_2->set_unique_id(2);
+    column_2->set_name("c2");
+    column_2->set_type("INT");
+    column_2->set_length(4);
+    column_2->set_index_length(4);
+    column_2->set_is_key(false);
+    column_2->set_is_nullable(true); // nullable column
+    column_2->set_is_bf_column(false);
+
+    // DELETE_SIGN column required for unique keys
+    ColumnPB* column_3 = tablet_schema_pb.add_column();
+    column_3->set_unique_id(3);
+    column_3->set_name(DELETE_SIGN);
+    column_3->set_type("TINYINT");
+    column_3->set_length(1);
+    column_3->set_index_length(1);
+    column_3->set_is_nullable(false);
+    column_3->set_is_key(false);
+    column_3->set_is_bf_column(false);
+
+    tablet_schema->init_from_pb(tablet_schema_pb);
+
+    // Create input rowsets with NULL values in c2
+    std::vector<RowsetSharedPtr> input_rowsets;
+    for (auto i = 0; i < num_input_rowset; i++) {
+        RowsetWriterContext rowset_writer_context;
+        static int64_t inc_id = 2000;
+        RowsetId rowset_id;
+        rowset_id.init(inc_id++);
+        rowset_writer_context.rowset_id = rowset_id;
+        rowset_writer_context.tablet_id = 12345;
+        rowset_writer_context.tablet_schema_hash = 1111;
+        rowset_writer_context.partition_id = 10;
+        rowset_writer_context.rowset_type = BETA_ROWSET;
+        rowset_writer_context.tablet_path = absolute_dir + "/tablet_path";
+        rowset_writer_context.rowset_state = VISIBLE;
+        rowset_writer_context.tablet_schema = tablet_schema;
+        rowset_writer_context.version = Version(i * 10, i * 10);
+        rowset_writer_context.segments_overlap = NONOVERLAPPING;
+
+        auto res = RowsetFactory::create_rowset_writer(*engine_ref, 
rowset_writer_context, true);
+        ASSERT_TRUE(res.has_value()) << res.error();
+        auto rowset_writer = std::move(res).value();
+
+        // Create block with nullable c2 column
+        vectorized::Block block = tablet_schema->create_block();
+        auto columns = block.mutate_columns();
+
+        for (int rid = 0; rid < rows_per_segment; ++rid) {
+            int32_t c1 = i * rows_per_segment + rid;
+            columns[0]->insert_data((const char*)&c1, sizeof(c1));
+
+            // Insert NULL for most rows (sparse pattern: 90% NULL)
+            if (rid % 10 == 0) {
+                // non-NULL value
+                int32_t c2 = c1 * 10;
+                columns[1]->insert_data((const char*)&c2, sizeof(c2));
+            } else {
+                // NULL value
+                columns[1]->insert_default();
+            }
+
+            uint8_t delete_sign = 0;
+            columns[2]->insert_data((const char*)&delete_sign, 
sizeof(delete_sign));
+        }
+
+        auto s = rowset_writer->add_block(&block);
+        ASSERT_TRUE(s.ok()) << s;
+        s = rowset_writer->flush();
+        ASSERT_TRUE(s.ok()) << s;
+
+        RowsetSharedPtr rowset;
+        ASSERT_EQ(Status::OK(), rowset_writer->build(rowset));
+        input_rowsets.push_back(rowset);
+    }
+
+    // Create input rowset readers
+    std::vector<RowsetReaderSharedPtr> input_rs_readers;
+    for (auto& rowset : input_rowsets) {
+        RowsetReaderSharedPtr rs_reader;
+        ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
+        input_rs_readers.push_back(std::move(rs_reader));
+    }
+
+    // Create output rowset writer
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
+    auto res = RowsetFactory::create_rowset_writer(*engine_ref, 
writer_context, true);
+    ASSERT_TRUE(res.has_value()) << res.error();
+    auto output_rs_writer = std::move(res).value();
+
+    // Create tablet and run vertical merge
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
+    Merger::Statistics stats;
+    RowIdConversion rowid_conversion;
+    stats.rowid_conversion = &rowid_conversion;
+
+    // This will trigger the _sample_info->null_count logic in 
vertical_block_reader.cpp
+    auto s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
+                                            *tablet_schema, input_rs_readers,
+                                            output_rs_writer.get(), 10000, 
num_segments, &stats);
+    ASSERT_TRUE(s.ok()) << s;
+
+    RowsetSharedPtr out_rowset;
+    ASSERT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
+
+    // Verify output
+    EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), num_input_rowset * 
rows_per_segment);
+
+    // Restore original threshold
+    config::sparse_column_compaction_threshold_percent = original_threshold;
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/test/vec/olap/vertical_merge_iterator_test.cpp 
b/be/test/vec/olap/vertical_merge_iterator_test.cpp
new file mode 100644
index 00000000000..b610f646916
--- /dev/null
+++ b/be/test/vec/olap/vertical_merge_iterator_test.cpp
@@ -0,0 +1,1130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/olap/vertical_merge_iterator.h"
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <random>
+
+#include "common/config.h"
+#include "util/faststring.h"
+#include "util/rle_encoding.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_vector.h"
+#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+
+class SparseColumnOptimizationTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        // Enable sparse column optimization for tests (use max threshold to 
always enable)
+        original_threshold_percent_ = 
config::sparse_column_compaction_threshold_percent;
+        config::sparse_column_compaction_threshold_percent = 1.0;
+    }
+
+    void TearDown() override {
+        // Reset to original value
+        config::sparse_column_compaction_threshold_percent = 
original_threshold_percent_;
+    }
+
+    // Helper function to create a nullable column with specific NULL pattern
+    static ColumnNullable::MutablePtr create_nullable_column(const 
std::vector<Int64>& values,
+                                                             const 
std::vector<bool>& null_flags) {
+        EXPECT_EQ(values.size(), null_flags.size());
+
+        auto nested_column = ColumnInt64::create();
+        auto null_map = ColumnUInt8::create();
+
+        for (size_t i = 0; i < values.size(); ++i) {
+            nested_column->insert_value(values[i]);
+            null_map->insert_value(null_flags[i] ? 1 : 0);
+        }
+
+        return ColumnNullable::create(std::move(nested_column), 
std::move(null_map));
+    }
+
+    // Helper to count non-NULL values using SIMD
+    static size_t count_non_null(const ColumnNullable* col, size_t start, 
size_t count) {
+        const auto& null_map = col->get_null_map_data();
+        return simd::count_zero_num(reinterpret_cast<const 
int8_t*>(null_map.data() + start),
+                                    count);
+    }
+
+    // Helper to check if all values in range are NULL
+    static bool is_all_null(const ColumnNullable* col, size_t start, size_t 
count) {
+        return count_non_null(col, start, count) == 0;
+    }
+
+    // Helper to check if all values in range are non-NULL
+    static bool is_all_non_null(const ColumnNullable* col, size_t start, 
size_t count) {
+        return count_non_null(col, start, count) == count;
+    }
+
+    // Simulate copy_rows logic for nullable columns with sparse optimization
+    static void copy_rows_with_optimization(const ColumnNullable* src, size_t 
start, size_t count,
+                                            IColumn* dst_col) {
+        auto* dst_mut = dst_col->assume_mutable().get();
+
+        const size_t non_null_count = count_non_null(src, start, count);
+
+        if (non_null_count == 0) {
+            // Case 1: All NULL - batch fill with defaults
+            dst_mut->insert_many_defaults(count);
+        } else if (non_null_count == count || non_null_count > count / 2) {
+            // Case 2: All non-NULL or non-NULL ratio > 50% - direct copy
+            dst_mut->insert_range_from(*src, start, count);
+        } else {
+            // Case 3: Sparse mixed (non-NULL < 50%) - fill NULL first, then 
replace
+            const size_t dst_start = dst_mut->size();
+            dst_mut->insert_many_defaults(count);
+
+            auto* nullable_dst = assert_cast<ColumnNullable*>(dst_mut);
+            const auto& null_map = src->get_null_map_data();
+
+            for (size_t row = 0; row < count; row++) {
+                if (null_map[start + row] == 0) { // 0 means non-NULL
+                    nullable_dst->replace_column_data(*src, start + row, 
dst_start + row);
+                }
+            }
+        }
+    }
+
+    // Original copy_rows logic (direct copy)
+    static void copy_rows_original(const IColumn* src, size_t start, size_t 
count,
+                                   IColumn* dst_col) {
+        dst_col->assume_mutable()->insert_range_from(*src, start, count);
+    }
+
+    // Helper to compare two nullable columns
+    static bool columns_equal(const ColumnNullable* col1, const 
ColumnNullable* col2) {
+        if (col1->size() != col2->size()) {
+            return false;
+        }
+
+        const auto& null_map1 = col1->get_null_map_data();
+        const auto& null_map2 = col2->get_null_map_data();
+        const auto& nested1 = col1->get_nested_column();
+        const auto& nested2 = col2->get_nested_column();
+
+        for (size_t i = 0; i < col1->size(); ++i) {
+            if (null_map1[i] != null_map2[i]) {
+                return false;
+            }
+            // Only compare nested data for non-NULL rows
+            if (null_map1[i] == 0) {
+                if (nested1.compare_at(i, i, nested2, 1) != 0) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    double original_threshold_percent_ = 0.0;
+};
+
+TEST_F(SparseColumnOptimizationTest, AllNullColumn) {
+    // Test Case 1: All NULL column
+    std::vector<Int64> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    std::vector<bool> null_flags = {true, true, true, true, true, true, true, 
true, true, true};
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    // Create destination columns
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    // Copy with optimization
+    copy_rows_with_optimization(nullable_src, 0, 10, dst_optimized.get());
+    // Copy with original method
+    copy_rows_original(src.get(), 0, 10, dst_original.get());
+
+    // Verify results are equal
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    // Verify all are NULL
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(result->size(), 10);
+    EXPECT_TRUE(is_all_null(result, 0, 10));
+}
+
+TEST_F(SparseColumnOptimizationTest, AllNonNullColumn) {
+    // Test Case 2: All non-NULL column
+    std::vector<Int64> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    std::vector<bool> null_flags = {false, false, false, false, false,
+                                    false, false, false, false, false};
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    copy_rows_with_optimization(nullable_src, 0, 10, dst_optimized.get());
+    copy_rows_original(src.get(), 0, 10, dst_original.get());
+
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    // Verify all are non-NULL
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(result->size(), 10);
+    EXPECT_TRUE(is_all_non_null(result, 0, 10));
+
+    // Verify values
+    const auto& nested = assert_cast<const 
ColumnInt64&>(result->get_nested_column());
+    for (size_t i = 0; i < 10; ++i) {
+        EXPECT_EQ(nested.get_element(i), static_cast<Int64>(i + 1));
+    }
+}
+
+TEST_F(SparseColumnOptimizationTest, SparseMixedColumn) {
+    // Test Case 3: Sparse mixed column (< 50% non-NULL)
+    // 20% non-NULL rate
+    std::vector<Int64> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    std::vector<bool> null_flags = {false, true, true, true, true, false, 
true, true, true, true};
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    copy_rows_with_optimization(nullable_src, 0, 10, dst_optimized.get());
+    copy_rows_original(src.get(), 0, 10, dst_original.get());
+
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    // Verify count
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(result->size(), 10);
+    EXPECT_EQ(count_non_null(result, 0, 10), 2);
+
+    // Verify specific values
+    EXPECT_FALSE(result->is_null_at(0));
+    EXPECT_TRUE(result->is_null_at(1));
+    EXPECT_FALSE(result->is_null_at(5));
+
+    const auto& nested = assert_cast<const 
ColumnInt64&>(result->get_nested_column());
+    EXPECT_EQ(nested.get_element(0), 1);
+    EXPECT_EQ(nested.get_element(5), 6);
+}
+
+TEST_F(SparseColumnOptimizationTest, DenseMixedColumn) {
+    // Test Case: Dense mixed column (> 50% non-NULL, should use direct copy)
+    // 80% non-NULL rate
+    std::vector<Int64> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    std::vector<bool> null_flags = {false, false, false, false, true,
+                                    false, false, false, false, true};
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    copy_rows_with_optimization(nullable_src, 0, 10, dst_optimized.get());
+    copy_rows_original(src.get(), 0, 10, dst_original.get());
+
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(count_non_null(result, 0, 10), 8);
+}
+
+TEST_F(SparseColumnOptimizationTest, PartialRangeCopy) {
+    // Test partial range copy
+    std::vector<Int64> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+    std::vector<bool> null_flags = {false, true, true, true, true, true, true, 
true, true, false};
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    // Copy middle range (indices 2-7, all NULL)
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    copy_rows_with_optimization(nullable_src, 2, 6, dst_optimized.get());
+    copy_rows_original(src.get(), 2, 6, dst_original.get());
+
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(result->size(), 6);
+    EXPECT_TRUE(is_all_null(result, 0, 6));
+}
+
+TEST_F(SparseColumnOptimizationTest, LargeSparseCopy) {
+    // Test with large sparse column (5% non-NULL rate, typical for sparse 
wide tables)
+    constexpr size_t num_rows = 1024;
+    std::vector<Int64> values(num_rows);
+    std::vector<bool> null_flags(num_rows);
+
+    for (size_t i = 0; i < num_rows; ++i) {
+        values[i] = static_cast<Int64>(i);
+        // Every 20th row is non-NULL (5% non-NULL rate)
+        null_flags[i] = (i % 20 != 0);
+    }
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    copy_rows_with_optimization(nullable_src, 0, num_rows, 
dst_optimized.get());
+    copy_rows_original(src.get(), 0, num_rows, dst_original.get());
+
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(result->size(), num_rows);
+    // 1024 / 20 = 51.2 -> 52 non-NULL rows (0, 20, 40, ..., 1000, 1020)
+    EXPECT_EQ(count_non_null(result, 0, num_rows), 52);
+}
+
+TEST_F(SparseColumnOptimizationTest, MultipleCopies) {
+    // Test multiple sequential copies to the same destination
+    std::vector<Int64> values = {1, 2, 3, 4, 5};
+    std::vector<bool> null_flags = {false, true, true, true, false};
+
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    auto dst_optimized = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto dst_original = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    // Copy the same source multiple times
+    for (int i = 0; i < 3; ++i) {
+        copy_rows_with_optimization(nullable_src, 0, 5, dst_optimized.get());
+        copy_rows_original(src.get(), 0, 5, dst_original.get());
+    }
+
+    EXPECT_TRUE(columns_equal(assert_cast<const 
ColumnNullable*>(dst_optimized.get()),
+                              assert_cast<const 
ColumnNullable*>(dst_original.get())));
+
+    const auto* result = assert_cast<const 
ColumnNullable*>(dst_optimized.get());
+    EXPECT_EQ(result->size(), 15);
+    EXPECT_EQ(count_non_null(result, 0, 15), 6); // 2 non-NULL per copy * 3 
copies
+}
+
+TEST_F(SparseColumnOptimizationTest, DisabledOptimization) {
+    // Test with optimization disabled (threshold = 0 means disabled)
+    config::sparse_column_compaction_threshold_percent = 0.0;
+
+    std::vector<Int64> values = {1, 2, 3, 4, 5};
+    std::vector<bool> null_flags = {true, true, true, true, true};
+
+    auto src = create_nullable_column(values, null_flags);
+
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+
+    // When disabled, should still work correctly via direct copy path
+    copy_rows_original(src.get(), 0, 5, dst.get());
+
+    const auto* result = assert_cast<const ColumnNullable*>(dst.get());
+    EXPECT_EQ(result->size(), 5);
+    EXPECT_TRUE(is_all_null(result, 0, 5));
+}
+
+TEST_F(SparseColumnOptimizationTest, ReplaceColumnDataRange) {
+    // Test replace_column_data_range functionality
+    std::vector<Int64> src_values = {1, 2, 3, 4, 5};
+    std::vector<bool> src_null_flags = {false, true, false, true, false};
+
+    auto src = create_nullable_column(src_values, src_null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    // Create destination with pre-filled NULLs
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto* nullable_dst = assert_cast<ColumnNullable*>(dst.get());
+
+    // Pre-fill with NULLs
+    nullable_dst->get_null_map_column().get_data().resize_fill(5, 1);
+    nullable_dst->get_nested_column().resize(5);
+
+    // Replace with source data
+    nullable_dst->replace_column_data_range(*nullable_src, 0, 5, 0);
+
+    // Verify results
+    EXPECT_EQ(nullable_dst->size(), 5);
+    const auto& null_map = nullable_dst->get_null_map_data();
+    EXPECT_EQ(null_map[0], 0); // non-NULL
+    EXPECT_EQ(null_map[1], 1); // NULL
+    EXPECT_EQ(null_map[2], 0); // non-NULL
+    EXPECT_EQ(null_map[3], 1); // NULL
+    EXPECT_EQ(null_map[4], 0); // non-NULL
+
+    // Verify values for non-NULL positions
+    const auto& nested = assert_cast<const 
ColumnInt64&>(nullable_dst->get_nested_column());
+    EXPECT_EQ(nested.get_element(0), 1);
+    EXPECT_EQ(nested.get_element(2), 3);
+    EXPECT_EQ(nested.get_element(4), 5);
+}
+
+TEST_F(SparseColumnOptimizationTest, CountZeroNumSIMD) {
+    // Test SIMD count_zero_num function
+    std::vector<int8_t> data(128);
+
+    // All zeros
+    std::fill(data.begin(), data.end(), 0);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 
static_cast<size_t>(data.size())), 128);
+
+    // All ones
+    std::fill(data.begin(), data.end(), 1);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 
static_cast<size_t>(data.size())), 0);
+
+    // Mixed: every 4th is zero
+    for (size_t i = 0; i < data.size(); ++i) {
+        data[i] = (i % 4 == 0) ? 0 : 1;
+    }
+    EXPECT_EQ(simd::count_zero_num(data.data(), 
static_cast<size_t>(data.size())), 32);
+
+    // Small sizes
+    EXPECT_EQ(simd::count_zero_num(data.data(), static_cast<size_t>(1)), 1);   
  // single zero
+    EXPECT_EQ(simd::count_zero_num(data.data() + 1, static_cast<size_t>(1)), 
0); // single one
+}
+
+// ==================== ColumnVector replace_column_data_range tests 
====================
+
+TEST_F(SparseColumnOptimizationTest, ColumnVectorReplaceDataRange) {
+    // Test ColumnVector::replace_column_data_range with memcpy optimization
+    auto src = ColumnInt64::create();
+    for (Int64 i = 1; i <= 10; ++i) {
+        src->insert_value(i * 100);
+    }
+
+    // Create destination column with pre-allocated space
+    auto dst = ColumnInt64::create();
+    for (int i = 0; i < 10; ++i) {
+        dst->insert_value(0); // Pre-fill with zeros
+    }
+
+    // Replace range [2, 6) from src to dst at position 3
+    dst->replace_column_data_range(*src, 2, 4, 3);
+
+    // Verify: dst[3..6] should be src[2..5] = {300, 400, 500, 600}
+    const auto& dst_data = dst->get_data();
+    EXPECT_EQ(dst_data[0], 0);   // unchanged
+    EXPECT_EQ(dst_data[1], 0);   // unchanged
+    EXPECT_EQ(dst_data[2], 0);   // unchanged
+    EXPECT_EQ(dst_data[3], 300); // src[2]
+    EXPECT_EQ(dst_data[4], 400); // src[3]
+    EXPECT_EQ(dst_data[5], 500); // src[4]
+    EXPECT_EQ(dst_data[6], 600); // src[5]
+    EXPECT_EQ(dst_data[7], 0);   // unchanged
+}
+
+TEST_F(SparseColumnOptimizationTest, ColumnVectorReplaceDataRangeFullCopy) {
+    // Test full range copy
+    constexpr size_t num_rows = 1024;
+    auto src = ColumnInt64::create();
+    for (size_t i = 0; i < num_rows; ++i) {
+        src->insert_value(static_cast<Int64>(i));
+    }
+
+    auto dst = ColumnInt64::create();
+    for (size_t i = 0; i < num_rows; ++i) {
+        dst->insert_value(-1);
+    }
+
+    // Replace entire range
+    dst->replace_column_data_range(*src, 0, num_rows, 0);
+
+    // Verify all values copied correctly
+    const auto& dst_data = dst->get_data();
+    for (size_t i = 0; i < num_rows; ++i) {
+        EXPECT_EQ(dst_data[i], static_cast<Int64>(i));
+    }
+}
+
+TEST_F(SparseColumnOptimizationTest, 
ColumnVectorReplaceDataRangeSingleElement) {
+    // Test single element replacement
+    auto src = ColumnInt64::create();
+    src->insert_value(42);
+    src->insert_value(99);
+
+    auto dst = ColumnInt64::create();
+    for (int i = 0; i < 5; ++i) {
+        dst->insert_value(0);
+    }
+
+    dst->replace_column_data_range(*src, 1, 1, 2);
+
+    EXPECT_EQ(dst->get_data()[2], 99);
+    EXPECT_EQ(dst->get_data()[0], 0); // unchanged
+    EXPECT_EQ(dst->get_data()[4], 0); // unchanged
+}
+
+// ==================== ColumnDecimal replace_column_data_range tests 
====================
+
+TEST_F(SparseColumnOptimizationTest, ColumnDecimalReplaceDataRange) {
+    // Test ColumnDecimal::replace_column_data_range with memcpy optimization
+    // Use ColumnDecimal128V3 which is ColumnDecimal<TYPE_DECIMAL128I>
+    auto src = ColumnDecimal128V3::create(0, 2); // scale = 2
+    for (int i = 1; i <= 10; ++i) {
+        src->insert_value(Decimal128V3(i * 100));
+    }
+
+    auto dst = ColumnDecimal128V3::create(0, 2);
+    for (int i = 0; i < 10; ++i) {
+        dst->insert_value(Decimal128V3(0));
+    }
+
+    // Replace range
+    dst->replace_column_data_range(*src, 2, 4, 3);
+
+    const auto& dst_data = dst->get_data();
+    EXPECT_EQ(dst_data[3], Decimal128V3(300));
+    EXPECT_EQ(dst_data[4], Decimal128V3(400));
+    EXPECT_EQ(dst_data[5], Decimal128V3(500));
+    EXPECT_EQ(dst_data[6], Decimal128V3(600));
+    EXPECT_EQ(dst_data[0], Decimal128V3(0)); // unchanged
+}
+
+// ==================== RowBatch structure tests ====================
+
+TEST_F(SparseColumnOptimizationTest, RowBatchConstruction) {
+    // Test RowBatch construction and member access
+    auto block = std::make_shared<Block>();
+
+    // Add a column to the block
+    auto col = ColumnInt64::create();
+    col->insert_value(1);
+    col->insert_value(2);
+    col->insert_value(3);
+    block->insert({std::move(col), std::make_shared<DataTypeInt64>(), 
"test_col"});
+
+    // Create RowBatch
+    RowBatch batch(block, 1, 2); // start_row=1, count=2
+
+    EXPECT_EQ(batch.block.get(), block.get());
+    EXPECT_EQ(batch.start_row, 1);
+    EXPECT_EQ(batch.count, 2);
+
+    // Verify block content accessible through batch
+    const auto& batch_col = batch.block->get_by_position(0).column;
+    EXPECT_EQ(batch_col->size(), 3);
+}
+
+TEST_F(SparseColumnOptimizationTest, RowBatchSharedPtrLifetime) {
+    // Test that RowBatch keeps block alive via shared_ptr
+    RowBatch batch(nullptr, 0, 0);
+
+    {
+        auto block = std::make_shared<Block>();
+        auto col = ColumnInt64::create();
+        col->insert_value(42);
+        block->insert({std::move(col), std::make_shared<DataTypeInt64>(), 
"col"});
+
+        batch = RowBatch(block, 0, 1);
+        // block goes out of scope here, but batch keeps it alive
+    }
+
+    // Block should still be accessible
+    EXPECT_NE(batch.block, nullptr);
+    EXPECT_EQ(batch.block->columns(), 1);
+    EXPECT_EQ(batch.block->get_by_position(0).column->size(), 1);
+}
+
+TEST_F(SparseColumnOptimizationTest, RowBatchVector) {
+    // Test vector of RowBatches (simulating batches from 
unique_key_next_batch)
+    std::vector<RowBatch> batches;
+
+    // Create multiple blocks and batches
+    for (int i = 0; i < 3; ++i) {
+        auto block = std::make_shared<Block>();
+        auto col = ColumnInt64::create();
+        for (int j = 0; j < 10; ++j) {
+            col->insert_value(i * 10 + j);
+        }
+        block->insert({std::move(col), std::make_shared<DataTypeInt64>(), 
"col"});
+
+        batches.emplace_back(block, i * 2, 5); // Different start positions
+    }
+
+    EXPECT_EQ(batches.size(), 3);
+
+    // Verify each batch
+    for (size_t i = 0; i < batches.size(); ++i) {
+        EXPECT_EQ(batches[i].start_row, i * 2);
+        EXPECT_EQ(batches[i].count, 5);
+        EXPECT_NE(batches[i].block, nullptr);
+    }
+}
+
+// ==================== All-non-NULL batch optimization tests 
====================
+
+TEST_F(SparseColumnOptimizationTest, AllNonNullBatchOptimization) {
+    // Test the complete flow for all-non-NULL scenario
+    // This simulates what happens in vertical_block_reader when 
non_null_count == batch.count
+
+    // Create source nullable column (all non-NULL)
+    std::vector<Int64> values = {10, 20, 30, 40, 50};
+    std::vector<bool> null_flags = {false, false, false, false, false};
+    auto src = create_nullable_column(values, null_flags);
+    const auto* nullable_src = assert_cast<const ColumnNullable*>(src.get());
+
+    // Create destination with pre-filled NULLs (simulating sparse 
optimization)
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto* nullable_dst = assert_cast<ColumnNullable*>(dst.get());
+
+    // Pre-fill with NULLs
+    nullable_dst->get_null_map_column().get_data().resize_fill(5, 1); // all 
NULL
+    nullable_dst->get_nested_column().resize(5);
+
+    // Check non-NULL count using SIMD
+    const auto& null_map = nullable_src->get_null_map_data();
+    size_t non_null_count =
+            simd::count_zero_num(reinterpret_cast<const 
int8_t*>(null_map.data()), 5);
+
+    EXPECT_EQ(non_null_count, 5); // All non-NULL
+
+    // Since all non-NULL, use batch replace (triggers memcpy optimization)
+    nullable_dst->replace_column_data_range(*nullable_src, 0, 5, 0);
+
+    // Verify results
+    EXPECT_EQ(nullable_dst->size(), 5);
+    for (size_t i = 0; i < 5; ++i) {
+        EXPECT_FALSE(nullable_dst->is_null_at(i));
+    }
+
+    const auto& nested = assert_cast<const 
ColumnInt64&>(nullable_dst->get_nested_column());
+    EXPECT_EQ(nested.get_element(0), 10);
+    EXPECT_EQ(nested.get_element(1), 20);
+    EXPECT_EQ(nested.get_element(2), 30);
+    EXPECT_EQ(nested.get_element(3), 40);
+    EXPECT_EQ(nested.get_element(4), 50);
+}
+
+TEST_F(SparseColumnOptimizationTest, BatchReplaceWithOffset) {
+    // Test batch replace at non-zero offset (simulating multiple batches)
+    std::vector<Int64> values1 = {1, 2, 3};
+    std::vector<bool> null_flags1 = {false, false, false};
+    auto src1 = create_nullable_column(values1, null_flags1);
+
+    std::vector<Int64> values2 = {4, 5};
+    std::vector<bool> null_flags2 = {false, false};
+    auto src2 = create_nullable_column(values2, null_flags2);
+
+    // Create destination pre-filled with NULLs
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto* nullable_dst = assert_cast<ColumnNullable*>(dst.get());
+
+    nullable_dst->get_null_map_column().get_data().resize_fill(5, 1);
+    nullable_dst->get_nested_column().resize(5);
+
+    // Batch 1: replace at offset 0, count 3
+    nullable_dst->replace_column_data_range(*src1, 0, 3, 0);
+
+    // Batch 2: replace at offset 3, count 2
+    nullable_dst->replace_column_data_range(*src2, 0, 2, 3);
+
+    // Verify
+    const auto& nested = assert_cast<const 
ColumnInt64&>(nullable_dst->get_nested_column());
+    EXPECT_EQ(nested.get_element(0), 1);
+    EXPECT_EQ(nested.get_element(1), 2);
+    EXPECT_EQ(nested.get_element(2), 3);
+    EXPECT_EQ(nested.get_element(3), 4);
+    EXPECT_EQ(nested.get_element(4), 5);
+
+    // All should be non-NULL
+    for (size_t i = 0; i < 5; ++i) {
+        EXPECT_FALSE(nullable_dst->is_null_at(i));
+    }
+}
+
+TEST_F(SparseColumnOptimizationTest, MixedBatchProcessing) {
+    // Test mixed scenario: some batches all-NULL, some all-non-NULL, some 
mixed
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto* nullable_dst = assert_cast<ColumnNullable*>(dst.get());
+
+    // Pre-fill with 15 NULLs
+    nullable_dst->get_null_map_column().get_data().resize_fill(15, 1);
+    nullable_dst->get_nested_column().resize(15);
+
+    // Batch 1 (offset 0-4): All NULL - skip (already pre-filled)
+    // Nothing to do
+
+    // Batch 2 (offset 5-9): All non-NULL - use batch replace
+    std::vector<Int64> values2 = {50, 51, 52, 53, 54};
+    std::vector<bool> null_flags2 = {false, false, false, false, false};
+    auto src2 = create_nullable_column(values2, null_flags2);
+    nullable_dst->replace_column_data_range(*src2, 0, 5, 5);
+
+    // Batch 3 (offset 10-14): Mixed - use per-row replace
+    std::vector<Int64> values3 = {100, 101, 102, 103, 104};
+    std::vector<bool> null_flags3 = {false, true, false, true, false};
+    auto src3 = create_nullable_column(values3, null_flags3);
+    const auto* nullable_src3 = assert_cast<const ColumnNullable*>(src3.get());
+    const auto& null_map3 = nullable_src3->get_null_map_data();
+
+    for (size_t i = 0; i < 5; ++i) {
+        if (null_map3[i] == 0) { // non-NULL
+            nullable_dst->replace_column_data(*src3, i, 10 + i);
+        }
+    }
+
+    // Verify results
+    // Batch 1: All NULL
+    for (size_t i = 0; i < 5; ++i) {
+        EXPECT_TRUE(nullable_dst->is_null_at(i)) << "Position " << i << " 
should be NULL";
+    }
+
+    // Batch 2: All non-NULL
+    const auto& nested = assert_cast<const 
ColumnInt64&>(nullable_dst->get_nested_column());
+    for (size_t i = 5; i < 10; ++i) {
+        EXPECT_FALSE(nullable_dst->is_null_at(i)) << "Position " << i << " 
should be non-NULL";
+        EXPECT_EQ(nested.get_element(i), 50 + (i - 5));
+    }
+
+    // Batch 3: Mixed pattern
+    EXPECT_FALSE(nullable_dst->is_null_at(10));
+    EXPECT_EQ(nested.get_element(10), 100);
+    EXPECT_TRUE(nullable_dst->is_null_at(11));
+    EXPECT_FALSE(nullable_dst->is_null_at(12));
+    EXPECT_EQ(nested.get_element(12), 102);
+    EXPECT_TRUE(nullable_dst->is_null_at(13));
+    EXPECT_FALSE(nullable_dst->is_null_at(14));
+    EXPECT_EQ(nested.get_element(14), 104);
+}
+
+TEST_F(SparseColumnOptimizationTest, LargeBatchMemcpyPerformance) {
+    // Test large batch to verify memcpy optimization works correctly
+    constexpr size_t num_rows = 4096; // Typical batch size
+
+    // Create source with all non-NULL values
+    auto src_nested = ColumnInt64::create();
+    auto src_null_map = ColumnUInt8::create();
+    for (size_t i = 0; i < num_rows; ++i) {
+        src_nested->insert_value(static_cast<Int64>(i * 2));
+        src_null_map->insert_value(0); // all non-NULL
+    }
+    auto src = ColumnNullable::create(std::move(src_nested), 
std::move(src_null_map));
+
+    // Create destination pre-filled with NULLs
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto* nullable_dst = assert_cast<ColumnNullable*>(dst.get());
+    nullable_dst->get_null_map_column().get_data().resize_fill(num_rows, 1);
+    nullable_dst->get_nested_column().resize(num_rows);
+
+    // Batch replace (should use memcpy)
+    nullable_dst->replace_column_data_range(*src, 0, num_rows, 0);
+
+    // Verify correctness
+    const auto& nested = assert_cast<const 
ColumnInt64&>(nullable_dst->get_nested_column());
+    for (size_t i = 0; i < num_rows; ++i) {
+        EXPECT_FALSE(nullable_dst->is_null_at(i));
+        EXPECT_EQ(nested.get_element(i), static_cast<Int64>(i * 2));
+    }
+}
+
+// ==================== RLE Batch Put Optimization Tests ====================
+
+TEST_F(SparseColumnOptimizationTest, RleBatchPutLargeRunLength) {
+    // Test RLE batch Put with large run_length (typical for sparse compaction)
+    // This tests the fast path optimization in RleEncoder::Put()
+    constexpr int bit_width = 1; // For boolean/null bitmap
+    faststring buffer;
+
+    RleEncoder<bool> encoder(&buffer, bit_width);
+
+    // Simulate putting 4096 NULLs (value=true) in one call
+    // This is the typical pattern when compacting a sparse column that's all 
NULL
+    encoder.Put(true, 4096);
+    encoder.Flush();
+
+    int encoded_len = encoder.len();
+    EXPECT_GT(encoded_len, 0);
+
+    // Decode and verify
+    RleDecoder<bool> decoder(buffer.data(), encoded_len, bit_width);
+    for (int i = 0; i < 4096; ++i) {
+        bool value;
+        EXPECT_TRUE(decoder.Get(&value));
+        EXPECT_TRUE(value) << "Position " << i << " should be true";
+    }
+}
+
+TEST_F(SparseColumnOptimizationTest, RleBatchPutMultipleCalls) {
+    // Test multiple batch Put calls with same value (should accumulate 
efficiently)
+    constexpr int bit_width = 1;
+    faststring buffer;
+
+    RleEncoder<bool> encoder(&buffer, bit_width);
+
+    // Multiple calls with same value - should use fast path after first 8
+    encoder.Put(true, 100);
+    encoder.Put(true, 200);
+    encoder.Put(true, 300);
+    encoder.Flush();
+
+    int encoded_len = encoder.len();
+    EXPECT_GT(encoded_len, 0);
+
+    // Decode and verify total count
+    RleDecoder<bool> decoder(buffer.data(), encoded_len, bit_width);
+    int count = 0;
+    bool value;
+    while (decoder.Get(&value)) {
+        EXPECT_TRUE(value);
+        count++;
+    }
+    EXPECT_EQ(count, 600);
+}
+
+TEST_F(SparseColumnOptimizationTest, RleBatchPutMixedValues) {
+    // Test mixed pattern: long run of same value, then different value
+    constexpr int bit_width = 1;
+    faststring buffer;
+
+    RleEncoder<bool> encoder(&buffer, bit_width);
+
+    // 1000 trues, then 500 falses, then 1000 trues
+    encoder.Put(true, 1000);
+    encoder.Put(false, 500);
+    encoder.Put(true, 1000);
+    encoder.Flush();
+
+    int encoded_len = encoder.len();
+    EXPECT_GT(encoded_len, 0);
+
+    // Decode and verify
+    RleDecoder<bool> decoder(buffer.data(), encoded_len, bit_width);
+    int true_count = 0, false_count = 0;
+    bool value;
+    while (decoder.Get(&value)) {
+        if (value) {
+            true_count++;
+        } else {
+            false_count++;
+        }
+    }
+    EXPECT_EQ(true_count, 2000);
+    EXPECT_EQ(false_count, 500);
+}
+
+TEST_F(SparseColumnOptimizationTest, RleBatchPutSmallRunLength) {
+    // Test with small run_length (should still work correctly)
+    constexpr int bit_width = 1;
+    faststring buffer;
+
+    RleEncoder<bool> encoder(&buffer, bit_width);
+
+    // Small batches
+    encoder.Put(true, 3);
+    encoder.Put(false, 2);
+    encoder.Put(true, 1);
+    encoder.Put(false, 4);
+    encoder.Flush();
+
+    int encoded_len = encoder.len();
+    EXPECT_GT(encoded_len, 0);
+
+    // Decode and verify pattern
+    RleDecoder<bool> decoder(buffer.data(), encoded_len, bit_width);
+    bool expected[] = {true, true, true, false, false, true, false, false, 
false, false};
+    for (int i = 0; i < 10; ++i) {
+        bool value;
+        EXPECT_TRUE(decoder.Get(&value));
+        EXPECT_EQ(value, expected[i]) << "Mismatch at position " << i;
+    }
+}
+
+TEST_F(SparseColumnOptimizationTest, RleBatchPutIntValues) {
+    // Test batch Put with integer values (used in RLE encoding of data)
+    constexpr int bit_width = 8;
+    faststring buffer;
+
+    RleEncoder<uint8_t> encoder(&buffer, bit_width);
+
+    // Put repeated integer values
+    encoder.Put(42, 100);
+    encoder.Put(0, 200); // Common case: repeated zeros
+    encoder.Put(255, 50);
+    encoder.Flush();
+
+    int encoded_len = encoder.len();
+    EXPECT_GT(encoded_len, 0);
+
+    // Decode and verify
+    RleDecoder<uint8_t> decoder(buffer.data(), encoded_len, bit_width);
+    uint8_t value;
+
+    for (int i = 0; i < 100; ++i) {
+        EXPECT_TRUE(decoder.Get(&value));
+        EXPECT_EQ(value, 42);
+    }
+    for (int i = 0; i < 200; ++i) {
+        EXPECT_TRUE(decoder.Get(&value));
+        EXPECT_EQ(value, 0);
+    }
+    for (int i = 0; i < 50; ++i) {
+        EXPECT_TRUE(decoder.Get(&value));
+        EXPECT_EQ(value, 255);
+    }
+}
+
+// ==================== Additional Column Type Tests ====================
+
+TEST_F(SparseColumnOptimizationTest, ColumnInt32ReplaceDataRange) {
+    // Test ColumnInt32 replace_column_data_range
+    auto src = ColumnInt32::create();
+    for (Int32 i = 1; i <= 10; ++i) {
+        src->insert_value(i * 10);
+    }
+
+    auto dst = ColumnInt32::create();
+    for (int i = 0; i < 10; ++i) {
+        dst->insert_value(0);
+    }
+
+    dst->replace_column_data_range(*src, 2, 4, 3);
+
+    const auto& dst_data = dst->get_data();
+    EXPECT_EQ(dst_data[0], 0);
+    EXPECT_EQ(dst_data[3], 30); // src[2]
+    EXPECT_EQ(dst_data[4], 40); // src[3]
+    EXPECT_EQ(dst_data[5], 50); // src[4]
+    EXPECT_EQ(dst_data[6], 60); // src[5]
+    EXPECT_EQ(dst_data[7], 0);
+}
+
+TEST_F(SparseColumnOptimizationTest, ColumnFloat64ReplaceDataRange) {
+    // Test ColumnFloat64 replace_column_data_range
+    auto src = ColumnFloat64::create();
+    for (int i = 1; i <= 10; ++i) {
+        src->insert_value(i * 1.5);
+    }
+
+    auto dst = ColumnFloat64::create();
+    for (int i = 0; i < 10; ++i) {
+        dst->insert_value(0.0);
+    }
+
+    dst->replace_column_data_range(*src, 0, 5, 2);
+
+    const auto& dst_data = dst->get_data();
+    EXPECT_DOUBLE_EQ(dst_data[0], 0.0);
+    EXPECT_DOUBLE_EQ(dst_data[1], 0.0);
+    EXPECT_DOUBLE_EQ(dst_data[2], 1.5); // src[0]
+    EXPECT_DOUBLE_EQ(dst_data[3], 3.0); // src[1]
+    EXPECT_DOUBLE_EQ(dst_data[4], 4.5); // src[2]
+    EXPECT_DOUBLE_EQ(dst_data[5], 6.0); // src[3]
+    EXPECT_DOUBLE_EQ(dst_data[6], 7.5); // src[4]
+    EXPECT_DOUBLE_EQ(dst_data[7], 0.0);
+}
+
+// ==================== SIMD count_zero_num Edge Cases ====================
+
+TEST_F(SparseColumnOptimizationTest, CountZeroNumEmpty) {
+    // Test empty input
+    std::vector<int8_t> data;
+    EXPECT_EQ(simd::count_zero_num(data.data(), 0), 0);
+}
+
+TEST_F(SparseColumnOptimizationTest, CountZeroNumUnalignedSizes) {
+    // Test various unaligned sizes (not multiples of SIMD width)
+    std::vector<int8_t> data(100, 0); // All zeros
+
+    // Test sizes that are not multiples of 16 (typical SIMD width)
+    EXPECT_EQ(simd::count_zero_num(data.data(), 1), 1);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 7), 7);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 15), 15);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 17), 17);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 31), 31);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 33), 33);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 63), 63);
+    EXPECT_EQ(simd::count_zero_num(data.data(), 65), 65);
+}
+
+TEST_F(SparseColumnOptimizationTest, CountZeroNumLargeArray) {
+    // Test large array (typical batch size)
+    constexpr size_t size = 4096;
+    std::vector<int8_t> data(size);
+
+    // All zeros
+    std::fill(data.begin(), data.end(), 0);
+    EXPECT_EQ(simd::count_zero_num(data.data(), size), size);
+
+    // All ones
+    std::fill(data.begin(), data.end(), 1);
+    EXPECT_EQ(simd::count_zero_num(data.data(), size), 0);
+
+    // 10% zeros (sparse pattern)
+    for (size_t i = 0; i < size; ++i) {
+        data[i] = (i % 10 == 0) ? 0 : 1;
+    }
+    EXPECT_EQ(simd::count_zero_num(data.data(), size), 410); // 4096/10 = 
409.6 -> 410
+}
+
+TEST_F(SparseColumnOptimizationTest, CountZeroNumRandomPattern) {
+    // Test random pattern
+    constexpr size_t size = 1000;
+    std::vector<int8_t> data(size);
+
+    std::mt19937 gen(42); // Fixed seed for reproducibility
+    std::uniform_int_distribution<> dis(0, 1);
+
+    size_t expected_zeros = 0;
+    for (size_t i = 0; i < size; ++i) {
+        data[i] = dis(gen);
+        if (data[i] == 0) expected_zeros++;
+    }
+
+    EXPECT_EQ(simd::count_zero_num(data.data(), size), expected_zeros);
+}
+
+// ==================== Threshold Configuration Tests ====================
+
+TEST_F(SparseColumnOptimizationTest, ThresholdZeroDisablesOptimization) {
+    // Test that threshold = 0 means optimization disabled
+    config::sparse_column_compaction_threshold_percent = 0.0;
+
+    // With threshold = 0, optimization should be disabled
+    // This affects the decision in Merger::vertical_merge_rowsets
+    double density = 0.2;
+    bool use_optimization = density <= 
config::sparse_column_compaction_threshold_percent;
+    EXPECT_FALSE(use_optimization);
+    EXPECT_DOUBLE_EQ(config::sparse_column_compaction_threshold_percent, 0.0);
+}
+
+TEST_F(SparseColumnOptimizationTest, ThresholdMaxAlwaysEnabled) {
+    // Test that threshold = 1 means always enabled
+    config::sparse_column_compaction_threshold_percent = 1.0;
+
+    // With max threshold, any density in [0, 1] will be <= threshold
+    double density = 1.0;
+    EXPECT_LE(density, config::sparse_column_compaction_threshold_percent);
+}
+
+TEST_F(SparseColumnOptimizationTest, ThresholdBasedDecision) {
+    // Test threshold-based decision logic
+    // density <= threshold means sparse (enable optimization)
+
+    config::sparse_column_compaction_threshold_percent = 0.1;
+
+    // Case 1: density = 0.05 <= 0.1, should enable
+    double density1 = 0.05;
+    bool use_optimization1 = (density1 <= 
config::sparse_column_compaction_threshold_percent);
+    EXPECT_TRUE(use_optimization1);
+
+    // Case 2: density = 0.1 <= 0.1, should enable (boundary)
+    double density2 = 0.1;
+    bool use_optimization2 = (density2 <= 
config::sparse_column_compaction_threshold_percent);
+    EXPECT_TRUE(use_optimization2);
+
+    // Case 3: density = 0.11 > 0.1, should disable
+    double density3 = 0.11;
+    bool use_optimization3 = (density3 <= 
config::sparse_column_compaction_threshold_percent);
+    EXPECT_FALSE(use_optimization3);
+
+    // Case 4: density = 0.9 > 0.1, should disable
+    double density4 = 0.9;
+    bool use_optimization4 = (density4 <= 
config::sparse_column_compaction_threshold_percent);
+    EXPECT_FALSE(use_optimization4);
+}
+
+TEST_F(SparseColumnOptimizationTest, AvgRowBytesCalculation) {
+    // Test average row bytes calculation: data_disk_size / total_rows
+    // Simulating the calculation done in Merger::vertical_merge_rowsets
+
+    // Case 1: Small tablet with sparse data
+    int64_t data_disk_size1 = 1000; // 1KB
+    int64_t total_rows1 = 100;
+    int64_t avg_row_bytes1 = data_disk_size1 / total_rows1;
+    EXPECT_EQ(avg_row_bytes1, 10); // 10 bytes per row - very sparse
+
+    // Case 2: Medium tablet
+    int64_t data_disk_size2 = 100000; // 100KB
+    int64_t total_rows2 = 1000;
+    int64_t avg_row_bytes2 = data_disk_size2 / total_rows2;
+    EXPECT_EQ(avg_row_bytes2, 100); // 100 bytes per row - boundary
+
+    // Case 3: Dense tablet
+    int64_t data_disk_size3 = 10000000; // 10MB
+    int64_t total_rows3 = 10000;
+    int64_t avg_row_bytes3 = data_disk_size3 / total_rows3;
+    EXPECT_EQ(avg_row_bytes3, 1000); // 1000 bytes per row - dense
+}
+
+// ==================== Boundary and Edge Cases ====================
+
+TEST_F(SparseColumnOptimizationTest, EmptyColumnReplace) {
+    // Test replace_column_data_range with zero count
+    auto src = ColumnInt64::create();
+    src->insert_value(100);
+
+    auto dst = ColumnInt64::create();
+    dst->insert_value(0);
+
+    // Replace 0 elements (should be no-op)
+    dst->replace_column_data_range(*src, 0, 0, 0);
+
+    EXPECT_EQ(dst->get_data()[0], 0); // Unchanged
+}
+
+TEST_F(SparseColumnOptimizationTest, SingleElementColumn) {
+    // Test with single element columns
+    auto src = ColumnInt64::create();
+    src->insert_value(42);
+
+    auto dst = ColumnInt64::create();
+    dst->insert_value(0);
+
+    dst->replace_column_data_range(*src, 0, 1, 0);
+
+    EXPECT_EQ(dst->get_data()[0], 42);
+}
+
+TEST_F(SparseColumnOptimizationTest, NullableColumnAllNullReplace) {
+    // Test replacing all-NULL range
+    std::vector<Int64> src_values = {0, 0, 0, 0, 0};
+    std::vector<bool> src_null_flags = {true, true, true, true, true};
+    auto src = create_nullable_column(src_values, src_null_flags);
+
+    auto dst = ColumnNullable::create(ColumnInt64::create(), 
ColumnUInt8::create());
+    auto* nullable_dst = assert_cast<ColumnNullable*>(dst.get());
+
+    // Pre-fill with non-NULL values
+    for (int i = 0; i < 5; ++i) {
+        nullable_dst->get_null_map_column().get_data().push_back(0); // 
non-NULL
+        
assert_cast<ColumnInt64&>(nullable_dst->get_nested_column()).insert_value(i * 
10);
+    }
+
+    // Replace with all-NULL source
+    nullable_dst->replace_column_data_range(*src, 0, 5, 0);
+
+    // All should be NULL now
+    for (size_t i = 0; i < 5; ++i) {
+        EXPECT_TRUE(nullable_dst->is_null_at(i));
+    }
+}
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to