This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c409fa0f58 [Feature](Compaction)Support full compaction (#21177)
c409fa0f58 is described below

commit c409fa0f58fa0b68fafe34c4796fc45abe78979c
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Sun Jul 16 13:21:15 2023 +0800

    [Feature](Compaction)Support full compaction (#21177)
---
 be/src/common/status.h                             |   3 +
 be/src/http/action/compaction_action.cpp           |  17 +-
 be/src/http/action/compaction_action.h             |   1 +
 be/src/io/io_common.h                              |   3 +-
 be/src/olap/compaction.cpp                         |   4 +
 be/src/olap/full_compaction.cpp                    | 215 +++++++++++++++++++++
 be/src/olap/full_compaction.h                      |  62 ++++++
 be/src/olap/olap_common.h                          |   2 +-
 be/src/olap/reader.cpp                             |   2 +
 be/src/olap/rowset/rowset_meta.h                   |   2 +-
 be/src/olap/single_replica_compaction.cpp          |   2 +
 be/src/olap/tablet.cpp                             |  18 ++
 be/src/olap/tablet.h                               |  17 ++
 .../data/compaction/test_full_compaction.out       |  41 ++++
 .../plugins/plugin_curl_requester.groovy           |   6 +
 .../suites/compaction/test_full_compaction.groovy  | 177 +++++++++++++++++
 16 files changed, 568 insertions(+), 4 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index eda6a34d55..d64b70615c 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -229,6 +229,8 @@ E(CUMULATIVE_INVALID_NEED_MERGED_VERSIONS, -2004);
 E(CUMULATIVE_ERROR_DELETE_ACTION, -2005);
 E(CUMULATIVE_MISS_VERSION, -2006);
 E(CUMULATIVE_CLONE_OCCURRED, -2007);
+E(FULL_NO_SUITABLE_VERSION, -2008);
+E(FULL_MISS_VERSION, -2009);
 E(META_INVALID_ARGUMENT, -3000);
 E(META_OPEN_DB_ERROR, -3001);
 E(META_KEY_NOT_FOUND, -3002);
@@ -285,6 +287,7 @@ inline bool capture_stacktrace(int code) {
         && code != ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST
         && code != ErrorCode::BE_NO_SUITABLE_VERSION
         && code != ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION
+        && code != ErrorCode::FULL_NO_SUITABLE_VERSION
         && code != ErrorCode::PUBLISH_VERSION_NOT_CONTINUOUS
         && code != ErrorCode::ROWSET_RENAME_FILE_FAILED
         && code != ErrorCode::SEGCOMPACTION_INIT_READER
diff --git a/be/src/http/action/compaction_action.cpp 
b/be/src/http/action/compaction_action.cpp
index 48dbe78ab4..81f6ad4d30 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -37,6 +37,7 @@
 #include "olap/base_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/cumulative_compaction_policy.h"
+#include "olap/full_compaction.h"
 #include "olap/olap_define.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -89,7 +90,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* 
req, std::string* j
     // check compaction_type equals 'base' or 'cumulative'
     std::string compaction_type = req->param(PARAM_COMPACTION_TYPE);
     if (compaction_type != PARAM_COMPACTION_BASE &&
-        compaction_type != PARAM_COMPACTION_CUMULATIVE) {
+        compaction_type != PARAM_COMPACTION_CUMULATIVE &&
+        compaction_type != PARAM_COMPACTION_FULL) {
         return Status::NotSupported("The compaction type '{}' is not 
supported", compaction_type);
     }
 
@@ -229,6 +231,19 @@ Status 
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
                              << ", table=" << tablet->full_name();
             }
         }
+    } else if (compaction_type == PARAM_COMPACTION_FULL) {
+        FullCompaction full_compaction(tablet);
+        res = full_compaction.compact();
+        if (!res) {
+            if (res.is<FULL_NO_SUITABLE_VERSION>()) {
+                // Ignore this error code.
+                VLOG_NOTICE << "failed to init full compaction due to no 
suitable version,"
+                            << "tablet=" << tablet->full_name();
+            } else {
+                LOG(WARNING) << "failed to do full compaction. res=" << res
+                             << ", table=" << tablet->full_name();
+            }
+        }
     }
 
     timer.stop();
diff --git a/be/src/http/action/compaction_action.h 
b/be/src/http/action/compaction_action.h
index 1feb7989e7..9f52c625f1 100644
--- a/be/src/http/action/compaction_action.h
+++ b/be/src/http/action/compaction_action.h
@@ -39,6 +39,7 @@ enum class CompactionActionType {
 const std::string PARAM_COMPACTION_TYPE = "compact_type";
 const std::string PARAM_COMPACTION_BASE = "base";
 const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
+const std::string PARAM_COMPACTION_FULL = "full";
 
 /// This action is used for viewing the compaction status.
 /// See compaction-action.md for details.
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 18391f2a32..8849330746 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -29,7 +29,8 @@ enum class ReaderType {
     READER_CHECKSUM = 4,
     READER_COLD_DATA_COMPACTION = 5,
     READER_SEGMENT_COMPACTION = 6,
-    UNKNOWN = 7
+    READER_FULL_COMPACTION = 7,
+    UNKNOWN = 8
 };
 
 namespace io {
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e4c0b59d8c..d67248919f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -301,6 +301,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
             _tablet->set_last_cumu_compaction_success_time(now);
         } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
             _tablet->set_last_base_compaction_success_time(now);
+        } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) {
+            _tablet->set_last_full_compaction_success_time(now);
         }
         auto cumu_policy = _tablet->cumulative_compaction_policy();
         LOG(INFO) << "succeed to do ordered data " << compaction_name()
@@ -451,6 +453,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         _tablet->set_last_cumu_compaction_success_time(now);
     } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
         _tablet->set_last_base_compaction_success_time(now);
+    } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) {
+        _tablet->set_last_full_compaction_success_time(now);
     }
 
     int64_t current_max_version;
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
new file mode 100644
index 0000000000..e2acd1cdb0
--- /dev/null
+++ b/be/src/olap/full_compaction.cpp
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/full_compaction.h"
+
+#include <glog/logging.h>
+#include <time.h>
+
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <shared_mutex>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
+#include "olap/schema_change.h"
+#include "olap/tablet_meta.h"
+#include "runtime/thread_context.h"
+#include "util/thread.h"
+#include "util/trace.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+FullCompaction::FullCompaction(const TabletSharedPtr& tablet)
+        : Compaction(tablet, "FullCompaction:" + 
std::to_string(tablet->tablet_id())) {}
+
+FullCompaction::~FullCompaction() {}
+
+Status FullCompaction::prepare_compact() {
+    if (!_tablet->init_succeeded()) {
+        return Status::Error<INVALID_ARGUMENT>("Full compaction init failed");
+    }
+
+    std::unique_lock full_lock(_tablet->get_full_compaction_lock());
+    std::unique_lock base_lock(_tablet->get_base_compaction_lock());
+    std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
+
+    // 1. pick rowsets to compact
+    RETURN_IF_ERROR(pick_rowsets_to_compact());
+    _tablet->set_clone_occurred(false);
+
+    return Status::OK();
+}
+
+Status FullCompaction::execute_compact_impl() {
+    std::unique_lock full_lock(_tablet->get_full_compaction_lock());
+    std::unique_lock base_lock(_tablet->get_base_compaction_lock());
+    std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
+
+    // Clone task may happen after compaction task is submitted to thread 
pool, and rowsets picked
+    // for compaction may change. In this case, current compaction task should 
not be executed.
+    if (_tablet->get_clone_occurred()) {
+        _tablet->set_clone_occurred(false);
+        return Status::Error<BE_CLONE_OCCURRED>("get_clone_occurred failed");
+    }
+
+    SCOPED_ATTACH_TASK(_mem_tracker);
+
+    // 2. do full compaction, merge rowsets
+    int64_t permits = get_compaction_permits();
+    RETURN_IF_ERROR(do_compaction(permits));
+
+    // 3. set state to success
+    _state = CompactionState::SUCCESS;
+
+    // 4. set cumulative point
+    Version last_version = _input_rowsets.back()->version();
+    
_tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(), 
_input_rowsets,
+                                                                     
_output_rowset, last_version);
+    VLOG_CRITICAL << "after cumulative compaction, current cumulative point is 
"
+                  << _tablet->cumulative_layer_point() << ", tablet=" << 
_tablet->full_name();
+
+    return Status::OK();
+}
+
+Status FullCompaction::pick_rowsets_to_compact() {
+    _input_rowsets = _tablet->pick_candidate_rowsets_to_full_compaction();
+    RETURN_IF_ERROR(check_version_continuity(_input_rowsets));
+    RETURN_IF_ERROR(_check_all_version(_input_rowsets));
+    if (_input_rowsets.size() <= 1) {
+        return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable 
version");
+    }
+
+    if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
+        // the tablet is with rowset: [0-1], [2-y]
+        // and [0-1] has no data. in this situation, no need to do full 
compaction.
+        return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable 
version");
+    }
+
+    return Status::OK();
+}
+
+Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) {
+    if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+        _tablet->enable_unique_key_merge_on_write()) {
+        RETURN_IF_ERROR(
+                _full_compaction_update_delete_bitmap(_output_rowset, 
_output_rs_writer.get()));
+    }
+    std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset);
+    RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, 
true));
+    _tablet->save_meta();
+    return Status::OK();
+}
+
+Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& 
rowsets) {
+    if (rowsets.empty()) {
+        return Status::Error<FULL_MISS_VERSION>("There is no input rowset when 
do full compaction");
+    }
+    const RowsetSharedPtr& last_rowset = rowsets.back();
+    const RowsetSharedPtr& first_rowset = rowsets.front();
+    if (last_rowset->version() != _tablet->max_version() || 
first_rowset->version().first != 0) {
+        return Status::Error<FULL_MISS_VERSION>(
+                "Full compaction rowsets' versions not equal to all exist 
rowsets' versions. "
+                "full compaction rowsets max version={}-{}"
+                ", current rowsets max version={}-{}"
+                "full compaction rowsets min version={}-{}, current rowsets 
min version=0-1",
+                last_rowset->start_version(), last_rowset->end_version(),
+                _tablet->max_version().first, _tablet->max_version().second,
+                first_rowset->start_version(), first_rowset->end_version());
+    }
+    return Status::OK();
+}
+
+Status FullCompaction::_full_compaction_update_delete_bitmap(const 
RowsetSharedPtr& rowset,
+                                                             RowsetWriter* 
rowset_writer) {
+    std::vector<RowsetSharedPtr> tmp_rowsets {};
+
+    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+    if (_tablet->tablet_state() == TABLET_NOTREADY &&
+        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+        LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
+                  << _tablet->tablet_id();
+        return Status::OK();
+    }
+
+    int64_t max_version = _tablet->max_version().second;
+    DCHECK(max_version >= rowset->version().second);
+    if (max_version > rowset->version().second) {
+        _tablet->capture_consistent_rowsets({rowset->version().second + 1, 
max_version},
+                                            &tmp_rowsets);
+    }
+
+    for (const auto& it : tmp_rowsets) {
+        const int64_t& cur_version = it->rowset_meta()->start_version();
+        RETURN_IF_ERROR(
+                _full_compaction_calc_delete_bitmap(it, rowset, cur_version, 
rowset_writer));
+    }
+
+    std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock());
+    std::lock_guard header_lock(_tablet->get_header_lock());
+    for (const auto& it : _tablet->rowset_map()) {
+        const int64_t& cur_version = it.first.first;
+        const RowsetSharedPtr& published_rowset = it.second;
+        if (cur_version > max_version) {
+            
RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(published_rowset, rowset,
+                                                                cur_version, 
rowset_writer));
+        }
+    }
+
+    return Status::OK();
+}
+
+Status FullCompaction::_full_compaction_calc_delete_bitmap(const 
RowsetSharedPtr& published_rowset,
+                                                           const 
RowsetSharedPtr& rowset,
+                                                           const int64_t& 
cur_version,
+                                                           RowsetWriter* 
rowset_writer) {
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    auto beta_rowset = reinterpret_cast<BetaRowset*>(published_rowset.get());
+    RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
+    DeleteBitmapPtr delete_bitmap =
+            
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
+    std::vector<RowsetSharedPtr> specified_rowsets(1, rowset);
+
+    OlapStopWatch watch;
+    RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, 
specified_rowsets,
+                                                delete_bitmap, cur_version, 
rowset_writer));
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    VLOG_DEBUG << "[Full compaction] construct delete bitmap tablet: " << 
_tablet->tablet_id()
+               << ", published rowset version: [" << 
published_rowset->version().first << "-"
+               << published_rowset->version().second << "]"
+               << ", full compaction rowset version: [" << 
rowset->version().first << "-"
+               << rowset->version().second << "]"
+               << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
+
+    for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
+        _tablet->tablet_meta()->delete_bitmap().merge({std::get<0>(k), 
std::get<1>(k), cur_version},
+                                                      v);
+    }
+
+    return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h
new file mode 100644
index 0000000000..bce9ac745b
--- /dev/null
+++ b/be/src/olap/full_compaction.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "io/io_common.h"
+#include "olap/compaction.h"
+#include "olap/rowset/rowset.h"
+#include "olap/tablet.h"
+
+namespace doris {
+
+class FullCompaction : public Compaction {
+public:
+    FullCompaction(const TabletSharedPtr& tablet);
+    ~FullCompaction() override;
+
+    Status prepare_compact() override;
+    Status execute_compact_impl() override;
+    Status modify_rowsets(const Merger::Statistics* stats = nullptr) override;
+
+    std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
+
+protected:
+    Status pick_rowsets_to_compact() override;
+    std::string compaction_name() const override { return "full compaction"; }
+
+    ReaderType compaction_type() const override { return 
ReaderType::READER_FULL_COMPACTION; }
+
+private:
+    Status _check_all_version(const std::vector<RowsetSharedPtr>& rowsets);
+    Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset,
+                                                 RowsetWriter* rowset_writer);
+    Status _full_compaction_calc_delete_bitmap(const RowsetSharedPtr& 
published_rowset,
+                                               const RowsetSharedPtr& rowset,
+                                               const int64_t& cur_version,
+                                               RowsetWriter* rowset_writer);
+
+    DISALLOW_COPY_AND_ASSIGN(FullCompaction);
+};
+
+} // namespace doris
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 86f59af14c..c9030d1317 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -48,7 +48,7 @@ using uint128_t = unsigned __int128;
 
 using TabletUid = UniqueId;
 
-enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2 };
+enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2, 
FULL_COMPACTION = 3 };
 
 struct DataDirInfo {
     std::string path;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 730fdf5145..468c6289d5 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -324,6 +324,7 @@ Status TabletReader::_init_return_columns(const 
ReaderParams& read_params) {
     } else if ((read_params.reader_type == 
ReaderType::READER_CUMULATIVE_COMPACTION ||
                 read_params.reader_type == 
ReaderType::READER_SEGMENT_COMPACTION ||
                 read_params.reader_type == ReaderType::READER_BASE_COMPACTION 
||
+                read_params.reader_type == ReaderType::READER_FULL_COMPACTION 
||
                 read_params.reader_type == 
ReaderType::READER_COLD_DATA_COMPACTION ||
                 read_params.reader_type == ReaderType::READER_ALTER_TABLE) &&
                !read_params.return_columns.empty()) {
@@ -613,6 +614,7 @@ Status TabletReader::_init_delete_condition(const 
ReaderParams& read_params) {
     // QUERY will filter the row in query layer to keep right result use where 
clause.
     // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
     if (read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
+        read_params.reader_type == ReaderType::READER_FULL_COMPACTION ||
         read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
         read_params.reader_type == ReaderType::READER_CHECKSUM) {
         _filter_delete = true;
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 68bb1e7075..22c15d790d 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -293,7 +293,7 @@ public:
     // `segments_overlap()` only return the value of "segments_overlap" field 
in rowset meta,
     // but "segments_overlap" may be UNKNOWN.
     //
-    // Returns true iff all of the following conditions are met
+    // Returns true if all of the following conditions are met
     // 1. the rowset contains more than one segment
     // 2. the rowset's start version == end version (non-singleton rowset was 
generated by compaction process
     //    which always produces non-overlapped segments)
diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index 8f927022f1..c7671aee36 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -164,6 +164,8 @@ Status 
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
         _tablet->set_last_cumu_compaction_success_time(UnixMillis());
     } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
         _tablet->set_last_base_compaction_success_time(UnixMillis());
+    } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) {
+        _tablet->set_last_full_compaction_success_time(UnixMillis());
     }
 
     int64_t current_max_version;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index efaaf2fffe..9ad6096bfe 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -244,8 +244,10 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* 
data_dir,
           _is_bad(false),
           _last_cumu_compaction_failure_millis(0),
           _last_base_compaction_failure_millis(0),
+          _last_full_compaction_failure_millis(0),
           _last_cumu_compaction_success_millis(0),
           _last_base_compaction_success_millis(0),
+          _last_full_compaction_success_millis(0),
           _cumulative_point(K_INVALID_CUMULATIVE_POINT),
           _newly_created_rowset_num(0),
           _last_checkpoint_time(0),
@@ -1313,6 +1315,10 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_base_compaction()
     return candidate_rowsets;
 }
 
+std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_full_compaction() {
+    return pick_candidate_rowsets_to_single_replica_compaction();
+}
+
 std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_build_inverted_index(
         const std::set<int32_t>& alter_index_uids, bool is_drop_op) {
     std::vector<RowsetSharedPtr> candidate_rowsets;
@@ -1393,6 +1399,10 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
     format_str = 
ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
     base_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
     root.AddMember("last base failure time", base_value, root.GetAllocator());
+    rapidjson::Value full_value;
+    format_str = 
ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
+    base_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
+    root.AddMember("last full failure time", full_value, root.GetAllocator());
     rapidjson::Value cumu_success_value;
     format_str = 
ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
     cumu_success_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
@@ -1401,6 +1411,10 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
     format_str = 
ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
     base_success_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
     root.AddMember("last base success time", base_success_value, 
root.GetAllocator());
+    rapidjson::Value full_success_value;
+    format_str = 
ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
+    full_success_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
+    root.AddMember("last full success time", full_success_value, 
root.GetAllocator());
 
     // print all rowsets' version as an array
     rapidjson::Document versions_arr;
@@ -1761,6 +1775,8 @@ void 
Tablet::execute_single_replica_compaction(CompactionType compaction_type) {
             set_last_cumu_compaction_failure_time(UnixMillis());
         } else if (compaction_type == CompactionType::BASE_COMPACTION) {
             set_last_base_compaction_failure_time(UnixMillis());
+        } else if (compaction_type == CompactionType::FULL_COMPACTION) {
+            set_last_full_compaction_failure_time(UnixMillis());
         }
         LOG(WARNING) << "failed to do single replica compaction. res=" << res
                      << ", tablet=" << full_name();
@@ -1770,6 +1786,8 @@ void 
Tablet::execute_single_replica_compaction(CompactionType compaction_type) {
         set_last_cumu_compaction_failure_time(0);
     } else if (compaction_type == CompactionType::BASE_COMPACTION) {
         set_last_base_compaction_failure_time(0);
+    } else if (compaction_type == CompactionType::FULL_COMPACTION) {
+        set_last_full_compaction_failure_time(0);
     }
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8b764326ed..9852c38a02 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -199,6 +199,7 @@ public:
     std::mutex& get_push_lock() { return _ingest_lock; }
     std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
     std::mutex& get_cumulative_compaction_lock() { return 
_cumulative_compaction_lock; }
+    std::mutex& get_full_compaction_lock() { return _full_compaction_lock; }
 
     std::shared_mutex& get_migration_lock() { return _migration_lock; }
 
@@ -234,6 +235,11 @@ public:
         _last_base_compaction_failure_millis = millis;
     }
 
+    int64_t last_full_compaction_failure_time() { return 
_last_full_compaction_failure_millis; }
+    void set_last_full_compaction_failure_time(int64_t millis) {
+        _last_full_compaction_failure_millis = millis;
+    }
+
     int64_t last_cumu_compaction_success_time() { return 
_last_cumu_compaction_success_millis; }
     void set_last_cumu_compaction_success_time(int64_t millis) {
         _last_cumu_compaction_success_millis = millis;
@@ -244,6 +250,11 @@ public:
         _last_base_compaction_success_millis = millis;
     }
 
+    int64_t last_full_compaction_success_time() { return 
_last_full_compaction_success_millis; }
+    void set_last_full_compaction_success_time(int64_t millis) {
+        _last_full_compaction_success_millis = millis;
+    }
+
     void delete_all_files();
 
     void check_tablet_path_exists();
@@ -255,6 +266,7 @@ public:
 
     std::vector<RowsetSharedPtr> 
pick_candidate_rowsets_to_cumulative_compaction();
     std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
+    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction();
     std::vector<RowsetSharedPtr> 
pick_candidate_rowsets_to_build_inverted_index(
             const std::set<int32_t>& alter_index_uids, bool is_drop_op);
 
@@ -591,6 +603,7 @@ private:
     std::mutex _ingest_lock;
     std::mutex _base_compaction_lock;
     std::mutex _cumulative_compaction_lock;
+    std::mutex _full_compaction_lock;
     std::mutex _schema_change_lock;
     std::shared_mutex _migration_lock;
     std::mutex _build_inverted_index_lock;
@@ -618,10 +631,14 @@ private:
     std::atomic<int64_t> _last_cumu_compaction_failure_millis;
     // timestamp of last base compaction failure
     std::atomic<int64_t> _last_base_compaction_failure_millis;
+    // timestamp of last full compaction failure
+    std::atomic<int64_t> _last_full_compaction_failure_millis;
     // timestamp of last cumu compaction success
     std::atomic<int64_t> _last_cumu_compaction_success_millis;
     // timestamp of last base compaction success
     std::atomic<int64_t> _last_base_compaction_success_millis;
+    // timestamp of last full compaction success
+    std::atomic<int64_t> _last_full_compaction_success_millis;
     std::atomic<int64_t> _cumulative_point;
     std::atomic<int64_t> _cumulative_promotion_size;
     std::atomic<int32_t> _newly_created_rowset_num;
diff --git a/regression-test/data/compaction/test_full_compaction.out 
b/regression-test/data/compaction/test_full_compaction.out
new file mode 100644
index 0000000000..b5ed2dffc1
--- /dev/null
+++ b/regression-test/data/compaction/test_full_compaction.out
@@ -0,0 +1,41 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1 --
+1      1
+2      2
+
+-- !2 --
+1      10
+2      20
+
+-- !3 --
+1      100
+2      200
+
+-- !4 --
+1      100
+2      200
+3      300
+
+-- !5 --
+1      100
+2      200
+3      100
+
+-- !6 --
+1      100
+2      200
+
+-- !skip_delete --
+1      1
+1      10
+1      100
+2      2
+2      20
+2      200
+3      300
+3      100
+
+-- !select_final --
+1      100
+2      200
+
diff --git a/regression-test/plugins/plugin_curl_requester.groovy 
b/regression-test/plugins/plugin_curl_requester.groovy
index 2e777cb8d7..a268339e63 100644
--- a/regression-test/plugins/plugin_curl_requester.groovy
+++ b/regression-test/plugins/plugin_curl_requester.groovy
@@ -61,3 +61,9 @@ Suite.metaClass.be_run_cumulative_compaction = { String ip, 
String port, String
 }
 
 logger.info("Added 'be_run_cumulative_compaction' function to Suite")
+
+Suite.metaClass.be_run_full_compaction = { String ip, String port, String 
tablet_id  /* param */-> 
+    return curl("POST", 
String.format("http://%s:%s/api/compaction/run?tablet_id=%s&compact_type=full";, 
ip, port, tablet_id))
+}
+
+logger.info("Added 'be_run_full_compaction' function to Suite")
diff --git a/regression-test/suites/compaction/test_full_compaction.groovy 
b/regression-test/suites/compaction/test_full_compaction.groovy
new file mode 100644
index 0000000000..41813fde3b
--- /dev/null
+++ b/regression-test/suites/compaction/test_full_compaction.groovy
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_full_compaction") {
+    def tableName = "test_full_compaction"
+
+    try {
+        String backend_id;
+
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        boolean disableAutoCompaction = true
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+            `user_id` INT NOT NULL, `value` INT NOT NULL)
+            UNIQUE KEY(`user_id`) 
+            DISTRIBUTED BY HASH(`user_id`) 
+            BUCKETS 1 
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1",
+            "disable_auto_compaction" = "true",
+            "enable_unique_key_merge_on_write" = "true");"""
+
+        // version1 (1,1)(2,2)
+        sql """ INSERT INTO ${tableName} VALUES
+            (1,1),(2,2)
+            """
+        qt_1 """select * from ${tableName} order by user_id"""
+
+
+        // version2 (1,10)(2,20)
+        sql """ INSERT INTO ${tableName} VALUES
+            (1,10),(2,20)
+            """
+        qt_2 """select * from ${tableName} order by user_id"""
+
+
+        // version3 (1,100)(2,200)
+        sql """ INSERT INTO ${tableName} VALUES
+            (1,100),(2,200)
+            """
+        qt_3 """select * from ${tableName} order by user_id"""
+
+
+        // version4 (1,100)(2,200)(3,300)
+        sql """ INSERT INTO ${tableName} VALUES
+            (3,300)
+            """
+        qt_4 """select * from ${tableName} order by user_id"""
+
+
+        // version5 (1,100)(2,200)(3,100)
+        sql """update ${tableName} set value = 100 where user_id = 3"""
+        qt_5 """select * from ${tableName} order by user_id"""
+
+
+        // version6 (1,100)(2,200)
+        sql """delete from ${tableName} where user_id = 3"""
+        qt_6 """select * from ${tableName} order by user_id"""
+
+        sql "SET skip_delete_predicate = true"
+        sql "SET skip_delete_sign = true"
+        sql "SET skip_delete_bitmap = true"
+        // show all hidden data
+        // (1,10)(1,100)(2,2)(2,20)(2,200)(3,300)(3,100)
+        qt_skip_delete """select * from ${tableName} order by user_id"""
+
+        
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
+        String[][] tablets = sql """ show tablets from ${tableName}; """
+
+        // before full compaction, there are 7 rowsets.
+        int rowsetCount = 0
+        for (String[] tablet in tablets) {
+            String tablet_id = tablet[0]
+            def compactionStatusUrlIndex = 18
+            (code, out, err) = curl("GET", tablet[compactionStatusUrlIndex])
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            rowsetCount +=((List<String>) tabletJson.rowsets).size()
+        }
+        assert (rowsetCount == 7)
+
+        // trigger full compactions for all tablets in ${tableName}
+        for (String[] tablet in tablets) {
+            String tablet_id = tablet[0]
+            backend_id = tablet[2]
+            times = 1
+
+            do{
+                (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Run compaction: code=" + code + ", out=" + out + 
", err=" + err)
+                ++times
+                sleep(2000)
+            } while (parseJson(out.trim()).status.toLowerCase()!="success" && 
times<=10)
+
+            def compactJson = parseJson(out.trim())
+            if (compactJson.status.toLowerCase() == "fail") {
+                assertEquals(disableAutoCompaction, false)
+                logger.info("Compaction was done automatically!")
+            }
+            if (disableAutoCompaction) {
+                assertEquals("success", compactJson.status.toLowerCase())
+            }
+        }
+
+        // wait for full compaction done
+        for (String[] tablet in tablets) {
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                String tablet_id = tablet[0]
+                backend_id = tablet[2]
+                (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        // after full compaction, there is only 1 rowset.
+        
+        rowsetCount = 0
+        for (String[] tablet in tablets) {
+            String tablet_id = tablet[0]
+            def compactionStatusUrlIndex = 18
+            (code, out, err) = curl("GET", tablet[compactionStatusUrlIndex])
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            rowsetCount +=((List<String>) tabletJson.rowsets).size()
+        }
+        assert (rowsetCount == 1)
+
+        // make sure all hidden data has been deleted
+        // (1,100)(2,200)
+        qt_select_final """select * from ${tableName} order by user_id"""
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to