This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 5e28fde7db [cherry-pick](merge-on-write) support concurrent delete bitmap calc while close_wait (#21488) (#22267) 5e28fde7db is described below commit 5e28fde7db94b1a3b708a08f96b455ccbfa11ae8 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Thu Jul 27 10:00:51 2023 +0800 [cherry-pick](merge-on-write) support concurrent delete bitmap calc while close_wait (#21488) (#22267) cherry-pick #21488 --- be/src/olap/calc_delete_bitmap_executor.cpp | 91 ++++++ be/src/olap/calc_delete_bitmap_executor.h | 91 ++++++ be/src/olap/delta_writer.cpp | 107 ++++--- be/src/olap/delta_writer.h | 11 +- be/src/olap/full_compaction.cpp | 3 +- be/src/olap/memtable.cpp | 2 +- be/src/olap/olap_server.cpp | 5 - be/src/olap/storage_engine.cpp | 7 +- be/src/olap/storage_engine.h | 9 +- be/src/olap/tablet.cpp | 75 +++-- be/src/olap/tablet.h | 5 +- be/src/runtime/tablets_channel.cpp | 62 +++- be/src/runtime/tablets_channel.h | 8 +- be/test/olap/delta_writer_test.cpp | 338 ++++++++++++++++++--- .../olap/engine_storage_migration_task_test.cpp | 4 +- be/test/olap/tablet_cooldown_test.cpp | 4 +- 16 files changed, 664 insertions(+), 158 deletions(-) diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp b/be/src/olap/calc_delete_bitmap_executor.cpp new file mode 100644 index 0000000000..284c03c985 --- /dev/null +++ b/be/src/olap/calc_delete_bitmap_executor.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/calc_delete_bitmap_executor.h" + +#include <gen_cpp/olap_file.pb.h> + +#include <ostream> + +#include "common/config.h" +#include "common/logging.h" +#include "olap/memtable.h" +#include "olap/tablet.h" +#include "util/time.h" + +namespace doris { +using namespace ErrorCode; + +Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, + const segment_v2::SegmentSharedPtr& cur_segment, + const std::vector<RowsetSharedPtr>& target_rowsets, + int64_t end_version, RowsetWriter* rowset_writer) { + { + std::shared_lock rlock(_lock); + RETURN_IF_ERROR(_status); + } + + DeleteBitmapPtr bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id()); + { + std::lock_guard wlock(_lock); + _delete_bitmaps.push_back(bitmap); + } + return _thread_token->submit_func([=, this]() { + auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets, + bitmap, end_version, rowset_writer); + if (!st.ok()) { + LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: " + << tablet->tablet_id() << " rowset: " << cur_rowset->rowset_id() + << " seg_id: " << cur_segment->id() << " version: " << end_version; + std::lock_guard wlock(_lock); + if (_status.ok()) { + _status = st; + } + } + }); +} + +Status CalcDeleteBitmapToken::wait() { + _thread_token->wait(); + // all tasks complete here, don't need lock; + return _status; +} + +Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) { + std::lock_guard wlock(_lock); + RETURN_IF_ERROR(_status); + + for (auto bitmap : _delete_bitmaps) { + res_bitmap->merge(*bitmap); + } + _delete_bitmaps.clear(); + return Status::OK(); +} + +void CalcDeleteBitmapExecutor::init() { + ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool") + .set_min_threads(1) + .set_max_threads(config::calc_delete_bitmap_max_thread) + .build(&_thread_pool); +} + +std::unique_ptr<CalcDeleteBitmapToken> CalcDeleteBitmapExecutor::create_token() { + return std::make_unique<CalcDeleteBitmapToken>( + _thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)); +} + +} // namespace doris diff --git a/be/src/olap/calc_delete_bitmap_executor.h b/be/src/olap/calc_delete_bitmap_executor.h new file mode 100644 index 0000000000..d2c392a04d --- /dev/null +++ b/be/src/olap/calc_delete_bitmap_executor.h @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <atomic> +#include <iosfwd> +#include <memory> +#include <utility> +#include <vector> + +#include "common/status.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/segment_v2/segment.h" +#include "util/threadpool.h" + +namespace doris { + +class DataDir; +class Tablet; +enum RowsetTypePB : int; +using TabletSharedPtr = std::shared_ptr<Tablet>; + +// A thin wrapper of ThreadPoolToken to submit calc delete bitmap task. +// Usage: +// 1. create a token +// 2. submit delete bitmap calculate tasks +// 3. wait all tasks complete +// 4. call `get_delete_bitmap()` to get the result of all tasks +class CalcDeleteBitmapToken { +public: + explicit CalcDeleteBitmapToken(std::unique_ptr<ThreadPoolToken> thread_token) + : _thread_token(std::move(thread_token)), _status(Status::OK()) {} + + Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, + const segment_v2::SegmentSharedPtr& cur_segment, + const std::vector<RowsetSharedPtr>& target_rowsets, int64_t end_version, + RowsetWriter* rowset_writer); + + // wait all tasks in token to be completed. + Status wait(); + + void cancel() { _thread_token->shutdown(); } + + Status get_delete_bitmap(DeleteBitmapPtr res_bitmap); + +private: + std::unique_ptr<ThreadPoolToken> _thread_token; + + std::shared_mutex _lock; + std::vector<DeleteBitmapPtr> _delete_bitmaps; + + // Records the current status of the calc delete bitmap job. + // Note: Once its value is set to Failed, it cannot return to SUCCESS. + Status _status; +}; + +// CalcDeleteBitmapExecutor is responsible for calc delete bitmap concurrently. +// It encapsulate a ThreadPool to handle all tasks. +class CalcDeleteBitmapExecutor { +public: + CalcDeleteBitmapExecutor() {} + ~CalcDeleteBitmapExecutor() { _thread_pool->shutdown(); } + + // init should be called after storage engine is opened, + void init(); + + std::unique_ptr<CalcDeleteBitmapToken> create_token(); + +private: + std::unique_ptr<ThreadPool> _thread_pool; +}; + +} // namespace doris diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 197ecb417e..dd5b3c74f6 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -125,6 +125,10 @@ DeltaWriter::~DeltaWriter() { } } + if (_calc_delete_bitmap_token != nullptr) { + _calc_delete_bitmap_token->cancel(); + } + if (_tablet != nullptr) { _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string()); @@ -225,6 +229,7 @@ Status DeltaWriter::init() { bool should_serial = false; RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token( &_flush_token, _rowset_writer->type(), should_serial, _req.is_high_priority)); + _calc_delete_bitmap_token = _storage_engine->calc_delete_bitmap_executor()->create_token(); _is_init = true; return Status::OK(); @@ -411,12 +416,10 @@ Status DeltaWriter::close() { } } -Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, - const bool write_single_replica) { - SCOPED_TIMER(_close_wait_timer); +Status DeltaWriter::build_rowset() { std::lock_guard<std::mutex> l(_lock); DCHECK(_is_init) - << "delta writer is supposed be to initialized before close_wait() being called"; + << "delta writer is supposed be to initialized before build_rowset() being called"; if (_is_cancelled) { return _cancel_status; @@ -447,43 +450,68 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, if (_cur_rowset == nullptr) { return Status::Error<MEM_ALLOC_FAILED>("fail to build rowset"); } + return Status::OK(); +} - if (_tablet->enable_unique_key_merge_on_write()) { - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { - LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " - "tablet_id: " - << _tablet->tablet_id() << " txn_id: " << _req.txn_id; - } else { - auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get()); - std::vector<segment_v2::SegmentSharedPtr> segments; - RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); - if (segments.size() > 1) { - // calculate delete bitmap between segments - RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, - _delete_bitmap)); - } +Status DeltaWriter::submit_calc_delete_bitmap_task() { + if (!_tablet->enable_unique_key_merge_on_write()) { + return Status::OK(); + } - // commit_phase_update_delete_bitmap() may generate new segments, we need to create a new - // transient rowset writer to write the new segments, then merge it back the original - // rowset. - std::unique_ptr<RowsetWriter> rowset_writer; - _tablet->create_transient_rowset_writer(_cur_rowset, &rowset_writer); - RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap( - _cur_rowset, _rowset_ids, _delete_bitmap, segments, _req.txn_id, - rowset_writer.get())); - if (_cur_rowset->tablet_schema()->is_partial_update()) { - // build rowset writer and merge transient rowset - RETURN_IF_ERROR(rowset_writer->flush()); - RowsetSharedPtr transient_rowset = rowset_writer->build(); - _cur_rowset->merge_rowset_meta(transient_rowset->rowset_meta()); - - // erase segment cache cause we will add a segment to rowset - SegmentLoader::instance()->erase_segment(_cur_rowset->rowset_id()); - } - } + std::lock_guard<std::mutex> l(_lock); + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " + "tablet_id: " + << _tablet->tablet_id() << " txn_id: " << _req.txn_id; + return Status::OK(); } + auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get()); + std::vector<segment_v2::SegmentSharedPtr> segments; + RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + return Status::OK(); + } + if (segments.size() > 1) { + // calculate delete bitmap between segments + RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, + _delete_bitmap)); + } + + // For partial update, we need to fill in the entire row of data, during the calculation + // of the delete bitmap. This operation is resource-intensive, and we need to minimize + // the number of times it occurs. Therefore, we skip this operation here. + if (_cur_rowset->tablet_schema()->is_partial_update()) { + return Status::OK(); + } + + LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << _tablet->tablet_id() + << ", txn_id: " << _req.txn_id; + return _tablet->commit_phase_update_delete_bitmap(_cur_rowset, _rowset_ids, _delete_bitmap, + segments, _req.txn_id, + _calc_delete_bitmap_token.get(), nullptr); +} + +Status DeltaWriter::wait_calc_delete_bitmap() { + if (!_tablet->enable_unique_key_merge_on_write() || + _cur_rowset->tablet_schema()->is_partial_update()) { + return Status::OK(); + } + std::lock_guard<std::mutex> l(_lock); + RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); + RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap)); + LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: " + << _tablet->tablet_id() << ", txn_id: " << _req.txn_id; + return Status::OK(); +} + +Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, + const bool write_single_replica) { + std::lock_guard<std::mutex> l(_lock); + SCOPED_TIMER(_close_wait_timer); Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, _req.load_id, _cur_rowset, false); @@ -550,6 +578,9 @@ Status DeltaWriter::cancel_with_status(const Status& st) { // cancel and wait all memtables in flush queue to be finished _flush_token->cancel(); } + if (_calc_delete_bitmap_token != nullptr) { + _calc_delete_bitmap_token->cancel(); + } _is_cancelled = true; _cancel_status = st; return Status::OK(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index daf091bf8e..aeef1c6866 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -89,11 +89,14 @@ public: Status append(const vectorized::Block* block); - // flush the last memtable to flush queue, must call it before close_wait() + // flush the last memtable to flush queue, must call it before build_rowset() Status close(); // wait for all memtables to be flushed. // mem_consumption() should be 0 after this function returns. - Status close_wait(const PSlaveTabletNodes& slave_tablet_nodes, const bool write_single_replica); + Status build_rowset(); + Status submit_calc_delete_bitmap_task(); + Status wait_calc_delete_bitmap(); + Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool write_single_replica); bool check_slave_replicas_done(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids); @@ -131,6 +134,9 @@ public: int64_t num_rows_filtered() const; + // For UT + DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; } + private: DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, const UniqueId& load_id); @@ -183,6 +189,7 @@ private: std::shared_mutex _slave_node_lock; DeleteBitmapPtr _delete_bitmap = nullptr; + std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token; // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; // current max version, used to calculate delete bitmap diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index e2acd1cdb0..cdec9c09ed 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -193,7 +193,8 @@ Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr OlapStopWatch watch; RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets, - delete_bitmap, cur_version, rowset_writer)); + delete_bitmap, cur_version, nullptr, + rowset_writer)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 9716f265d3..78860a1f65 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -480,7 +480,7 @@ Status MemTable::_generate_delete_bitmap(int32_t segment_id) { OlapStopWatch watch; RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, specified_rowsets, _mow_context->delete_bitmap, - _mow_context->max_version)); + _mow_context->max_version, nullptr)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index c1af281465..67c41c6f7e 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -243,11 +243,6 @@ Status StorageEngine::start_bg_threads() { .set_max_threads(config::tablet_publish_txn_max_thread) .build(&_tablet_publish_txn_thread_pool); - ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool") - .set_min_threads(1) - .set_max_threads(config::calc_delete_bitmap_max_thread) - .build(&_calc_delete_bitmap_thread_pool); - RETURN_IF_ERROR(Thread::create( "StorageEngine", "aync_publish_version_thread", [this]() { this->_async_publish_callback(); }, &_async_publish_thread)); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 745637a296..340f02b65b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -126,6 +126,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), + _calc_delete_bitmap_executor(nullptr), _default_rowset_type(BETA_ROWSET), _heartbeat_flags(nullptr), _stream_load_recorder(nullptr) { @@ -155,9 +156,6 @@ StorageEngine::~StorageEngine() { if (_tablet_meta_checkpoint_thread_pool) { _tablet_meta_checkpoint_thread_pool->shutdown(); } - if (_calc_delete_bitmap_thread_pool) { - _calc_delete_bitmap_thread_pool->shutdown(); - } if (_cold_data_compaction_thread_pool) { _cold_data_compaction_thread_pool->shutdown(); } @@ -199,6 +197,9 @@ Status StorageEngine::_open() { _memtable_flush_executor.reset(new MemTableFlushExecutor()); _memtable_flush_executor->init(dirs); + _calc_delete_bitmap_executor.reset(new CalcDeleteBitmapExecutor()); + _calc_delete_bitmap_executor->init(); + _parse_default_rowset_type(); return Status::OK(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index a40e1792d9..7af2fde0fb 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -37,6 +37,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" +#include "olap/calc_delete_bitmap_executor.h" #include "olap/compaction_permit_limiter.h" #include "olap/olap_common.h" #include "olap/options.h" @@ -143,6 +144,9 @@ public: TabletManager* tablet_manager() { return _tablet_manager.get(); } TxnManager* txn_manager() { return _txn_manager.get(); } MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); } + CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() { + return _calc_delete_bitmap_executor.get(); + } bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id); @@ -210,9 +214,6 @@ public: std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() { return _tablet_publish_txn_thread_pool; } - std::unique_ptr<ThreadPool>& calc_delete_bitmap_thread_pool() { - return _calc_delete_bitmap_thread_pool; - } bool stopped() { return _stopped; } ThreadPool* get_bg_multiget_threadpool() { return _bg_multi_get_thread_pool.get(); } @@ -423,6 +424,7 @@ private: std::unique_ptr<RowsetIdGenerator> _rowset_id_generator; std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor; + std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor; // Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue. // Type of new loaded data @@ -437,7 +439,6 @@ private: std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool; std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool; - std::unique_ptr<ThreadPool> _calc_delete_bitmap_thread_pool; std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool; std::unique_ptr<ThreadPool> _bg_multi_get_thread_pool; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 1cd2e0427f..19c8a8467e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2992,11 +2992,15 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, return Status::OK(); } +// if user pass a token, then all calculation works will submit to a threadpool, +// user can get all delete bitmaps from that token. +// if `token` is nullptr, the calculation will run in local, and user can get the result +// delete bitmap from `delete_bitmap` directly. Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, int64_t end_version, - RowsetWriter* rowset_writer) { + CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer) { auto rowset_id = rowset->rowset_id(); if (specified_rowsets.empty() || segments.empty()) { LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet_id() @@ -3005,37 +3009,31 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, } OlapStopWatch watch; + doris::TabletSharedPtr tablet_ptr = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id()); + if (tablet_ptr == nullptr) { + return Status::InternalError("Can't find tablet id: {}, maybe already dropped.", + tablet_id()); + } std::vector<DeleteBitmapPtr> seg_delete_bitmaps; - std::unique_ptr<ThreadPoolToken> token = - StorageEngine::instance()->calc_delete_bitmap_thread_pool()->new_token( - ThreadPool::ExecutionMode::CONCURRENT); - std::atomic<int> calc_status {ErrorCode::OK}; - for (size_t i = 1; i < segments.size(); i++) { + for (size_t i = 0; i < segments.size(); i++) { auto& seg = segments[i]; - DeleteBitmapPtr seg_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); - seg_delete_bitmaps.push_back(seg_delete_bitmap); - RETURN_IF_ERROR(token->submit_func([=, &calc_status, this]() { - auto st = calc_segment_delete_bitmap(rowset, seg, specified_rowsets, seg_delete_bitmap, - end_version, rowset_writer); - if (!st.ok()) { - LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: " << tablet_id() - << " rowset: " << rowset_id << " seg_id: " << seg->id() - << " version: " << end_version; - calc_status.store(st.code()); - } - })); + if (token != nullptr) { + RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, specified_rowsets, end_version, + rowset_writer)); + } else { + DeleteBitmapPtr seg_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); + seg_delete_bitmaps.push_back(seg_delete_bitmap); + RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], specified_rowsets, + seg_delete_bitmap, end_version, + rowset_writer)); + } } - // this thread calc delete bitmap of segment 0 - RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0], specified_rowsets, - delete_bitmap, end_version, rowset_writer)); - token->wait(); - auto code = calc_status.load(); - if (code != ErrorCode::OK) { - return Status::Error(code, "Tablet::calc_delete_bitmap meet error"); - } - for (auto seg_delete_bitmap : seg_delete_bitmaps) { - delete_bitmap->merge(*seg_delete_bitmap); + if (token == nullptr) { + for (auto seg_delete_bitmap : seg_delete_bitmaps) { + delete_bitmap->merge(*seg_delete_bitmap); + } } return Status::OK(); } @@ -3212,8 +3210,11 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) std::vector<RowsetSharedPtr> specified_rowsets = get_rowset_by_ids(&cur_rowset_ids); OlapStopWatch watch; + auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, - cur_version - 1)); + cur_version - 1, token.get())); + token->wait(); + token->get_delete_bitmap(delete_bitmap); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); @@ -3233,7 +3234,7 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) Status Tablet::commit_phase_update_delete_bitmap( const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, const std::vector<segment_v2::SegmentSharedPtr>& segments, - int64_t txn_id, RowsetWriter* rowset_writer) { + int64_t txn_id, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer) { SCOPED_BVAR_LATENCY(g_tablet_commit_phase_update_delete_bitmap_latency); RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; @@ -3247,19 +3248,14 @@ Status Tablet::commit_phase_update_delete_bitmap( cur_rowset_ids = all_rs_id(cur_version); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); - if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) { - LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size() - << ", rowset_ids_to_del: " << rowset_ids_to_del.size(); - } specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add); } for (const auto& to_del : rowset_ids_to_del) { delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } - OlapStopWatch watch; RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, - cur_version, rowset_writer)); + cur_version, token, rowset_writer)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); @@ -3267,7 +3263,7 @@ Status Tablet::commit_phase_update_delete_bitmap( << ", rowset_ids to add: " << rowset_ids_to_add.size() << ", rowset_ids to del: " << rowset_ids_to_del.size() << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id - << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; + << ", total rows: " << total_rows; pre_rowset_ids = cur_rowset_ids; return Status::OK(); } @@ -3309,8 +3305,11 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, } OlapStopWatch watch; + auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, - cur_version - 1, rowset_writer)); + cur_version - 1, token.get(), rowset_writer)); + token->wait(); + token->get_delete_bitmap(delete_bitmap); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index adc06de339..3efd2c89ce 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -70,6 +70,7 @@ class RowIdConversion; class TTabletInfo; class TabletMetaPB; class TupleDescriptor; +class CalcDeleteBitmapToken; enum CompressKind : int; namespace io { @@ -446,7 +447,7 @@ public: const std::vector<segment_v2::SegmentSharedPtr>& segments, const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, int64_t version, - RowsetWriter* rowset_writer = nullptr); + CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); std::vector<RowsetSharedPtr> get_rowset_by_ids( const RowsetIdUnorderedSet* specified_rowset_ids); @@ -479,7 +480,7 @@ public: const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, - RowsetWriter* rowset_writer = nullptr); + CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); Status update_delete_bitmap(const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 3862fd533e..714aff369e 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -166,9 +166,9 @@ Status TabletsChannel::close( // just skip this tablet(writer) and continue to close others continue; } - // to make sure tablet writer in `_broken_tablets` won't call `close_wait` method. - // `close_wait` might create the rowset and commit txn directly, and the subsequent - // publish version task will success, which can cause the replica inconsistency. + // tablet writer in `_broken_tablets` should not call `build_rowset` and + // `commit_txn` method, after that, the publish-version task will success, + // which can cause the replica inconsistency. if (_is_broken_tablet(it.second->tablet_id())) { LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled" << ", tablet_id=" << it.first << ", transaction_id=" << _txn_id; @@ -190,7 +190,39 @@ Status TabletsChannel::close( _write_single_replica = write_single_replica; - // 2. wait delta writers and build the tablet vector + // 2. wait all writer finished flush. + for (auto writer : need_wait_writers) { + writer->wait_flush(); + } + + // 3. build rowset + for (auto it = need_wait_writers.begin(); it != need_wait_writers.end(); it++) { + Status st = (*it)->build_rowset(); + if (!st.ok()) { + _add_error_tablet(tablet_errors, (*it)->tablet_id(), st); + it = need_wait_writers.erase(it); + continue; + } + // 3.1 calculate delete bitmap for Unique Key MoW tables + st = (*it)->submit_calc_delete_bitmap_task(); + if (!st.ok()) { + _add_error_tablet(tablet_errors, (*it)->tablet_id(), st); + it = need_wait_writers.erase(it); + continue; + } + } + + // 4. wait for delete bitmap calculation complete if necessary + for (auto it = need_wait_writers.begin(); it != need_wait_writers.end(); it++) { + Status st = (*it)->wait_calc_delete_bitmap(); + if (!st.ok()) { + _add_error_tablet(tablet_errors, (*it)->tablet_id(), st); + it = need_wait_writers.erase(it); + continue; + } + } + + // 5. commit all writers for (auto writer : need_wait_writers) { PSlaveTabletNodes slave_nodes; if (write_single_replica) { @@ -198,7 +230,7 @@ Status TabletsChannel::close( } // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. - _close_wait(writer, tablet_vec, tablet_errors, slave_nodes, write_single_replica); + _commit_txn(writer, tablet_vec, tablet_errors, slave_nodes, write_single_replica); } if (write_single_replica) { @@ -229,12 +261,12 @@ Status TabletsChannel::close( return Status::OK(); } -void TabletsChannel::_close_wait(DeltaWriter* writer, +void TabletsChannel::_commit_txn(DeltaWriter* writer, google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, PSlaveTabletNodes slave_tablet_nodes, const bool write_single_replica) { - Status st = writer->close_wait(slave_tablet_nodes, write_single_replica); + Status st = writer->commit_txn(slave_tablet_nodes, write_single_replica); if (st.ok()) { PTabletInfo* tablet_info = tablet_vec->Add(); tablet_info->set_tablet_id(writer->tablet_id()); @@ -242,14 +274,20 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, tablet_info->set_received_rows(writer->total_received_rows()); tablet_info->set_num_rows_filtered(writer->num_rows_filtered()); } else { - PTabletError* tablet_error = tablet_errors->Add(); - tablet_error->set_tablet_id(writer->tablet_id()); - tablet_error->set_msg(st.to_string()); - VLOG_PROGRESS << "close wait failed tablet " << writer->tablet_id() << " transaction_id " - << _txn_id << "err msg " << st; + _add_error_tablet(tablet_errors, writer->tablet_id(), st); } } +void TabletsChannel::_add_error_tablet( + google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id, + Status error) { + PTabletError* tablet_error = tablet_errors->Add(); + tablet_error->set_tablet_id(tablet_id); + tablet_error->set_msg(error.to_string()); + VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " transaction_id " << _txn_id + << "err msg " << error; +} + int64_t TabletsChannel::mem_consumption() { int64_t mem_usage = 0; { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 9a06653c74..e7c05a1998 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -126,14 +126,16 @@ private: // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); - // deal with DeltaWriter close_wait(), add tablet to list for return. - void _close_wait(DeltaWriter* writer, + // deal with DeltaWriter commit_txn(), add tablet to list for return. + void _commit_txn(DeltaWriter* writer, google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, - google::protobuf::RepeatedPtrField<PTabletError>* tablet_error, + google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, PSlaveTabletNodes slave_tablet_nodes, const bool write_single_replica); void _build_partition_tablets_relation(const PTabletWriterOpenRequest& request); void _add_broken_tablet(int64_t tablet_id); + void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, + int64_t tablet_id, Status error); bool _is_broken_tablet(int64_t tablet_id); void _init_profile(RuntimeProfile* profile); diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index f1d5f49619..9fd998ebd0 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -261,7 +261,8 @@ static void create_tablet_request(int64_t tablet_id, int32_t schema_hash, } static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t schema_hash, - TCreateTabletReq* request) { + TCreateTabletReq* request, + bool enable_mow = false) { request->tablet_id = tablet_id; request->__set_version(1); request->tablet_schema.schema_hash = schema_hash; @@ -270,6 +271,7 @@ static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t s request->tablet_schema.storage_type = TStorageType::COLUMN; request->tablet_schema.__set_sequence_col_idx(4); request->__set_storage_format(TStorageFormat::V2); + request->__set_enable_unique_key_merge_on_write(enable_mow); TColumn k1; k1.column_name = "k1"; @@ -434,6 +436,28 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() { return dtb.desc_tbl(); } +static void generate_data(vectorized::Block* block, int8_t k1, int16_t k2, int32_t seq) { + auto columns = block->mutate_columns(); + int8_t c1 = k1; + columns[0]->insert_data((const char*)&c1, sizeof(c1)); + + int16_t c2 = k2; + columns[1]->insert_data((const char*)&c2, sizeof(c2)); + + vectorized::VecDateTimeValue c3; + c3.from_date_str("2020-07-16 19:39:43", 19); + int64_t c3_int = c3.to_int64(); + columns[2]->insert_data((const char*)&c3_int, sizeof(c3)); + + doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4; + c4.set_time(2022, 6, 6, 0, 0, 0, 0); + uint32_t c4_int = c4.to_date_int_val(); + columns[3]->insert_data((const char*)&c4_int, sizeof(c4)); + + int32_t c5 = seq; + columns[4]->insert_data((const char*)&c5, sizeof(c2)); +} + class TestDeltaWriter : public ::testing::Test { public: TestDeltaWriter() {} @@ -473,7 +497,9 @@ TEST_F(TestDeltaWriter, open) { EXPECT_NE(delta_writer, nullptr); res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(PSlaveTabletNodes(), false); + res = delta_writer->build_rowset(); + EXPECT_EQ(Status::OK(), res); + res = delta_writer->commit_txn(PSlaveTabletNodes(), false); EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); @@ -593,7 +619,15 @@ TEST_F(TestDeltaWriter, vec_write) { res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(PSlaveTabletNodes(), false); + res = delta_writer->wait_flush(); + ASSERT_TRUE(res.ok()); + res = delta_writer->build_rowset(); + ASSERT_TRUE(res.ok()); + res = delta_writer->submit_calc_delete_bitmap_task(); + ASSERT_TRUE(res.ok()); + res = delta_writer->wait_calc_delete_bitmap(); + ASSERT_TRUE(res.ok()); + res = delta_writer->commit_txn(PSlaveTabletNodes(), false); ASSERT_TRUE(res.ok()); // publish version success @@ -661,54 +695,25 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { slot_desc->col_name())); } - auto columns = block.mutate_columns(); - { - int8_t c1 = 123; - columns[0]->insert_data((const char*)&c1, sizeof(c1)); - - int16_t c2 = 456; - columns[1]->insert_data((const char*)&c2, sizeof(c2)); - - vectorized::VecDateTimeValue c3; - c3.from_date_str("2020-07-16 19:39:43", 19); - int64_t c3_int = c3.to_int64(); - columns[2]->insert_data((const char*)&c3_int, sizeof(c3)); - - doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4; - c4.set_time(2022, 6, 6, 0, 0, 0, 0); - uint32_t c4_int = c4.to_date_int_val(); - columns[3]->insert_data((const char*)&c4_int, sizeof(c4)); - - int32_t c5 = 100; - columns[4]->insert_data((const char*)&c5, sizeof(c2)); - res = delta_writer->write(&block, {0}); - ASSERT_TRUE(res.ok()); - } - { - int8_t c1 = 123; - columns[0]->insert_data((const char*)&c1, sizeof(c1)); - - int16_t c2 = 456; - columns[1]->insert_data((const char*)&c2, sizeof(c2)); - - vectorized::VecDateTimeValue c3; - c3.from_date_str("2020-07-31 19:39:43", 19); - int64_t c3_int = c3.to_int64(); - columns[2]->insert_data((const char*)&c3_int, sizeof(c3)); + generate_data(&block, 123, 456, 100); + res = delta_writer->write(&block, {0}); + ASSERT_TRUE(res.ok()); - doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4; - c4.set_time(2022, 7, 6, 0, 0, 0, 0); - uint32_t c4_int = c4.to_date_int_val(); - columns[3]->insert_data((const char*)&c4_int, sizeof(c4)); + generate_data(&block, 123, 456, 90); + res = delta_writer->write(&block, {1}); + ASSERT_TRUE(res.ok()); - int32_t c5 = 90; - columns[4]->insert_data((const char*)&c5, sizeof(c2)); - res = delta_writer->write(&block, {1}); - ASSERT_TRUE(res.ok()); - } res = delta_writer->close(); ASSERT_TRUE(res.ok()); - res = delta_writer->close_wait(PSlaveTabletNodes(), false); + res = delta_writer->wait_flush(); + ASSERT_TRUE(res.ok()); + res = delta_writer->build_rowset(); + ASSERT_TRUE(res.ok()); + res = delta_writer->submit_calc_delete_bitmap_task(); + ASSERT_TRUE(res.ok()); + res = delta_writer->wait_calc_delete_bitmap(); + ASSERT_TRUE(res.ok()); + res = delta_writer->commit_txn(PSlaveTabletNodes(), false); ASSERT_TRUE(res.ok()); // publish version success @@ -767,4 +772,243 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { delete delta_writer; } +TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { + TCreateTabletReq request; + sleep(20); + create_tablet_request_with_sequence_col(10005, 270068377, &request, true); + Status res = k_engine->create_tablet(request); + ASSERT_TRUE(res.ok()); + + TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + OlapTableSchemaParam param; + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req = { + 10005, 270068377, WriteType::LOAD, 20003, 30003, load_id, tuple_desc, &(tuple_desc->slots()), + false, ¶m}; + DeltaWriter* delta_writer1 = nullptr; + DeltaWriter* delta_writer2 = nullptr; + std::unique_ptr<RuntimeProfile> profile1; + profile1 = std::make_unique<RuntimeProfile>("LoadChannels1"); + std::unique_ptr<RuntimeProfile> profile2; + profile2 = std::make_unique<RuntimeProfile>("LoadChannels2"); + DeltaWriter::open(&write_req, &delta_writer1, profile1.get(), TUniqueId()); + DeltaWriter::open(&write_req, &delta_writer2, profile2.get(), TUniqueId()); + ASSERT_NE(delta_writer1, nullptr); + ASSERT_NE(delta_writer2, nullptr); + + // write data in delta writer 1 + { + vectorized::Block block; + for (const auto& slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + generate_data(&block, 10, 123, 100); + res = delta_writer1->write(&block, {0}); + ASSERT_TRUE(res.ok()); + + generate_data(&block, 20, 123, 100); + res = delta_writer1->write(&block, {1}); + ASSERT_TRUE(res.ok()); + + res = delta_writer1->close(); + ASSERT_TRUE(res.ok()); + res = delta_writer1->wait_flush(); + ASSERT_TRUE(res.ok()); + res = delta_writer1->build_rowset(); + ASSERT_TRUE(res.ok()); + res = delta_writer1->submit_calc_delete_bitmap_task(); + ASSERT_TRUE(res.ok()); + res = delta_writer1->wait_calc_delete_bitmap(); + ASSERT_TRUE(res.ok()); + res = delta_writer1->commit_txn(PSlaveTabletNodes(), false); + ASSERT_TRUE(res.ok()); + } + // write data in delta writer 2 + { + vectorized::Block block; + for (const auto& slot_desc : tuple_desc->slots()) { + block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + generate_data(&block, 10, 123, 110); + res = delta_writer2->write(&block, {0}); + ASSERT_TRUE(res.ok()); + + generate_data(&block, 20, 123, 90); + res = delta_writer2->write(&block, {1}); + ASSERT_TRUE(res.ok()); + + res = delta_writer2->close(); + ASSERT_TRUE(res.ok()); + res = delta_writer2->wait_flush(); + ASSERT_TRUE(res.ok()); + } + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); + std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; + OlapMeta* meta = tablet->data_dir()->get_meta(); + RowsetSharedPtr rowset1 = nullptr; + RowsetSharedPtr rowset2 = nullptr; + + // publish version on delta writer 1 success + { + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::cout << "start to add rowset version:" << version.first << "-" << version.second + << std::endl; + std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets( + write_req.txn_id, write_req.partition_id, &tablet_related_rs); + ASSERT_EQ(1, tablet_related_rs.size()); + + std::cout << "start to publish txn" << std::endl; + rowset1 = tablet_related_rs.begin()->second; + TabletPublishStatistics pstats; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, + tablet_related_rs.begin()->first.tablet_uid, + version, &pstats); + ASSERT_TRUE(res.ok()); + std::cout << "start to add inc rowset:" << rowset1->rowset_id() + << ", num rows:" << rowset1->num_rows() + << ", version:" << rowset1->version().first << "-" << rowset1->version().second + << std::endl; + res = tablet->add_inc_rowset(rowset1); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(2, tablet->num_rows()); + std::vector<segment_v2::SegmentSharedPtr> segments; + res = ((BetaRowset*)rowset1.get())->load_segments(&segments); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(1, rowset1->num_segments()); + ASSERT_EQ(1, segments.size()); + } + + // commit delta writer2, then publish it. + { + // commit, calc delete bitmap should happen here + res = delta_writer2->build_rowset(); + ASSERT_TRUE(res.ok()); + res = delta_writer2->submit_calc_delete_bitmap_task(); + ASSERT_TRUE(res.ok()); + res = delta_writer2->wait_calc_delete_bitmap(); + ASSERT_TRUE(res.ok()); + + // verify that delete bitmap calculated correctly + // since the delete bitmap not published, versions are 0 + auto delete_bitmap = delta_writer2->get_delete_bitmap(); + ASSERT_TRUE(delete_bitmap->contains({rowset1->rowset_id(), 0, 0}, 0)); + // We can't get the rowset id of rowset2 now, will check the delete bitmap + // contains row 0 of rowset2 at L929. + + res = delta_writer2->commit_txn(PSlaveTabletNodes(), false); + ASSERT_TRUE(res.ok()); + + Version version; + version.first = tablet->rowset_with_max_version()->end_version() + 1; + version.second = tablet->rowset_with_max_version()->end_version() + 1; + std::cout << "start to add rowset version:" << version.first << "-" << version.second + << std::endl; + std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; + StorageEngine::instance()->txn_manager()->get_txn_related_tablets( + write_req.txn_id, write_req.partition_id, &tablet_related_rs); + ASSERT_EQ(1, tablet_related_rs.size()); + + std::cout << "start to publish txn" << std::endl; + rowset2 = tablet_related_rs.begin()->second; + ASSERT_TRUE(delete_bitmap->contains({rowset2->rowset_id(), 0, 0}, 1)); + + TabletPublishStatistics pstats; + res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, + write_req.tablet_id, write_req.schema_hash, + tablet_related_rs.begin()->first.tablet_uid, + version, &pstats); + ASSERT_TRUE(res.ok()); + std::cout << "start to add inc rowset:" << rowset2->rowset_id() + << ", num rows:" << rowset2->num_rows() + << ", version:" << rowset2->version().first << "-" << rowset2->version().second + << std::endl; + res = tablet->add_inc_rowset(rowset2); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(4, tablet->num_rows()); + std::vector<segment_v2::SegmentSharedPtr> segments; + res = ((BetaRowset*)rowset2.get())->load_segments(&segments); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(1, rowset2->num_segments()); + ASSERT_EQ(1, segments.size()); + } + + auto cur_version = tablet->rowset_with_max_version()->end_version(); + // read data from rowset 1, verify the data correct + { + OlapReaderStatistics stats; + StorageReadOptions opts; + opts.stats = &stats; + opts.tablet_schema = rowset1->tablet_schema(); + opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap().get_agg( + {rowset1->rowset_id(), 0, cur_version})); + std::unique_ptr<RowwiseIterator> iter; + std::shared_ptr<Schema> schema = std::make_shared<Schema>(rowset1->tablet_schema()); + std::vector<segment_v2::SegmentSharedPtr> segments; + ((BetaRowset*)rowset1.get())->load_segments(&segments); + auto s = segments[0]->new_iterator(schema, opts, &iter); + ASSERT_TRUE(s.ok()); + auto read_block = rowset1->tablet_schema()->create_block(); + res = iter->next_batch(&read_block); + ASSERT_TRUE(res.ok()); + // key of (10, 123) is deleted + ASSERT_EQ(1, read_block.rows()); + auto k1 = read_block.get_by_position(0).column->get_int(0); + ASSERT_EQ(20, k1); + auto k2 = read_block.get_by_position(1).column->get_int(0); + ASSERT_EQ(123, k2); + // get the value from sequence column + auto seq_v = read_block.get_by_position(4).column->get_int(0); + ASSERT_EQ(100, seq_v); + } + + // read data from rowset 2, verify the data correct + { + OlapReaderStatistics stats; + StorageReadOptions opts; + opts.stats = &stats; + opts.tablet_schema = rowset2->tablet_schema(); + opts.delete_bitmap.emplace(0, tablet->tablet_meta()->delete_bitmap().get_agg( + {rowset2->rowset_id(), 0, cur_version})); + std::unique_ptr<RowwiseIterator> iter; + std::shared_ptr<Schema> schema = std::make_shared<Schema>(rowset2->tablet_schema()); + std::vector<segment_v2::SegmentSharedPtr> segments; + ((BetaRowset*)rowset2.get())->load_segments(&segments); + auto s = segments[0]->new_iterator(schema, opts, &iter); + ASSERT_TRUE(s.ok()); + auto read_block = rowset2->tablet_schema()->create_block(); + res = iter->next_batch(&read_block); + ASSERT_TRUE(res.ok()); + // key of (20, 123) is deleted, because it's seq value is low + ASSERT_EQ(1, read_block.rows()); + auto k1 = read_block.get_by_position(0).column->get_int(0); + ASSERT_EQ(10, k1); + auto k2 = read_block.get_by_position(1).column->get_int(0); + ASSERT_EQ(123, k2); + // get the value from sequence column + auto seq_v = read_block.get_by_position(4).column->get_int(0); + ASSERT_EQ(110, seq_v); + } + + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); + ASSERT_TRUE(res.ok()); + delete delta_writer1; + delete delta_writer2; +} } // namespace doris diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 79fbf26203..5e2312cb0d 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -193,7 +193,9 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { res = delta_writer->close(); EXPECT_EQ(Status::OK(), res); - res = delta_writer->close_wait(PSlaveTabletNodes(), false); + res = delta_writer->build_rowset(); + EXPECT_EQ(Status::OK(), res); + res = delta_writer->commit_txn(PSlaveTabletNodes(), false); EXPECT_EQ(Status::OK(), res); // publish version success diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 431342c4f2..509425cb15 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -393,7 +393,9 @@ void createTablet(StorageEngine* engine, TabletSharedPtr* tablet, int64_t replic st = delta_writer->close(); ASSERT_EQ(Status::OK(), st); - st = delta_writer->close_wait(PSlaveTabletNodes(), false); + st = delta_writer->build_rowset(); + ASSERT_EQ(Status::OK(), st); + st = delta_writer->commit_txn(PSlaveTabletNodes(), false); ASSERT_EQ(Status::OK(), st); delete delta_writer; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org