This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6da36e10773 [feature](merge-cloud) Refactor write path code by abstract base class (#26537) 6da36e10773 is described below commit 6da36e107739905c1f2665028c5788f1a7313902 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Fri Dec 8 14:50:36 2023 +0800 [feature](merge-cloud) Refactor write path code by abstract base class (#26537) Refactor write path code by abstract base class. Whether to use `StorageEngine` or `CloudStorageEngine` will be determined during compilation instead of runtime `config::cloud_mode` to avoid unexpected null pointer or undefined behavior issues caused by merging code. Class that depend on `StorageEngine` but are shared by the cloud mode need to have an abstract base class. Common code should be extracted into the base class, while the code that depends on `StorageEngine` should be implemented in a `StorageEngine` mix-in class of the base class. --- be/src/cloud/config.cpp | 2 +- be/src/cloud/config.h | 2 +- be/src/olap/delta_writer.cpp | 101 +++++----- be/src/olap/delta_writer.h | 71 ++++--- be/src/olap/rowset/beta_rowset_writer.cpp | 215 ++++++++++++--------- be/src/olap/rowset/beta_rowset_writer.h | 118 ++++++----- be/src/olap/rowset/beta_rowset_writer_v2.h | 4 - be/src/olap/rowset/rowset_factory.cpp | 5 +- be/src/olap/rowset/rowset_writer.h | 4 - be/src/olap/rowset/segcompaction.h | 4 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 14 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 10 +- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 3 + be/src/olap/rowset/vertical_beta_rowset_writer.h | 7 +- be/src/olap/rowset_builder.cpp | 151 +++++++-------- be/src/olap/rowset_builder.h | 53 +++-- be/src/runtime/exec_env.cpp | 2 +- be/src/runtime/exec_env.h | 6 + be/src/runtime/load_channel.cpp | 33 +++- be/src/runtime/load_channel.h | 41 +--- be/src/runtime/load_stream.h | 12 +- be/src/runtime/load_stream_mgr.h | 8 +- be/src/runtime/load_stream_writer.cpp | 20 +- be/src/runtime/load_stream_writer.h | 22 +-- be/src/runtime/tablets_channel.cpp | 202 ++++++++++--------- be/src/runtime/tablets_channel.h | 97 +++++----- be/src/vec/olap/block_reader.cpp | 13 +- be/src/vec/olap/vertical_block_reader.cpp | 11 +- be/test/olap/delta_writer_test.cpp | 42 ++-- .../olap/engine_storage_migration_task_test.cpp | 9 +- be/test/olap/memtable_memory_limiter_test.cpp | 7 +- be/test/olap/tablet_cooldown_test.cpp | 7 +- 32 files changed, 680 insertions(+), 616 deletions(-) diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index e7e1510b2ba..4d9da1e9cfc 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -20,7 +20,7 @@ namespace doris { namespace config { -DEFINE_Bool(cloud_mode, "false"); +// TODO } // namespace config } // namespace doris diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index da7fea826cc..21a3b6052f5 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -22,7 +22,7 @@ namespace doris { namespace config { -DECLARE_Bool(cloud_mode); +// TODO } // namespace config } // namespace doris diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 310ffacad9a..dc5973bf04d 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -28,7 +28,7 @@ #include <string> #include <utility> -#include "cloud/config.h" +// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" @@ -41,6 +41,7 @@ #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/rowset_builder.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" @@ -57,25 +58,29 @@ namespace doris { using namespace ErrorCode; -Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, RuntimeProfile* profile, - const UniqueId& load_id) { - *writer = new DeltaWriter(req, StorageEngine::instance(), profile, load_id); - return Status::OK(); +BaseDeltaWriter::BaseDeltaWriter(WriteRequest* req, RuntimeProfile* profile, + const UniqueId& load_id) + : _req(*req), _memtable_writer(new MemTableWriter(*req)) { + _init_profile(profile); } -DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, +DeltaWriter::DeltaWriter(StorageEngine& engine, WriteRequest* req, RuntimeProfile* profile, const UniqueId& load_id) - : _req(*req), _rowset_builder(*req, profile), _memtable_writer(new MemTableWriter(*req)) { - _init_profile(profile); + : BaseDeltaWriter(req, profile, load_id), _engine(engine) { + _rowset_builder = std::make_unique<RowsetBuilder>(_engine, *req, profile); } -void DeltaWriter::_init_profile(RuntimeProfile* profile) { +void BaseDeltaWriter::_init_profile(RuntimeProfile* profile) { _profile = profile->create_child(fmt::format("DeltaWriter {}", _req.tablet_id), true, true); _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime"); +} + +void DeltaWriter::_init_profile(RuntimeProfile* profile) { + BaseDeltaWriter::_init_profile(profile); _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime"); } -DeltaWriter::~DeltaWriter() { +BaseDeltaWriter::~BaseDeltaWriter() { if (!_is_init) { return; } @@ -83,33 +88,35 @@ DeltaWriter::~DeltaWriter() { // cancel and wait all memtables in flush queue to be finished static_cast<void>(_memtable_writer->cancel()); - if (_rowset_builder.tablet() != nullptr) { + if (_rowset_builder->tablet() != nullptr) { const FlushStatistic& stat = _memtable_writer->get_flush_token_stats(); - _rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes); - _rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count); + _rowset_builder->tablet()->flush_bytes->increment(stat.flush_size_bytes); + _rowset_builder->tablet()->flush_finish_count->increment(stat.flush_finish_count); } } -Status DeltaWriter::init() { +DeltaWriter::~DeltaWriter() = default; + +Status BaseDeltaWriter::init() { if (_is_init) { return Status::OK(); } - RETURN_IF_ERROR(_rowset_builder.init()); - RETURN_IF_ERROR( - _memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(), - _rowset_builder.get_partial_update_info(), - _rowset_builder.tablet()->enable_unique_key_merge_on_write())); + RETURN_IF_ERROR(_rowset_builder->init()); + RETURN_IF_ERROR(_memtable_writer->init( + _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(), + _rowset_builder->get_partial_update_info(), + _rowset_builder->tablet()->enable_unique_key_merge_on_write())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; return Status::OK(); } -Status DeltaWriter::append(const vectorized::Block* block) { +Status BaseDeltaWriter::append(const vectorized::Block* block) { return write(block, {}, true); } -Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, - bool is_append) { +Status BaseDeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs, + bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); } @@ -121,11 +128,11 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint } return _memtable_writer->write(block, row_idxs, is_append); } -Status DeltaWriter::wait_flush() { +Status BaseDeltaWriter::wait_flush() { return _memtable_writer->wait_flush(); } -Status DeltaWriter::close() { +Status BaseDeltaWriter::close() { _lock_watch.start(); std::lock_guard<std::mutex> l(_lock); _lock_watch.stop(); @@ -140,37 +147,31 @@ Status DeltaWriter::close() { return _memtable_writer->close(); } -Status DeltaWriter::build_rowset() { +Status BaseDeltaWriter::build_rowset() { std::lock_guard<std::mutex> l(_lock); DCHECK(_is_init) << "delta writer is supposed be to initialized before build_rowset() being called"; SCOPED_TIMER(_close_wait_timer); RETURN_IF_ERROR(_memtable_writer->close_wait(_profile)); - return _rowset_builder.build_rowset(); + return _rowset_builder->build_rowset(); } -Status DeltaWriter::submit_calc_delete_bitmap_task() { - return _rowset_builder.submit_calc_delete_bitmap_task(); +Status BaseDeltaWriter::submit_calc_delete_bitmap_task() { + return _rowset_builder->submit_calc_delete_bitmap_task(); } -Status DeltaWriter::wait_calc_delete_bitmap() { - return _rowset_builder.wait_calc_delete_bitmap(); +Status BaseDeltaWriter::wait_calc_delete_bitmap() { + return _rowset_builder->wait_calc_delete_bitmap(); } -Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, - const bool write_single_replica) { - if (config::cloud_mode) { - return Status::OK(); - } +Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes) { std::lock_guard<std::mutex> l(_lock); SCOPED_TIMER(_commit_txn_timer); - RETURN_IF_ERROR(_rowset_builder.commit_txn()); + RETURN_IF_ERROR(_rowset_builder->commit_txn()); - if (write_single_replica) { - for (auto node_info : slave_tablet_nodes.slave_nodes()) { - _request_slave_tablet_pull_rowset(node_info); - } + for (auto&& node_info : slave_tablet_nodes.slave_nodes()) { + _request_slave_tablet_pull_rowset(node_info); } return Status::OK(); } @@ -191,11 +192,11 @@ void DeltaWriter::add_finished_slave_replicas( success_slave_tablet_node_ids->insert({_req.tablet_id, _success_slave_node_ids}); } -Status DeltaWriter::cancel() { +Status BaseDeltaWriter::cancel() { return cancel_with_status(Status::Cancelled("already cancelled")); } -Status DeltaWriter::cancel_with_status(const Status& st) { +Status BaseDeltaWriter::cancel_with_status(const Status& st) { std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { return Status::OK(); @@ -205,14 +206,11 @@ Status DeltaWriter::cancel_with_status(const Status& st) { return Status::OK(); } -int64_t DeltaWriter::mem_consumption(MemType mem) { +int64_t BaseDeltaWriter::mem_consumption(MemType mem) { return _memtable_writer->mem_consumption(mem); } void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { - if (config::cloud_mode) [[unlikely]] { - return; - } std::shared_ptr<PBackendService_Stub> stub = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( node_info.host(), node_info.async_internal_port()); @@ -224,15 +222,14 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { return; } - StorageEngine::instance()->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, - _req.tablet_id, this); + _engine.txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, _req.tablet_id, this); { std::lock_guard<std::shared_mutex> lock(_slave_node_lock); _unfinished_slave_node.insert(node_info.id()); } std::vector<std::pair<int64_t, std::string>> indices_ids; - auto cur_rowset = _rowset_builder.rowset(); + auto cur_rowset = _rowset_builder->rowset(); auto tablet_schema = cur_rowset->rowset_meta()->tablet_schema(); if (!tablet_schema->skip_write_index_on_load()) { for (auto& column : tablet_schema->columns()) { @@ -247,7 +244,7 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { *(request->mutable_rowset_meta()) = cur_rowset->rowset_meta()->get_rowset_pb(); request->set_host(BackendOptions::get_localhost()); request->set_http_port(config::webserver_port); - string tablet_path = _rowset_builder.tablet()->tablet_path(); + string tablet_path = _rowset_builder->tablet()->tablet_path(); request->set_rowset_path(tablet_path); request->set_token(ExecEnv::GetInstance()->token()); request->set_brpc_port(config::brpc_port); @@ -314,8 +311,8 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succe _unfinished_slave_node.erase(node_id); } -int64_t DeltaWriter::num_rows_filtered() const { - auto rowset_writer = _rowset_builder.rowset_writer(); +int64_t BaseDeltaWriter::num_rows_filtered() const { + auto rowset_writer = _rowset_builder->rowset_writer(); return rowset_writer == nullptr ? 0 : rowset_writer->num_rows_filtered(); } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index d7e351a168e..c782c5ef3b6 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -20,7 +20,6 @@ #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> #include <gen_cpp/types.pb.h> -#include <stdint.h> #include <atomic> #include <memory> @@ -34,7 +33,6 @@ #include "olap/memtable_writer.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" -#include "olap/rowset_builder.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" @@ -56,14 +54,15 @@ namespace vectorized { class Block; } // namespace vectorized +class BaseRowsetBuilder; + // Writer for a particular (load, index, tablet). // This class is NOT thread-safe, external synchronization is required. -class DeltaWriter { +class BaseDeltaWriter { public: - static Status open(WriteRequest* req, DeltaWriter** writer, RuntimeProfile* profile, - const UniqueId& load_id = TUniqueId()); + BaseDeltaWriter(WriteRequest* req, RuntimeProfile* profile, const UniqueId& load_id); - ~DeltaWriter(); + virtual ~BaseDeltaWriter(); Status init(); @@ -79,15 +78,6 @@ public: Status build_rowset(); Status submit_calc_delete_bitmap_task(); Status wait_calc_delete_bitmap(); - Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool write_single_replica); - - bool check_slave_replicas_done(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* - success_slave_tablet_node_ids); - - void add_finished_slave_replicas(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* - success_slave_tablet_node_ids); - - void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); // abandon current memtable and wait for all pending-flushing memtables to be destructed. // mem_consumption() should be 0 after this function returns. @@ -109,34 +99,55 @@ public: int64_t num_rows_filtered() const; - // For UT - DeleteBitmapPtr get_delete_bitmap() { return _rowset_builder.get_delete_bitmap(); } - -private: - DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, - const UniqueId& load_id); - - void _request_slave_tablet_pull_rowset(PNodeInfo node_info); - - void _init_profile(RuntimeProfile* profile); +protected: + virtual void _init_profile(RuntimeProfile* profile); bool _is_init = false; bool _is_cancelled = false; WriteRequest _req; - RowsetBuilder _rowset_builder; + std::unique_ptr<BaseRowsetBuilder> _rowset_builder; std::shared_ptr<MemTableWriter> _memtable_writer; std::mutex _lock; - std::unordered_set<int64_t> _unfinished_slave_node; - PSuccessSlaveTabletNodeIds _success_slave_node_ids; - std::shared_mutex _slave_node_lock; + // total rows num written by DeltaWriter + std::atomic<int64_t> _total_received_rows = 0; RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _close_wait_timer = nullptr; - RuntimeProfile::Counter* _commit_txn_timer = nullptr; MonotonicStopWatch _lock_watch; }; +// `StorageEngine` mixin for `BaseDeltaWriter` +class DeltaWriter final : public BaseDeltaWriter { +public: + DeltaWriter(StorageEngine& engine, WriteRequest* req, RuntimeProfile* profile, + const UniqueId& load_id); + + ~DeltaWriter() override; + + Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes); + + bool check_slave_replicas_done(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* + success_slave_tablet_node_ids); + + void add_finished_slave_replicas(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* + success_slave_tablet_node_ids); + + void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); + +private: + void _init_profile(RuntimeProfile* profile) override; + + void _request_slave_tablet_pull_rowset(PNodeInfo node_info); + + StorageEngine& _engine; + std::unordered_set<int64_t> _unfinished_slave_node; + PSuccessSlaveTabletNodeIds _success_slave_node_ids; + std::shared_mutex _slave_node_lock; + + RuntimeProfile::Counter* _commit_txn_timer = nullptr; +}; + } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index ab9e97a3d3b..6850ce0f43c 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -25,10 +25,11 @@ #include <ctime> // time #include <filesystem> +#include <memory> #include <sstream> #include <utility> -#include "cloud/config.h" +// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" @@ -40,6 +41,7 @@ #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_writer.h" +#include "olap/rowset/segcompaction.h" #include "olap/rowset/segment_v2/inverted_index_cache.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/segment.h" @@ -59,27 +61,53 @@ namespace doris { using namespace ErrorCode; -BetaRowsetWriter::BetaRowsetWriter() +namespace { + +bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { + std::string_view last; + for (auto&& segment_encode_key : segments_encoded_key_bounds) { + auto&& cur_min = segment_encode_key.min_key(); + auto&& cur_max = segment_encode_key.max_key(); + if (cur_min <= last) { + return true; + } + last = cur_max; + } + return false; +} + +void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, + const RowsetMeta& spec_rowset_meta) { + rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); + rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); + rowset_meta.set_data_disk_size(spec_rowset_meta.total_disk_size()); + rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); + // TODO write zonemap to meta + rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); + rowset_meta.set_creation_time(time(nullptr)); + rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); + rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); + rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); + + std::vector<KeyBoundsPB> segments_key_bounds; + spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); + rowset_meta.set_segments_key_bounds(segments_key_bounds); +} + +} // namespace + +BaseBetaRowsetWriter::BaseBetaRowsetWriter() : _rowset_meta(nullptr), _num_segment(0), _segment_start_id(0), - _segcompacted_point(0), - _num_segcompacted(0), _num_rows_written(0), _total_data_size(0), - _total_index_size(0), - _segcompaction_worker(this), - _is_doing_segcompaction(false) { - _segcompaction_status.store(OK); -} + _total_index_size(0) {} -BetaRowsetWriter::~BetaRowsetWriter() { - /* Note that segcompaction is async and in parallel with load job. So we should handle carefully - * when the job is cancelled. Although it is meaningless to continue segcompaction when the job - * is cancelled, the objects involved in the job should be preserved during segcompaction to - * avoid crashs for memory issues. */ - WARN_IF_ERROR(wait_flying_segcompaction(), "segment compaction failed"); +BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) + : _engine(engine), _segcompaction_worker(std::make_unique<SegcompactionWorker>(this)) {} +BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { // TODO(lingbin): Should wrapper exception logic, no need to know file ops directly. if (!_already_built) { // abnormal exit, remove all files generated WARN_IF_ERROR(_segment_creator.close(), @@ -100,7 +128,15 @@ BetaRowsetWriter::~BetaRowsetWriter() { } } -Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { +BetaRowsetWriter::~BetaRowsetWriter() { + /* Note that segcompaction is async and in parallel with load job. So we should handle carefully + * when the job is cancelled. Although it is meaningless to continue segcompaction when the job + * is cancelled, the objects involved in the job should be preserved during segcompaction to + * avoid crashs for memory issues. */ + WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); +} + +Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { _context = rowset_writer_context; _rowset_meta.reset(new RowsetMeta); _rowset_meta->set_fs(_context.fs); @@ -121,21 +157,17 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) } _rowset_meta->set_tablet_uid(_context.tablet_uid); _rowset_meta->set_tablet_schema(_context.tablet_schema); - _context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriter>>(this); - _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriter>>(this); + _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); + _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); RETURN_IF_ERROR(_segment_creator.init(_context)); return Status::OK(); } -Status BetaRowsetWriter::add_block(const vectorized::Block* block) { +Status BaseBetaRowsetWriter::add_block(const vectorized::Block* block) { return _segment_creator.add_block(block); } Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { - if (config::cloud_mode) { - // TODO(plat1ko) - return Status::NotSupported("_generate_delete_bitmap"); - } SCOPED_RAW_TIMER(&_delete_bitmap_ns); if (!_context.tablet->enable_unique_key_merge_on_write() || (_context.partial_update_info && _context.partial_update_info->is_partial_update)) { @@ -374,12 +406,11 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { } else if ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { SegCompactionCandidatesSharedPtr segments; status = _find_longest_consecutive_small_segment(segments); - if (LIKELY(status.ok()) && (segments->size() > 0)) { + if (LIKELY(status.ok()) && (!segments->empty())) { LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment << ", segcompacted_point:" << _segcompacted_point; - status = StorageEngine::instance()->submit_seg_compaction_task(&_segcompaction_worker, - segments); + status = _engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments); if (status.ok()) { return status; } @@ -415,7 +446,7 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { return Status::OK(); } -Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { +Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir, _context.rowset_id)); _num_rows_written += rowset->num_rows(); @@ -438,17 +469,17 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { return Status::OK(); } -Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { +Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { // TODO use schema_mapping to transfer zonemap return add_rowset(rowset); } -Status BetaRowsetWriter::flush() { +Status BaseBetaRowsetWriter::flush() { return _segment_creator.flush(); } -Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id, - int64_t* flush_size) { +Status BaseBetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id, + int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); } @@ -460,11 +491,11 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen return Status::OK(); } -Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) { +Status BaseBetaRowsetWriter::flush_single_block(const vectorized::Block* block) { return _segment_creator.flush_single_block(block); } -Status BetaRowsetWriter::wait_flying_segcompaction() { +Status BetaRowsetWriter::_wait_flying_segcompaction() { std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); uint64_t begin_wait = GetCurrentTimeMicros(); while (_is_doing_segcompaction) { @@ -481,12 +512,12 @@ Status BetaRowsetWriter::wait_flying_segcompaction() { return Status::OK(); } -RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { +RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { if (_rowset_meta->newest_write_timestamp() == -1) { _rowset_meta->set_newest_write_timestamp(UnixSeconds()); } - _build_rowset_meta_with_spec_field(_rowset_meta, spec_rowset_meta); + build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); RowsetSharedPtr rowset; auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, &rowset); @@ -498,7 +529,7 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r return rowset; } -Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { +Status BaseBetaRowsetWriter::_close_file_writers() { // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& file_writer : _file_writers) { RETURN_NOT_OK_STATUS_WITH_WARN( @@ -506,20 +537,31 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { fmt::format("failed to close file writer, path={}", file_writer->path().string())); } RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), - "failed to close segment creator when build new rowset") + "failed to close segment creator when build new rowset"); + return Status::OK(); +} + +Status BetaRowsetWriter::_close_file_writers() { + RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); // if _segment_start_id is not zero, that means it's a transient rowset writer for // MoW partial update, don't need to do segment compaction. if (_segment_start_id == 0) { - _segcompaction_worker.cancel(); - RETURN_NOT_OK_STATUS_WITH_WARN(wait_flying_segcompaction(), + _segcompaction_worker->cancel(); + RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), "segcompaction failed when build new rowset"); RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), "rename last segments failed when build new rowset"); - if (_segcompaction_worker.get_file_writer()) { - RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(), + if (_segcompaction_worker->get_file_writer()) { + RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(), "close segment compaction worker failed"); } } + return Status::OK(); +} + +Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) { + RETURN_IF_ERROR(_close_file_writers()); + RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(), "too many segments when build new rowset"); _build_rowset_meta(_rowset_meta); @@ -541,25 +583,19 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { return Status::OK(); } -bool BetaRowsetWriter::_is_segment_overlapping( - const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { - std::string last; - for (auto segment_encode_key : segments_encoded_key_bounds) { - auto cur_min = segment_encode_key.min_key(); - auto cur_max = segment_encode_key.max_key(); - if (cur_min <= last) { - return true; - } - last = cur_max; - } - return false; +int64_t BaseBetaRowsetWriter::_num_seg() const { + return _num_segment; +} + +int64_t BetaRowsetWriter::_num_seg() const { + return _is_segcompacted() ? _num_segcompacted : _num_segment; } // update tablet schema when meet variant columns, before commit_txn // Eg. rowset schema: A(int), B(float), C(int), D(int) // _tabelt->tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) -void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) { +void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) { std::lock_guard<std::mutex> lock(*(_context.schema_lock)); TabletSchemaSPtr update_schema; static_cast<void>(vectorized::schema_util::get_least_common_schema( @@ -573,26 +609,7 @@ void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) { VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure(); } -void BetaRowsetWriter::_build_rowset_meta_with_spec_field( - RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr& spec_rowset_meta) { - rowset_meta->set_num_rows(spec_rowset_meta->num_rows()); - rowset_meta->set_total_disk_size(spec_rowset_meta->total_disk_size()); - rowset_meta->set_data_disk_size(spec_rowset_meta->total_disk_size()); - rowset_meta->set_index_disk_size(spec_rowset_meta->index_disk_size()); - // TODO write zonemap to meta - rowset_meta->set_empty(spec_rowset_meta->num_rows() == 0); - rowset_meta->set_creation_time(time(nullptr)); - rowset_meta->set_num_segments(spec_rowset_meta->num_segments()); - rowset_meta->set_segments_overlap(spec_rowset_meta->segments_overlap()); - rowset_meta->set_rowset_state(spec_rowset_meta->rowset_state()); - - std::vector<KeyBoundsPB> segments_key_bounds; - spec_rowset_meta->get_segments_key_bounds(&segments_key_bounds); - rowset_meta->set_segments_key_bounds(segments_key_bounds); -} - -void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta) { - int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; +void BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta) { int64_t num_rows_written = 0; int64_t total_data_size = 0; int64_t total_index_size = 0; @@ -613,11 +630,11 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met // segment key bounds are empty in old version(before version 1.2.x). So we should not modify // the overlap property when key bounds are empty. if (!segments_encoded_key_bounds.empty() && - !_is_segment_overlapping(segments_encoded_key_bounds)) { + !is_segment_overlapping(segments_encoded_key_bounds)) { rowset_meta->set_segments_overlap(NONOVERLAPPING); } - rowset_meta->set_num_segments(num_seg); + rowset_meta->set_num_segments(_num_seg()); // TODO(zhangzhengyu): key_bounds.size() should equal num_seg, but currently not always rowset_meta->set_num_rows(num_rows_written + _num_rows_written); rowset_meta->set_total_disk_size(total_data_size + _total_data_size); @@ -635,7 +652,7 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met } } -RowsetSharedPtr BetaRowsetWriter::_build_tmp() { +RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() { std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>(); *rowset_meta_ = *_rowset_meta; _build_rowset_meta(rowset_meta_); @@ -650,7 +667,7 @@ RowsetSharedPtr BetaRowsetWriter::_build_tmp() { return rowset; } -Status BetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr& file_writer) { +Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr& file_writer) { auto fs = _rowset_meta->fs(); if (!fs) { return Status::Error<INIT_FAILED>("get fs failed"); @@ -673,7 +690,8 @@ Status BetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr return Status::OK(); } -Status BetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer) { +Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, + io::FileWriterPtr& file_writer) { std::string path; path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); return _create_file_writer(path, file_writer); @@ -693,15 +711,28 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( writer_options.write_type = _context.write_type; writer_options.write_type = DataWriteType::TYPE_COMPACTION; - writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted, - _context.tablet_schema, _context.tablet, - _context.data_dir, _context.max_rows_per_segment, - writer_options, _context.mow_context)); - if (_segcompaction_worker.get_file_writer() != nullptr) { - RETURN_IF_ERROR(_segcompaction_worker.get_file_writer()->close()); + *writer = std::make_unique<segment_v2::SegmentWriter>( + file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, + _context.data_dir, _context.max_rows_per_segment, writer_options, _context.mow_context); + if (_segcompaction_worker->get_file_writer() != nullptr) { + RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); } - _segcompaction_worker.get_file_writer().reset(file_writer.release()); + _segcompaction_worker->get_file_writer().reset(file_writer.release()); + + return Status::OK(); +} +Status BaseBetaRowsetWriter::_check_segment_number_limit() { + size_t total_segment_num = _num_segment + 1; + DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", + { total_segment_num = dp->param("segnum", 1024); }); + if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { + return Status::Error<TOO_MANY_SEGMENTS>( + "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " + "_num_segment:{}, ", + _context.tablet_id, _context.rowset_id.to_string(), + config::max_segment_num_per_rowset, _num_segment); + } return Status::OK(); } @@ -720,8 +751,8 @@ Status BetaRowsetWriter::_check_segment_number_limit() { return Status::OK(); } -Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, - TabletSchemaSPtr flush_schema) { +Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) { uint32_t segid_offset = segment_id - _segment_start_id; { std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); @@ -750,10 +781,16 @@ Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistic if (_context.mow_context != nullptr) { RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); } - RETURN_IF_ERROR(_segcompaction_if_necessary()); return Status::OK(); } +Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) { + RETURN_IF_ERROR( + BaseBetaRowsetWriter::add_segment(segment_id, segstat, std::move(flush_schema))); + return _segcompaction_if_necessary(); +} + Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, KeyBoundsPB& key_bounds) { diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 68932c5ef77..347994e243e 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -19,8 +19,6 @@ #include <fmt/format.h> #include <gen_cpp/olap_file.pb.h> -#include <stddef.h> -#include <stdint.h> #include <algorithm> #include <atomic> @@ -43,7 +41,6 @@ #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_creator.h" -#include "segcompaction.h" #include "segment_v2/segment.h" #include "util/spinlock.h" @@ -59,21 +56,19 @@ class SegmentWriter; using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>; using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>; -class BetaRowsetWriter : public RowsetWriter { - friend class SegcompactionWorker; - +class BaseBetaRowsetWriter : public RowsetWriter { public: - BetaRowsetWriter(); + BaseBetaRowsetWriter(); - ~BetaRowsetWriter() override; + ~BaseBetaRowsetWriter() override; Status init(const RowsetWriterContext& rowset_writer_context) override; Status add_block(const vectorized::Block* block) override; + // Declare these interface in `BaseBetaRowsetWriter` // add rowset by create hard link Status add_rowset(RowsetSharedPtr rowset) override; - Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override; Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) override; @@ -114,14 +109,6 @@ public: int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; - Status flush_segment_writer_for_segcompaction( - std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, - KeyBoundsPB& key_bounds); - - bool is_doing_segcompaction() const override { return _is_doing_segcompaction; } - - Status wait_flying_segcompaction() override; - void set_segment_start_id(int32_t start_id) override { _segment_creator.set_segment_start_id(start_id); _segment_start_id = start_id; @@ -142,36 +129,20 @@ public: const RowsetWriterContext& context() const override { return _context; } private: - Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); - Status _check_segment_number_limit(); - Status _generate_delete_bitmap(int32_t segment_id); + virtual Status _generate_delete_bitmap(int32_t segment_id) = 0; void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta); - // segment compaction - Status _create_segment_writer_for_segcompaction( - std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end); - Status _segcompaction_if_necessary(); - Status _segcompaction_rename_last_segments(); - Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); - Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); - bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } - - bool _check_and_set_is_doing_segcompaction(); - - void _build_rowset_meta_with_spec_field(RowsetMetaSharedPtr rowset_meta, - const RowsetMetaSharedPtr& spec_rowset_meta); - bool _is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds); - void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin, uint64_t end); - Status _rename_compacted_segments(int64_t begin, int64_t end); - Status _rename_compacted_segment_plain(uint64_t seg_id); - Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id); - void update_rowset_schema(TabletSchemaSPtr flush_schema); // build a tmp rowset for load segment to calc delete_bitmap // for this segment +protected: + Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); + virtual Status _close_file_writers(); + virtual Status _check_segment_number_limit(); + virtual int64_t _num_seg() const; + // build a tmp rowset for load segment to calc delete_bitmap for this segment RowsetSharedPtr _build_tmp(); -protected: RowsetWriterContext _context; std::shared_ptr<RowsetMeta> _rowset_meta; @@ -179,9 +150,6 @@ protected: roaring::Roaring _segment_set; // bitmap set to record flushed segment id std::mutex _segment_set_mutex; // mutex for _segment_set int32_t _segment_start_id; // basic write start from 0, partial update may be different - std::atomic<int32_t> _segcompacted_point; // segemnts before this point have - // already been segment compacted - std::atomic<int32_t> _num_segcompacted; // index for segment compaction mutable SpinLock _lock; // protect following vectors. // record rows number of every segment already written, using for rowid @@ -204,15 +172,6 @@ protected: bool _already_built = false; SegmentCreator _segment_creator; - SegcompactionWorker _segcompaction_worker; - - // ensure only one inflight segcompaction task for each rowset - std::atomic<bool> _is_doing_segcompaction; - // enforce compare-and-swap on _is_doing_segcompaction - std::mutex _is_doing_segcompaction_lock; - std::condition_variable _segcompacting_cond; - - std::atomic<int> _segcompaction_status; fmt::memory_buffer vlog_buffer; @@ -222,4 +181,59 @@ protected: int64_t _segment_writer_ns = 0; }; +class SegcompactionWorker; + +// `StorageEngine` mixin for `BaseBetaRowsetWriter` +class BetaRowsetWriter : public BaseBetaRowsetWriter { +public: + BetaRowsetWriter(StorageEngine& engine); + + ~BetaRowsetWriter() override; + + Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, + TabletSchemaSPtr flush_schema) override; + + Status flush_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, + KeyBoundsPB& key_bounds); + +private: + Status _generate_delete_bitmap(int32_t segment_id) override; + + // segment compaction + friend class SegcompactionWorker; + Status _close_file_writers() override; + Status _check_segment_number_limit() override; + int64_t _num_seg() const override; + Status _wait_flying_segcompaction(); + Status _create_segment_writer_for_segcompaction( + std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end); + Status _segcompaction_if_necessary(); + Status _segcompaction_rename_last_segments(); + Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); + Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); + bool _is_segcompacted() const { return _num_segcompacted > 0; } + bool _check_and_set_is_doing_segcompaction(); + Status _rename_compacted_segments(int64_t begin, int64_t end); + Status _rename_compacted_segment_plain(uint64_t seg_id); + Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id); + void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin, uint64_t end); + + StorageEngine& _engine; + + std::atomic<int32_t> _segcompacted_point {0}; // segemnts before this point have + // already been segment compacted + std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction + + std::unique_ptr<SegcompactionWorker> _segcompaction_worker; + + // ensure only one inflight segcompaction task for each rowset + std::atomic<bool> _is_doing_segcompaction {false}; + // enforce compare-and-swap on _is_doing_segcompaction + std::mutex _is_doing_segcompaction_lock; + std::condition_variable _segcompacting_cond; + + std::atomic<int> _segcompaction_status {ErrorCode::OK}; +}; + } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 4a99acdaba6..bdcd8a47a98 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -127,10 +127,6 @@ public: int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); }; - bool is_doing_segcompaction() const override { return false; } - - Status wait_flying_segcompaction() override { return Status::OK(); } - int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; } int64_t segment_writer_ns() override { return _segment_writer_ns; } diff --git a/be/src/olap/rowset/rowset_factory.cpp b/be/src/olap/rowset/rowset_factory.cpp index 8447154483c..9758a09e7ea 100644 --- a/be/src/olap/rowset/rowset_factory.cpp +++ b/be/src/olap/rowset/rowset_factory.cpp @@ -27,6 +27,7 @@ #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/vertical_beta_rowset_writer.h" +#include "olap/storage_engine.h" namespace doris { using namespace ErrorCode; @@ -51,10 +52,10 @@ Status RowsetFactory::create_rowset_writer(const RowsetWriterContext& context, b } if (context.rowset_type == BETA_ROWSET) { if (is_vertical) { - output->reset(new VerticalBetaRowsetWriter); + output->reset(new VerticalBetaRowsetWriter(*StorageEngine::instance())); return (*output)->init(context); } - output->reset(new BetaRowsetWriter); + output->reset(new BetaRowsetWriter(*StorageEngine::instance())); return (*output)->init(context); } return Status::Error<ROWSET_TYPE_NOT_FOUND>("invalid rowset_type"); diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 542528b1acb..d7ec494f0d6 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -146,10 +146,6 @@ public: virtual int32_t allocate_segment_id() = 0; - virtual bool is_doing_segcompaction() const = 0; - - virtual Status wait_flying_segcompaction() = 0; - virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not supported!"; } virtual int64_t delete_bitmap_ns() { return 0; } diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index a0f81e59c77..e2b5812ad8c 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -16,7 +16,6 @@ // under the License. #pragma once -#include <stdint.h> #include <memory> #include <vector> @@ -48,7 +47,7 @@ class SegcompactionWorker { friend class BetaRowsetWriter; public: - SegcompactionWorker(BetaRowsetWriter* writer); + explicit SegcompactionWorker(BetaRowsetWriter* writer); void compact_segments(SegCompactionCandidatesSharedPtr segments); @@ -75,6 +74,7 @@ private: private: //TODO(zhengyu): current impl depends heavily on the access to feilds of BetaRowsetWriter + // Currently cloud storage engine doesn't need segcompaction BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; std::atomic<bool> _cancelled = false; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index ac70ef8c1f4..2b8375da598 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -26,7 +26,7 @@ #include <unordered_map> #include <utility> -#include "cloud/config.h" +// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" // LOG @@ -44,8 +44,10 @@ #include "olap/rowset/segment_v2/page_pointer.h" #include "olap/segment_loader.h" #include "olap/short_key_index.h" +#include "olap/storage_engine.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "service/point_query_executor.h" #include "util/coding.h" @@ -336,11 +338,12 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) { // 3. set columns to data convertor and then write all columns Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block, size_t row_pos, size_t num_rows) { - if (config::cloud_mode) { - // TODO(plat1ko) + if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) { + // TODO(plat1ko): cloud mode return Status::NotSupported("append_block_with_partial_content"); } - auto tablet = static_cast<Tablet*>(_tablet.get()); + + auto* tablet = static_cast<Tablet*>(_tablet.get()); if (block->columns() <= _tablet_schema->num_key_columns() || block->columns() >= _tablet_schema->num_columns()) { return Status::InternalError( @@ -559,7 +562,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos) { - if (config::cloud_mode) [[unlikely]] { + if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) { + // TODO(plat1ko): cloud mode return Status::NotSupported("fill_missing_columns"); } auto tablet = static_cast<Tablet*>(_tablet.get()); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 91f890b7f96..842eecf3e10 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -26,7 +26,6 @@ #include <unordered_map> #include <utility> -#include "cloud/config.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" // LOG @@ -45,6 +44,7 @@ #include "olap/short_key_index.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "service/point_query_executor.h" #include "util/coding.h" @@ -283,10 +283,11 @@ void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl // 2.3 fill block // 3. set columns to data convertor and then write all columns Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data) { - if (config::cloud_mode) { - // TODO(plat1ko) + if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) { + // TODO(plat1ko): CloudStorageEngine return Status::NotSupported("append_block_with_partial_content"); } + DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); DCHECK(_opts.rowset_ctx->partial_update_info != nullptr); @@ -495,7 +496,8 @@ Status VerticalSegmentWriter::_fill_missing_columns( vectorized::MutableColumns& mutable_full_columns, const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos) { - if (config::cloud_mode) [[unlikely]] { + if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) { + // TODO(plat1ko): CloudStorageEngine return Status::NotSupported("fill_missing_columns"); } auto tablet = static_cast<Tablet*>(_tablet.get()); diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index cd9fee5fb4e..35f878dd17c 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -41,6 +41,9 @@ namespace doris { using namespace ErrorCode; +VerticalBetaRowsetWriter::VerticalBetaRowsetWriter(StorageEngine& engine) + : BetaRowsetWriter(engine) {} + VerticalBetaRowsetWriter::~VerticalBetaRowsetWriter() { if (!_already_built) { const auto& fs = _rowset_meta->fs(); diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.h b/be/src/olap/rowset/vertical_beta_rowset_writer.h index 8251ad0a07e..a1477dc2a71 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.h +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.h @@ -33,10 +33,11 @@ class Block; } // namespace vectorized // for vertical compaction -class VerticalBetaRowsetWriter : public BetaRowsetWriter { +// TODO(plat1ko): Inherited from template type `T`, `T` is `BetaRowsetWriter` or `CloudBetaRowsetWriter` +class VerticalBetaRowsetWriter final : public BetaRowsetWriter { public: - VerticalBetaRowsetWriter() : BetaRowsetWriter() {} - ~VerticalBetaRowsetWriter(); + VerticalBetaRowsetWriter(StorageEngine& engine); + ~VerticalBetaRowsetWriter() override; Status add_columns(const vectorized::Block* block, const std::vector<uint32_t>& col_ids, bool is_key, uint32_t max_rows_per_segment) override; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 21fbed78022..4675d668f41 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -26,7 +26,7 @@ #include <string> #include <utility> -#include "cloud/config.h" +// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/status.h" @@ -60,12 +60,16 @@ namespace doris { using namespace ErrorCode; -RowsetBuilder::RowsetBuilder(const WriteRequest& req, RuntimeProfile* profile) +BaseRowsetBuilder::BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile* profile) : _req(req), _tablet_schema(std::make_shared<TabletSchema>()) { _init_profile(profile); } -void RowsetBuilder::_init_profile(RuntimeProfile* profile) { +RowsetBuilder::RowsetBuilder(StorageEngine& engine, const WriteRequest& req, + RuntimeProfile* profile) + : BaseRowsetBuilder(req, profile), _engine(engine) {} + +void BaseRowsetBuilder::_init_profile(RuntimeProfile* profile) { _profile = profile->create_child(fmt::format("RowsetBuilder {}", _req.tablet_id), true, true); _build_rowset_timer = ADD_TIMER(_profile, "BuildRowsetTime"); _submit_delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapSubmitTime"); @@ -73,11 +77,7 @@ void RowsetBuilder::_init_profile(RuntimeProfile* profile) { _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime"); } -RowsetBuilder::~RowsetBuilder() { - if (_is_init && !_is_committed) { - _garbage_collection(); - } - +BaseRowsetBuilder::~BaseRowsetBuilder() { if (!_is_init) { return; } @@ -87,34 +87,39 @@ RowsetBuilder::~RowsetBuilder() { } } -void RowsetBuilder::_garbage_collection() { - if (config::cloud_mode) { - return; +RowsetBuilder::~RowsetBuilder() { + if (_is_init && !_is_committed) { + _garbage_collection(); } +} + +Tablet* RowsetBuilder::tablet() { + return static_cast<Tablet*>(_tablet.get()); +} + +TabletSharedPtr RowsetBuilder::tablet_sptr() { + return std::static_pointer_cast<Tablet>(_tablet); +} + +void RowsetBuilder::_garbage_collection() { Status rollback_status; - TxnManager* txn_mgr = StorageEngine::instance()->txn_manager(); - auto tablet = static_cast<Tablet*>(_tablet.get()); - if (tablet != nullptr) { - rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet, _req.txn_id); + TxnManager* txn_mgr = _engine.txn_manager(); + if (tablet() != nullptr) { + rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet(), _req.txn_id); } // has to check rollback status, because the rowset maybe committed in this thread and // published in another thread, then rollback will fail. // when rollback failed should not delete rowset if (rollback_status.ok()) { - StorageEngine::instance()->add_unused_rowset(_rowset); + _engine.add_unused_rowset(_rowset); } } Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context) { - if (config::cloud_mode) { - // TODO(plat1ko) - return Status::NotSupported("init_mow_context"); - } - auto tablet = static_cast<Tablet*>(_tablet.get()); - std::lock_guard<std::shared_mutex> lck(tablet->get_header_lock()); - int64_t cur_max_version = tablet->max_version_unlocked().second; + std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock()); + int64_t cur_max_version = tablet()->max_version_unlocked().second; // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet->tablet_state() == TABLET_NOTREADY) { + if (tablet()->tablet_state() == TABLET_NOTREADY) { // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' if (_req.table_schema_param->is_partial_update()) { return Status::InternalError( @@ -123,9 +128,9 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context) } _rowset_ids.clear(); } else { - RETURN_IF_ERROR(tablet->all_rs_id(cur_max_version, &_rowset_ids)); + RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids)); } - _delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id()); + _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id()); mow_context = std::make_shared<MowContext>(cur_max_version, _req.txn_id, _rowset_ids, _delete_bitmap); return Status::OK(); @@ -136,41 +141,31 @@ Status RowsetBuilder::check_tablet_version_count() { MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { return Status::OK(); } - if (config::cloud_mode) { - // TODO(plat1ko) - return Status::OK(); - } - auto tablet = std::static_pointer_cast<Tablet>(_tablet); //trigger compaction - auto st = StorageEngine::instance()->submit_compaction_task( - tablet, CompactionType::CUMULATIVE_COMPACTION, true); + auto st = _engine.submit_compaction_task(tablet_sptr(), CompactionType::CUMULATIVE_COMPACTION, + true); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "failed to trigger compaction, tablet_id=" << _tablet->tablet_id() << " : " << st; } - int version_count = tablet->version_count(); + int version_count = tablet()->version_count(); if (version_count > config::max_tablet_version_num) { return Status::Error<TOO_MANY_VERSION>( "failed to init rowset builder. version count: {}, exceed limit: {}, " "tablet: {}", - version_count, config::max_tablet_version_num, tablet->tablet_id()); + version_count, config::max_tablet_version_num, _tablet->tablet_id()); } return Status::OK(); } Status RowsetBuilder::prepare_txn() { - if (config::cloud_mode) { - // TODO(plat1ko) - return Status::OK(); - } - auto tablet = static_cast<Tablet*>(_tablet.get()); - std::shared_lock base_migration_lock(tablet->get_migration_lock(), std::try_to_lock); + std::shared_lock base_migration_lock(tablet()->get_migration_lock(), std::try_to_lock); if (!base_migration_lock.owns_lock()) { return Status::Error<TRY_LOCK_FAILED>("try migration lock failed"); } - std::lock_guard<std::mutex> push_lock(tablet->get_push_lock()); - return StorageEngine::instance()->txn_manager()->prepare_txn(_req.partition_id, *tablet, - _req.txn_id, _req.load_id); + std::lock_guard<std::mutex> push_lock(tablet()->get_push_lock()); + return _engine.txn_manager()->prepare_txn(_req.partition_id, *tablet(), _req.txn_id, + _req.load_id); } Status RowsetBuilder::init() { @@ -205,18 +200,13 @@ Status RowsetBuilder::init() { _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false)); _pending_rs_guard = StorageEngine::instance()->pending_local_rowsets().add(context.rowset_id); - if (config::cloud_mode) { - // TODO(plat1ko) - } else { - _calc_delete_bitmap_token = - StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); - } + _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); _is_init = true; return Status::OK(); } -Status RowsetBuilder::build_rowset() { +Status BaseRowsetBuilder::build_rowset() { std::lock_guard<std::mutex> l(_lock); DCHECK(_is_init) << "rowset builder is supposed be to initialized before " "build_rowset() being called"; @@ -231,27 +221,22 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { if (!_tablet->enable_unique_key_merge_on_write()) { return Status::OK(); } - if (config::cloud_mode) { - // TODO(plat1ko) - return Status::OK(); - } - auto tablet = static_cast<Tablet*>(_tablet.get()); std::lock_guard<std::mutex> l(_lock); SCOPED_TIMER(_submit_delete_bitmap_timer); // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet->tablet_state() == TABLET_NOTREADY) { + if (tablet()->tablet_state() == TABLET_NOTREADY) { LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " "tablet_id: " - << tablet->tablet_id() << " txn_id: " << _req.txn_id; + << tablet()->tablet_id() << " txn_id: " << _req.txn_id; return Status::OK(); } - auto beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get()); + auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get()); std::vector<segment_v2::SegmentSharedPtr> segments; RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); if (segments.size() > 1) { // calculate delete bitmap between segments RETURN_IF_ERROR( - tablet->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); + tablet()->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); } // For partial update, we need to fill in the entire row of data, during the calculation @@ -261,14 +246,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { return Status::OK(); } - LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << tablet->tablet_id() + LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << tablet()->tablet_id() << ", txn_id: " << _req.txn_id; - return tablet->commit_phase_update_delete_bitmap(_rowset, _rowset_ids, _delete_bitmap, segments, - _req.txn_id, _calc_delete_bitmap_token.get(), - nullptr); + return tablet()->commit_phase_update_delete_bitmap(_rowset, _rowset_ids, _delete_bitmap, + segments, _req.txn_id, + _calc_delete_bitmap_token.get(), nullptr); } -Status RowsetBuilder::wait_calc_delete_bitmap() { +Status BaseRowsetBuilder::wait_calc_delete_bitmap() { if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) { return Status::OK(); } @@ -281,15 +266,10 @@ Status RowsetBuilder::wait_calc_delete_bitmap() { } Status RowsetBuilder::commit_txn() { - if (config::cloud_mode) { - // TODO(plat1ko) - return Status::OK(); - } - auto tablet = static_cast<Tablet*>(_tablet.get()); - if (tablet->enable_unique_key_merge_on_write() && + if (tablet()->enable_unique_key_merge_on_write() && config::enable_merge_on_write_correctness_check && _rowset->num_rows() != 0 && - tablet->tablet_state() != TABLET_NOTREADY) { - auto st = tablet->check_delete_bitmap_correctness( + tablet()->tablet_state() != TABLET_NOTREADY) { + auto st = tablet()->check_delete_bitmap_correctness( _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, _rowset_ids); if (!st.ok()) { LOG(WARNING) << fmt::format( @@ -300,21 +280,20 @@ Status RowsetBuilder::commit_txn() { return st; } } - auto storage_engine = StorageEngine::instance(); std::lock_guard<std::mutex> l(_lock); SCOPED_TIMER(_commit_txn_timer); - if (_tablet->tablet_schema()->num_variant_columns() > 0) { + if (tablet()->tablet_schema()->num_variant_columns() > 0) { // update tablet schema when meet variant columns, before commit_txn // Eg. rowset schema: A(int), B(float), C(int), D(int) // _tabelt->tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) const RowsetWriterContext& rw_ctx = _rowset_writer->context(); - RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema)); + RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema)); } // Transfer ownership of `PendingRowsetGuard` to `TxnManager` - Status res = storage_engine->txn_manager()->commit_txn(_req.partition_id, *tablet, _req.txn_id, - _req.load_id, _rowset, - std::move(_pending_rs_guard), false); + Status res = _engine.txn_manager()->commit_txn(_req.partition_id, *tablet(), _req.txn_id, + _req.load_id, _rowset, + std::move(_pending_rs_guard), false); if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) { LOG(WARNING) << "Failed to commit txn: " << _req.txn_id @@ -322,8 +301,8 @@ Status RowsetBuilder::commit_txn() { return res; } if (_tablet->enable_unique_key_merge_on_write()) { - storage_engine->txn_manager()->set_txn_related_delete_bitmap( - _req.partition_id, _req.txn_id, tablet->tablet_id(), tablet->tablet_uid(), true, + _engine.txn_manager()->set_txn_related_delete_bitmap( + _req.partition_id, _req.txn_id, tablet()->tablet_id(), tablet()->tablet_uid(), true, _delete_bitmap, _rowset_ids, _partial_update_info); } @@ -331,7 +310,7 @@ Status RowsetBuilder::commit_txn() { return Status::OK(); } -Status RowsetBuilder::cancel() { +Status BaseRowsetBuilder::cancel() { std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { return Status::OK(); @@ -343,9 +322,9 @@ Status RowsetBuilder::cancel() { return Status::OK(); } -void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { _tablet_schema->copy_from(ori_tablet_schema); // find the right index id int i = 0; @@ -356,7 +335,7 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, } } - if (indexes.size() > 0 && indexes[i]->columns.size() != 0 && + if (!indexes.empty() && !indexes[i]->columns.empty() && indexes[i]->columns[0]->unique_id() >= 0) { _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(), indexes[i], ori_tablet_schema); diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index ea849f80e61..8f254074c37 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -17,8 +17,6 @@ #pragma once -#include <stdint.h> - #include <atomic> #include <memory> #include <mutex> @@ -53,21 +51,21 @@ class Block; // Writer for a particular (load, index, tablet). // This class is NOT thread-safe, external synchronization is required. -class RowsetBuilder { +class BaseRowsetBuilder { public: - RowsetBuilder(const WriteRequest& req, RuntimeProfile* profile); + BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile* profile); - ~RowsetBuilder(); + virtual ~BaseRowsetBuilder(); - Status init(); + virtual Status init() = 0; Status build_rowset(); - Status submit_calc_delete_bitmap_task(); + virtual Status submit_calc_delete_bitmap_task() = 0; Status wait_calc_delete_bitmap(); - Status commit_txn(); + virtual Status commit_txn() = 0; Status cancel(); @@ -86,21 +84,13 @@ public: return _partial_update_info; } -private: - void _garbage_collection(); - +protected: void _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, const TabletSchema& ori_tablet_schema); void _init_profile(RuntimeProfile* profile); - Status init_mow_context(std::shared_ptr<MowContext>& mow_context); - - Status check_tablet_version_count(); - - Status prepare_txn(); - bool _is_init = false; bool _is_cancelled = false; bool _is_committed = false; @@ -127,4 +117,33 @@ private: RuntimeProfile::Counter* _commit_txn_timer = nullptr; }; +// `StorageEngine` mixin for `BaseRowsetBuilder` +class RowsetBuilder final : public BaseRowsetBuilder { +public: + RowsetBuilder(StorageEngine& engine, const WriteRequest& req, RuntimeProfile* profile); + + ~RowsetBuilder() override; + + Status init() override; + + Status commit_txn() override; + + Status submit_calc_delete_bitmap_task() override; + +private: + Status check_tablet_version_count(); + + Status prepare_txn(); + + void _garbage_collection(); + + Status init_mow_context(std::shared_ptr<MowContext>& mow_context); + + // Cast `BaseTablet` to `Tablet` + Tablet* tablet(); + TabletSharedPtr tablet_sptr(); + + StorageEngine& _engine; +}; + } // namespace doris diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 6ff06bc0189..7115dd76f13 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -42,9 +42,9 @@ ExecEnv::~ExecEnv() { destroy(); } +// TODO(plat1ko): template <class Engine> Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id) { BaseTabletSPtr tablet; - // TODO(plat1ko): config::cloud_mode std::string err; tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); if (tablet == nullptr) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index c405953d6c1..94ba0720fb8 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -112,6 +112,12 @@ inline bool k_doris_exit = false; // once to properly initialise service state. class ExecEnv { public: +#ifdef CLOUD_MODE + using Engine = CloudStorageEngine; // TODO(plat1ko) +#else + using Engine = StorageEngine; +#endif + // Empty destructor because the compiler-generated one requires full // declarations for classes in scoped_ptrs. ~ExecEnv(); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 7c6549d692f..0dc0ac344b3 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -21,6 +21,7 @@ #include <glog/logging.h> #include "bvar/bvar.h" +#include "olap/storage_engine.h" #include "runtime/memory/mem_tracker.h" #include "runtime/tablets_channel.h" @@ -68,7 +69,7 @@ void LoadChannel::_init_profile() { Status LoadChannel::open(const PTabletWriterOpenRequest& params) { int64_t index_id = params.index_id(); - std::shared_ptr<TabletsChannel> channel; + std::shared_ptr<BaseTabletsChannel> channel; { std::lock_guard<std::mutex> l(_lock); auto it = _tablets_channels.find(index_id); @@ -77,8 +78,9 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel = std::make_shared<TabletsChannel>(key, _load_id, _is_high_priority, - _self_profile); + // TODO(plat1ko): CloudTabletsChannel + channel = std::make_shared<TabletsChannel>(*StorageEngine::instance(), key, _load_id, + _is_high_priority, _self_profile); { std::lock_guard<SpinLock> l(_tablets_channels_lock); _tablets_channels.insert({index_id, channel}); @@ -98,7 +100,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { return Status::OK(); } -Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, +Status LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, bool& is_finished, const int64_t index_id) { std::lock_guard<std::mutex> l(_lock); auto it = _tablets_channels.find(index_id); @@ -124,7 +126,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, COUNTER_UPDATE(_add_batch_times, 1); int64_t index_id = request.index_id(); // 1. get tablets channel - std::shared_ptr<TabletsChannel> channel; + std::shared_ptr<BaseTabletsChannel> channel; bool is_finished = false; Status st = _get_tablets_channel(channel, is_finished, index_id); if (!st.ok() || is_finished) { @@ -139,7 +141,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, // 3. handle eos if (request.has_eos() && request.eos()) { - st = _handle_eos(channel, request, response); + st = _handle_eos(channel.get(), request, response); _report_profile(response); if (!st.ok()) { return st; @@ -151,6 +153,25 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, return st; } +Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, + const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response) { + _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id())); + bool finished = false; + auto index_id = request.index_id(); + + RETURN_IF_ERROR(channel->close(this, request, response, &finished)); + if (finished) { + std::lock_guard<std::mutex> l(_lock); + { + std::lock_guard<SpinLock> l(_tablets_channels_lock); + _tablets_channels.erase(index_id); + } + _finished_channel_ids.emplace(index_id); + } + return Status::OK(); +} + void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) { if (!_enable_profile) { return; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index ea0fa0f2486..bdeedbd9eae 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -17,10 +17,6 @@ #pragma once -#include <gen_cpp/internal_service.pb.h> -#include <stdint.h> -#include <time.h> - #include <algorithm> #include <atomic> #include <functional> @@ -38,17 +34,18 @@ #include "olap/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" -#include "runtime/tablets_channel.h" #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/thrift_util.h" #include "util/uid_util.h" -//#include <gen_cpp/internal_service.pb.h> namespace doris { class PTabletWriterOpenRequest; +class PTabletWriterAddBlockRequest; +class PTabletWriterAddBlockResult; class OpenPartitionRequest; +class BaseTabletsChannel; // A LoadChannel manages tablets channels for all indexes // corresponding to a certain load job @@ -82,31 +79,11 @@ public: RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; } protected: - Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, bool& is_finished, - const int64_t index_id); - - Status _handle_eos(std::shared_ptr<TabletsChannel>& channel, - const PTabletWriterAddBlockRequest& request, - PTabletWriterAddBlockResult* response) { - _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id())); - bool finished = false; - auto index_id = request.index_id(); - - RETURN_IF_ERROR(channel->close( - this, request.sender_id(), request.backend_id(), &finished, request.partition_ids(), - response->mutable_tablet_vec(), response->mutable_tablet_errors(), - request.slave_tablet_nodes(), response->mutable_success_slave_tablet_node_ids(), - request.write_single_replica())); - if (finished) { - std::lock_guard<std::mutex> l(_lock); - { - std::lock_guard<SpinLock> l(_tablets_channels_lock); - _tablets_channels.erase(index_id); - } - _finished_channel_ids.emplace(index_id); - } - return Status::OK(); - } + Status _get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, bool& is_finished, + int64_t index_id); + + Status _handle_eos(BaseTabletsChannel* channel, const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response); void _init_profile(); // thread safety @@ -129,7 +106,7 @@ private: // lock protect the tablets channel map std::mutex _lock; // index id -> tablets channel - std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> _tablets_channels; + std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> _tablets_channels; SpinLock _tablets_channels_lock; // This is to save finished channels id, to handle the retry request. std::unordered_set<int64_t> _finished_channel_ids; diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index cc06f2a2f7e..a4d359dc0ea 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -17,9 +17,8 @@ #pragma once +#include <bthread/mutex.h> #include <gen_cpp/internal_service.pb.h> -#include <runtime/load_stream_writer.h> -#include <stdint.h> #include <condition_variable> #include <memory> @@ -31,10 +30,15 @@ #include "butil/iobuf.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" +#include "runtime/load_stream_writer.h" #include "util/runtime_profile.h" namespace doris { +class LoadStreamMgr; +class ThreadPoolToken; +class OlapTableSchemaParam; + // origin_segid(index) -> new_segid(value in vector) using SegIdMapping = std::vector<uint32_t>; class TabletStream { @@ -47,7 +51,7 @@ public: Status append_data(const PStreamHeader& header, butil::IOBuf* data); Status add_segment(const PStreamHeader& header, butil::IOBuf* data); Status close(); - int64_t id() { return _id; } + int64_t id() const { return _id; } friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream); @@ -103,7 +107,7 @@ using StreamId = brpc::StreamId; class LoadStream : public brpc::StreamInputHandler { public: LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); - ~LoadStream(); + ~LoadStream() override; Status init(const POpenLoadStreamRequest* request); diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index 1228061bb40..466a23c8c5c 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -17,10 +17,6 @@ #pragma once -#include <gen_cpp/internal_service.pb.h> -#include <runtime/load_stream.h> -#include <stdint.h> - #include <condition_variable> #include <memory> #include <mutex> @@ -29,9 +25,13 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" +#include "runtime/load_stream.h" +#include "util/threadpool.h" namespace doris { +class POpenStreamSinkRequest; + class LoadStreamMgr { public: LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool, diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 52948429e32..427ace47d00 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -48,6 +48,7 @@ #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset_builder.h" #include "olap/schema.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" @@ -67,14 +68,17 @@ namespace doris { using namespace ErrorCode; -LoadStreamWriter::LoadStreamWriter(WriteRequest* req, RuntimeProfile* profile) - : _req(*req), _rowset_builder(*req, profile), _rowset_writer(nullptr) {} +LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile) + : _req(*context), _rowset_writer(nullptr) { + _rowset_builder = + std::make_unique<RowsetBuilder>(*StorageEngine::instance(), *context, profile); +} LoadStreamWriter::~LoadStreamWriter() = default; Status LoadStreamWriter::init() { - RETURN_IF_ERROR(_rowset_builder.init()); - _rowset_writer = _rowset_builder.rowset_writer(); + RETURN_IF_ERROR(_rowset_builder->init()); + _rowset_writer = _rowset_builder->rowset_writer(); _is_init = true; return Status::OK(); } @@ -157,10 +161,10 @@ Status LoadStreamWriter::close() { } } - RETURN_IF_ERROR(_rowset_builder.build_rowset()); - RETURN_IF_ERROR(_rowset_builder.submit_calc_delete_bitmap_task()); - RETURN_IF_ERROR(_rowset_builder.wait_calc_delete_bitmap()); - RETURN_IF_ERROR(_rowset_builder.commit_txn()); + RETURN_IF_ERROR(_rowset_builder->build_rowset()); + RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task()); + RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap()); + RETURN_IF_ERROR(_rowset_builder->commit_txn()); return Status::OK(); } diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index 37514377a3d..e038ceeb89b 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -17,11 +17,6 @@ #pragma once -#include <gen_cpp/Types_types.h> -#include <gen_cpp/internal_service.pb.h> -#include <gen_cpp/types.pb.h> -#include <stdint.h> - #include <atomic> #include <memory> #include <mutex> @@ -32,16 +27,12 @@ #include "brpc/stream.h" #include "butil/iobuf.h" #include "common/status.h" +#include "io/fs/file_reader_writer_fwd.h" #include "olap/delta_writer_context.h" #include "olap/memtable.h" #include "olap/olap_common.h" -#include "olap/rowset/beta_rowset_writer.h" -#include "olap/rowset/rowset.h" -#include "olap/rowset/rowset_writer.h" -#include "olap/rowset_builder.h" -#include "olap/tablet.h" -#include "olap/tablet_meta.h" -#include "olap/tablet_schema.h" +#include "olap/rowset/rowset_fwd.h" +#include "olap/tablet_fwd.h" #include "util/spinlock.h" #include "util/uid_util.h" @@ -53,6 +44,9 @@ class SlotDescriptor; class OlapTableSchemaParam; class RowsetWriter; class RuntimeProfile; +struct SegmentStatistics; +using SegmentStatisticsSharedPtr = std::shared_ptr<SegmentStatistics>; +class BaseRowsetBuilder; namespace vectorized { class Block; @@ -76,13 +70,13 @@ public: // wait for all memtables to be flushed. Status close(); - int64_t tablet_id() { return _req.tablet_id; } + int64_t tablet_id() const { return _req.tablet_id; } private: bool _is_init = false; bool _is_canceled = false; WriteRequest _req; - RowsetBuilder _rowset_builder; + std::unique_ptr<BaseRowsetBuilder> _rowset_builder; std::shared_ptr<RowsetWriter> _rowset_writer; std::mutex _lock; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 5508bc3005a..d0d742e9152 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -50,10 +50,10 @@ class SlotDescriptor; DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); -std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count; +std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count; -TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, - bool is_high_priority, RuntimeProfile* profile) +BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, + bool is_high_priority, RuntimeProfile* profile) : _key(key), _state(kInitialized), _load_id(load_id), @@ -66,21 +66,41 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& loa }); } -TabletsChannel::~TabletsChannel() { +TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key, + const UniqueId& load_id, bool is_high_priority, + RuntimeProfile* profile) + : BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {} + +BaseTabletsChannel::~BaseTabletsChannel() { _s_tablet_writer_count -= _tablet_writers.size(); - for (auto& it : _tablet_writers) { - delete it.second; +} + +TabletsChannel::~TabletsChannel() = default; + +Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq, + const PTabletWriterAddBlockRequest& request) { + std::lock_guard<std::mutex> l(_lock); + if (_state != kOpened) { + return _state == kFinished ? _close_status + : Status::InternalError("TabletsChannel {} state: {}", + _key.to_string(), _state); } - delete _schema; + cur_seq = _next_seqs[request.sender_id()]; + // check packet + if (request.packet_seq() > cur_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq + << ", recept_seq=" << request.packet_seq(); + return Status::InternalError("lost data packet"); + } + return Status::OK(); } -void TabletsChannel::_init_profile(RuntimeProfile* profile) { +void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) { _profile = profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true); _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT); auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true); - _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime"); _add_batch_timer = ADD_TIMER(_profile, "AddBatchTime"); _write_block_timer = ADD_TIMER(_profile, "WriteBlockTime"); _incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime"); @@ -95,7 +115,12 @@ void TabletsChannel::_init_profile(RuntimeProfile* profile) { memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES); } -Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { +void TabletsChannel::_init_profile(RuntimeProfile* profile) { + BaseTabletsChannel::_init_profile(profile); + _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime"); +} + +Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { std::lock_guard<std::mutex> l(_lock); if (_state == kOpened) { // Normal case, already open by other sender @@ -105,7 +130,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { << ", timeout(s): " << request.load_channel_timeout_s(); _txn_id = request.txn_id(); _index_id = request.index_id(); - _schema = new OlapTableSchemaParam(); + _schema = std::make_unique<OlapTableSchemaParam>(); RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); @@ -119,7 +144,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { return Status::OK(); } -Status TabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { +Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { SCOPED_TIMER(_incremental_open_timer); if (_state == kInitialized) { // haven't opened return open(params); @@ -159,22 +184,15 @@ Status TabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) wrequest.tuple_desc = _tuple_desc; wrequest.slots = index_slots; wrequest.is_high_priority = _is_high_priority; - wrequest.table_schema_param = _schema; + wrequest.table_schema_param = _schema.get(); - DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id); - if (!st.ok()) { - auto err_msg = fmt::format( - "open delta writer failed, tablet_id={}" - ", txn_id={}, partition_id={}, err={}", - tablet.tablet_id(), _txn_id, tablet.partition_id(), st.to_string()); - LOG(WARNING) << err_msg; - return Status::InternalError(err_msg); - } + // TODO(plat1ko): CloudDeltaWriter + auto delta_writer = std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest, + _profile, _load_id); ss << "[" << tablet.tablet_id() << "]"; { std::lock_guard<SpinLock> l(_tablet_writers_lock); - _tablet_writers.emplace(tablet.tablet_id(), writer); + _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer)); } } @@ -185,14 +203,12 @@ Status TabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) return Status::OK(); } -Status TabletsChannel::close( - LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished, - const google::protobuf::RepeatedField<int64_t>& partition_ids, - google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, - const google::protobuf::Map<int64_t, PSlaveTabletNodes>& slave_tablet_nodes, - google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids, - const bool write_single_replica) { +Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req, + PTabletWriterAddBlockResult* res, bool* finished) { + int sender_id = req.sender_id(); + int64_t backend_id = req.backend_id(); + const auto& partition_ids = req.partition_ids(); + auto* tablet_errors = res->mutable_tablet_errors(); std::lock_guard<std::mutex> l(_lock); if (_state == kFinished) { return _close_status; @@ -215,17 +231,17 @@ Status TabletsChannel::close( // All senders are closed // 1. close all delta writers std::set<DeltaWriter*> need_wait_writers; - for (auto& it : _tablet_writers) { - if (_partition_ids.count(it.second->partition_id()) > 0) { - auto st = it.second->close(); + for (auto&& [tablet_id, writer] : _tablet_writers) { + if (_partition_ids.contains(writer->partition_id())) { + auto st = writer->close(); if (!st.ok()) { auto err_msg = fmt::format( "close tablet writer failed, tablet_id={}, " "transaction_id={}, err={}", - it.first, _txn_id, st.to_string()); + tablet_id, _txn_id, st.to_string()); LOG(WARNING) << err_msg; PTabletError* tablet_error = tablet_errors->Add(); - tablet_error->set_tablet_id(it.first); + tablet_error->set_tablet_id(tablet_id); tablet_error->set_msg(st.to_string()); // just skip this tablet(writer) and continue to close others continue; @@ -233,30 +249,30 @@ Status TabletsChannel::close( // tablet writer in `_broken_tablets` should not call `build_rowset` and // `commit_txn` method, after that, the publish-version task will success, // which can cause the replica inconsistency. - if (_is_broken_tablet(it.second->tablet_id())) { + if (_is_broken_tablet(writer->tablet_id())) { LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled" - << ", tablet_id=" << it.first << ", transaction_id=" << _txn_id; + << ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id; continue; } - need_wait_writers.insert(it.second); + need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get())); } else { - auto st = it.second->cancel(); + auto st = writer->cancel(); if (!st.ok()) { - LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << it.first + LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id << ", transaction_id=" << _txn_id; // just skip this tablet(writer) and continue to close others continue; } - VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << it.first + VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << tablet_id << ", transaction_id=" << _txn_id; } } - _write_single_replica = write_single_replica; + _write_single_replica = req.write_single_replica(); // 2. wait all writer finished flush. - for (auto writer : need_wait_writers) { - static_cast<void>(writer->wait_flush()); + for (auto* writer : need_wait_writers) { + RETURN_IF_ERROR((writer->wait_flush())); } // 3. build rowset @@ -289,23 +305,23 @@ Status TabletsChannel::close( } // 5. commit all writers - for (auto writer : need_wait_writers) { + + for (auto* writer : need_wait_writers) { PSlaveTabletNodes slave_nodes; - if (write_single_replica) { - slave_nodes = slave_tablet_nodes.at(writer->tablet_id()); - } + // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. - _commit_txn(writer, tablet_vec, tablet_errors, slave_nodes, write_single_replica); + _commit_txn(writer, req, res); } - if (write_single_replica) { + if (_write_single_replica) { + auto* success_slave_tablet_node_ids = res->mutable_success_slave_tablet_node_ids(); // The operation waiting for all slave replicas to complete must end before the timeout, // so that there is enough time to collect completed replica. Otherwise, the task may // timeout and fail even though most of the replicas are completed. Here we set 0.9 // times the timeout as the maximum waiting time. SCOPED_TIMER(_slave_replica_timer); - while (need_wait_writers.size() > 0 && + while (!need_wait_writers.empty() && (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) { std::set<DeltaWriter*>::iterator it; for (it = need_wait_writers.begin(); it != need_wait_writers.end();) { @@ -318,22 +334,22 @@ Status TabletsChannel::close( } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - for (auto writer : need_wait_writers) { + for (auto* writer : need_wait_writers) { writer->add_finished_slave_replicas(success_slave_tablet_node_ids); } - StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id); + _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id); } } return Status::OK(); } -void TabletsChannel::_commit_txn(DeltaWriter* writer, - google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, - PSlaveTabletNodes slave_tablet_nodes, - const bool write_single_replica) { - Status st = writer->commit_txn(slave_tablet_nodes, write_single_replica); - if (st.ok()) { +void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req, + PTabletWriterAddBlockResult* res) { + Status st = writer->commit_txn(_write_single_replica + ? req.slave_tablet_nodes().at(writer->tablet_id()) + : PSlaveTabletNodes {}); + if (st.ok()) [[likely]] { + auto* tablet_vec = res->mutable_tablet_vec(); PTabletInfo* tablet_info = tablet_vec->Add(); tablet_info->set_tablet_id(writer->tablet_id()); // unused required field. @@ -341,11 +357,11 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer, tablet_info->set_received_rows(writer->total_received_rows()); tablet_info->set_num_rows_filtered(writer->num_rows_filtered()); } else { - _add_error_tablet(tablet_errors, writer->tablet_id(), st); + _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st); } } -void TabletsChannel::_add_error_tablet( +void BaseTabletsChannel::_add_error_tablet( google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id, Status error) const { PTabletError* tablet_error = tablet_errors->Add(); @@ -355,7 +371,7 @@ void TabletsChannel::_add_error_tablet( << "err msg " << error; } -void TabletsChannel::refresh_profile() { +void BaseTabletsChannel::refresh_profile() { int64_t write_mem_usage = 0; int64_t flush_mem_usage = 0; int64_t max_tablet_mem_usage = 0; @@ -363,10 +379,10 @@ void TabletsChannel::refresh_profile() { int64_t max_tablet_flush_mem_usage = 0; { std::lock_guard<SpinLock> l(_tablet_writers_lock); - for (auto& it : _tablet_writers) { - int64_t write_mem = it.second->mem_consumption(MemType::WRITE); + for (auto&& [tablet_id, writer] : _tablet_writers) { + int64_t write_mem = writer->mem_consumption(MemType::WRITE); write_mem_usage += write_mem; - int64_t flush_mem = it.second->mem_consumption(MemType::FLUSH); + int64_t flush_mem = writer->mem_consumption(MemType::FLUSH); flush_mem_usage += flush_mem; if (write_mem > max_tablet_write_mem_usage) { max_tablet_write_mem_usage = write_mem; @@ -387,7 +403,7 @@ void TabletsChannel::refresh_profile() { COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage); } -Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { +Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) { std::vector<SlotDescriptor*>* index_slots = nullptr; int32_t schema_hash = 0; for (auto& index : _schema->indexes()) { @@ -428,24 +444,17 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request .load_id = request.id(), .tuple_desc = _tuple_desc, .slots = index_slots, - .table_schema_param = _schema, + .table_schema_param = _schema.get(), .is_high_priority = _is_high_priority, .write_file_cache = request.write_file_cache(), }; - DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id); - if (!st.ok()) { - auto err_msg = fmt::format( - "open delta writer failed, tablet_id={}" - ", txn_id={}, partition_id={}, err={}", - tablet.tablet_id(), _txn_id, tablet.partition_id(), st.to_string()); - LOG(WARNING) << err_msg; - return Status::InternalError(err_msg); - } + // TODO(plat1ko): CloudDeltaWriter + auto writer = std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest, _profile, + _load_id); { std::lock_guard<SpinLock> l(_tablet_writers_lock); - _tablet_writers.emplace(tablet.tablet_id(), writer); + _tablet_writers.emplace(tablet.tablet_id(), std::move(writer)); } } _s_tablet_writer_count += _tablet_writers.size(); @@ -453,7 +462,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request return Status::OK(); } -Status TabletsChannel::cancel() { +Status BaseTabletsChannel::cancel() { std::lock_guard<std::mutex> l(_lock); if (_state == kFinished) { return _close_status; @@ -462,8 +471,14 @@ Status TabletsChannel::cancel() { static_cast<void>(it.second->cancel()); } _state = kFinished; + + return Status::OK(); +} + +Status TabletsChannel::cancel() { + RETURN_IF_ERROR(BaseTabletsChannel::cancel()); if (_write_single_replica) { - StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id); + _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id); } return Status::OK(); } @@ -479,8 +494,8 @@ std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { return os; } -Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, - PTabletWriterAddBlockResult* response) { +Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response) { SCOPED_TIMER(_add_batch_timer); int64_t cur_seq = 0; _add_batch_number_counter->update(1); @@ -523,14 +538,14 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, << ", tablet_ids_size: " << request.tablet_ids_size(); auto write_tablet_data = [&](uint32_t tablet_id, - std::function<Status(DeltaWriter * writer)> write_func) { + std::function<Status(BaseDeltaWriter * writer)> write_func) { google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = response->mutable_tablet_errors(); auto tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); } - Status st = write_func(tablet_writer_it->second); + Status st = write_func(tablet_writer_it->second.get()); if (!st.ok()) { auto err_msg = fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}", @@ -549,15 +564,16 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, if (request.is_single_tablet_block()) { SCOPED_TIMER(_write_block_timer); - RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), [&](DeltaWriter* writer) { + RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), [&](BaseDeltaWriter* writer) { return writer->append(&send_data); })); } else { SCOPED_TIMER(_write_block_timer); for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { - RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](DeltaWriter* writer) { - return writer->write(&send_data, tablet_to_rowidxs_it.second); - })); + RETURN_IF_ERROR( + write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) { + return writer->write(&send_data, tablet_to_rowidxs_it.second); + })); } } @@ -568,12 +584,12 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } -void TabletsChannel::_add_broken_tablet(int64_t tablet_id) { +void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) { std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock); _broken_tablets.insert(tablet_id); } -bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) { +bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) { std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock); return _broken_tablets.find(tablet_id) != _broken_tablets.end(); } diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index b8e3de0584b..2f9ec9d51a9 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -17,7 +17,6 @@ #pragma once -#include <gen_cpp/internal_service.pb.h> #include <glog/logging.h> #include <atomic> @@ -38,16 +37,14 @@ #include "util/spinlock.h" #include "util/uid_util.h" -namespace google { -namespace protobuf { +namespace google::protobuf { template <typename Element> class RepeatedField; template <typename Key, typename T> class Map; template <typename T> class RepeatedPtrField; -} // namespace protobuf -} // namespace google +} // namespace google::protobuf namespace doris { class PSlaveTabletNodes; @@ -55,9 +52,12 @@ class PSuccessSlaveTabletNodeIds; class PTabletError; class PTabletInfo; class PTabletWriterOpenRequest; +class PTabletWriterAddBlockRequest; +class PTabletWriterAddBlockResult; class PUniqueId; class TupleDescriptor; class OpenPartitionRequest; +class StorageEngine; struct TabletsChannelKey { UniqueId id; @@ -65,7 +65,7 @@ struct TabletsChannelKey { TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) : id(pid), index_id(index_id_) {} - ~TabletsChannelKey() noexcept {} + ~TabletsChannelKey() noexcept = default; bool operator==(const TabletsChannelKey& rhs) const noexcept { return index_id == rhs.index_id && id == rhs.id; @@ -76,18 +76,18 @@ struct TabletsChannelKey { std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key); -class DeltaWriter; +class BaseDeltaWriter; class MemTableWriter; class OlapTableSchemaParam; class LoadChannel; // Write channel for a particular (load, index). -class TabletsChannel { +class BaseTabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, - RuntimeProfile* profile); + BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, + RuntimeProfile* profile); - ~TabletsChannel(); + virtual ~BaseTabletsChannel(); Status open(const PTabletWriterOpenRequest& request); // open + open writers @@ -101,38 +101,25 @@ public: // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec' // to include all tablets written in this channel. // no-op when this channel has been closed or cancelled - Status - close(LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished, - const google::protobuf::RepeatedField<int64_t>& partition_ids, - google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_error, - const google::protobuf::Map<int64_t, PSlaveTabletNodes>& slave_tablet_nodes, - google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids, - const bool write_single_replica); + virtual Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req, + PTabletWriterAddBlockResult* res, bool* finished) = 0; // no-op when this channel has been closed or cancelled - Status cancel(); + virtual Status cancel(); void refresh_profile(); -private: - template <typename Request> - Status _get_current_seq(int64_t& cur_seq, const Request& request); +protected: + Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); - // deal with DeltaWriter commit_txn(), add tablet to list for return. - void _commit_txn(DeltaWriter* writer, - google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, - PSlaveTabletNodes slave_tablet_nodes, const bool write_single_replica); - void _add_broken_tablet(int64_t tablet_id); void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id, Status error) const; bool _is_broken_tablet(int64_t tablet_id); - void _init_profile(RuntimeProfile* profile); + virtual void _init_profile(RuntimeProfile* profile); // id of this load channel TabletsChannelKey _key; @@ -154,7 +141,7 @@ private: // initialized in open function int64_t _txn_id = -1; int64_t _index_id = -1; - OlapTableSchemaParam* _schema = nullptr; + std::unique_ptr<OlapTableSchemaParam> _schema; TupleDescriptor* _tuple_desc = nullptr; @@ -167,7 +154,7 @@ private: Status _close_status; // tablet_id -> TabletChannel - std::unordered_map<int64_t, DeltaWriter*> _tablet_writers; + std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers; // broken tablet ids. // If a tablet write fails, it's id will be added to this set. // So that following batch will not handle this tablet anymore. @@ -183,8 +170,6 @@ private: bool _is_high_priority = false; - bool _write_single_replica = false; - RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _add_batch_number_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr; @@ -193,28 +178,36 @@ private: RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _max_tablet_write_memory_usage_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _max_tablet_flush_memory_usage_counter = nullptr; - RuntimeProfile::Counter* _slave_replica_timer = nullptr; RuntimeProfile::Counter* _add_batch_timer = nullptr; RuntimeProfile::Counter* _write_block_timer = nullptr; RuntimeProfile::Counter* _incremental_open_timer = nullptr; }; -template <typename Request> -Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request) { - std::lock_guard<std::mutex> l(_lock); - if (_state != kOpened) { - return _state == kFinished ? _close_status - : Status::InternalError("TabletsChannel {} state: {}", - _key.to_string(), _state); - } - cur_seq = _next_seqs[request.sender_id()]; - // check packet - if (request.packet_seq() > cur_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq - << ", recept_seq=" << request.packet_seq(); - return Status::InternalError("lost data packet"); - } - return Status::OK(); -} +class DeltaWriter; + +// `StorageEngine` mixin for `BaseTabletsChannel` +class TabletsChannel final : public BaseTabletsChannel { +public: + TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key, const UniqueId& load_id, + bool is_high_priority, RuntimeProfile* profile); + + ~TabletsChannel() override; + + Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req, + PTabletWriterAddBlockResult* res, bool* finished) override; + + Status cancel() override; + +private: + void _init_profile(RuntimeProfile* profile) override; + + // deal with DeltaWriter commit_txn(), add tablet to list for return. + void _commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req, + PTabletWriterAddBlockResult* res); + + StorageEngine& _engine; + bool _write_single_replica = false; + RuntimeProfile::Counter* _slave_replica_timer = nullptr; +}; } // namespace doris diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 8602af091a0..9c15302af6c 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -27,7 +27,7 @@ #include <ostream> #include <string> -#include "cloud/config.h" +// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" #include "exprs/function_filter.h" @@ -62,8 +62,10 @@ BlockReader::~BlockReader() { Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) { auto res = (this->*_next_block_func)(block, eof); - if (!res.ok() && !res.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) [[unlikely]] { - static_cast<Tablet*>(_tablet.get())->report_error(res); + if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) { + if (!res.ok()) [[unlikely]] { + static_cast<Tablet*>(_tablet.get())->report_error(res); + } } return res; } @@ -230,11 +232,10 @@ Status BlockReader::init(const ReaderParams& read_params) { } auto status = _init_collect_iter(read_params); - if (!status.ok()) { - if (!status.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) [[unlikely]] { + if (!status.ok()) [[unlikely]] { + if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) { static_cast<Tablet*>(_tablet.get())->report_error(status); } - return status; } diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 52d35283b50..aa16c91cbd2 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -24,7 +24,6 @@ #include <boost/iterator/iterator_facade.hpp> #include <ostream> -#include "cloud/config.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" @@ -53,8 +52,10 @@ VerticalBlockReader::~VerticalBlockReader() { Status VerticalBlockReader::next_block_with_aggregation(Block* block, bool* eof) { auto res = (this->*_next_block_func)(block, eof); - if (!res.ok() && !res.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) [[unlikely]] { - static_cast<Tablet*>(_tablet.get())->report_error(res); + if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) { + if (!res.ok()) [[unlikely]] { + static_cast<Tablet*>(_tablet.get())->report_error(res); + } } return res; } @@ -212,8 +213,8 @@ Status VerticalBlockReader::init(const ReaderParams& read_params) { RETURN_IF_ERROR(TabletReader::init(read_params)); auto status = _init_collect_iter(read_params); - if (!status.ok()) { - if (!status.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) [[unlikely]] { + if (!status.ok()) [[unlikely]] { + if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) { static_cast<Tablet*>(_tablet.get())->report_error(status); } return status; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 7db9e129e58..f2274a9a76a 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -43,6 +43,7 @@ #include "olap/options.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset_builder.h" #include "olap/schema.h" #include "olap/storage_engine.h" #include "olap/tablet.h" @@ -500,19 +501,18 @@ TEST_F(TestDeltaWriter, open) { write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = true; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer = nullptr; // test vec delta writer profile = std::make_unique<RuntimeProfile>("LoadChannels"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId())); + auto delta_writer = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); res = delta_writer->build_rowset(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer->commit_txn(PSlaveTabletNodes()); EXPECT_EQ(Status::OK(), res); - SAFE_DELETE(delta_writer); res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); @@ -547,10 +547,9 @@ TEST_F(TestDeltaWriter, vec_write) { write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer = nullptr; profile = std::make_unique<RuntimeProfile>("LoadChannels"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId())); - ASSERT_NE(delta_writer, nullptr); + auto delta_writer = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); vectorized::Block block; for (const auto& slot_desc : tuple_desc->slots()) { @@ -647,7 +646,7 @@ TEST_F(TestDeltaWriter, vec_write) { ASSERT_TRUE(res.ok()); res = delta_writer->wait_calc_delete_bitmap(); ASSERT_TRUE(res.ok()); - res = delta_writer->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer->commit_txn(PSlaveTabletNodes()); ASSERT_TRUE(res.ok()); // publish version success @@ -680,7 +679,6 @@ TEST_F(TestDeltaWriter, vec_write) { res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); ASSERT_TRUE(res.ok()); - delete delta_writer; } TEST_F(TestDeltaWriter, vec_sequence_col) { @@ -712,10 +710,9 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer = nullptr; profile = std::make_unique<RuntimeProfile>("LoadChannels"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId())); - ASSERT_NE(delta_writer, nullptr); + auto delta_writer = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); vectorized::Block block; for (const auto& slot_desc : tuple_desc->slots()) { @@ -742,7 +739,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { ASSERT_TRUE(res.ok()); res = delta_writer->wait_calc_delete_bitmap(); ASSERT_TRUE(res.ok()); - res = delta_writer->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer->commit_txn(PSlaveTabletNodes()); ASSERT_TRUE(res.ok()); // publish version success @@ -798,7 +795,6 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); ASSERT_TRUE(res.ok()); - delete delta_writer; } TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { @@ -829,16 +825,14 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer1 = nullptr; - DeltaWriter* delta_writer2 = nullptr; std::unique_ptr<RuntimeProfile> profile1; profile1 = std::make_unique<RuntimeProfile>("LoadChannels1"); std::unique_ptr<RuntimeProfile> profile2; profile2 = std::make_unique<RuntimeProfile>("LoadChannels2"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer1, profile1.get(), TUniqueId())); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer2, profile2.get(), TUniqueId())); - ASSERT_NE(delta_writer1, nullptr); - ASSERT_NE(delta_writer2, nullptr); + auto delta_writer1 = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile1.get(), TUniqueId {}); + auto delta_writer2 = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile2.get(), TUniqueId {}); // write data in delta writer 1 { @@ -867,7 +861,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { ASSERT_TRUE(res.ok()); res = delta_writer1->wait_calc_delete_bitmap(); ASSERT_TRUE(res.ok()); - res = delta_writer1->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer1->commit_txn(PSlaveTabletNodes()); ASSERT_TRUE(res.ok()); } // write data in delta writer 2 @@ -943,12 +937,12 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { // verify that delete bitmap calculated correctly // since the delete bitmap not published, versions are 0 - auto delete_bitmap = delta_writer2->get_delete_bitmap(); + auto delete_bitmap = delta_writer2->_rowset_builder->get_delete_bitmap(); ASSERT_TRUE(delete_bitmap->contains({rowset1->rowset_id(), 0, 0}, 0)); // We can't get the rowset id of rowset2 now, will check the delete bitmap // contains row 0 of rowset2 at L929. - res = delta_writer2->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer2->commit_txn(PSlaveTabletNodes()); ASSERT_TRUE(res.ok()); Version version; @@ -1043,7 +1037,5 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); ASSERT_TRUE(res.ok()); - delete delta_writer1; - delete delta_writer2; } } // namespace doris diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 5107aafb552..87b30ae4da3 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -196,17 +196,15 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { write_req.is_high_priority = false; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer = nullptr; - profile = std::make_unique<RuntimeProfile>("LoadChannels"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, profile.get())); - EXPECT_NE(delta_writer, nullptr); + auto delta_writer = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); res = delta_writer->build_rowset(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer->commit_txn(PSlaveTabletNodes()); EXPECT_EQ(Status::OK(), res); // publish version success @@ -276,7 +274,6 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); - delete delta_writer; } } // namespace doris diff --git a/be/test/olap/memtable_memory_limiter_test.cpp b/be/test/olap/memtable_memory_limiter_test.cpp index 9771fb324c6..11e791551b1 100644 --- a/be/test/olap/memtable_memory_limiter_test.cpp +++ b/be/test/olap/memtable_memory_limiter_test.cpp @@ -140,9 +140,9 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer = nullptr; profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, profile.get(), TUniqueId())); + auto delta_writer = + std::make_unique<DeltaWriter>(*_engine, &write_req, profile.get(), TUniqueId {}); ASSERT_NE(delta_writer, nullptr); auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); @@ -174,10 +174,9 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { EXPECT_EQ(Status::OK(), res); res = delta_writer->build_rowset(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->commit_txn(PSlaveTabletNodes(), false); + res = delta_writer->commit_txn(PSlaveTabletNodes()); EXPECT_EQ(Status::OK(), res); res = _engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); EXPECT_EQ(Status::OK(), res); - delete delta_writer; } } // namespace doris diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 124380220f8..e19ba3bd32c 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -361,9 +361,9 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha write_req.is_high_priority = false; write_req.table_schema_param = ¶m; - DeltaWriter* delta_writer = nullptr; profile = std::make_unique<RuntimeProfile>("LoadChannels"); - static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, profile.get())); + auto delta_writer = + std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); ASSERT_NE(delta_writer, nullptr); vectorized::Block block; @@ -397,9 +397,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha ASSERT_EQ(Status::OK(), st); st = delta_writer->build_rowset(); ASSERT_EQ(Status::OK(), st); - st = delta_writer->commit_txn(PSlaveTabletNodes(), false); + st = delta_writer->commit_txn(PSlaveTabletNodes()); ASSERT_EQ(Status::OK(), st); - delete delta_writer; // publish version success *tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org