This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 3743f19369 [feature] support convert alpha rowset (#9890) 3743f19369 is described below commit 3743f19369654682377c77b9c903ed417847874b Author: jacktengg <18241664+jackte...@users.noreply.github.com> AuthorDate: Sat Jun 4 12:29:03 2022 +0800 [feature] support convert alpha rowset (#9890) Add alpha rowset to beta rowset convert to convert rowset automatically. We will remove alpha rowset's code after 1.1. --- be/src/common/config.h | 6 ++ be/src/olap/CMakeLists.txt | 1 + be/src/olap/convert_rowset.cpp | 140 +++++++++++++++++++++++++++++++++++++++++ be/src/olap/convert_rowset.h | 42 +++++++++++++ be/src/olap/olap_server.cpp | 50 +++++++++++++++ be/src/olap/storage_engine.cpp | 6 ++ be/src/olap/storage_engine.h | 5 ++ be/src/olap/tablet.cpp | 9 +++ be/src/olap/tablet.h | 2 + be/src/olap/tablet_manager.cpp | 14 +++++ be/src/olap/tablet_manager.h | 2 + 11 files changed, 277 insertions(+) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8205ac2537..fe40be22a8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -294,6 +294,12 @@ CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds // This config can be set to limit thread number in compaction thread pool. CONF_mInt32(max_compaction_threads, "10"); +// This config can be set to limit thread number in convert rowset thread pool. +CONF_mInt32(convert_rowset_thread_num, "0"); + +// initial sleep interval in seconds of scan alpha rowset +CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3"); + // Thread count to do tablet meta checkpoint, -1 means use the data directories count. CONF_Int32(max_meta_checkpoint_threads, "-1"); diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 657ebe3518..2c02122883 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(Olap STATIC compaction_permit_limiter.cpp comparison_predicate.cpp compress.cpp + convert_rowset.cpp cumulative_compaction.cpp cumulative_compaction_policy.cpp delete_handler.cpp diff --git a/be/src/olap/convert_rowset.cpp b/be/src/olap/convert_rowset.cpp new file mode 100644 index 0000000000..c71e66c246 --- /dev/null +++ b/be/src/olap/convert_rowset.cpp @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/convert_rowset.h" + +namespace doris { + +Status ConvertRowset::do_convert() { + if (!_tablet->init_succeeded()) { + return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR); + } + std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(), + std::try_to_lock); + std::unique_lock<std::mutex> cumulative_compaction_lock( + _tablet->get_cumulative_compaction_lock(), std::try_to_lock); + if (!base_compaction_lock.owns_lock() || !cumulative_compaction_lock.owns_lock()) { + LOG(INFO) << "The tablet is under compaction. tablet=" << _tablet->full_name(); + return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR); + } + + std::vector<RowsetSharedPtr> alpah_rowsets; + _tablet->find_alpha_rowsets(&alpah_rowsets); + + Merger::Statistics stats; + Status res; + const size_t max_convert_row_count = 20000000; + size_t row_count = 0; + for (size_t i = 0; i < alpah_rowsets.size(); ++i) { + Version output_version = + Version(alpah_rowsets[i]->start_version(), alpah_rowsets[i]->end_version()); + + RowsetReaderSharedPtr input_rs_reader; + RETURN_NOT_OK(alpah_rowsets[i]->create_reader(&input_rs_reader)); + + std::unique_ptr<RowsetWriter> output_rs_writer; + RETURN_NOT_OK(_tablet->create_rowset_writer(output_version, VISIBLE, NONOVERLAPPING, + &output_rs_writer)); + res = Merger::merge_rowsets(_tablet, ReaderType::READER_BASE_COMPACTION, {input_rs_reader}, + output_rs_writer.get(), &stats); + + if (!res.ok()) { + LOG(WARNING) << "fail to convert rowset. res=" << res + << ", tablet=" << _tablet->full_name(); + return res; + } else { + auto output_rowset = output_rs_writer->build(); + if (output_rowset == nullptr) { + LOG(WARNING) << "rowset writer build failed" + << ", tablet=" << _tablet->full_name(); + return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); + } + + RETURN_NOT_OK(check_correctness(alpah_rowsets[i], output_rowset, stats)); + + row_count += alpah_rowsets[i]->num_rows(); + + RETURN_NOT_OK(_modify_rowsets(alpah_rowsets[i], output_rowset)); + + LOG(INFO) << "succeed to convert rowset" + << ". tablet=" << _tablet->full_name() + << ", output_version=" << output_version + << ", disk=" << _tablet->data_dir()->path(); + + if (row_count >= max_convert_row_count) { + break; + } + } + } + return Status::OK(); +} + +Status ConvertRowset::check_correctness(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset, + const Merger::Statistics& stats) { + // 1. check row number + if (input_rowset->num_rows() != + output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) { + LOG(WARNING) << "row_num does not match between input and output! " + << "input_row_num=" << input_rowset->num_rows() + << ", merged_row_num=" << stats.merged_rows + << ", filtered_row_num=" << stats.filtered_rows + << ", output_row_num=" << output_rowset->num_rows(); + + // ATTN(cmy): We found that the num_rows in some rowset meta may be set to the wrong value, + // but it is not known which version of the code has the problem. So when the convert + // result is inconsistent, we then try to verify by num_rows recorded in segment_groups. + // If the check passes, ignore the error and set the correct value in the output rowset meta + // to fix this problem. + // Only handle alpha rowset because we only find this bug in alpha rowset + int64_t num_rows = _get_input_num_rows_from_seg_grps(input_rowset); + if (num_rows == -1) { + return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR); + } + if (num_rows != output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) { + // If it is still incorrect, it may be another problem + LOG(WARNING) << "row_num got from seg groups does not match between cumulative input " + "and output! " + << "input_row_num=" << num_rows << ", merged_row_num=" << stats.merged_rows + << ", filtered_row_num=" << stats.filtered_rows + << ", output_row_num=" << output_rowset->num_rows(); + + return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR); + } + } + return Status::OK(); +} + +int64_t ConvertRowset::_get_input_num_rows_from_seg_grps(RowsetSharedPtr rowset) { + int64_t num_rows = 0; + for (auto& seg_grp : rowset->rowset_meta()->alpha_rowset_extra_meta_pb().segment_groups()) { + num_rows += seg_grp.num_rows(); + } + return num_rows; +} +Status ConvertRowset::_modify_rowsets(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset) { + std::vector<RowsetSharedPtr> input_rowsets; + input_rowsets.push_back(input_rowset); + + std::vector<RowsetSharedPtr> output_rowsets; + output_rowsets.push_back(output_rowset); + + std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); + RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, input_rowsets, true)); + _tablet->save_meta(); + return Status::OK(); +} +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/convert_rowset.h b/be/src/olap/convert_rowset.h new file mode 100644 index 0000000000..a691d38624 --- /dev/null +++ b/be/src/olap/convert_rowset.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/merger.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" + +namespace doris { +class DataDir; +class ConvertRowset { +public: + ConvertRowset(TabletSharedPtr tablet) : _tablet(tablet) {} + Status do_convert(); + +private: + Status check_correctness(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset, + const Merger::Statistics& stats); + int64_t _get_input_num_rows_from_seg_grps(RowsetSharedPtr rowset); + Status _modify_rowsets(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset); + +private: + TabletSharedPtr _tablet; + + DISALLOW_COPY_AND_ASSIGN(ConvertRowset); +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index f4732cbe04..2ad264d2bc 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -28,6 +28,7 @@ #include "agent/cgroups_mgr.h" #include "common/status.h" #include "gutil/strings/substitute.h" +#include "olap/convert_rowset.h" #include "olap/cumulative_compaction.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -72,6 +73,21 @@ Status StorageEngine::start_bg_threads() { .set_max_threads(max_thread_num) .build(&_compaction_thread_pool); + int32_t convert_rowset_thread_num = config::convert_rowset_thread_num; + if (convert_rowset_thread_num > 0) { + ThreadPoolBuilder("ConvertRowsetTaskThreadPool") + .set_min_threads(convert_rowset_thread_num) + .set_max_threads(convert_rowset_thread_num) + .build(&_convert_rowset_thread_pool); + + // alpha rowset scan thread + RETURN_IF_ERROR(Thread::create( + "StorageEngine", "alpha_rowset_scan_thread", + [this]() { this->_alpha_rowset_scan_thread_callback(); }, + &_alpha_rowset_scan_thread)); + LOG(INFO) << "alpha rowset scan thread started"; + } + // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( "StorageEngine", "compaction_tasks_producer_thread", @@ -304,6 +320,40 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& dat } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); } +void StorageEngine::_alpha_rowset_scan_thread_callback() { + LOG(INFO) << "try to start alpha rowset scan thread!"; + + auto scan_interval_sec = config::scan_alpha_rowset_min_interval_sec; + auto max_convert_task = config::convert_rowset_thread_num * 2; + do { + std::vector<TabletSharedPtr> tablet_have_alpha_rowset; + _tablet_manager->find_tablet_have_alpha_rowset(tablet_have_alpha_rowset); + + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(tablet_have_alpha_rowset.begin(), tablet_have_alpha_rowset.end(), g); + + for (int i = 0; i < max_convert_task && i < tablet_have_alpha_rowset.size(); ++i) { + auto tablet = tablet_have_alpha_rowset[i]; + auto st = _convert_rowset_thread_pool->submit_func([=]() { + CgroupsMgr::apply_system_cgroup(); + auto convert_rowset = std::make_shared<ConvertRowset>(tablet); + convert_rowset->do_convert(); + }); + if (!st.ok()) { + LOG(WARNING) << "submit convert tablet tasks failed."; + } + } + + if (tablet_have_alpha_rowset.size() == 0) { + scan_interval_sec = std::min(3600, scan_interval_sec * 2); + } else { + _convert_rowset_thread_pool->wait(); + scan_interval_sec = config::scan_alpha_rowset_min_interval_sec; + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(scan_interval_sec))); +} + void StorageEngine::_compaction_tasks_producer_callback() { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 6a1871b8f1..35dc3dd1e1 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -160,6 +160,9 @@ StorageEngine::~StorageEngine() { if (_compaction_thread_pool) { _compaction_thread_pool->shutdown(); } + if (_convert_rowset_thread_pool) { + _convert_rowset_thread_pool->shutdown(); + } if (_tablet_meta_checkpoint_thread_pool) { _tablet_meta_checkpoint_thread_pool->shutdown(); } @@ -578,6 +581,9 @@ void StorageEngine::stop() { } THREAD_JOIN(_compaction_tasks_producer_thread); + if (_alpha_rowset_scan_thread) { + THREAD_JOIN(_alpha_rowset_scan_thread); + } THREAD_JOIN(_unused_rowset_monitor_thread); THREAD_JOIN(_garbage_sweeper_thread); THREAD_JOIN(_disk_stat_monitor_thread); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index b3768cccf3..ce33223c5a 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -253,6 +253,8 @@ private: void _compaction_tasks_producer_callback(); + void _alpha_rowset_scan_thread_callback(); + std::vector<TabletSharedPtr> _generate_compaction_tasks(CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score); @@ -377,6 +379,9 @@ private: std::unique_ptr<ThreadPool> _compaction_thread_pool; + scoped_refptr<Thread> _alpha_rowset_scan_thread; + std::unique_ptr<ThreadPool> _convert_rowset_thread_pool; + std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool; CompactionPermitLimiter _permit_limiter; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a457e3eb42..df7f02a1f9 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1051,6 +1051,15 @@ void Tablet::pick_candidate_rowsets_to_cumulative_compaction( _cumulative_point, candidate_rowsets); } +void Tablet::find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const { + std::shared_lock rdlock(_meta_lock); + for (auto& it : _rs_version_map) { + if (it.second->rowset_meta()->rowset_type() == RowsetTypePB::ALPHA_ROWSET) { + rowsets->push_back(it.second); + } + } +} + void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) { std::shared_lock rdlock(_meta_lock); for (auto& it : _rs_version_map) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 8f08833c33..a7f1830ef1 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -260,6 +260,8 @@ public: return _tablet_meta->all_beta(); } + void find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const; + Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap, std::unique_ptr<RowsetWriter>* rowset_writer); diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index c3a682f283..06008d5c78 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -601,6 +601,20 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) { result->__set_tablet_stat_list(*local_cache); } +void TabletManager::find_tablet_have_alpha_rowset(std::vector<TabletSharedPtr>& tablets) { + for (const auto& tablets_shard : _tablets_shards) { + std::shared_lock rdlock(tablets_shard.lock); + for (const auto& tablet_map : tablets_shard.tablet_map) { + const TabletSharedPtr& tablet_ptr = tablet_map.second; + if (!tablet_ptr->all_beta() && + tablet_ptr->can_do_compaction(tablet_ptr->data_dir()->path_hash(), + BASE_COMPACTION)) { + tablets.push_back(tablet_ptr); + } + } + } +} + TabletSharedPtr TabletManager::find_best_tablet_to_compaction( CompactionType compaction_type, DataDir* data_dir, const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score, diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 85b4c644ae..1627ab741e 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -140,6 +140,8 @@ public: void get_all_tablets_storage_format(TCheckStorageFormatResult* result); + void find_tablet_have_alpha_rowset(std::vector<TabletSharedPtr>& tablets); + private: // Add a tablet pointer to StorageEngine // If force, drop the existing tablet add this new one --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org