This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1514f78b87e [refactor](partial-update) Split partial update infos from tablet schema (#25147) 1514f78b87e is described below commit 1514f78b87e3a3f09e2c8417d203b3968b8f5638 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Tue Oct 17 14:21:40 2023 +0800 [refactor](partial-update) Split partial update infos from tablet schema (#25147) --- be/src/olap/compaction.cpp | 2 +- be/src/olap/delta_writer.cpp | 1 + be/src/olap/delta_writer_v2.cpp | 10 ++-- be/src/olap/delta_writer_v2.h | 3 ++ be/src/olap/memtable.cpp | 20 ++++--- be/src/olap/memtable.h | 6 ++- be/src/olap/memtable_writer.cpp | 10 ++-- be/src/olap/memtable_writer.h | 4 ++ be/src/olap/partial_update_info.h | 54 +++++++++++++++++++ be/src/olap/rowset/beta_rowset_writer.cpp | 2 +- be/src/olap/rowset/beta_rowset_writer.h | 8 +++ be/src/olap/rowset/beta_rowset_writer_v2.h | 8 +++ be/src/olap/rowset/rowset_writer.h | 4 ++ be/src/olap/rowset/rowset_writer_context.h | 5 ++ be/src/olap/rowset/segment_v2/segment_writer.cpp | 20 ++++--- be/src/olap/rowset_builder.cpp | 16 +++--- be/src/olap/rowset_builder.h | 7 +++ be/src/olap/tablet.cpp | 24 +++++---- be/src/olap/tablet.h | 7 ++- be/src/olap/tablet_schema.cpp | 68 +----------------------- be/src/olap/tablet_schema.h | 24 +-------- be/src/olap/txn_manager.cpp | 18 ++++--- be/src/olap/txn_manager.h | 5 +- be/src/service/backend_service.cpp | 2 +- gensrc/proto/descriptors.proto | 6 +-- gensrc/proto/olap_file.proto | 4 +- 26 files changed, 188 insertions(+), 150 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8fb2df2a49f..8383dac052f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -685,7 +685,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { it.rowset_ids.insert(_output_rowset->rowset_id()); StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap( it.partition_id, it.transaction_id, _tablet->tablet_id(), - _tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids); + _tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids, nullptr); } } diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 2f2ff7cc938..2acb4069910 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -99,6 +99,7 @@ Status DeltaWriter::init() { RETURN_IF_ERROR(_rowset_builder.init()); RETURN_IF_ERROR( _memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(), + _rowset_builder.get_partial_update_info(), _rowset_builder.tablet()->enable_unique_key_merge_on_write())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 039ad714004..ef3ff23f9d8 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -50,6 +50,7 @@ #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" @@ -121,10 +122,11 @@ Status DeltaWriterV2::init() { context.rowset_type = RowsetTypePB::BETA_ROWSET; context.rowset_id = StorageEngine::instance()->next_rowset_id(); context.data_dir = nullptr; + context.partial_update_info = _partial_update_info; _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, + RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; @@ -221,8 +223,10 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_table_id(table_schema_param->table_id()); // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); + _partial_update_info = std::make_shared<PartialUpdateInfo>(); + _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode()); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 0f8d21a19b1..b2b1f5f1c19 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -34,6 +34,7 @@ #include "olap/delta_writer_context.h" #include "olap/memtable_writer.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/rowset/rowset.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" @@ -126,6 +127,8 @@ private: MonotonicStopWatch _lock_watch; std::vector<std::shared_ptr<LoadStreamStub>> _streams; + + std::shared_ptr<PartialUpdateInfo> _partial_update_info; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 47c16424478..d163abd26a7 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -49,7 +49,7 @@ using namespace ErrorCode; MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, + bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, const std::shared_ptr<MemTracker>& insert_mem_tracker, const std::shared_ptr<MemTracker>& flush_mem_tracker) : _tablet_id(tablet_id), @@ -77,8 +77,11 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _num_columns = _tablet_schema->num_columns(); - if (_tablet_schema->is_partial_update()) { - _num_columns = _tablet_schema->partial_input_column_size(); + if (partial_update_info != nullptr) { + _is_partial_update = partial_update_info->is_partial_update; + if (_is_partial_update) { + _num_columns = partial_update_info->partial_update_input_columns.size(); + } } } void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, @@ -178,7 +181,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in _init_agg_functions(&target_block); } if (_tablet_schema->has_sequence_col()) { - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { // for unique key partial update, sequence column index in block // may be different with the index in `_tablet_schema` for (size_t i = 0; i < cloneBlock.columns(); i++) { @@ -417,8 +420,8 @@ void MemTable::shrink_memtable_by_agg() { bool MemTable::need_flush() const { auto max_size = config::write_buffer_size; - if (_tablet_schema->is_partial_update()) { - auto update_columns_size = _tablet_schema->partial_input_column_size(); + if (_is_partial_update) { + auto update_columns_size = _num_columns; max_size = max_size * update_columns_size / _tablet_schema->num_columns(); max_size = max_size > 1048576 ? max_size : 1048576; } @@ -428,11 +431,6 @@ bool MemTable::need_flush() const { bool MemTable::need_agg() const { if (_keys_type == KeysType::AGG_KEYS) { auto max_size = config::write_buffer_size_for_agg; - if (_tablet_schema->is_partial_update()) { - auto update_columns_size = _tablet_schema->partial_input_column_size(); - max_size = max_size * update_columns_size / _tablet_schema->num_columns(); - max_size = max_size > 1048576 ? max_size : 1048576; - } return memory_usage() >= max_size; } return false; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index ed9226c4a0c..b98e7411e3b 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -31,6 +31,8 @@ #include "common/status.h" #include "gutil/integral_types.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" +#include "olap/tablet_schema.h" #include "runtime/memory/mem_tracker.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/arena.h" @@ -167,7 +169,8 @@ class MemTable { public: MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& insert_mem_tracker, + bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, + const std::shared_ptr<MemTracker>& insert_mem_tracker, const std::shared_ptr<MemTracker>& flush_mem_tracker); ~MemTable(); @@ -202,6 +205,7 @@ private: private: int64_t _tablet_id; bool _enable_unique_key_mow = false; + bool _is_partial_update = false; const KeysType _keys_type; const TabletSchema* _tablet_schema; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 4013f7fda99..2ef704f075a 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -39,6 +39,7 @@ #include "olap/rowset/rowset_writer.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" +#include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "service/backend_options.h" @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() { } Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, - TabletSchemaSPtr tablet_schema, bool unique_key_mow) { + TabletSchemaSPtr tablet_schema, + std::shared_ptr<PartialUpdateInfo> partial_update_info, + bool unique_key_mow) { _rowset_writer = rowset_writer; _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; + _partial_update_info = partial_update_info; _reset_mem_table(); @@ -195,8 +199,8 @@ void MemTableWriter::_reset_mem_table() { _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc, - _unique_key_mow, mem_table_insert_tracker, - mem_table_flush_tracker)); + _unique_key_mow, _partial_update_info.get(), + mem_table_insert_tracker, mem_table_flush_tracker)); _segment_num++; } diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index b374f10bded..3491f72abd5 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -33,6 +33,7 @@ #include "olap/delta_writer_context.h" #include "olap/memtable.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/rowset/rowset.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" @@ -67,6 +68,7 @@ public: ~MemTableWriter(); Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema, + std::shared_ptr<PartialUpdateInfo> partial_update_info, bool unique_key_mow = false); Status write(const vectorized::Block* block, const std::vector<int>& row_idxs, @@ -141,6 +143,8 @@ private: int64_t _segment_num = 0; MonotonicStopWatch _lock_watch; + + std::shared_ptr<PartialUpdateInfo> _partial_update_info; }; } // namespace doris diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h new file mode 100644 index 00000000000..cdea698b20d --- /dev/null +++ b/be/src/olap/partial_update_info.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/tablet_schema.h" + +namespace doris { + +struct PartialUpdateInfo { + void init(const TabletSchema& tablet_schema, bool partial_update, + const std::set<string>& partial_update_cols, bool is_strict_mode) { + is_partial_update = partial_update; + partial_update_input_columns = partial_update_cols; + missing_cids.clear(); + update_cids.clear(); + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { + auto tablet_column = tablet_schema.column(i); + if (!partial_update_input_columns.contains(tablet_column.name())) { + missing_cids.emplace_back(i); + if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { + can_insert_new_rows_in_partial_update = false; + } + } else { + update_cids.emplace_back(i); + } + } + this->is_strict_mode = is_strict_mode; + } + + bool is_partial_update {false}; + std::set<std::string> partial_update_input_columns; + std::vector<uint32_t> missing_cids; + std::vector<uint32_t> update_cids; + // if key not exist in old rowset, use default value or null value for the unmentioned cols + // to generate a new row, only available in non-strict mode + bool can_insert_new_rows_in_partial_update {true}; + bool is_strict_mode {false}; +}; +} // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index b8c578ff80d..9262d603e28 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -135,7 +135,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { SCOPED_RAW_TIMER(&_delete_bitmap_ns); if (!_context.tablet->enable_unique_key_merge_on_write() || - _context.tablet_schema->is_partial_update()) { + (_context.partial_update_info && _context.partial_update_info->is_partial_update)) { return Status::OK(); } auto rowset = _build_tmp(); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 3691a661403..859ab84c52f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -131,6 +131,14 @@ public: int64_t segment_writer_ns() override { return _segment_writer_ns; } + std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override { + return _context.partial_update_info; + } + + bool is_partial_update() override { + return _context.partial_update_info && _context.partial_update_info->is_partial_update; + } + private: Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); Status _check_segment_number_limit(); diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 1279a564ff3..c845425bb10 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -134,6 +134,14 @@ public: int64_t segment_writer_ns() override { return _segment_writer_ns; } + std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override { + return _context.partial_update_info; + } + + bool is_partial_update() override { + return _context.partial_update_info && _context.partial_update_info->is_partial_update; + } + private: RowsetWriterContext _context; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index d32c813233b..869abea483e 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -151,6 +151,10 @@ public: virtual int64_t segment_writer_ns() { return 0; } + virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0; + + virtual bool is_partial_update() = 0; + private: DISALLOW_COPY_AND_ASSIGN(RowsetWriter); }; diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 985efa0809e..6c4c7a04e0c 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -21,6 +21,7 @@ #include "io/fs/file_system.h" #include "olap/olap_define.h" +#include "olap/partial_update_info.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" @@ -105,6 +106,10 @@ struct RowsetWriterContext { // segcompaction for this RowsetWriter, disable it for some transient writers bool enable_segcompaction = true; + + std::shared_ptr<PartialUpdateInfo> partial_update_info; + + bool is_transient_rowset_writer = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 9508557d0ef..0a6e72ad662 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -333,9 +333,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); + DCHECK(_opts.rowset_ctx->partial_update_info); // find missing column cids - std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids(); - std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids(); + std::vector<uint32_t> missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids; + std::vector<uint32_t> including_cids = _opts.rowset_ctx->partial_update_info->update_cids; // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); @@ -421,7 +422,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is<KEY_NOT_FOUND>()) { - if (_tablet_schema->is_strict_mode()) { + if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { ++num_rows_filtered; // delete the invalid newly inserted row _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, @@ -429,7 +430,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* segment_pos); } - if (!_tablet_schema->can_insert_new_rows_in_partial_update()) { + if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) { return Status::InternalError( "the unmentioned columns should have default value or be nullable for " "newly inserted rows in non-strict mode partial update"); @@ -492,7 +493,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } // convert missing columns and send to column writer - auto cids_missing = _tablet_schema->get_missing_cids(); + auto cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids; _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows, cids_missing); for (auto cid : cids_missing) { @@ -545,8 +546,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f bool has_default_or_nullable, const size_t& segment_start_pos) { // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); + std::vector<uint32_t> cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids; + auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing); CHECK(cids_missing.size() == old_value_block.columns()); auto mutable_old_columns = old_value_block.mutate_columns(); bool has_row_column = _tablet_schema->store_row_column(); @@ -652,7 +653,10 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows) { - if (_tablet_schema->is_partial_update() && _opts.write_type == DataWriteType::TYPE_DIRECT) { + if (_opts.rowset_ctx->partial_update_info && + _opts.rowset_ctx->partial_update_info->is_partial_update && + _opts.write_type == DataWriteType::TYPE_DIRECT && + !_opts.rowset_ctx->is_transient_rowset_writer) { RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows)); return Status::OK(); } diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 0173643b8c5..1e1facab69a 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -43,6 +43,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" #include "olap/txn_manager.h" #include "util/brpc_client_cache.h" #include "util/mem_info.h" @@ -172,6 +173,7 @@ Status RowsetBuilder::init() { context.write_type = DataWriteType::TYPE_DIRECT; context.mow_context = mow_context; context.write_file_cache = _req.write_file_cache; + context.partial_update_info = _partial_update_info; std::unique_ptr<RowsetWriter> rowset_writer; RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &rowset_writer)); _rowset_writer = std::move(rowset_writer); @@ -223,7 +225,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { // For partial update, we need to fill in the entire row of data, during the calculation // of the delete bitmap. This operation is resource-intensive, and we need to minimize // the number of times it occurs. Therefore, we skip this operation here. - if (_rowset->tablet_schema()->is_partial_update()) { + if (_partial_update_info->is_partial_update) { return Status::OK(); } @@ -235,8 +237,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { } Status RowsetBuilder::wait_calc_delete_bitmap() { - if (!_tablet->enable_unique_key_merge_on_write() || - _rowset->tablet_schema()->is_partial_update()) { + if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) { return Status::OK(); } std::lock_guard<std::mutex> l(_lock); @@ -278,7 +279,7 @@ Status RowsetBuilder::commit_txn() { if (_tablet->enable_unique_key_merge_on_write()) { _storage_engine->txn_manager()->set_txn_related_delete_bitmap( _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->tablet_uid(), true, - _delete_bitmap, _rowset_ids); + _delete_bitmap, _rowset_ids, _partial_update_info); } _is_committed = true; @@ -321,9 +322,10 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_table_id(table_schema_param->table_id()); // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); - _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); + _partial_update_info = std::make_shared<PartialUpdateInfo>(); + _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode()); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 8bb94c20905..b3193f2eafe 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -32,6 +32,7 @@ #include "common/status.h" #include "olap/delta_writer_context.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/rowset/rowset.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" @@ -86,6 +87,10 @@ public: // For UT DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; } + std::shared_ptr<PartialUpdateInfo> get_partial_update_info() const { + return _partial_update_info; + } + private: void _garbage_collection(); @@ -113,6 +118,8 @@ private: // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + std::shared_ptr<PartialUpdateInfo> _partial_update_info; + RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _build_rowset_timer = nullptr; RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8d11e39731e..12ef819d6fb 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2006,14 +2006,14 @@ Status Tablet::create_rowset_writer(RowsetWriterContext& context, // create a rowset writer with rowset_id and seg_id // after writer, merge this transient rowset with original rowset -Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr, - std::unique_ptr<RowsetWriter>* rowset_writer) { +Status Tablet::create_transient_rowset_writer( + RowsetSharedPtr rowset_ptr, std::unique_ptr<RowsetWriter>* rowset_writer, + std::shared_ptr<PartialUpdateInfo> partial_update_info) { RowsetWriterContext context; context.rowset_state = PREPARED; context.segments_overlap = OVERLAPPING; context.tablet_schema = std::make_shared<TabletSchema>(); context.tablet_schema->copy_from(*(rowset_ptr->tablet_schema())); - context.tablet_schema->set_partial_update_info(false, std::set<std::string>()); context.newest_write_timestamp = UnixSeconds(); context.tablet_id = table_id(); context.enable_segcompaction = false; @@ -2021,6 +2021,8 @@ Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr, // get the shared_ptr from tablet_manager. context.tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id()); context.write_type = DataWriteType::TYPE_DIRECT; + context.partial_update_info = partial_update_info; + context.is_transient_rowset_writer = true; RETURN_IF_ERROR( create_transient_rowset_writer(context, rowset_ptr->rowset_id(), rowset_writer)); (*rowset_writer)->set_segment_start_id(rowset_ptr->num_segments()); @@ -2927,7 +2929,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, auto rowset_id = rowset->rowset_id(); Version dummy_version(end_version + 1, end_version + 1); auto rowset_schema = rowset->tablet_schema(); - bool is_partial_update = rowset_schema->is_partial_update(); + bool is_partial_update = rowset_writer && rowset_writer->is_partial_update(); // use for partial update PartialUpdateReadPlan read_plan_ori; PartialUpdateReadPlan read_plan_update; @@ -3048,8 +3050,11 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, } if (pos > 0) { + auto partial_update_info = rowset_writer->get_partial_update_info(); + DCHECK(partial_update_info); RETURN_IF_ERROR(generate_new_block_for_partial_update( - rowset_schema, read_plan_ori, read_plan_update, rsid_to_rowset, &block)); + rowset_schema, partial_update_info->missing_cids, partial_update_info->update_cids, + read_plan_ori, read_plan_update, rsid_to_rowset, &block)); sort_block(block, ordered_block); RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); } @@ -3123,7 +3128,8 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids( } Status Tablet::generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori, + TabletSchemaSPtr rowset_schema, const std::vector<uint32>& missing_cids, + const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block* output_block) { @@ -3134,10 +3140,8 @@ Status Tablet::generate_new_block_for_partial_update( // 4. mark current keys deleted CHECK(output_block); auto full_mutable_columns = output_block->mutate_columns(); - auto old_block = rowset_schema->create_missing_columns_block(); - auto missing_cids = rowset_schema->get_missing_cids(); - auto update_block = rowset_schema->create_update_columns_block(); - auto update_cids = rowset_schema->get_update_cids(); + auto old_block = rowset_schema->create_block_by_cids(missing_cids); + auto update_block = rowset_schema->create_block_by_cids(update_cids); std::map<uint32_t, uint32_t> read_index_old; RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index c7e7dcc6e31..576dcce555f 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -44,6 +44,7 @@ #include "olap/binlog_config.h" #include "olap/data_dir.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_reader.h" @@ -351,7 +352,8 @@ public: std::unique_ptr<RowsetWriter>* rowset_writer); Status create_transient_rowset_writer(RowsetSharedPtr rowset_ptr, - std::unique_ptr<RowsetWriter>* rowset_writer); + std::unique_ptr<RowsetWriter>* rowset_writer, + std::shared_ptr<PartialUpdateInfo> partial_update_info); Status create_transient_rowset_writer(RowsetWriterContext& context, const RowsetId& rowset_id, std::unique_ptr<RowsetWriter>* rowset_writer); @@ -477,7 +479,8 @@ public: void prepare_to_read(const RowLocation& row_location, size_t pos, PartialUpdateReadPlan* read_plan); Status generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori, + TabletSchemaSPtr rowset_schema, const std::vector<uint32>& missing_cids, + const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block* output_block); diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 9cbc9ab608f..20260f2f4ff 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -710,9 +710,6 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _indexes.clear(); _field_name_to_index.clear(); _field_id_to_index.clear(); - _partial_update_input_columns.clear(); - _missing_cids.clear(); - _update_cids.clear(); for (auto& column_pb : schema.column()) { TabletColumn column; column.init_from_pb(column_pb); @@ -758,23 +755,6 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _sort_col_num = schema.sort_col_num(); _compression_type = schema.compression_type(); _schema_version = schema.schema_version(); - _is_partial_update = schema.is_partial_update(); - for (auto& col_name : schema.partial_update_input_columns()) { - _partial_update_input_columns.emplace(col_name); - } - if (_is_partial_update) { - for (auto i = 0; i < _cols.size(); ++i) { - if (_partial_update_input_columns.count(_cols[i].name()) == 0) { - _missing_cids.emplace_back(i); - auto tablet_column = column(i); - if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { - _can_insert_new_rows_in_partial_update = false; - } - } else { - _update_cids.emplace_back(i); - } - } - } } void TabletSchema::copy_from(const TabletSchema& tablet_schema) { @@ -916,10 +896,6 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const { tablet_schema_pb->set_schema_version(_schema_version); tablet_schema_pb->set_compression_type(_compression_type); tablet_schema_pb->set_version_col_idx(_version_col_idx); - tablet_schema_pb->set_is_partial_update(_is_partial_update); - for (auto& col : _partial_update_input_columns) { - *tablet_schema_pb->add_partial_update_input_columns() = col; - } } size_t TabletSchema::row_size() const { @@ -1103,19 +1079,9 @@ vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const { return block; } -vectorized::Block TabletSchema::create_missing_columns_block() { - vectorized::Block block; - for (const auto& cid : _missing_cids) { - auto col = _cols[cid]; - auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col); - block.insert({data_type->create_column(), data_type, col.name()}); - } - return block; -} - -vectorized::Block TabletSchema::create_update_columns_block() { +vectorized::Block TabletSchema::create_block_by_cids(const std::vector<uint32_t>& cids) { vectorized::Block block; - for (const auto& cid : _update_cids) { + for (const auto& cid : cids) { auto col = _cols[cid]; auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col); block.insert({data_type->create_column(), data_type, col.name()}); @@ -1123,36 +1089,6 @@ vectorized::Block TabletSchema::create_update_columns_block() { return block; } -void TabletSchema::set_partial_update_info(bool is_partial_update, - const std::set<string>& partial_update_input_columns) { - _is_partial_update = is_partial_update; - _partial_update_input_columns = partial_update_input_columns; - _missing_cids.clear(); - _update_cids.clear(); - for (auto i = 0; i < _cols.size(); ++i) { - if (_partial_update_input_columns.count(_cols[i].name()) == 0) { - _missing_cids.emplace_back(i); - auto tablet_column = column(i); - if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { - _can_insert_new_rows_in_partial_update = false; - } - } else { - _update_cids.emplace_back(i); - } - } -} - -bool TabletSchema::is_column_missing(size_t cid) const { - DCHECK(cid < _cols.size()); - if (!_is_partial_update) { - return false; - } - if (_partial_update_input_columns.count(_cols[cid].name()) == 0) { - return true; - } - return false; -} - bool operator==(const TabletColumn& a, const TabletColumn& b) { if (a._unique_id != b._unique_id) return false; if (a._col_name != b._col_name) return false; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 5e6ffa3ac77..42fabc6c2f6 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -351,20 +351,7 @@ public: return str; } - vectorized::Block create_missing_columns_block(); - vectorized::Block create_update_columns_block(); - void set_partial_update_info(bool is_partial_update, - const std::set<string>& partial_update_input_columns); - bool is_partial_update() const { return _is_partial_update; } - size_t partial_input_column_size() const { return _partial_update_input_columns.size(); } - bool is_column_missing(size_t cid) const; - bool can_insert_new_rows_in_partial_update() const { - return _can_insert_new_rows_in_partial_update; - } - void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode = is_strict_mode; } - bool is_strict_mode() const { return _is_strict_mode; } - std::vector<uint32_t> get_missing_cids() const { return _missing_cids; } - std::vector<uint32_t> get_update_cids() const { return _update_cids; } + vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids); private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); @@ -402,15 +389,6 @@ private: int64_t _mem_size = 0; bool _store_row_column = false; bool _skip_write_index_on_load = false; - - bool _is_partial_update; - std::set<std::string> _partial_update_input_columns; - std::vector<uint32_t> _missing_cids; - std::vector<uint32_t> _update_cids; - // if key not exist in old rowset, use default value or null value for the unmentioned cols - // to generate a new row, only available in non-strict mode - bool _can_insert_new_rows_in_partial_update = true; - bool _is_strict_mode = false; }; bool operator==(const TabletSchema& a, const TabletSchema& b); diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 6d14e620d96..e5d7ceffa91 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -35,6 +35,7 @@ #include "common/logging.h" #include "olap/data_dir.h" #include "olap/delta_writer.h" +#include "olap/olap_common.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/schema_change.h" @@ -175,11 +176,11 @@ Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet->tablet_id(), tablet->tablet_uid()); } -void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, - TTransactionId transaction_id, TTabletId tablet_id, - TabletUid tablet_uid, bool unique_key_merge_on_write, - DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids) { +void TxnManager::set_txn_related_delete_bitmap( + TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, + TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, + std::shared_ptr<PartialUpdateInfo> partial_update_info) { pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, tablet_uid); @@ -205,6 +206,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, load_info.unique_key_merge_on_write = unique_key_merge_on_write; load_info.delete_bitmap = delete_bitmap; load_info.rowset_ids = rowset_ids; + load_info.partial_update_info = partial_update_info; } } @@ -363,7 +365,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // update delete_bitmap if (tablet_txn_info.unique_key_merge_on_write) { std::unique_ptr<RowsetWriter> rowset_writer; - static_cast<void>(tablet->create_transient_rowset_writer(rowset, &rowset_writer)); + static_cast<void>(tablet->create_transient_rowset_writer( + rowset, &rowset_writer, tablet_txn_info.partial_update_info)); int64_t t2 = MonotonicMicros(); RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids, @@ -371,7 +374,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, rowset_writer.get())); int64_t t3 = MonotonicMicros(); stats->calc_delete_bitmap_time_us = t3 - t2; - if (rowset->tablet_schema()->is_partial_update()) { + if (tablet_txn_info.partial_update_info && + tablet_txn_info.partial_update_info->is_partial_update) { // build rowset writer and merge transient rowset RETURN_IF_ERROR(rowset_writer->flush()); RowsetSharedPtr transient_rowset; diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index c311fed8799..a7dcc852d9b 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -36,6 +36,7 @@ #include "common/status.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/segment_v2/segment.h" @@ -59,6 +60,7 @@ struct TabletTxnInfo { RowsetIdUnorderedSet rowset_ids; int64_t creation_time; bool ingest {false}; + std::shared_ptr<PartialUpdateInfo> partial_update_info; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} @@ -185,7 +187,8 @@ public: TTabletId tablet_id, TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids); + const RowsetIdUnorderedSet& rowset_ids, + std::shared_ptr<PartialUpdateInfo> partial_update_info); void get_all_commit_tablet_txn_info_by_tablet( const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 69c9e6608e7..436187d779b 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -717,7 +717,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, if (local_tablet->enable_unique_key_merge_on_write()) { StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap( partition_id, txn_id, local_tablet_id, local_tablet->tablet_uid(), true, - delete_bitmap, pre_rowset_ids); + delete_bitmap, pre_rowset_ids, nullptr); } tstatus.__set_status_code(TStatusCode::OK); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index abebf8fde5e..99101767644 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -64,8 +64,8 @@ message POlapTableSchemaParam { repeated PSlotDescriptor slot_descs = 4; required PTupleDescriptor tuple_desc = 5; repeated POlapTableIndexSchema indexes = 6; - optional bool partial_update = 7; - repeated string partial_update_input_columns = 8; - optional bool is_strict_mode = 9 [default = false]; + optional bool partial_update = 7; // deprecated + repeated string partial_update_input_columns = 8; // deprecated + optional bool is_strict_mode = 9 [default = false]; // deprecated }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index fe5f76f6b4f..4c49e31a7f4 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -250,8 +250,8 @@ message TabletSchemaPB { optional int32 version_col_idx = 17 [default = -1]; optional bool store_row_column = 18 [default=false]; // store tuplerow oriented column optional bool is_dynamic_schema = 19 [default=false]; // deprecated - optional bool is_partial_update = 20 [default=false]; - repeated string partial_update_input_columns = 21; + optional bool is_partial_update = 20 [default=false]; // deprecated + repeated string partial_update_input_columns = 21; // deprecated optional bool enable_single_replica_compaction = 22 [default=false]; optional bool skip_write_index_on_load = 23 [default=false]; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org