This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d0568b80787 branch-3.0: [Fix](partial update) abort partial update on shadow index's tablet when the including columns miss key columns on new schema #46347 (#46587) d0568b80787 is described below commit d0568b8078728b3eb47f789d8eef108c9c9faada Author: bobhan1 <bao...@selectdb.com> AuthorDate: Wed Jan 8 22:36:30 2025 +0800 branch-3.0: [Fix](partial update) abort partial update on shadow index's tablet when the including columns miss key columns on new schema #46347 (#46587) pick https://github.com/apache/doris/pull/46347 --- be/src/cloud/cloud_rowset_builder.cpp | 4 +- be/src/olap/delta_writer_v2.cpp | 23 +++---- be/src/olap/delta_writer_v2.h | 6 +- be/src/olap/partial_update_info.cpp | 28 ++++++-- be/src/olap/partial_update_info.h | 9 +-- .../rowset/segment_v2/vertical_segment_writer.cpp | 8 +++ be/src/olap/rowset_builder.cpp | 18 +++--- be/src/olap/rowset_builder.h | 6 +- be/src/olap/schema_change.cpp | 1 + .../org/apache/doris/alter/SchemaChangeJobV2.java | 1 + .../partial_update/test_add_key_partial_update.out | 17 +++++ .../test_add_key_partial_update.groovy | 74 ++++++++++++++++++++++ 12 files changed, 159 insertions(+), 36 deletions(-) diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 2e6764b33aa..9466dd10628 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -51,8 +51,8 @@ Status CloudRowsetBuilder::init() { duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema()); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); RowsetWriterContext context; context.txn_id = _req.txn_id; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 73d2fb1d974..5a420f6c387 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -102,8 +102,8 @@ Status DeltaWriterV2::init() { if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { return Status::InternalError("failed to find tablet schema for {}", _req.index_id); } - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_streams[0]->tablet_schema(_req.index_id)); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_streams[0]->tablet_schema(_req.index_id))); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -211,9 +211,9 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) { return Status::OK(); } -void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { _tablet_schema->copy_from(ori_tablet_schema); // find the right index id int i = 0; @@ -237,12 +237,13 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared<PartialUpdateInfo>(); - _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns(), - table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), - table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + RETURN_IF_ERROR(_partial_update_info->init( + _req.tablet_id, _req.txn_id, *_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), + table_schema_param->nano_seconds(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn())); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index beeb3d3ecd3..2afc51c5d5e 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -86,9 +86,9 @@ public: Status cancel_with_status(const Status& st); private: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); void _update_profile(RuntimeProfile* profile); diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index a1ae19a6aa7..5ab08b012f1 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -20,6 +20,7 @@ #include <gen_cpp/olap_file.pb.h> #include "common/consts.h" +#include "common/logging.h" #include "olap/base_tablet.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" @@ -32,11 +33,11 @@ namespace doris { -void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update, - const std::set<string>& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, - const std::string& timezone, const std::string& auto_increment_column, - int64_t cur_max_version) { +Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + bool partial_update, const std::set<string>& partial_update_cols, + bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, + const std::string& auto_increment_column, int64_t cur_max_version) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; @@ -45,6 +46,22 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_upd this->timezone = timezone; missing_cids.clear(); update_cids.clear(); + + if (is_partial_update) { + // partial_update_cols should include all key columns + for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { + const auto key_col = tablet_schema.column(i); + if (!partial_update_cols.contains(key_col.name())) { + auto msg = fmt::format( + "Unable to do partial update on shadow index's tablet, tablet_id={}, " + "txn_id={}. Missing key column {}.", + tablet_id, txn_id, key_col.name()); + LOG_WARNING(msg); + return Status::Aborted<false>(msg); + } + } + } + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { auto tablet_column = tablet_schema.column(i); if (!partial_update_input_columns.contains(tablet_column.name())) { @@ -64,6 +81,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_upd is_input_columns_contains_auto_inc_column = is_partial_update && partial_update_input_columns.contains(auto_increment_column); _generate_default_values_for_missing_cids(tablet_schema); + return Status::OK(); } void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 88fac1a3e9c..05372154bd2 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -37,10 +37,11 @@ struct RowsetWriterContext; struct RowsetId; struct PartialUpdateInfo { - void init(const TabletSchema& tablet_schema, bool partial_update, - const std::set<std::string>& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, - const std::string& auto_increment_column, int64_t cur_max_version = -1); + Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, + bool partial_update, const std::set<std::string>& partial_update_cols, + bool is_strict_mode, int64_t timestamp_ms, int32_t nano_seconds, + const std::string& timezone, const std::string& auto_increment_column, + int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 7970cb27aef..6339c7db4bf 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -59,6 +59,7 @@ #include "service/point_query_executor.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/debug_points.h" #include "util/faststring.h" #include "util/key_util.h" #include "vec/columns/column_nullable.h" @@ -459,6 +460,8 @@ Status VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos // 3. set columns to data convertor and then write all columns Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block) { + DBUG_EXECUTE_IF("_append_block_with_partial_content.block", DBUG_BLOCK); + RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos)); // create full block and fill with input columns @@ -941,6 +944,11 @@ void VerticalSegmentWriter::_encode_rowid(const uint32_t rowid, string* encoded_ std::string VerticalSegmentWriter::_full_encode_keys( const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos) { assert(_key_index_size.size() == _num_sort_key_columns); + if (!(key_columns.size() == _num_sort_key_columns && + _key_coders.size() == _num_sort_key_columns)) { + LOG_INFO("key_columns.size()={}, _key_coders.size()={}, _num_sort_key_columns={}, ", + key_columns.size(), _key_coders.size(), _num_sort_key_columns); + } assert(key_columns.size() == _num_sort_key_columns && _key_coders.size() == _num_sort_key_columns); return _full_encode_keys(_key_coders, key_columns, pos); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 057cc35fb13..63ffe4f68a1 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -221,8 +221,8 @@ Status RowsetBuilder::init() { }; }) // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), - *_tablet->tablet_schema()); + RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema())); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -385,9 +385,9 @@ Status BaseRowsetBuilder::cancel() { return Status::OK(); } -void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema) { +Status BaseRowsetBuilder::_build_current_tablet_schema( + int64_t index_id, const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema) { // find the right index id int i = 0; auto indexes = table_schema_param->indexes(); @@ -427,12 +427,14 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared<PartialUpdateInfo>(); - _partial_update_info->init( - *_tablet_schema, table_schema_param->is_partial_update(), + RETURN_IF_ERROR(_partial_update_info->init( + tablet()->tablet_id(), _req.txn_id, *_tablet_schema, + table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->nano_seconds(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase); + table_schema_param->auto_increment_coulumn(), _max_version_in_flush_phase)); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 7fd57803736..24ae04b5fee 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -85,9 +85,9 @@ public: Status init_mow_context(std::shared_ptr<MowContext>& mow_context); protected: - void _build_current_tablet_schema(int64_t index_id, - const OlapTableSchemaParam* table_schema_param, - const TabletSchema& ori_tablet_schema); + Status _build_current_tablet_schema(int64_t index_id, + const OlapTableSchemaParam* table_schema_param, + const TabletSchema& ori_tablet_schema); virtual void _init_profile(RuntimeProfile* profile); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index ae449532182..47e464318d7 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -751,6 +751,7 @@ Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { Status res = _do_process_alter_tablet(request); LOG(INFO) << "finished alter tablet process, res=" << res; + DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); }); return res; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 425ee6eb278..1f1e706a90d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -601,6 +601,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); + /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. diff --git a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out new file mode 100644 index 00000000000..87d44dac59e --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 + +-- !sql -- +1 \N 1 1 2 0 +2 \N 2 2 2 0 +3 \N 3 3 2 0 +4 \N 4 4 5 0 +5 \N 5 5 5 0 +6 \N 6 6 5 0 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy new file mode 100644 index 00000000000..61ba9d60ea8 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_add_key_partial_update", "nonConcurrent") { + + def table1 = "test_add_key_partial_update" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_mow_light_delete" = "false", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1),(2,2,2),(3,3,3);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the schema change process before it change the shadow index to base index + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.sleep") + + sql "alter table ${table1} ADD COLUMN k2 int key;" + + Thread.sleep(1000) + test { + sql "delete from ${table1} where k1<=3;" + exception "Unable to do partial update on shadow index's tablet" + } + + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + sql "set skip_delete_sign=true;" + sql "sync;" + qt_sql "select k1,k2,c1,c2,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ from ${table1} order by k1,k2,__DORIS_VERSION_COL__;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org