This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d0568b80787 branch-3.0: [Fix](partial update) abort partial update on 
shadow index's tablet when the including columns miss key columns on new schema 
#46347 (#46587)
d0568b80787 is described below

commit d0568b8078728b3eb47f789d8eef108c9c9faada
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Wed Jan 8 22:36:30 2025 +0800

    branch-3.0: [Fix](partial update) abort partial update on shadow index's 
tablet when the including columns miss key columns on new schema #46347 (#46587)
    
    pick https://github.com/apache/doris/pull/46347
---
 be/src/cloud/cloud_rowset_builder.cpp              |  4 +-
 be/src/olap/delta_writer_v2.cpp                    | 23 +++----
 be/src/olap/delta_writer_v2.h                      |  6 +-
 be/src/olap/partial_update_info.cpp                | 28 ++++++--
 be/src/olap/partial_update_info.h                  |  9 +--
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  8 +++
 be/src/olap/rowset_builder.cpp                     | 18 +++---
 be/src/olap/rowset_builder.h                       |  6 +-
 be/src/olap/schema_change.cpp                      |  1 +
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  1 +
 .../partial_update/test_add_key_partial_update.out | 17 +++++
 .../test_add_key_partial_update.groovy             | 74 ++++++++++++++++++++++
 12 files changed, 159 insertions(+), 36 deletions(-)

diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 2e6764b33aa..9466dd10628 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -51,8 +51,8 @@ Status CloudRowsetBuilder::init() {
             
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
 
     // build tablet schema in request level
-    _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
-                                 *_tablet->tablet_schema());
+    RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, 
_req.table_schema_param.get(),
+                                                 *_tablet->tablet_schema()));
 
     RowsetWriterContext context;
     context.txn_id = _req.txn_id;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 73d2fb1d974..5a420f6c387 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -102,8 +102,8 @@ Status DeltaWriterV2::init() {
     if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == 
nullptr) {
         return Status::InternalError("failed to find tablet schema for {}", 
_req.index_id);
     }
-    _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
-                                 *_streams[0]->tablet_schema(_req.index_id));
+    RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, 
_req.table_schema_param.get(),
+                                                 
*_streams[0]->tablet_schema(_req.index_id)));
     RowsetWriterContext context;
     context.txn_id = _req.txn_id;
     context.load_id = _req.load_id;
@@ -211,9 +211,9 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) {
     return Status::OK();
 }
 
-void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
-                                                 const OlapTableSchemaParam* 
table_schema_param,
-                                                 const TabletSchema& 
ori_tablet_schema) {
+Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
+                                                   const OlapTableSchemaParam* 
table_schema_param,
+                                                   const TabletSchema& 
ori_tablet_schema) {
     _tablet_schema->copy_from(ori_tablet_schema);
     // find the right index id
     int i = 0;
@@ -237,12 +237,13 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t 
index_id,
     }
     // set partial update columns info
     _partial_update_info = std::make_shared<PartialUpdateInfo>();
-    _partial_update_info->init(*_tablet_schema, 
table_schema_param->is_partial_update(),
-                               
table_schema_param->partial_update_input_columns(),
-                               table_schema_param->is_strict_mode(),
-                               table_schema_param->timestamp_ms(),
-                               table_schema_param->nano_seconds(), 
table_schema_param->timezone(),
-                               table_schema_param->auto_increment_coulumn());
+    RETURN_IF_ERROR(_partial_update_info->init(
+            _req.tablet_id, _req.txn_id, *_tablet_schema, 
table_schema_param->is_partial_update(),
+            table_schema_param->partial_update_input_columns(),
+            table_schema_param->is_strict_mode(), 
table_schema_param->timestamp_ms(),
+            table_schema_param->nano_seconds(), table_schema_param->timezone(),
+            table_schema_param->auto_increment_coulumn()));
+    return Status::OK();
 }
 
 } // namespace doris
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index beeb3d3ecd3..2afc51c5d5e 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -86,9 +86,9 @@ public:
     Status cancel_with_status(const Status& st);
 
 private:
-    void _build_current_tablet_schema(int64_t index_id,
-                                      const OlapTableSchemaParam* 
table_schema_param,
-                                      const TabletSchema& ori_tablet_schema);
+    Status _build_current_tablet_schema(int64_t index_id,
+                                        const OlapTableSchemaParam* 
table_schema_param,
+                                        const TabletSchema& ori_tablet_schema);
 
     void _update_profile(RuntimeProfile* profile);
 
diff --git a/be/src/olap/partial_update_info.cpp 
b/be/src/olap/partial_update_info.cpp
index a1ae19a6aa7..5ab08b012f1 100644
--- a/be/src/olap/partial_update_info.cpp
+++ b/be/src/olap/partial_update_info.cpp
@@ -20,6 +20,7 @@
 #include <gen_cpp/olap_file.pb.h>
 
 #include "common/consts.h"
+#include "common/logging.h"
 #include "olap/base_tablet.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset.h"
@@ -32,11 +33,11 @@
 
 namespace doris {
 
-void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool 
partial_update,
-                             const std::set<string>& partial_update_cols, bool 
is_strict_mode,
-                             int64_t timestamp_ms, int32_t nano_seconds,
-                             const std::string& timezone, const std::string& 
auto_increment_column,
-                             int64_t cur_max_version) {
+Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const 
TabletSchema& tablet_schema,
+                               bool partial_update, const std::set<string>& 
partial_update_cols,
+                               bool is_strict_mode, int64_t timestamp_ms, 
int32_t nano_seconds,
+                               const std::string& timezone,
+                               const std::string& auto_increment_column, 
int64_t cur_max_version) {
     is_partial_update = partial_update;
     partial_update_input_columns = partial_update_cols;
     max_version_in_flush_phase = cur_max_version;
@@ -45,6 +46,22 @@ void PartialUpdateInfo::init(const TabletSchema& 
tablet_schema, bool partial_upd
     this->timezone = timezone;
     missing_cids.clear();
     update_cids.clear();
+
+    if (is_partial_update) {
+        // partial_update_cols should include all key columns
+        for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) {
+            const auto key_col = tablet_schema.column(i);
+            if (!partial_update_cols.contains(key_col.name())) {
+                auto msg = fmt::format(
+                        "Unable to do partial update on shadow index's tablet, 
tablet_id={}, "
+                        "txn_id={}. Missing key column {}.",
+                        tablet_id, txn_id, key_col.name());
+                LOG_WARNING(msg);
+                return Status::Aborted<false>(msg);
+            }
+        }
+    }
+
     for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
         auto tablet_column = tablet_schema.column(i);
         if (!partial_update_input_columns.contains(tablet_column.name())) {
@@ -64,6 +81,7 @@ void PartialUpdateInfo::init(const TabletSchema& 
tablet_schema, bool partial_upd
     is_input_columns_contains_auto_inc_column =
             is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
     _generate_default_values_for_missing_cids(tablet_schema);
+    return Status::OK();
 }
 
 void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) 
const {
diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index 88fac1a3e9c..05372154bd2 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -37,10 +37,11 @@ struct RowsetWriterContext;
 struct RowsetId;
 
 struct PartialUpdateInfo {
-    void init(const TabletSchema& tablet_schema, bool partial_update,
-              const std::set<std::string>& partial_update_cols, bool 
is_strict_mode,
-              int64_t timestamp_ms, int32_t nano_seconds, const std::string& 
timezone,
-              const std::string& auto_increment_column, int64_t 
cur_max_version = -1);
+    Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& 
tablet_schema,
+                bool partial_update, const std::set<std::string>& 
partial_update_cols,
+                bool is_strict_mode, int64_t timestamp_ms, int32_t 
nano_seconds,
+                const std::string& timezone, const std::string& 
auto_increment_column,
+                int64_t cur_max_version = -1);
     void to_pb(PartialUpdateInfoPB* partial_update_info) const;
     void from_pb(PartialUpdateInfoPB* partial_update_info);
     Status handle_non_strict_mode_not_found_error(const TabletSchema& 
tablet_schema);
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 7970cb27aef..6339c7db4bf 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -59,6 +59,7 @@
 #include "service/point_query_executor.h"
 #include "util/coding.h"
 #include "util/crc32c.h"
+#include "util/debug_points.h"
 #include "util/faststring.h"
 #include "util/key_util.h"
 #include "vec/columns/column_nullable.h"
@@ -459,6 +460,8 @@ Status 
VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos
 // 3. set columns to data convertor and then write all columns
 Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& 
data,
                                                                  
vectorized::Block& full_block) {
+    DBUG_EXECUTE_IF("_append_block_with_partial_content.block", DBUG_BLOCK);
+
     RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos));
 
     // create full block and fill with input columns
@@ -941,6 +944,11 @@ void VerticalSegmentWriter::_encode_rowid(const uint32_t 
rowid, string* encoded_
 std::string VerticalSegmentWriter::_full_encode_keys(
         const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, 
size_t pos) {
     assert(_key_index_size.size() == _num_sort_key_columns);
+    if (!(key_columns.size() == _num_sort_key_columns &&
+          _key_coders.size() == _num_sort_key_columns)) {
+        LOG_INFO("key_columns.size()={}, _key_coders.size()={}, 
_num_sort_key_columns={}, ",
+                 key_columns.size(), _key_coders.size(), 
_num_sort_key_columns);
+    }
     assert(key_columns.size() == _num_sort_key_columns &&
            _key_coders.size() == _num_sort_key_columns);
     return _full_encode_keys(_key_coders, key_columns, pos);
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 057cc35fb13..63ffe4f68a1 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -221,8 +221,8 @@ Status RowsetBuilder::init() {
         };
     })
     // build tablet schema in request level
-    _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
-                                 *_tablet->tablet_schema());
+    RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, 
_req.table_schema_param.get(),
+                                                 *_tablet->tablet_schema()));
     RowsetWriterContext context;
     context.txn_id = _req.txn_id;
     context.load_id = _req.load_id;
@@ -385,9 +385,9 @@ Status BaseRowsetBuilder::cancel() {
     return Status::OK();
 }
 
-void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
-                                                     const 
OlapTableSchemaParam* table_schema_param,
-                                                     const TabletSchema& 
ori_tablet_schema) {
+Status BaseRowsetBuilder::_build_current_tablet_schema(
+        int64_t index_id, const OlapTableSchemaParam* table_schema_param,
+        const TabletSchema& ori_tablet_schema) {
     // find the right index id
     int i = 0;
     auto indexes = table_schema_param->indexes();
@@ -427,12 +427,14 @@ void 
BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
     }
     // set partial update columns info
     _partial_update_info = std::make_shared<PartialUpdateInfo>();
-    _partial_update_info->init(
-            *_tablet_schema, table_schema_param->is_partial_update(),
+    RETURN_IF_ERROR(_partial_update_info->init(
+            tablet()->tablet_id(), _req.txn_id, *_tablet_schema,
+            table_schema_param->is_partial_update(),
             table_schema_param->partial_update_input_columns(),
             table_schema_param->is_strict_mode(), 
table_schema_param->timestamp_ms(),
             table_schema_param->nano_seconds(), table_schema_param->timezone(),
-            table_schema_param->auto_increment_coulumn(), 
_max_version_in_flush_phase);
+            table_schema_param->auto_increment_coulumn(), 
_max_version_in_flush_phase));
+    return Status::OK();
 }
 
 } // namespace doris
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 7fd57803736..24ae04b5fee 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -85,9 +85,9 @@ public:
     Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
 
 protected:
-    void _build_current_tablet_schema(int64_t index_id,
-                                      const OlapTableSchemaParam* 
table_schema_param,
-                                      const TabletSchema& ori_tablet_schema);
+    Status _build_current_tablet_schema(int64_t index_id,
+                                        const OlapTableSchemaParam* 
table_schema_param,
+                                        const TabletSchema& ori_tablet_schema);
 
     virtual void _init_profile(RuntimeProfile* profile);
 
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ae449532182..47e464318d7 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -751,6 +751,7 @@ Status SchemaChangeJob::process_alter_tablet(const 
TAlterTabletReqV2& request) {
 
     Status res = _do_process_alter_tablet(request);
     LOG(INFO) << "finished alter tablet process, res=" << res;
+    DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { 
sleep(5); });
     return res;
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 425ee6eb278..1f1e706a90d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -601,6 +601,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
         Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId);
         Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
+
         /*
          * all tasks are finished. check the integrity.
          * we just check whether all new replicas are healthy.
diff --git 
a/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out
 
b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out
new file mode 100644
index 00000000000..87d44dac59e
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/partial_update/test_add_key_partial_update.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1
+2      2       2
+3      3       3
+4      4       4
+5      5       5
+6      6       6
+
+-- !sql --
+1      \N      1       1       2       0
+2      \N      2       2       2       0
+3      \N      3       3       2       0
+4      \N      4       4       5       0
+5      \N      5       5       5       0
+6      \N      6       6       5       0
+
diff --git 
a/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy
 
b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy
new file mode 100644
index 00000000000..61ba9d60ea8
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/partial_update/test_add_key_partial_update.groovy
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_add_key_partial_update", "nonConcurrent") {
+
+    def table1 = "test_add_key_partial_update"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int,
+        )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_mow_light_delete" = "false",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(1,1,1),(2,2,2),(3,3,3);"
+    sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);"
+    sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);"
+    sql "insert into ${table1} values(4,4,4),(5,5,5),(6,6,6);"
+    sql "sync;"
+    order_qt_sql "select * from ${table1};"
+
+    try {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        // block the schema change process before it change the shadow index 
to base index
+        
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::process_alter_tablet.leave.sleep")
+
+        sql "alter table ${table1} ADD COLUMN k2 int key;"
+
+        Thread.sleep(1000)
+        test {
+            sql "delete from ${table1} where k1<=3;"
+            exception "Unable to do partial update on shadow index's tablet"
+        }
+
+        waitForSchemaChangeDone {
+            sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER 
BY createtime DESC LIMIT 1 """
+            time 1000
+        }
+
+        sql "set skip_delete_sign=true;"
+        sql "sync;"
+        qt_sql "select k1,k2,c1,c2,__DORIS_VERSION_COL__,__DORIS_DELETE_SIGN__ 
from ${table1} order by k1,k2,__DORIS_VERSION_COL__;"
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        throw e
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to