github-actions[bot] commented on code in PR #31215: URL: https://github.com/apache/doris/pull/31215#discussion_r1504307847
########## be/src/cloud/cloud_compaction_action.cpp: ########## @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_action.h" + +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <exception> +#include <future> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <utility> + +#include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_action.h" +#include "cloud/cloud_cumulative_compaction.h" +#include "cloud/cloud_full_compaction.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "common/logging.h" +#include "common/status.h" +#include "gutil/strings/substitute.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" +#include "olap/full_compaction.h" +#include "olap/olap_define.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/doris_metrics.h" +#include "util/stopwatch.hpp" + +namespace doris { +using namespace ErrorCode; + +namespace {} + +const static std::string HEADER_JSON = "application/json"; + +CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, + CloudStorageEngine& engine, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _type(ctype) {} + +/// check param and fetch tablet_id from req +static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { + // req tablet id and table id, we have to set only one of them. + std::string req_tablet_id = req->param(TABLET_ID_KEY); + std::string req_table_id = req->param(TABLE_ID_KEY); + if (req_tablet_id == "") { + if (req_table_id == "") { + // both tablet id and table id are empty, return error. + return Status::InternalError( + "tablet id and table id can not be empty at the same time!"); + } else { + try { + *table_id = std::stoull(req_table_id); + } catch (const std::exception& e) { + return Status::InternalError("convert tablet_id or table_id failed, {}", e.what()); + } + return Status::OK(); + } + } else { + if (req_table_id == "") { + try { + *tablet_id = std::stoull(req_tablet_id); + } catch (const std::exception& e) { + return Status::InternalError("convert tablet_id or table_id failed, {}", e.what()); + } + return Status::OK(); + } else { + // both tablet id and table id are not empty, return err. + return Status::InternalError("tablet id and table id can not be set at the same time!"); + } + } +} + +// for viewing the compaction status +Status CloudCompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) { Review Comment: warning: method '_handle_show_compaction' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_compaction_action.h:61: ```diff - Status _handle_show_compaction(HttpRequest* req, std::string* json_result); + static Status _handle_show_compaction(HttpRequest* req, std::string* json_result); ``` ########## be/src/cloud/cloud_compaction_action.cpp: ########## @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_action.h" + +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <exception> +#include <future> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <utility> + +#include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_action.h" +#include "cloud/cloud_cumulative_compaction.h" +#include "cloud/cloud_full_compaction.h" Review Comment: warning: duplicate include [readability-duplicate-include] be/src/cloud/cloud_compaction_action.cpp:32: ```diff - #include "cloud/cloud_base_compaction.h" - #include "cloud/cloud_compaction_action.h" + #include "cloud/cloud_base_compaction.h" ``` ########## be/src/cloud/cloud_compaction_action.cpp: ########## @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_action.h" + +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <exception> +#include <future> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <utility> + +#include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_action.h" +#include "cloud/cloud_cumulative_compaction.h" +#include "cloud/cloud_full_compaction.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "common/logging.h" +#include "common/status.h" +#include "gutil/strings/substitute.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" +#include "olap/full_compaction.h" +#include "olap/olap_define.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/doris_metrics.h" +#include "util/stopwatch.hpp" + +namespace doris { +using namespace ErrorCode; + +namespace {} + +const static std::string HEADER_JSON = "application/json"; + +CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, + CloudStorageEngine& engine, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _type(ctype) {} + +/// check param and fetch tablet_id from req +static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { + // req tablet id and table id, we have to set only one of them. + std::string req_tablet_id = req->param(TABLET_ID_KEY); + std::string req_table_id = req->param(TABLE_ID_KEY); + if (req_tablet_id == "") { + if (req_table_id == "") { + // both tablet id and table id are empty, return error. + return Status::InternalError( + "tablet id and table id can not be empty at the same time!"); + } else { + try { + *table_id = std::stoull(req_table_id); + } catch (const std::exception& e) { + return Status::InternalError("convert tablet_id or table_id failed, {}", e.what()); + } + return Status::OK(); + } + } else { + if (req_table_id == "") { Review Comment: warning: the 'empty' method should be used to check for emptiness instead of comparing to an empty object [readability-container-size-empty] ```suggestion if (req_table_id.empty()) { ``` <details> <summary>Additional context</summary> **/usr/include/c++/11/bits/basic_string.h:1022:** method 'basic_string'::empty() defined here ```cpp empty() const _GLIBCXX_NOEXCEPT ^ ``` </details> ########## be/src/cloud/cloud_vertical_rowset_writer.cpp: ########## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_vertical_rowset_writer.h" + +#include <fmt/format.h> +#include <gen_cpp/olap_file.pb.h> + +#include <algorithm> +#include <atomic> +#include <memory> +#include <mutex> +#include <ostream> +#include <string> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/vertical_beta_rowset_writer_helper.h" +#include "util/slice.h" +#include "util/spinlock.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +CloudVerticalRowsetWriter::CloudVerticalRowsetWriter() : CloudRowsetWriter() { + _helper = std::make_shared<VerticalBetaRowsetWriterHelper>( Review Comment: warning: initializer for base class 'doris::CloudRowsetWriter' is redundant [readability-redundant-member-init] ```suggestion : { ``` ########## be/src/olap/rowset/vertical_beta_rowset_writer_helper.h: ########## @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> Review Comment: warning: inclusion of deprecated C++ header 'stddef.h'; consider using 'cstddef' instead [modernize-deprecated-headers] ```suggestion #include <cstddef> ``` ########## be/src/olap/rowset/vertical_beta_rowset_writer_helper.h: ########## @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> Review Comment: warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers] ```suggestion #include <cstdint> ``` ########## be/src/cloud/cloud_compaction_action.h: ########## @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <string> + +#include "cloud/cloud_storage_engine.h" +#include "common/status.h" +#include "http/action/compaction_action.h" +#include "http/http_handler_with_auth.h" +#include "olap/tablet.h" + +namespace doris { +class HttpRequest; + +class ExecEnv; + +/* +enum class CompactionActionType { + SHOW_INFO = 1, + RUN_COMPACTION = 2, + RUN_COMPACTION_STATUS = 3, +}; +*/ + +/* +const std::string PARAM_COMPACTION_TYPE = "compact_type"; +const std::string PARAM_COMPACTION_BASE = "base"; +const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative"; +const std::string PARAM_COMPACTION_FULL = "full"; +*/ + +/// This action is used for viewing the compaction status. +/// See compaction-action.md for details. +class CloudCompactionAction : public HttpHandlerWithAuth { +public: + CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, CloudStorageEngine& engine, + TPrivilegeHier::type hier, TPrivilegeType::type ptype); + + ~CloudCompactionAction() override = default; + + void handle(HttpRequest* req) override; + +private: + Status _handle_show_compaction(HttpRequest* req, std::string* json_result); + + /// execute compaction request to run compaction task + /// param compact_type in req to distinguish the task type, base or cumulative + Status _handle_run_compaction(HttpRequest* req, std::string* json_result); + + /// thread callback function for the tablet to do compaction + Status _execute_compaction_callback(CloudTabletSPtr tablet, const std::string& compaction_type); + + /// fetch compaction running status + Status _handle_run_status_compaction(HttpRequest* req, std::string* json_result); + +private: Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_compaction_action.h:60:** previously declared here ```cpp private: ^ ``` </details> ########## be/src/cloud/cloud_compaction_action.cpp: ########## @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_action.h" + +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <exception> +#include <future> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <utility> + +#include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_action.h" +#include "cloud/cloud_cumulative_compaction.h" +#include "cloud/cloud_full_compaction.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "common/logging.h" +#include "common/status.h" +#include "gutil/strings/substitute.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" +#include "olap/full_compaction.h" +#include "olap/olap_define.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/doris_metrics.h" +#include "util/stopwatch.hpp" + +namespace doris { +using namespace ErrorCode; + +namespace {} + +const static std::string HEADER_JSON = "application/json"; + +CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, + CloudStorageEngine& engine, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _type(ctype) {} + +/// check param and fetch tablet_id from req +static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { + // req tablet id and table id, we have to set only one of them. + std::string req_tablet_id = req->param(TABLET_ID_KEY); + std::string req_table_id = req->param(TABLE_ID_KEY); + if (req_tablet_id == "") { Review Comment: warning: the 'empty' method should be used to check for emptiness instead of comparing to an empty object [readability-container-size-empty] ```suggestion if (req_tablet_id.empty()) { ``` <details> <summary>Additional context</summary> **/usr/include/c++/11/bits/basic_string.h:1022:** method 'basic_string'::empty() defined here ```cpp empty() const _GLIBCXX_NOEXCEPT ^ ``` </details> ########## be/src/cloud/cloud_compaction_action.h: ########## @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> Review Comment: warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers] ```suggestion #include <cstdint> ``` ########## be/src/olap/rowset/vertical_beta_rowset_writer_helper.h: ########## @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <memory> +#include <vector> + +#include "common/status.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/segment_v2/segment_writer.h" + +namespace doris { +namespace vectorized { +class Block; +} // namespace vectorized + +class VerticalBetaRowsetWriterHelper { +public: + VerticalBetaRowsetWriterHelper( + std::vector<std::unique_ptr<segment_v2::SegmentWriter>>* segment_writers, + bool already_built, RowsetMetaSharedPtr& rowset_meta, std::atomic<int32_t>* num_segment, + RowsetWriterContext& context, std::atomic<int64_t>* _num_rows_written, + std::vector<KeyBoundsPB>* _segments_encoded_key_bounds, + std::vector<uint32_t>* _segment_num_rows, std::atomic<int64_t>* _total_index_size, + std::vector<io::FileWriterPtr>* _file_writers, std::atomic<int64_t>* _total_data_size, + SpinLock* _lock); + ~VerticalBetaRowsetWriterHelper() = default; + + Status add_columns(const vectorized::Block* block, const std::vector<uint32_t>& col_ids, + bool is_key, uint32_t max_rows_per_segment); + + Status flush_columns(bool is_key); + + Status final_flush(); + + int64_t num_rows() const { return _total_key_group_rows; } + + void destruct_writer(); + +private: + Status _flush_columns(std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, + bool is_key = false); + Status _create_segment_writer(const std::vector<uint32_t>& column_ids, bool is_key, + std::unique_ptr<segment_v2::SegmentWriter>* writer); Review Comment: warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers] ```suggestion ``` <details> <summary>Additional context</summary> **be/src/olap/rowset/vertical_beta_rowset_writer_helper.h:55:** previously declared here ```cpp private: ^ ``` </details> ########## be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp: ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/vertical_beta_rowset_writer_helper.h" + +#include <fmt/format.h> +#include <gen_cpp/olap_file.pb.h> + +#include <algorithm> +#include <atomic> +#include <mutex> +#include <ostream> +#include <string> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer_context.h" +#include "util/slice.h" +#include "util/spinlock.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +VerticalBetaRowsetWriterHelper::VerticalBetaRowsetWriterHelper( + std::vector<std::unique_ptr<segment_v2::SegmentWriter>>* segment_writers, + bool already_built, RowsetMetaSharedPtr& rowset_meta, std::atomic<int32_t>* num_segment, + RowsetWriterContext& context, std::atomic<int64_t>* num_rows_written, + std::vector<KeyBoundsPB>* segments_encoded_key_bounds, + std::vector<uint32_t>* segment_num_rows, std::atomic<int64_t>* total_index_size, + std::vector<io::FileWriterPtr>* file_writers, std::atomic<int64_t>* total_data_size, + SpinLock* lock) + : _segment_writers(segment_writers), + _already_built(already_built), + _rowset_meta(rowset_meta), + _num_segment(num_segment), + _context(context), + _num_rows_written(num_rows_written), + _segments_encoded_key_bounds(segments_encoded_key_bounds), + _segment_num_rows(segment_num_rows), + _total_index_size(total_index_size), + _file_writers(file_writers), + _total_data_size(total_data_size), + _lock(lock) {} + +void VerticalBetaRowsetWriterHelper::destruct_writer() { + if (!_already_built) { + const auto& fs = _rowset_meta->fs(); + if (!fs || !_rowset_meta->is_local()) { // Remote fs will delete them asynchronously + return; + } + for (auto& segment_writer : *_segment_writers) { + segment_writer.reset(); + } + for (int i = 0; i < *_num_segment; ++i) { + auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(path), fmt::format("Failed to delete file={}", path)); + } + } +} + +Status VerticalBetaRowsetWriterHelper::add_columns(const vectorized::Block* block, + const std::vector<uint32_t>& col_ids, + bool is_key, uint32_t max_rows_per_segment) { + VLOG_NOTICE << "VerticalBetaRowsetWriter::add_columns, columns: " << block->columns(); + size_t num_rows = block->rows(); + if (num_rows == 0) { + return Status::OK(); + } + if (UNLIKELY(max_rows_per_segment > _context.max_rows_per_segment)) { + max_rows_per_segment = _context.max_rows_per_segment; + } + + if (_segment_writers->empty()) { + // it must be key columns + DCHECK(is_key); + std::unique_ptr<segment_v2::SegmentWriter> writer; + RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); + _segment_writers->emplace_back(std::move(writer)); + _cur_writer_idx = 0; + RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); + } else if (is_key) { + if ((*_segment_writers)[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) { + // segment is full, need flush columns and create new segment writer + RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx], true)); + + std::unique_ptr<segment_v2::SegmentWriter> writer; + RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); + _segment_writers->emplace_back(std::move(writer)); + ++_cur_writer_idx; + } + RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); + } else { + // value columns + uint32_t num_rows_written = (*_segment_writers)[_cur_writer_idx]->num_rows_written(); + VLOG_NOTICE << "num_rows_written: " << num_rows_written + << ", _cur_writer_idx: " << _cur_writer_idx; + uint32_t num_rows_key_group = (*_segment_writers)[_cur_writer_idx]->row_count(); + // init if it's first value column write in current segment + if (_cur_writer_idx == 0 && num_rows_written == 0) { + VLOG_NOTICE << "init first value column segment writer"; + RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); + } + // when splitting segment, need to make rows align between key columns and value columns + size_t start_offset = 0; + size_t limit = num_rows; + if (num_rows_written + num_rows >= num_rows_key_group && + _cur_writer_idx < _segment_writers->size() - 1) { + RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block( + block, 0, num_rows_key_group - num_rows_written)); + RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx])); + start_offset = num_rows_key_group - num_rows_written; + limit = num_rows - start_offset; + ++_cur_writer_idx; + // switch to next writer + RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); + num_rows_written = 0; + num_rows_key_group = (*_segment_writers)[_cur_writer_idx]->row_count(); + } + if (limit > 0) { + RETURN_IF_ERROR( + (*_segment_writers)[_cur_writer_idx]->append_block(block, start_offset, limit)); + DCHECK((*_segment_writers)[_cur_writer_idx]->num_rows_written() <= + (*_segment_writers)[_cur_writer_idx]->row_count()); + } + } + if (is_key) { + *_num_rows_written += num_rows; + } + return Status::OK(); +} + +Status VerticalBetaRowsetWriterHelper::_flush_columns( + std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, bool is_key) { + uint64_t index_size = 0; + VLOG_NOTICE << "flush columns index: " << _cur_writer_idx; + RETURN_IF_ERROR((*segment_writer)->finalize_columns_data()); + RETURN_IF_ERROR((*segment_writer)->finalize_columns_index(&index_size)); + if (is_key) { + _total_key_group_rows += (*segment_writer)->row_count(); + // record segment key bound + KeyBoundsPB key_bounds; + Slice min_key = (*segment_writer)->min_encoded_key(); + Slice max_key = (*segment_writer)->max_encoded_key(); + DCHECK_LE(min_key.compare(max_key), 0); + key_bounds.set_min_key(min_key.to_string()); + key_bounds.set_max_key(max_key.to_string()); + _segments_encoded_key_bounds->emplace_back(key_bounds); + _segment_num_rows->resize(_cur_writer_idx + 1); + (*_segment_num_rows)[_cur_writer_idx] = (*_segment_writers)[_cur_writer_idx]->row_count(); + } + *_total_index_size += + static_cast<int64_t>(index_size) + (*segment_writer)->get_inverted_index_file_size(); + return Status::OK(); +} + +Status VerticalBetaRowsetWriterHelper::flush_columns(bool is_key) { + if (_segment_writers->empty()) { + return Status::OK(); + } + + DCHECK(_cur_writer_idx < _segment_writers->size() && (*_segment_writers)[_cur_writer_idx]); + RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx], is_key)); + _cur_writer_idx = 0; + return Status::OK(); +} + +Status VerticalBetaRowsetWriterHelper::_create_segment_writer( + const std::vector<uint32_t>& column_ids, bool is_key, + std::unique_ptr<segment_v2::SegmentWriter>* writer) { + auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, + (*_num_segment)++); + auto fs = _rowset_meta->fs(); + if (!fs) { + return Status::Error<INIT_FAILED>("get fs failed"); + } + io::FileWriterPtr file_writer; + io::FileWriterOptions opts; + opts.create_empty_file = false; + Status st = fs->create_file(path, &file_writer, &opts); + if (!st.ok()) { + LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; + return st; + } + + DCHECK(file_writer != nullptr); + segment_v2::SegmentWriterOptions writer_options; + writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; + writer_options.rowset_ctx = &_context; + writer->reset(new segment_v2::SegmentWriter( + file_writer.get(), *_num_segment, _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, writer_options, nullptr)); + { + std::lock_guard<SpinLock> l(*_lock); + _file_writers->push_back(std::move(file_writer)); + } + + auto s = (*writer)->init(column_ids, is_key); + if (!s.ok()) { + LOG(WARNING) << "failed to init segment writer: " << s.to_string(); + writer->reset(nullptr); + return s; + } + return Status::OK(); +} Review Comment: warning: method 'final_flush' can be made static [readability-convert-member-functions-to-static] be/src/olap/rowset/vertical_beta_rowset_writer_helper.h:49: ```diff - Status final_flush(); + static Status final_flush(); ``` ########## be/src/cloud/cloud_compaction_action.cpp: ########## @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_action.h" + +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <exception> +#include <future> +#include <memory> +#include <mutex> +#include <sstream> +#include <string> +#include <thread> +#include <utility> + +#include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_action.h" +#include "cloud/cloud_cumulative_compaction.h" +#include "cloud/cloud_full_compaction.h" +#include "cloud/cloud_tablet.h" +#include "cloud/cloud_tablet_mgr.h" +#include "common/logging.h" +#include "common/status.h" +#include "gutil/strings/substitute.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/base_compaction.h" +#include "olap/cumulative_compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/cumulative_compaction_time_series_policy.h" +#include "olap/full_compaction.h" +#include "olap/olap_define.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/doris_metrics.h" +#include "util/stopwatch.hpp" + +namespace doris { +using namespace ErrorCode; + +namespace {} + +const static std::string HEADER_JSON = "application/json"; + +CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, + CloudStorageEngine& engine, TPrivilegeHier::type hier, + TPrivilegeType::type ptype) + : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _type(ctype) {} + +/// check param and fetch tablet_id from req +static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { + // req tablet id and table id, we have to set only one of them. + std::string req_tablet_id = req->param(TABLET_ID_KEY); + std::string req_table_id = req->param(TABLE_ID_KEY); + if (req_tablet_id == "") { + if (req_table_id == "") { Review Comment: warning: the 'empty' method should be used to check for emptiness instead of comparing to an empty object [readability-container-size-empty] ```suggestion if (req_table_id.empty()) { ``` <details> <summary>Additional context</summary> **/usr/include/c++/11/bits/basic_string.h:1022:** method 'basic_string'::empty() defined here ```cpp empty() const _GLIBCXX_NOEXCEPT ^ ``` </details> ########## be/src/cloud/cloud_vertical_rowset_writer.h: ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> Review Comment: warning: inclusion of deprecated C++ header 'stddef.h'; consider using 'cstddef' instead [modernize-deprecated-headers] ```suggestion #include <cstddef> ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +235,454 @@ void CloudStorageEngine::_sync_tablets_thread_callback() { } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { Review Comment: warning: method '_adjust_compaction_thread_num' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_storage_engine.h:89: ```diff - void _adjust_compaction_thread_num(); + static void _adjust_compaction_thread_num(); ``` ########## be/src/cloud/cloud_vertical_rowset_writer.h: ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> Review Comment: warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers] ```suggestion #include <cstdint> ``` ########## be/src/cloud/cloud_vertical_rowset_writer.cpp: ########## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_vertical_rowset_writer.h" + +#include <fmt/format.h> +#include <gen_cpp/olap_file.pb.h> + +#include <algorithm> +#include <atomic> +#include <memory> +#include <mutex> +#include <ostream> +#include <string> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/vertical_beta_rowset_writer_helper.h" +#include "util/slice.h" +#include "util/spinlock.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +CloudVerticalRowsetWriter::CloudVerticalRowsetWriter() : CloudRowsetWriter() { + _helper = std::make_shared<VerticalBetaRowsetWriterHelper>( + &_segment_writers, _already_built, _rowset_meta, &_num_segment, _context, + &_num_rows_written, &_segments_encoded_key_bounds, &_segment_num_rows, + &_total_index_size, &_file_writers, &_total_data_size, &_lock); +} + +CloudVerticalRowsetWriter::~CloudVerticalRowsetWriter() { + _helper->destruct_writer(); Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```cpp CloudVerticalRowsetWriter::~CloudVerticalRowsetWriter() { ^ ``` ########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -190,4 +235,454 @@ } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { + int base_thread_num = get_base_thread_num(); + if (_base_compaction_thread_pool->max_threads() != base_thread_num) { + int old_max_threads = _base_compaction_thread_pool->max_threads(); + Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads + << " to " << base_thread_num; + } + } + if (_base_compaction_thread_pool->min_threads() != base_thread_num) { + int old_min_threads = _base_compaction_thread_pool->min_threads(); + Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads + << " to " << base_thread_num; + } + } + + int cumu_thread_num = get_cumu_thread_num(); + if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { + int old_max_threads = _cumu_compaction_thread_pool->max_threads(); + Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads + << " to " << cumu_thread_num; + } + } + if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { + int old_min_threads = _cumu_compaction_thread_pool->min_threads(); + Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads + << " to " << cumu_thread_num; + } + } +} + +void CloudStorageEngine::_compaction_tasks_producer_callback() { + LOG(INFO) << "try to start compaction producer process!"; + + int round = 0; + CompactionType compaction_type; + + // Used to record the time when the score metric was last updated. + // The update of the score metric is accompanied by the logic of selecting the tablet. + // If there is no slot available, the logic of selecting the tablet will be terminated, + // which causes the score metric update to be terminated. + // In order to avoid this situation, we need to update the score regularly. + int64_t last_cumulative_score_update_time = 0; + int64_t last_base_score_update_time = 0; + static const int64_t check_score_interval_ms = 5000; // 5 secs + + int64_t interval = config::generate_compaction_tasks_interval_ms; + do { + if (!config::disable_auto_compaction) { + _adjust_compaction_thread_num(); + + bool check_score = false; + int64_t cur_time = UnixMillis(); + if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { + compaction_type = CompactionType::CUMULATIVE_COMPACTION; + round++; + if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { + check_score = true; + last_cumulative_score_update_time = cur_time; + } + } else { + compaction_type = CompactionType::BASE_COMPACTION; + round = 0; + if (cur_time - last_base_score_update_time >= check_score_interval_ms) { + check_score = true; + last_base_score_update_time = cur_time; + } + } + std::unique_ptr<ThreadPool>& thread_pool = + (compaction_type == CompactionType::CUMULATIVE_COMPACTION) + ? _cumu_compaction_thread_pool + : _base_compaction_thread_pool; + VLOG_CRITICAL << "compaction thread pool. type: " + << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" + : "BASE") + << ", num_threads: " << thread_pool->num_threads() + << ", num_threads_pending_start: " + << thread_pool->num_threads_pending_start() + << ", num_active_threads: " << thread_pool->num_active_threads() + << ", max_threads: " << thread_pool->max_threads() + << ", min_threads: " << thread_pool->min_threads() + << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); + std::vector<CloudTabletSPtr> tablets_compaction = + _generate_cloud_compaction_tasks(compaction_type, check_score); + + /// Regardless of whether the tablet is submitted for compaction or not, + /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects + /// in the tablet, because these two objects store the tablet's own shared_ptr. + /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, + /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) + for (const auto& tablet : tablets_compaction) { + Status st = submit_compaction_task(tablet, compaction_type); + if (st.ok()) continue; + if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && + !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || + VLOG_DEBUG_IS_ON) { + LOG(WARNING) << "failed to submit compaction task for tablet: " + << tablet->tablet_id() << ", err: " << st; + } + } + interval = config::generate_compaction_tasks_interval_ms; + } else { + interval = config::check_auto_compaction_interval_seconds * 1000; + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); +} + +std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( + CompactionType compaction_type, bool check_score) { + std::vector<std::shared_ptr<CloudTablet>> tablets_compaction; + + int64_t max_compaction_score = 0; + std::unordered_set<int64_t> tablet_preparing_cumu_compaction; + std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> + submitted_cumu_compactions; + std::unordered_map<int64_t, std::shared_ptr<CloudBaseCompaction>> submitted_base_compactions; + std::unordered_map<int64_t, std::shared_ptr<CloudFullCompaction>> submitted_full_compactions; + { + std::lock_guard lock(_compaction_mtx); + tablet_preparing_cumu_compaction = _tablet_preparing_cumu_compaction; + submitted_cumu_compactions = _submitted_cumu_compactions; + submitted_base_compactions = _submitted_base_compactions; + submitted_full_compactions = _submitted_full_compactions; + } + + bool need_pick_tablet = true; + int thread_per_disk = + config::compaction_task_num_per_fast_disk; // all disks are fast in cloud mode + int num_cumu = + std::accumulate(submitted_cumu_compactions.begin(), submitted_cumu_compactions.end(), 0, + [](int a, auto& b) { return a + b.second.size(); }); + int num_base = submitted_base_compactions.size() + submitted_full_compactions.size(); + int n = thread_per_disk - num_cumu - num_base; + if (compaction_type == CompactionType::BASE_COMPACTION) { + // We need to reserve at least one thread for cumulative compaction, + // because base compactions may take too long to complete, which may + // leads to "too many rowsets" error. + int base_n = std::min(config::max_base_compaction_task_num_per_disk, thread_per_disk - 1) - + num_base; + n = std::min(base_n, n); + } + if (n <= 0) { // No threads available + if (!check_score) return tablets_compaction; + need_pick_tablet = false; + n = 0; + } + + // Return true for skipping compaction + std::function<bool(CloudTablet*)> filter_out; + if (compaction_type == CompactionType::BASE_COMPACTION) { + filter_out = [&submitted_base_compactions, &submitted_full_compactions](CloudTablet* t) { + return !!submitted_base_compactions.count(t->tablet_id()) || + !!submitted_full_compactions.count(t->tablet_id()) || + t->tablet_state() != TABLET_RUNNING; + }; + } else if (config::enable_parallel_cumu_compaction) { + filter_out = [&tablet_preparing_cumu_compaction](CloudTablet* t) { + return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) || + t->tablet_state() != TABLET_RUNNING; + }; + } else { + filter_out = [&tablet_preparing_cumu_compaction, + &submitted_cumu_compactions](CloudTablet* t) { + return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) || + !!submitted_cumu_compactions.count(t->tablet_id()) || + t->tablet_state() != TABLET_RUNNING; + }; + } + + // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), + // So that we can update the max_compaction_score metric. + do { + std::vector<CloudTabletSPtr> tablets; + auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets, + &max_compaction_score); + if (!st.ok()) { + LOG(WARNING) << "failed to get tablets to compact, err=" << st; + break; + } + if (!need_pick_tablet) break; + tablets_compaction = std::move(tablets); + } while (false); + + if (max_compaction_score > 0) { + if (compaction_type == CompactionType::BASE_COMPACTION) { + DorisMetrics::instance()->tablet_base_max_compaction_score->set_value( + max_compaction_score); + } else { + DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value( + max_compaction_score); + } + } + + return tablets_compaction; +} + +Status CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& tablet) { + using namespace std::chrono; + { + std::lock_guard lock(_compaction_mtx); + // Take a placeholder for base compaction + auto [_, success] = _submitted_base_compactions.emplace(tablet->tablet_id(), nullptr); + if (!success) { + return Status::AlreadyExist( + "other base compaction or full compaction is submitted, tablet_id={}", + tablet->tablet_id()); + } + } + auto compaction = std::make_shared<CloudBaseCompaction>(*this, tablet); + auto st = compaction->prepare_compact(); + if (!st.ok()) { + long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + tablet->set_last_base_compaction_failure_time(now); + std::lock_guard lock(_compaction_mtx); + _submitted_base_compactions.erase(tablet->tablet_id()); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _submitted_base_compactions[tablet->tablet_id()] = compaction; + } + st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + auto st = compaction->execute_compact(); + if (!st.ok()) { + // Error log has been output in `execute_compact` + long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + tablet->set_last_base_compaction_failure_time(now); + } + std::lock_guard lock(_compaction_mtx); + _submitted_base_compactions.erase(tablet->tablet_id()); + }); + if (!st.ok()) { + std::lock_guard lock(_compaction_mtx); + _submitted_base_compactions.erase(tablet->tablet_id()); + return Status::InternalError("failed to submit base compaction, tablet_id={}", + tablet->tablet_id()); + } + return st; +} + +Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletSPtr& tablet) { + using namespace std::chrono; + { + std::lock_guard lock(_compaction_mtx); + if (!config::enable_parallel_cumu_compaction && + _submitted_cumu_compactions.count(tablet->tablet_id())) { + return Status::AlreadyExist("other cumu compaction is submitted, tablet_id={}", + tablet->tablet_id()); + } + auto [_, success] = _tablet_preparing_cumu_compaction.insert(tablet->tablet_id()); + if (!success) { + return Status::AlreadyExist("other cumu compaction is preparing, tablet_id={}", + tablet->tablet_id()); + } + } + auto compaction = std::make_shared<CloudCumulativeCompaction>(*this, tablet); + auto st = compaction->prepare_compact(); + if (!st.ok()) { + long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { + // Backoff strategy if no suitable version + tablet->last_cumu_no_suitable_version_ms = now; + } + tablet->set_last_cumu_compaction_failure_time(now); + std::lock_guard lock(_compaction_mtx); + _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _tablet_preparing_cumu_compaction.erase(tablet->tablet_id()); + _submitted_cumu_compactions[tablet->tablet_id()].push_back(compaction); + } + auto erase_submitted_cumu_compaction = [=, this]() { + std::lock_guard lock(_compaction_mtx); + auto it = _submitted_cumu_compactions.find(tablet->tablet_id()); + DCHECK(it != _submitted_cumu_compactions.end()); + auto& compactions = it->second; + auto it1 = std::find(compactions.begin(), compactions.end(), compaction); + DCHECK(it1 != compactions.end()); + compactions.erase(it1); + if (compactions.empty()) { // No compactions on this tablet, erase key + _submitted_cumu_compactions.erase(it); + // No cumu compaction on this tablet, reset `last_cumu_no_suitable_version_ms` to enable this tablet to + // enter the compaction scheduling candidate set. The purpose of doing this is to have at least one BE perform + // cumu compaction on tablet which has suitable versions for cumu compaction. + tablet->last_cumu_no_suitable_version_ms = 0; + } + }; + st = _cumu_compaction_thread_pool->submit_func([=, compaction = std::move(compaction)]() { + auto st = compaction->execute_compact(); + if (!st.ok()) { + // Error log has been output in `execute_compact` + long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + tablet->set_last_cumu_compaction_failure_time(now); + } + erase_submitted_cumu_compaction(); + }); + if (!st.ok()) { + erase_submitted_cumu_compaction(); + return Status::InternalError("failed to submit cumu compaction, tablet_id={}", + tablet->tablet_id()); + } + return st; +} + +Status CloudStorageEngine::_submit_full_compaction_task(const CloudTabletSPtr& tablet) { + using namespace std::chrono; + { + std::lock_guard lock(_compaction_mtx); + // Take a placeholder for full compaction + auto [_, success] = _submitted_full_compactions.emplace(tablet->tablet_id(), nullptr); + if (!success) { + return Status::AlreadyExist( + "other full compaction or base compaction is submitted, tablet_id={}", + tablet->tablet_id()); + } + } + //auto compaction = std::make_shared<CloudFullCompaction>(tablet); + auto compaction = std::make_shared<CloudFullCompaction>(*this, tablet); + auto st = compaction->prepare_compact(); + if (!st.ok()) { + long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + tablet->set_last_full_compaction_failure_time(now); + std::lock_guard lock(_compaction_mtx); + _submitted_full_compactions.erase(tablet->tablet_id()); + return st; + } + { + std::lock_guard lock(_compaction_mtx); + _submitted_full_compactions[tablet->tablet_id()] = compaction; + } + st = _base_compaction_thread_pool->submit_func([=, this, compaction = std::move(compaction)]() { + auto st = compaction->execute_compact(); + if (!st.ok()) { + // Error log has been output in `execute_compact` + long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + tablet->set_last_full_compaction_failure_time(now); + } + std::lock_guard lock(_compaction_mtx); + _submitted_full_compactions.erase(tablet->tablet_id()); + }); + if (!st.ok()) { + std::lock_guard lock(_compaction_mtx); + _submitted_full_compactions.erase(tablet->tablet_id()); + return Status::InternalError("failed to submit full compaction, tablet_id={}", + tablet->tablet_id()); + } + return st; +} + +Status CloudStorageEngine::submit_compaction_task(const CloudTabletSPtr& tablet, + CompactionType compaction_type) { + DCHECK(compaction_type == CompactionType::CUMULATIVE_COMPACTION || + compaction_type == CompactionType::BASE_COMPACTION || + compaction_type == CompactionType::FULL_COMPACTION); + switch (compaction_type) { + case CompactionType::BASE_COMPACTION: + RETURN_IF_ERROR(_submit_base_compaction_task(tablet)); + return Status::OK(); + case CompactionType::CUMULATIVE_COMPACTION: + RETURN_IF_ERROR(_submit_cumulative_compaction_task(tablet)); + return Status::OK(); + case CompactionType::FULL_COMPACTION: + RETURN_IF_ERROR(_submit_full_compaction_task(tablet)); + return Status::OK(); + default: + return Status::InternalError("unknown compaction type!"); + } +} + +void CloudStorageEngine::_lease_compaction_thread_callback() { + while (!_stop_background_threads_latch.wait_for( + std::chrono::seconds(config::lease_compaction_interval_seconds))) { + std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions; + std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; + { + std::lock_guard lock(_compaction_mtx); + for (auto& [_, base] : _submitted_base_compactions) { + if (base) { // `base` might be a nullptr placeholder + base_compactions.push_back(base); + } + } + for (auto& [_, cumus] : _submitted_cumu_compactions) { + for (auto& cumu : cumus) { + cumu_compactions.push_back(cumu); + } + } + } + // TODO(plat1ko): Support batch lease rpc + for (auto& comp : cumu_compactions) { + comp->do_lease(); + } + for (auto& comp : base_compactions) { + comp->do_lease(); + } + } +} + +Status CloudStorageEngine::get_compaction_status_json(std::string* result) { Review Comment: warning: method 'get_compaction_status_json' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_storage_engine.h:80: ```diff - Status get_compaction_status_json(std::string* result); + static Status get_compaction_status_json(std::string* result); ``` ########## be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp: ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/vertical_beta_rowset_writer_helper.h" + +#include <fmt/format.h> +#include <gen_cpp/olap_file.pb.h> + +#include <algorithm> +#include <atomic> +#include <mutex> +#include <ostream> +#include <string> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/logging.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_writer_context.h" +#include "util/slice.h" +#include "util/spinlock.h" +#include "vec/core/block.h" + +namespace doris { +using namespace ErrorCode; + +VerticalBetaRowsetWriterHelper::VerticalBetaRowsetWriterHelper( + std::vector<std::unique_ptr<segment_v2::SegmentWriter>>* segment_writers, + bool already_built, RowsetMetaSharedPtr& rowset_meta, std::atomic<int32_t>* num_segment, + RowsetWriterContext& context, std::atomic<int64_t>* num_rows_written, + std::vector<KeyBoundsPB>* segments_encoded_key_bounds, + std::vector<uint32_t>* segment_num_rows, std::atomic<int64_t>* total_index_size, + std::vector<io::FileWriterPtr>* file_writers, std::atomic<int64_t>* total_data_size, + SpinLock* lock) + : _segment_writers(segment_writers), + _already_built(already_built), + _rowset_meta(rowset_meta), + _num_segment(num_segment), + _context(context), + _num_rows_written(num_rows_written), + _segments_encoded_key_bounds(segments_encoded_key_bounds), + _segment_num_rows(segment_num_rows), + _total_index_size(total_index_size), + _file_writers(file_writers), + _total_data_size(total_data_size), + _lock(lock) {} + +void VerticalBetaRowsetWriterHelper::destruct_writer() { + if (!_already_built) { + const auto& fs = _rowset_meta->fs(); + if (!fs || !_rowset_meta->is_local()) { // Remote fs will delete them asynchronously + return; + } + for (auto& segment_writer : *_segment_writers) { + segment_writer.reset(); + } + for (int i = 0; i < *_num_segment; ++i) { + auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); + // Even if an error is encountered, these files that have not been cleaned up + // will be cleaned up by the GC background. So here we only print the error + // message when we encounter an error. + WARN_IF_ERROR(fs->delete_file(path), fmt::format("Failed to delete file={}", path)); + } + } +} Review Comment: warning: function 'add_columns' has cognitive complexity of 79 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status VerticalBetaRowsetWriterHelper::add_columns(const vectorized::Block* block, ^ ``` <details> <summary>Additional context</summary> **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:86:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (num_rows == 0) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:89:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (UNLIKELY(max_rows_per_segment > _context.max_rows_per_segment)) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:93:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_segment_writers->empty()) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:97:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:97:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:100:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:100:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:101:** +1, nesting level increased to 1 ```cpp } else if (is_key) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:102:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if ((*_segment_writers)[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:104:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx], true)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:104:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx], true)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:107:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:107:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:111:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:111:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:112:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:119:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_cur_writer_idx == 0 && num_rows_written == 0) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:119:** +1 ```cpp if (_cur_writer_idx == 0 && num_rows_written == 0) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:121:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:121:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:126:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (num_rows_written + num_rows >= num_rows_key_group && ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:128:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block( ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:128:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block( ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:130:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx])); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:130:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx])); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:135:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:135:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:139:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (limit > 0) { ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:140:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:140:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp:146:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (is_key) { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org