This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 159be51ea6a [bugfix](schema_change) Fix the coredump when doubly write 
during schema change (#22557)
159be51ea6a is described below

commit 159be51ea6aa76ac197fdbe2292a55177e456e72
Author: Lightman <31928846+lchangli...@users.noreply.github.com>
AuthorDate: Thu Oct 19 14:43:18 2023 +0800

    [bugfix](schema_change) Fix the coredump when doubly write during schema 
change (#22557)
---
 be/src/exec/tablet_info.cpp                        | 69 ++++++++-------
 be/src/olap/tablet_schema.cpp                      |  3 +
 be/src/olap/tablet_schema.h                        |  4 +
 be/src/runtime/descriptors.cpp                     |  4 +
 be/src/runtime/descriptors.h                       |  3 +
 be/src/runtime/primitive_type.cpp                  |  6 ++
 .../org/apache/doris/analysis/SlotDescriptor.java  |  5 +-
 .../main/java/org/apache/doris/catalog/Column.java |  2 +-
 gensrc/proto/descriptors.proto                     |  1 +
 gensrc/thrift/Descriptors.thrift                   |  1 +
 .../suites/schema_change/ddl/lineorder_create.sql  | 24 ++++++
 .../suites/schema_change/ddl/lineorder_delete.sql  |  1 +
 .../test_double_write_when_schema_change.groovy    | 99 ++++++++++++++++++++++
 13 files changed, 188 insertions(+), 34 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 3abeecbfe28..05561be2971 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -32,9 +32,11 @@
 #include "common/exception.h"
 #include "common/status.h"
 #include "olap/tablet_schema.h"
+#include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/large_int_value.h"
 #include "runtime/memory/mem_tracker.h"
+#include "runtime/primitive_type.h"
 #include "runtime/raw_value.h"
 #include "runtime/types.h"
 #include "util/hash_util.hpp"
@@ -128,30 +130,33 @@ Status OlapTableSchemaParam::init(const 
POlapTableSchemaParam& pschema) {
     for (auto& col : pschema.partial_update_input_columns()) {
         _partial_update_input_columns.insert(col);
     }
-    std::map<std::string, SlotDescriptor*> slots_map;
+    std::unordered_map<std::pair<std::string, std::string>, SlotDescriptor*> 
slots_map;
     _tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
 
     for (auto& p_slot_desc : pschema.slot_descs()) {
         auto slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
         _tuple_desc->add_slot(slot_desc);
-        slots_map.emplace(slot_desc->col_name(), slot_desc);
+        string data_type;
+        EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), 
data_type);
+        slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), 
std::move(data_type)),
+                          slot_desc);
     }
 
     for (auto& p_index : pschema.indexes()) {
         auto index = _obj_pool.add(new OlapTableIndexSchema());
         index->index_id = p_index.id();
         index->schema_hash = p_index.schema_hash();
-        for (auto& col : p_index.columns()) {
-            if (_is_partial_update && _partial_update_input_columns.count(col) 
== 0) {
-                continue;
-            }
-            auto it = slots_map.find(col);
-            if (it == std::end(slots_map)) {
-                return Status::InternalError("unknown index column, 
column={}", col);
-            }
-            index->slots.emplace_back(it->second);
-        }
         for (auto& pcolumn_desc : p_index.columns_desc()) {
+            if (!_is_partial_update ||
+                _partial_update_input_columns.count(pcolumn_desc.name()) > 0) {
+                auto it = slots_map.find(
+                        std::make_pair(to_lower(pcolumn_desc.name()), 
pcolumn_desc.type()));
+                if (it == std::end(slots_map)) {
+                    return Status::InternalError("unknown index column, 
column={}, type={}",
+                                                 pcolumn_desc.name(), 
pcolumn_desc.type());
+                }
+                index->slots.emplace_back(it->second);
+            }
             TabletColumn* tc = _obj_pool.add(new TabletColumn());
             tc->init_from_pb(pcolumn_desc);
             index->columns.emplace_back(tc);
@@ -183,41 +188,43 @@ Status OlapTableSchemaParam::init(const 
TOlapTableSchemaParam& tschema) {
     for (auto& tcolumn : tschema.partial_update_input_columns) {
         _partial_update_input_columns.insert(tcolumn);
     }
-    std::map<std::string, SlotDescriptor*> slots_map;
+    std::unordered_map<std::pair<std::string, PrimitiveType>, SlotDescriptor*> 
slots_map;
     _tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
     for (auto& t_slot_desc : tschema.slot_descs) {
         auto slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
         _tuple_desc->add_slot(slot_desc);
-        slots_map.emplace(to_lower(slot_desc->col_name()), slot_desc);
+        slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), 
slot_desc->col_type()),
+                          slot_desc);
     }
 
     for (auto& t_index : tschema.indexes) {
+        std::unordered_map<std::string, SlotDescriptor*> index_slots_map;
         auto index = _obj_pool.add(new OlapTableIndexSchema());
         index->index_id = t_index.id;
         index->schema_hash = t_index.schema_hash;
-        for (auto& col : t_index.columns) {
-            if (_is_partial_update && _partial_update_input_columns.count(col) 
== 0) {
-                continue;
-            }
-            auto it = slots_map.find(to_lower(col));
-            if (it == std::end(slots_map)) {
-                return Status::InternalError("unknown index column, 
column={}", col);
-            }
-            index->slots.emplace_back(it->second);
-        }
-        if (t_index.__isset.columns_desc) {
-            for (auto& tcolumn_desc : t_index.columns_desc) {
-                TabletColumn* tc = _obj_pool.add(new TabletColumn());
-                tc->init_from_thrift(tcolumn_desc);
-                index->columns.emplace_back(tc);
+        for (auto& tcolumn_desc : t_index.columns_desc) {
+            auto it = 
slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name),
+                                                    
thrift_to_type(tcolumn_desc.column_type.type)));
+            if (!_is_partial_update ||
+                _partial_update_input_columns.count(tcolumn_desc.column_name) 
> 0) {
+                if (it == slots_map.end()) {
+                    return Status::InternalError("unknown index column, 
column={}, type={}",
+                                                 tcolumn_desc.column_name,
+                                                 
tcolumn_desc.column_type.type);
+                }
+                index_slots_map.emplace(to_lower(tcolumn_desc.column_name), 
it->second);
+                index->slots.emplace_back(it->second);
             }
+            TabletColumn* tc = _obj_pool.add(new TabletColumn());
+            tc->init_from_thrift(tcolumn_desc);
+            index->columns.emplace_back(tc);
         }
         if (t_index.__isset.indexes_desc) {
             for (auto& tindex_desc : t_index.indexes_desc) {
                 std::vector<int32_t> 
column_unique_ids(tindex_desc.columns.size());
                 for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
-                    auto it = slots_map.find(to_lower(tindex_desc.columns[i]));
-                    if (it != std::end(slots_map)) {
+                    auto it = 
index_slots_map.find(to_lower(tindex_desc.columns[i]));
+                    if (it != index_slots_map.end()) {
                         column_unique_ids[i] = it->second->col_unique_id();
                     }
                 }
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index c10074c47cd..833ad15a15d 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -799,6 +799,9 @@ void TabletSchema::build_current_tablet_schema(int64_t 
index_id, int32_t version
     _indexes.clear();
     _field_name_to_index.clear();
     _field_id_to_index.clear();
+    _delete_sign_idx = -1;
+    _sequence_col_idx = -1;
+    _version_col_idx = -1;
 
     for (auto& column : index->columns) {
         if (column->is_key()) {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 42fabc6c2f6..2fe6ea45581 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -143,6 +143,10 @@ private:
     int32_t _unique_id = -1;
     std::string _col_name;
     std::string _col_name_lower_case;
+    // the field _type will change from TPrimitiveType
+    // to string by 'EnumToString(TPrimitiveType, tcolumn.column_type.type, 
data_type);' (reference: TabletMeta::init_column_from_tcolumn)
+    // to FieldType by 'TabletColumn::get_field_type_by_string' (reference: 
TabletColumn::init_from_pb).
+    // And the _type in columnPB is string and it changed from FieldType by 
'get_string_by_field_type' (reference: TabletColumn::to_schema_pb).
     FieldType _type;
     bool _is_key = false;
     FieldAggregationMethod _aggregation;
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index add28582d89..b5c23a2afd0 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -31,6 +31,7 @@
 #include <memory>
 
 #include "common/object_pool.h"
+#include "runtime/primitive_type.h"
 #include "util/string_util.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -59,6 +60,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
           _col_name(tdesc.colName),
           _col_name_lower_case(to_lower(tdesc.colName)),
           _col_unique_id(tdesc.col_unique_id),
+          _col_type(thrift_to_type(tdesc.primitive_type)),
           _slot_idx(tdesc.slotIdx),
           _field_idx(-1),
           _is_materialized(tdesc.isMaterialized),
@@ -77,6 +79,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
           _col_name(pdesc.col_name()),
           _col_name_lower_case(to_lower(pdesc.col_name())),
           _col_unique_id(pdesc.col_unique_id()),
+          _col_type(static_cast<PrimitiveType>(pdesc.col_type())),
           _slot_idx(pdesc.slot_idx()),
           _field_idx(-1),
           _is_materialized(pdesc.is_materialized()),
@@ -99,6 +102,7 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) 
const {
     pslot->set_col_unique_id(_col_unique_id);
     pslot->set_is_key(_is_key);
     pslot->set_is_auto_increment(_is_auto_increment);
+    pslot->set_col_type(_col_type);
 }
 
 vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 5483ecde52d..f77f5fec3b9 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -36,6 +36,7 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/global_types.h"
 #include "common/status.h"
+#include "runtime/define_primitive_type.h"
 #include "runtime/types.h"
 #include "vec/data_types/data_type.h"
 
@@ -113,6 +114,7 @@ public:
     bool is_auto_increment() const { return _is_auto_increment; }
 
     const std::string& col_default_value() const { return _col_default_value; }
+    PrimitiveType col_type() const { return _col_type; }
 
 private:
     friend class DescriptorTbl;
@@ -132,6 +134,7 @@ private:
     const std::string _col_name_lower_case;
 
     const int32_t _col_unique_id;
+    const PrimitiveType _col_type;
 
     // the idx of the slot in the tuple descriptor (0-based).
     // this is provided by the FE
diff --git a/be/src/runtime/primitive_type.cpp 
b/be/src/runtime/primitive_type.cpp
index 82a189107d9..56ac616014b 100644
--- a/be/src/runtime/primitive_type.cpp
+++ b/be/src/runtime/primitive_type.cpp
@@ -154,6 +154,7 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) {
 
     case TPrimitiveType::VARIANT:
         return TYPE_VARIANT;
+
     default:
         CHECK(false) << ", meet unknown type " << ttype;
         return INVALID_TYPE;
@@ -259,6 +260,8 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) {
         return TPrimitiveType::STRUCT;
     case TYPE_LAMBDA_FUNCTION:
         return TPrimitiveType::LAMBDA_FUNCTION;
+    case TYPE_AGG_STATE:
+        return TPrimitiveType::AGG_STATE;
 
     default:
         return TPrimitiveType::INVALID_TYPE;
@@ -365,6 +368,9 @@ std::string type_to_string(PrimitiveType t) {
     case TYPE_LAMBDA_FUNCTION:
         return "LAMBDA_FUNCTION TYPE";
 
+    case TYPE_VARIANT:
+        return "VARIANT";
+
     default:
         return "";
     };
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 42ba32aef41..6384dad8d7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -296,13 +296,14 @@ public class SlotDescriptor {
     public TSlotDescriptor toThrift() {
         // Non-nullable slots will have 0 for the byte offset and -1 for the 
bit mask
         TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), 
parent.getId().asInt(), type.toThrift(), -1,
-                byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? 
column.getName() : ""), slotIdx,
+                byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? 
column.getNonShadowName() : ""), slotIdx,
                 isMaterialized);
         tSlotDescriptor.setNeedMaterialize(needMaterialize);
         tSlotDescriptor.setIsAutoIncrement(isAutoInc);
         if (column != null) {
-            LOG.debug("column name:{}, column unique id:{}", column.getName(), 
column.getUniqueId());
+            LOG.debug("column name:{}, column unique id:{}", 
column.getNonShadowName(), column.getUniqueId());
             tSlotDescriptor.setColUniqueId(column.getUniqueId());
+            tSlotDescriptor.setPrimitiveType(column.getDataType().toThrift());
             tSlotDescriptor.setIsKey(column.isKey());
             tSlotDescriptor.setColDefaultValue(column.getDefaultValue());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 6de1b5a9d41..d9a0ed51bad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -491,7 +491,7 @@ public class Column implements Writable, 
GsonPostProcessable {
 
     public TColumn toThrift() {
         TColumn tColumn = new TColumn();
-        tColumn.setColumnName(this.name);
+        tColumn.setColumnName(removeNamePrefix(this.name));
 
         TColumnType tColumnType = new TColumnType();
         tColumnType.setType(this.getDataType().toThrift());
diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto
index 99101767644..aeab7ace7c2 100644
--- a/gensrc/proto/descriptors.proto
+++ b/gensrc/proto/descriptors.proto
@@ -37,6 +37,7 @@ message PSlotDescriptor {
     optional int32 col_unique_id = 11;
     optional bool is_key = 12;
     optional bool is_auto_increment = 13;
+    optional int32 col_type = 14 [default = 0];
 };
 
 message PTupleDescriptor {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index c5425ecfa3c..21bea8cac59 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -63,6 +63,7 @@ struct TSlotDescriptor {
   // subcolumn path info list for semi structure column(variant)
   15: optional list<string> column_paths
   16: optional string col_default_value
+  17: optional Types.TPrimitiveType primitive_type = 
Types.TPrimitiveType.INVALID_TYPE
 }
 
 struct TTupleDescriptor {
diff --git a/regression-test/suites/schema_change/ddl/lineorder_create.sql 
b/regression-test/suites/schema_change/ddl/lineorder_create.sql
new file mode 100644
index 00000000000..44226282198
--- /dev/null
+++ b/regression-test/suites/schema_change/ddl/lineorder_create.sql
@@ -0,0 +1,24 @@
+CREATE TABLE IF NOT EXISTS `lineorder` (
+  `lo_orderkey` bigint(20) NOT NULL COMMENT "",
+  `lo_linenumber` bigint(20) NOT NULL COMMENT "",
+  `lo_custkey` int(11) NOT NULL COMMENT "",
+  `lo_partkey` int(11) NOT NULL COMMENT "",
+  `lo_suppkey` int(11) NOT NULL COMMENT "",
+  `lo_orderdate` int(11) NOT NULL COMMENT "",
+  `lo_orderpriority` varchar(16) NOT NULL COMMENT "",
+  `lo_shippriority` int(11) NOT NULL COMMENT "",
+  `lo_quantity` bigint(20) NOT NULL COMMENT "",
+  `lo_extendedprice` bigint(20) NOT NULL COMMENT "",
+  `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
+  `lo_discount` bigint(20) NOT NULL COMMENT "",
+  `lo_revenue` bigint(20) NOT NULL COMMENT "",
+  `lo_supplycost` bigint(20) NOT NULL COMMENT "",
+  `lo_tax` bigint(20) NOT NULL COMMENT "",
+  `lo_commitdate` bigint(20) NOT NULL COMMENT "",
+  `lo_shipmode` varchar(11) NOT NULL COMMENT ""
+)
+DUPLICATE KEY (`lo_orderkey`, `lo_linenumber`)
+DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1"
+);
diff --git a/regression-test/suites/schema_change/ddl/lineorder_delete.sql 
b/regression-test/suites/schema_change/ddl/lineorder_delete.sql
new file mode 100644
index 00000000000..2c1c2fa57d9
--- /dev/null
+++ b/regression-test/suites/schema_change/ddl/lineorder_delete.sql
@@ -0,0 +1 @@
+drop table IF EXISTS lineorder;
diff --git 
a/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy
 
b/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy
new file mode 100644
index 00000000000..43a9bc2b349
--- /dev/null
+++ 
b/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Most of the cases are copied from 
https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+// Note: To filter out tables from sql files, use the following one-liner 
comamnd
+// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | 
sort | uniq
+suite("double_write_schema_change") {
+
+    // ssb_sf1_p1 is writted to test unique key table merge correctly.
+    // It creates unique key table and sets bucket num to 1 in order to make 
sure that
+    // many rowsets will be created during loading and then the merge process 
will be triggered.
+
+    def tableName = "lineorder"
+    def columns = 
"""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
 
+                    
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, 
+                    
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""
+
+    sql new File("""${context.file.parent}/ddl/${tableName}_delete.sql""").text
+    sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+
+    streamLoad {
+        // a default db 'regression_test' is specified in
+        // ${DORIS_HOME}/conf/regression-conf.groovy
+        table tableName
+
+        // default label is UUID:
+        // set 'label' UUID.randomUUID().toString()
+
+        // default column_separator is specify in doris fe config, usually is 
'\t'.
+        // this line change to ','
+        set 'column_separator', '|'
+        set 'compress_type', 'GZ'
+        set 'columns', columns
+
+
+        // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+        // also, you can stream load a http stream, e.g. http://xxx/some.csv
+        file """${getS3Url()}/regression/ssb/sf1/${tableName}.tbl.gz"""
+
+        time 10000 // limit inflight 10s
+
+        // stream load action will check result, include Success status, and 
NumberTotalRows == NumberLoadedRows
+
+        // if declared a check callback, the default check condition will 
ignore.
+        // So you must check all condition
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+            assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+        }
+    }
+
+    def getJobState = { indexName ->
+         def jobStateResult = sql """  SHOW ALTER TABLE COLUMN WHERE 
IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """
+         return jobStateResult[0][9]
+    }
+
+    def insert_sql = """ insert into ${tableName} values(100000000, 1, 1, 1, 
1, 1, "1", 1, 1, 1, 1, 1, 1, 1, 1, 1, "1") """
+
+    sql """ ALTER TABLE ${tableName} modify COLUMN lo_custkey double"""
+    int max_try_time = 3000
+    while (max_try_time--){
+        String result = getJobState(tableName)
+        if (result == "FINISHED") {
+            sleep(3000)
+            break
+        } else {
+            if (result == "RUNNING") {
+                sql insert_sql
+            }
+            sleep(100)
+            if (max_try_time < 1){
+                assertEquals(1,2)
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to