This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d935829e80a [fix](schema-change) Fix wrong intput column for cast validity check (#38894) d935829e80a is described below commit d935829e80a0949ffa74699f4674b7385a5b77ab Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Wed Aug 7 17:07:09 2024 +0800 [fix](schema-change) Fix wrong intput column for cast validity check (#38894) ## Proposed changes 1. Use column idx of ref block instead of new block to indicate the ref column. 2. Rename some variables to clarify their meanings. 3. Clarify some log msg. 4. Add a minimal case to verify the change. --- be/src/olap/column_mapping.h | 4 +- be/src/olap/schema_change.cpp | 147 ++++++++++++--------- be/src/olap/schema_change.h | 2 +- .../test_move_column_with_cast.groovy | 50 +++---- 4 files changed, 111 insertions(+), 92 deletions(-) diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index 047af1e9d11..bf3a6118d76 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -30,11 +30,11 @@ struct ColumnMapping { ColumnMapping() = default; virtual ~ColumnMapping() = default; - bool has_reference() const { return expr != nullptr || ref_column >= 0; } + bool has_reference() const { return expr != nullptr || ref_column_idx >= 0; } // <0: use default value // >=0: use origin column - int32_t ref_column = -1; + int32_t ref_column_idx = -1; // normally for default value. stores values for filters WrapperField* default_value = nullptr; std::shared_ptr<TExpr> expr; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 3079568547d..1771bfb7c67 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -17,6 +17,10 @@ #include "olap/schema_change.h" +#include <gen_cpp/olap_file.pb.h> +#include <glog/logging.h> +#include <thrift/protocol/TDebugProtocol.h> + #include <algorithm> #include <exception> #include <map> @@ -285,52 +289,63 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns())); } - const int row_size = ref_block->rows(); - const int column_size = new_block->columns(); + const int row_num = ref_block->rows(); + const int new_schema_cols_num = new_block->columns(); - // swap ref_block[key] and new_block[value] + // will be used for swaping ref_block[entry.first] and new_block[entry.second] std::list<std::pair<int, int>> swap_idx_list; - for (int idx = 0; idx < column_size; idx++) { - // just for MV, schema change should not run into this branch - if (_schema_mapping[idx].expr != nullptr) { + for (int idx = 0; idx < new_schema_cols_num; idx++) { + auto expr = _schema_mapping[idx].expr; + if (expr != nullptr) { vectorized::VExprContextSPtr ctx; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx)); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx)); RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc)); RETURN_IF_ERROR(ctx->open(state.get())); - int result_column_id = -1; - RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id)); - if (ref_block->get_by_position(result_column_id).column == nullptr) { + int result_tmp_column_idx = -1; + RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx)); + auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx); + if (result_tmp_column_def.column == nullptr) { return Status::Error<ErrorCode::INTERNAL_ERROR>( - "{} result column is nullptr", - ref_block->get_by_position(result_column_id).name); + "result column={} is nullptr, input expr={}", result_tmp_column_def.name, + apache::thrift::ThriftDebugString(*expr)); } - ref_block->replace_by_position_if_const(result_column_id); + ref_block->replace_by_position_if_const(result_tmp_column_idx); - if (ref_block->get_by_position(result_column_id).column->size() != row_size) { + if (result_tmp_column_def.column->size() != row_num) { return Status::Error<ErrorCode::INTERNAL_ERROR>( - "{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name, - row_size, ref_block->get_by_position(result_column_id).column->size()); + "result size invalid, expect={}, real={}; input expr={}", row_num, + result_tmp_column_def.column->size(), + apache::thrift::ThriftDebugString(*expr)); + } + + if (_type == SCHEMA_CHANGE) { + // danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change + // for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs + + // column_idx in base schema + int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx; + DCHECK_GE(ref_column_idx, 0); + auto& ref_column_def = ref_block->get_by_position(ref_column_idx); + RETURN_IF_ERROR( + _check_cast_valid(ref_column_def.column, result_tmp_column_def.column)); } - RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column, - ref_block->get_by_position(result_column_id).column, - _type)); - swap_idx_list.emplace_back(result_column_id, idx); - } else if (_schema_mapping[idx].ref_column < 0) { + swap_idx_list.emplace_back(result_tmp_column_idx, idx); + } else if (_schema_mapping[idx].ref_column_idx < 0) { // new column, write default value auto* value = _schema_mapping[idx].default_value; auto column = new_block->get_by_position(idx).column->assume_mutable(); if (value->is_null()) { DCHECK(column->is_nullable()); - column->insert_many_defaults(row_size); + column->insert_many_defaults(row_num); } else { auto type_info = get_type_info(_schema_mapping[idx].new_column); DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(), - value->ptr(), column, row_size); + value->ptr(), column, row_num); } } else { // same type, just swap column - swap_idx_list.emplace_back(_schema_mapping[idx].ref_column, idx); + swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx); } } @@ -368,78 +383,90 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, return Status::OK(); } -// This check is for MV to prevent schema-change from causing data loss -Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, - vectorized::ColumnPtr new_column, AlterTabletType type) { - if (ref_column->size() != new_column->size()) { +// This check can prevent schema-change from causing data loss after type cast +Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column, + vectorized::ColumnPtr output_column) { + if (input_column->size() != output_column->size()) { return Status::InternalError( - "column size is changed, ref_column_size={}, new_column_size={}", - ref_column->size(), new_column->size()); - } - if (type == ROLLUP) { - return Status::OK(); + "column size is changed, input_column_size={}, output_column_size={}; " + "input_column={}", + input_column->size(), output_column->size(), input_column->get_name()); } - if (ref_column->is_nullable() != new_column->is_nullable()) { - if (ref_column->is_nullable()) { + DCHECK_EQ(input_column->size(), output_column->size()) + << "length check should have done before calling this function!"; + + if (input_column->is_nullable() != output_column->is_nullable()) { + if (input_column->is_nullable()) { const auto* ref_null_map = - vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column) + vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column) ->get_null_map_column() .get_data() .data(); bool is_changed = false; - for (size_t i = 0; i < ref_column->size(); i++) { + for (size_t i = 0; i < input_column->size(); i++) { is_changed |= ref_null_map[i]; } if (is_changed) { - return Status::DataQualityError("Null data is changed to not nullable"); + return Status::DataQualityError( + "some null data is changed to not null, intput_column={}", + input_column->get_name()); } } else { const auto& null_map_column = - vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column) + vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column) ->get_null_map_column(); const auto& nested_column = - vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column) + vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column) ->get_nested_column(); const auto* new_null_map = null_map_column.get_data().data(); - if (null_map_column.size() != new_column->size() || - nested_column.size() != new_column->size()) { - DCHECK(false); + if (null_map_column.size() != output_column->size()) { return Status::InternalError( - "null_map_column size is changed, null_map_column_size={}, " - "new_column_size={}", - null_map_column.size(), new_column->size()); + "null_map_column size mismatch output_column_size, " + "null_map_column_size={}, output_column_size={}; input_column={}", + null_map_column.size(), output_column->size(), input_column->get_name()); + } + + if (nested_column.size() != output_column->size()) { + return Status::InternalError( + "nested_column size is changed, nested_column_size={}, " + "ouput_column_size={}; input_column={}", + nested_column.size(), output_column->size(), input_column->get_name()); } bool is_changed = false; - for (size_t i = 0; i < ref_column->size(); i++) { + for (size_t i = 0; i < input_column->size(); i++) { is_changed |= new_null_map[i]; } if (is_changed) { - return Status::DataQualityError("Some data is changed to null"); + return Status::DataQualityError( + "some not null data is changed to null, intput_column={}", + input_column->get_name()); } } } - if (ref_column->is_nullable() && new_column->is_nullable()) { + if (input_column->is_nullable() && output_column->is_nullable()) { const auto* ref_null_map = - vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column) + vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column) ->get_null_map_column() .get_data() .data(); const auto* new_null_map = - vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column) + vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column) ->get_null_map_column() .get_data() .data(); bool is_changed = false; - for (size_t i = 0; i < ref_column->size(); i++) { + for (size_t i = 0; i < input_column->size(); i++) { is_changed |= (ref_null_map[i] != new_null_map[i]); } if (is_changed) { - return Status::DataQualityError("is_null of data is changed!"); + return Status::DataQualityError( + "null map is changed after calculation, input_column={}", + input_column->get_name()); } } return Status::OK(); @@ -1203,6 +1230,8 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); column_mapping->new_column = &new_column; + column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name()); + if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) { auto mv_param = materialized_function_map.find(column_name_lower)->second; column_mapping->expr = mv_param.expr; @@ -1211,9 +1240,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, } } - int32_t column_index = base_tablet_schema->field_index(new_column.name()); - if (column_index >= 0) { - column_mapping->ref_column = column_index; + if (column_mapping->ref_column_idx >= 0) { continue; } @@ -1236,7 +1263,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, return Status::InternalError("failed due to operate on shadow column"); } // Newly added column go here - column_mapping->ref_column = -1; + column_mapping->ref_column_idx = -1; if (i < base_tablet_schema->num_short_key_columns()) { *sc_directly = true; @@ -1265,7 +1292,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, continue; } - if (column_mapping->ref_column != i - num_default_value) { + if (column_mapping->ref_column_idx != i - num_default_value) { *sc_sorting = true; return Status::OK(); } @@ -1332,9 +1359,9 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, if (column_mapping->expr != nullptr) { *sc_directly = true; return Status::OK(); - } else if (column_mapping->ref_column >= 0) { + } else if (column_mapping->ref_column_idx >= 0) { const auto& column_new = new_tablet_schema->column(i); - const auto& column_old = base_tablet_schema->column(column_mapping->ref_column); + const auto& column_old = base_tablet_schema->column(column_mapping->ref_column_idx); // index changed if (column_new.is_bf_column() != column_old.is_bf_column() || column_new.has_bitmap_index() != column_old.has_bitmap_index() || diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 64ab0c724d0..c29cb49a7aa 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -87,7 +87,7 @@ public: private: static Status _check_cast_valid(vectorized::ColumnPtr ref_column, - vectorized::ColumnPtr new_column, AlterTabletType type); + vectorized::ColumnPtr new_column); // @brief column-mapping specification of new schema SchemaMapping _schema_mapping; diff --git a/be/src/olap/column_mapping.h b/regression-test/suites/schema_change_p0/test_move_column_with_cast.groovy similarity index 52% copy from be/src/olap/column_mapping.h copy to regression-test/suites/schema_change_p0/test_move_column_with_cast.groovy index 047af1e9d11..e89542b6285 100644 --- a/be/src/olap/column_mapping.h +++ b/regression-test/suites/schema_change_p0/test_move_column_with_cast.groovy @@ -15,32 +15,24 @@ // specific language governing permissions and limitations // under the License. -#pragma once - -#include <gen_cpp/Exprs_types.h> - -#include <memory> - -#include "olap/tablet_schema.h" -namespace doris { - -class WrapperField; - -struct ColumnMapping { - ColumnMapping() = default; - virtual ~ColumnMapping() = default; - - bool has_reference() const { return expr != nullptr || ref_column >= 0; } - - // <0: use default value - // >=0: use origin column - int32_t ref_column = -1; - // normally for default value. stores values for filters - WrapperField* default_value = nullptr; - std::shared_ptr<TExpr> expr; - const TabletColumn* new_column = nullptr; -}; - -using SchemaMapping = std::vector<ColumnMapping>; - -} // namespace doris +suite("test_move_column_with_cast") { + def tableName = "test_move_column_with_cast" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k BIGINT, + v SMALLINT NOT NULL + ) DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 4 + properties("replication_num" = "1"); + """ + + sql """ INSERT INTO ${tableName} VALUES(1, 1); """ + sql """ ALTER TABLE ${tableName} ADD COLUMN t2 DATETIME DEFAULT NULL; """ + sql """ ALTER TABLE ${tableName} MODIFY COLUMN v BIGINT AFTER t2; """ + + waitForSchemaChangeDone { + sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1""" + time 600 + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org