This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c409fa0f58 [Feature](Compaction)Support full compaction (#21177) c409fa0f58 is described below commit c409fa0f58fa0b68fafe34c4796fc45abe78979c Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Sun Jul 16 13:21:15 2023 +0800 [Feature](Compaction)Support full compaction (#21177) --- be/src/common/status.h | 3 + be/src/http/action/compaction_action.cpp | 17 +- be/src/http/action/compaction_action.h | 1 + be/src/io/io_common.h | 3 +- be/src/olap/compaction.cpp | 4 + be/src/olap/full_compaction.cpp | 215 +++++++++++++++++++++ be/src/olap/full_compaction.h | 62 ++++++ be/src/olap/olap_common.h | 2 +- be/src/olap/reader.cpp | 2 + be/src/olap/rowset/rowset_meta.h | 2 +- be/src/olap/single_replica_compaction.cpp | 2 + be/src/olap/tablet.cpp | 18 ++ be/src/olap/tablet.h | 17 ++ .../data/compaction/test_full_compaction.out | 41 ++++ .../plugins/plugin_curl_requester.groovy | 6 + .../suites/compaction/test_full_compaction.groovy | 177 +++++++++++++++++ 16 files changed, 568 insertions(+), 4 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index eda6a34d55..d64b70615c 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -229,6 +229,8 @@ E(CUMULATIVE_INVALID_NEED_MERGED_VERSIONS, -2004); E(CUMULATIVE_ERROR_DELETE_ACTION, -2005); E(CUMULATIVE_MISS_VERSION, -2006); E(CUMULATIVE_CLONE_OCCURRED, -2007); +E(FULL_NO_SUITABLE_VERSION, -2008); +E(FULL_MISS_VERSION, -2009); E(META_INVALID_ARGUMENT, -3000); E(META_OPEN_DB_ERROR, -3001); E(META_KEY_NOT_FOUND, -3002); @@ -285,6 +287,7 @@ inline bool capture_stacktrace(int code) { && code != ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST && code != ErrorCode::BE_NO_SUITABLE_VERSION && code != ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION + && code != ErrorCode::FULL_NO_SUITABLE_VERSION && code != ErrorCode::PUBLISH_VERSION_NOT_CONTINUOUS && code != ErrorCode::ROWSET_RENAME_FILE_FAILED && code != ErrorCode::SEGCOMPACTION_INIT_READER diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 48dbe78ab4..81f6ad4d30 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -37,6 +37,7 @@ #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" +#include "olap/full_compaction.h" #include "olap/olap_define.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" @@ -89,7 +90,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j // check compaction_type equals 'base' or 'cumulative' std::string compaction_type = req->param(PARAM_COMPACTION_TYPE); if (compaction_type != PARAM_COMPACTION_BASE && - compaction_type != PARAM_COMPACTION_CUMULATIVE) { + compaction_type != PARAM_COMPACTION_CUMULATIVE && + compaction_type != PARAM_COMPACTION_FULL) { return Status::NotSupported("The compaction type '{}' is not supported", compaction_type); } @@ -229,6 +231,19 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, << ", table=" << tablet->full_name(); } } + } else if (compaction_type == PARAM_COMPACTION_FULL) { + FullCompaction full_compaction(tablet); + res = full_compaction.compact(); + if (!res) { + if (res.is<FULL_NO_SUITABLE_VERSION>()) { + // Ignore this error code. + VLOG_NOTICE << "failed to init full compaction due to no suitable version," + << "tablet=" << tablet->full_name(); + } else { + LOG(WARNING) << "failed to do full compaction. res=" << res + << ", table=" << tablet->full_name(); + } + } } timer.stop(); diff --git a/be/src/http/action/compaction_action.h b/be/src/http/action/compaction_action.h index 1feb7989e7..9f52c625f1 100644 --- a/be/src/http/action/compaction_action.h +++ b/be/src/http/action/compaction_action.h @@ -39,6 +39,7 @@ enum class CompactionActionType { const std::string PARAM_COMPACTION_TYPE = "compact_type"; const std::string PARAM_COMPACTION_BASE = "base"; const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative"; +const std::string PARAM_COMPACTION_FULL = "full"; /// This action is used for viewing the compaction status. /// See compaction-action.md for details. diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 18391f2a32..8849330746 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -29,7 +29,8 @@ enum class ReaderType { READER_CHECKSUM = 4, READER_COLD_DATA_COMPACTION = 5, READER_SEGMENT_COMPACTION = 6, - UNKNOWN = 7 + READER_FULL_COMPACTION = 7, + UNKNOWN = 8 }; namespace io { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e4c0b59d8c..d67248919f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -301,6 +301,8 @@ Status Compaction::do_compaction_impl(int64_t permits) { _tablet->set_last_cumu_compaction_success_time(now); } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { _tablet->set_last_base_compaction_success_time(now); + } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { + _tablet->set_last_full_compaction_success_time(now); } auto cumu_policy = _tablet->cumulative_compaction_policy(); LOG(INFO) << "succeed to do ordered data " << compaction_name() @@ -451,6 +453,8 @@ Status Compaction::do_compaction_impl(int64_t permits) { _tablet->set_last_cumu_compaction_success_time(now); } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { _tablet->set_last_base_compaction_success_time(now); + } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { + _tablet->set_last_full_compaction_success_time(now); } int64_t current_max_version; diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp new file mode 100644 index 0000000000..e2acd1cdb0 --- /dev/null +++ b/be/src/olap/full_compaction.cpp @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/full_compaction.h" + +#include <glog/logging.h> +#include <time.h> + +#include <memory> +#include <mutex> +#include <ostream> +#include <shared_mutex> + +#include "common/config.h" +#include "common/status.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/olap_common.h" +#include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset.h" +#include "olap/schema_change.h" +#include "olap/tablet_meta.h" +#include "runtime/thread_context.h" +#include "util/thread.h" +#include "util/trace.h" + +namespace doris { +using namespace ErrorCode; + +FullCompaction::FullCompaction(const TabletSharedPtr& tablet) + : Compaction(tablet, "FullCompaction:" + std::to_string(tablet->tablet_id())) {} + +FullCompaction::~FullCompaction() {} + +Status FullCompaction::prepare_compact() { + if (!_tablet->init_succeeded()) { + return Status::Error<INVALID_ARGUMENT>("Full compaction init failed"); + } + + std::unique_lock full_lock(_tablet->get_full_compaction_lock()); + std::unique_lock base_lock(_tablet->get_base_compaction_lock()); + std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock()); + + // 1. pick rowsets to compact + RETURN_IF_ERROR(pick_rowsets_to_compact()); + _tablet->set_clone_occurred(false); + + return Status::OK(); +} + +Status FullCompaction::execute_compact_impl() { + std::unique_lock full_lock(_tablet->get_full_compaction_lock()); + std::unique_lock base_lock(_tablet->get_base_compaction_lock()); + std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock()); + + // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked + // for compaction may change. In this case, current compaction task should not be executed. + if (_tablet->get_clone_occurred()) { + _tablet->set_clone_occurred(false); + return Status::Error<BE_CLONE_OCCURRED>("get_clone_occurred failed"); + } + + SCOPED_ATTACH_TASK(_mem_tracker); + + // 2. do full compaction, merge rowsets + int64_t permits = get_compaction_permits(); + RETURN_IF_ERROR(do_compaction(permits)); + + // 3. set state to success + _state = CompactionState::SUCCESS; + + // 4. set cumulative point + Version last_version = _input_rowsets.back()->version(); + _tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(), _input_rowsets, + _output_rowset, last_version); + VLOG_CRITICAL << "after cumulative compaction, current cumulative point is " + << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name(); + + return Status::OK(); +} + +Status FullCompaction::pick_rowsets_to_compact() { + _input_rowsets = _tablet->pick_candidate_rowsets_to_full_compaction(); + RETURN_IF_ERROR(check_version_continuity(_input_rowsets)); + RETURN_IF_ERROR(_check_all_version(_input_rowsets)); + if (_input_rowsets.size() <= 1) { + return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable version"); + } + + if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) { + // the tablet is with rowset: [0-1], [2-y] + // and [0-1] has no data. in this situation, no need to do full compaction. + return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable version"); + } + + return Status::OK(); +} + +Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) { + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write()) { + RETURN_IF_ERROR( + _full_compaction_update_delete_bitmap(_output_rowset, _output_rs_writer.get())); + } + std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset); + RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); + _tablet->save_meta(); + return Status::OK(); +} + +Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& rowsets) { + if (rowsets.empty()) { + return Status::Error<FULL_MISS_VERSION>("There is no input rowset when do full compaction"); + } + const RowsetSharedPtr& last_rowset = rowsets.back(); + const RowsetSharedPtr& first_rowset = rowsets.front(); + if (last_rowset->version() != _tablet->max_version() || first_rowset->version().first != 0) { + return Status::Error<FULL_MISS_VERSION>( + "Full compaction rowsets' versions not equal to all exist rowsets' versions. " + "full compaction rowsets max version={}-{}" + ", current rowsets max version={}-{}" + "full compaction rowsets min version={}-{}, current rowsets min version=0-1", + last_rowset->start_version(), last_rowset->end_version(), + _tablet->max_version().first, _tablet->max_version().second, + first_rowset->start_version(), first_rowset->end_version()); + } + return Status::OK(); +} + +Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset, + RowsetWriter* rowset_writer) { + std::vector<RowsetSharedPtr> tmp_rowsets {}; + + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" + << _tablet->tablet_id(); + return Status::OK(); + } + + int64_t max_version = _tablet->max_version().second; + DCHECK(max_version >= rowset->version().second); + if (max_version > rowset->version().second) { + _tablet->capture_consistent_rowsets({rowset->version().second + 1, max_version}, + &tmp_rowsets); + } + + for (const auto& it : tmp_rowsets) { + const int64_t& cur_version = it->rowset_meta()->start_version(); + RETURN_IF_ERROR( + _full_compaction_calc_delete_bitmap(it, rowset, cur_version, rowset_writer)); + } + + std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock()); + std::lock_guard header_lock(_tablet->get_header_lock()); + for (const auto& it : _tablet->rowset_map()) { + const int64_t& cur_version = it.first.first; + const RowsetSharedPtr& published_rowset = it.second; + if (cur_version > max_version) { + RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(published_rowset, rowset, + cur_version, rowset_writer)); + } + } + + return Status::OK(); +} + +Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset, + const RowsetSharedPtr& rowset, + const int64_t& cur_version, + RowsetWriter* rowset_writer) { + std::vector<segment_v2::SegmentSharedPtr> segments; + auto beta_rowset = reinterpret_cast<BetaRowset*>(published_rowset.get()); + RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); + DeleteBitmapPtr delete_bitmap = + std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id()); + std::vector<RowsetSharedPtr> specified_rowsets(1, rowset); + + OlapStopWatch watch; + RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets, + delete_bitmap, cur_version, rowset_writer)); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + VLOG_DEBUG << "[Full compaction] construct delete bitmap tablet: " << _tablet->tablet_id() + << ", published rowset version: [" << published_rowset->version().first << "-" + << published_rowset->version().second << "]" + << ", full compaction rowset version: [" << rowset->version().first << "-" + << rowset->version().second << "]" + << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; + + for (const auto& [k, v] : delete_bitmap->delete_bitmap) { + _tablet->tablet_meta()->delete_bitmap().merge({std::get<0>(k), std::get<1>(k), cur_version}, + v); + } + + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h new file mode 100644 index 0000000000..bce9ac745b --- /dev/null +++ b/be/src/olap/full_compaction.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <butil/macros.h> + +#include <string> +#include <vector> + +#include "common/status.h" +#include "io/io_common.h" +#include "olap/compaction.h" +#include "olap/rowset/rowset.h" +#include "olap/tablet.h" + +namespace doris { + +class FullCompaction : public Compaction { +public: + FullCompaction(const TabletSharedPtr& tablet); + ~FullCompaction() override; + + Status prepare_compact() override; + Status execute_compact_impl() override; + Status modify_rowsets(const Merger::Statistics* stats = nullptr) override; + + std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; } + +protected: + Status pick_rowsets_to_compact() override; + std::string compaction_name() const override { return "full compaction"; } + + ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; } + +private: + Status _check_all_version(const std::vector<RowsetSharedPtr>& rowsets); + Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset, + RowsetWriter* rowset_writer); + Status _full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset, + const RowsetSharedPtr& rowset, + const int64_t& cur_version, + RowsetWriter* rowset_writer); + + DISALLOW_COPY_AND_ASSIGN(FullCompaction); +}; + +} // namespace doris diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 86f59af14c..c9030d1317 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -48,7 +48,7 @@ using uint128_t = unsigned __int128; using TabletUid = UniqueId; -enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2 }; +enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2, FULL_COMPACTION = 3 }; struct DataDirInfo { std::string path; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 730fdf5145..468c6289d5 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -324,6 +324,7 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) { } else if ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION || read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION || read_params.reader_type == ReaderType::READER_BASE_COMPACTION || + read_params.reader_type == ReaderType::READER_FULL_COMPACTION || read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION || read_params.reader_type == ReaderType::READER_ALTER_TABLE) && !read_params.return_columns.empty()) { @@ -613,6 +614,7 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { // QUERY will filter the row in query layer to keep right result use where clause. // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset if (read_params.reader_type == ReaderType::READER_BASE_COMPACTION || + read_params.reader_type == ReaderType::READER_FULL_COMPACTION || read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION || read_params.reader_type == ReaderType::READER_CHECKSUM) { _filter_delete = true; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 68bb1e7075..22c15d790d 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -293,7 +293,7 @@ public: // `segments_overlap()` only return the value of "segments_overlap" field in rowset meta, // but "segments_overlap" may be UNKNOWN. // - // Returns true iff all of the following conditions are met + // Returns true if all of the following conditions are met // 1. the rowset contains more than one segment // 2. the rowset's start version == end version (non-singleton rowset was generated by compaction process // which always produces non-overlapped segments) diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index 8f927022f1..c7671aee36 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -164,6 +164,8 @@ Status SingleReplicaCompaction::_do_single_replica_compaction_impl() { _tablet->set_last_cumu_compaction_success_time(UnixMillis()); } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { _tablet->set_last_base_compaction_success_time(UnixMillis()); + } else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) { + _tablet->set_last_full_compaction_success_time(UnixMillis()); } int64_t current_max_version; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index efaaf2fffe..9ad6096bfe 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -244,8 +244,10 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, _is_bad(false), _last_cumu_compaction_failure_millis(0), _last_base_compaction_failure_millis(0), + _last_full_compaction_failure_millis(0), _last_cumu_compaction_success_millis(0), _last_base_compaction_success_millis(0), + _last_full_compaction_success_millis(0), _cumulative_point(K_INVALID_CUMULATIVE_POINT), _newly_created_rowset_num(0), _last_checkpoint_time(0), @@ -1313,6 +1315,10 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() return candidate_rowsets; } +std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_full_compaction() { + return pick_candidate_rowsets_to_single_replica_compaction(); +} + std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_build_inverted_index( const std::set<int32_t>& alter_index_uids, bool is_drop_op) { std::vector<RowsetSharedPtr> candidate_rowsets; @@ -1393,6 +1399,10 @@ void Tablet::get_compaction_status(std::string* json_result) { format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load()); base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); root.AddMember("last base failure time", base_value, root.GetAllocator()); + rapidjson::Value full_value; + format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load()); + base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last full failure time", full_value, root.GetAllocator()); rapidjson::Value cumu_success_value; format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load()); cumu_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); @@ -1401,6 +1411,10 @@ void Tablet::get_compaction_status(std::string* json_result) { format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load()); base_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); root.AddMember("last base success time", base_success_value, root.GetAllocator()); + rapidjson::Value full_success_value; + format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load()); + full_success_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + root.AddMember("last full success time", full_success_value, root.GetAllocator()); // print all rowsets' version as an array rapidjson::Document versions_arr; @@ -1761,6 +1775,8 @@ void Tablet::execute_single_replica_compaction(CompactionType compaction_type) { set_last_cumu_compaction_failure_time(UnixMillis()); } else if (compaction_type == CompactionType::BASE_COMPACTION) { set_last_base_compaction_failure_time(UnixMillis()); + } else if (compaction_type == CompactionType::FULL_COMPACTION) { + set_last_full_compaction_failure_time(UnixMillis()); } LOG(WARNING) << "failed to do single replica compaction. res=" << res << ", tablet=" << full_name(); @@ -1770,6 +1786,8 @@ void Tablet::execute_single_replica_compaction(CompactionType compaction_type) { set_last_cumu_compaction_failure_time(0); } else if (compaction_type == CompactionType::BASE_COMPACTION) { set_last_base_compaction_failure_time(0); + } else if (compaction_type == CompactionType::FULL_COMPACTION) { + set_last_full_compaction_failure_time(0); } } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 8b764326ed..9852c38a02 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -199,6 +199,7 @@ public: std::mutex& get_push_lock() { return _ingest_lock; } std::mutex& get_base_compaction_lock() { return _base_compaction_lock; } std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; } + std::mutex& get_full_compaction_lock() { return _full_compaction_lock; } std::shared_mutex& get_migration_lock() { return _migration_lock; } @@ -234,6 +235,11 @@ public: _last_base_compaction_failure_millis = millis; } + int64_t last_full_compaction_failure_time() { return _last_full_compaction_failure_millis; } + void set_last_full_compaction_failure_time(int64_t millis) { + _last_full_compaction_failure_millis = millis; + } + int64_t last_cumu_compaction_success_time() { return _last_cumu_compaction_success_millis; } void set_last_cumu_compaction_success_time(int64_t millis) { _last_cumu_compaction_success_millis = millis; @@ -244,6 +250,11 @@ public: _last_base_compaction_success_millis = millis; } + int64_t last_full_compaction_success_time() { return _last_full_compaction_success_millis; } + void set_last_full_compaction_success_time(int64_t millis) { + _last_full_compaction_success_millis = millis; + } + void delete_all_files(); void check_tablet_path_exists(); @@ -255,6 +266,7 @@ public: std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction(); std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); + std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction(); std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_build_inverted_index( const std::set<int32_t>& alter_index_uids, bool is_drop_op); @@ -591,6 +603,7 @@ private: std::mutex _ingest_lock; std::mutex _base_compaction_lock; std::mutex _cumulative_compaction_lock; + std::mutex _full_compaction_lock; std::mutex _schema_change_lock; std::shared_mutex _migration_lock; std::mutex _build_inverted_index_lock; @@ -618,10 +631,14 @@ private: std::atomic<int64_t> _last_cumu_compaction_failure_millis; // timestamp of last base compaction failure std::atomic<int64_t> _last_base_compaction_failure_millis; + // timestamp of last full compaction failure + std::atomic<int64_t> _last_full_compaction_failure_millis; // timestamp of last cumu compaction success std::atomic<int64_t> _last_cumu_compaction_success_millis; // timestamp of last base compaction success std::atomic<int64_t> _last_base_compaction_success_millis; + // timestamp of last full compaction success + std::atomic<int64_t> _last_full_compaction_success_millis; std::atomic<int64_t> _cumulative_point; std::atomic<int64_t> _cumulative_promotion_size; std::atomic<int32_t> _newly_created_rowset_num; diff --git a/regression-test/data/compaction/test_full_compaction.out b/regression-test/data/compaction/test_full_compaction.out new file mode 100644 index 0000000000..b5ed2dffc1 --- /dev/null +++ b/regression-test/data/compaction/test_full_compaction.out @@ -0,0 +1,41 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 1 +2 2 + +-- !2 -- +1 10 +2 20 + +-- !3 -- +1 100 +2 200 + +-- !4 -- +1 100 +2 200 +3 300 + +-- !5 -- +1 100 +2 200 +3 100 + +-- !6 -- +1 100 +2 200 + +-- !skip_delete -- +1 1 +1 10 +1 100 +2 2 +2 20 +2 200 +3 300 +3 100 + +-- !select_final -- +1 100 +2 200 + diff --git a/regression-test/plugins/plugin_curl_requester.groovy b/regression-test/plugins/plugin_curl_requester.groovy index 2e777cb8d7..a268339e63 100644 --- a/regression-test/plugins/plugin_curl_requester.groovy +++ b/regression-test/plugins/plugin_curl_requester.groovy @@ -61,3 +61,9 @@ Suite.metaClass.be_run_cumulative_compaction = { String ip, String port, String } logger.info("Added 'be_run_cumulative_compaction' function to Suite") + +Suite.metaClass.be_run_full_compaction = { String ip, String port, String tablet_id /* param */-> + return curl("POST", String.format("http://%s:%s/api/compaction/run?tablet_id=%s&compact_type=full", ip, port, tablet_id)) +} + +logger.info("Added 'be_run_full_compaction' function to Suite") diff --git a/regression-test/suites/compaction/test_full_compaction.groovy b/regression-test/suites/compaction/test_full_compaction.groovy new file mode 100644 index 0000000000..41813fde3b --- /dev/null +++ b/regression-test/suites/compaction/test_full_compaction.groovy @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_full_compaction") { + def tableName = "test_full_compaction" + + try { + String backend_id; + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `user_id` INT NOT NULL, `value` INT NOT NULL) + UNIQUE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) + BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true");""" + + // version1 (1,1)(2,2) + sql """ INSERT INTO ${tableName} VALUES + (1,1),(2,2) + """ + qt_1 """select * from ${tableName} order by user_id""" + + + // version2 (1,10)(2,20) + sql """ INSERT INTO ${tableName} VALUES + (1,10),(2,20) + """ + qt_2 """select * from ${tableName} order by user_id""" + + + // version3 (1,100)(2,200) + sql """ INSERT INTO ${tableName} VALUES + (1,100),(2,200) + """ + qt_3 """select * from ${tableName} order by user_id""" + + + // version4 (1,100)(2,200)(3,300) + sql """ INSERT INTO ${tableName} VALUES + (3,300) + """ + qt_4 """select * from ${tableName} order by user_id""" + + + // version5 (1,100)(2,200)(3,100) + sql """update ${tableName} set value = 100 where user_id = 3""" + qt_5 """select * from ${tableName} order by user_id""" + + + // version6 (1,100)(2,200) + sql """delete from ${tableName} where user_id = 3""" + qt_6 """select * from ${tableName} order by user_id""" + + sql "SET skip_delete_predicate = true" + sql "SET skip_delete_sign = true" + sql "SET skip_delete_bitmap = true" + // show all hidden data + // (1,10)(1,100)(2,2)(2,20)(2,200)(3,300)(3,100) + qt_skip_delete """select * from ${tableName} order by user_id""" + + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus + String[][] tablets = sql """ show tablets from ${tableName}; """ + + // before full compaction, there are 7 rowsets. + int rowsetCount = 0 + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + def compactionStatusUrlIndex = 18 + (code, out, err) = curl("GET", tablet[compactionStatusUrlIndex]) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List<String>) tabletJson.rowsets).size() + } + assert (rowsetCount == 7) + + // trigger full compactions for all tablets in ${tableName} + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + backend_id = tablet[2] + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + def compactJson = parseJson(out.trim()) + if (compactJson.status.toLowerCase() == "fail") { + assertEquals(disableAutoCompaction, false) + logger.info("Compaction was done automatically!") + } + if (disableAutoCompaction) { + assertEquals("success", compactJson.status.toLowerCase()) + } + } + + // wait for full compaction done + for (String[] tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet[0] + backend_id = tablet[2] + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + def compactionStatusUrlIndex = 18 + (code, out, err) = curl("GET", tablet[compactionStatusUrlIndex]) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List<String>) tabletJson.rowsets).size() + } + assert (rowsetCount == 1) + + // make sure all hidden data has been deleted + // (1,100)(2,200) + qt_select_final """select * from ${tableName} order by user_id""" + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org