HappenLee commented on code in PR #10187: URL: https://github.com/apache/doris/pull/10187#discussion_r906946575
########## be/src/olap/schema_change.cpp: ########## @@ -665,6 +812,112 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data #undef TYPE_REINTERPRET_CAST #undef ASSIGN_DEFAULT_VALUE +Status RowBlockChanger::change_block(vectorized::Block* ref_block, + vectorized::Block* new_block) const { + if (new_block->columns() != _schema_mapping.size()) { + LOG(WARNING) << "block does not match with schema mapping rules. " + << "block_schema_size=" << new_block->columns() + << ", mapping_schema_size=" << _schema_mapping.size(); + return Status::OLAPInternalError(OLAP_ERR_NOT_INITED); + } + + // material-view or rollup task will fail now + if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) { + return Status::NotSupported( + "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup " + "not supported now. "); + } + + std::vector<bool> nullable_tuples; + for (int i = 0; i < ref_block->columns(); i++) { + nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable()); + } + + ObjectPool pool; + RuntimeState* state = pool.add(new RuntimeState()); + state->set_desc_tbl(&_desc_tbl); + RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples); + + const int row_size = ref_block->rows(); + const int column_size = new_block->columns(); + + // swap ref_block[key] and new_block[value] + std::map<int, int> swap_idx_map; + + for (int idx = 0; idx < column_size; idx++) { + int ref_idx = _schema_mapping[idx].ref_column; + + if (ref_idx < 0) { + // new column, write default value + auto value = _schema_mapping[idx].default_value; + auto column = new_block->get_by_position(idx).column->assume_mutable(); + if (value->is_null()) { + DCHECK(column->is_nullable()); + column->insert_many_defaults(row_size); + } else { + auto type_info = get_type_info(_schema_mapping[idx].new_column); + DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(), + value->ptr(), column, row_size); + } + continue; + } + + if (!_schema_mapping[idx].materialized_function.empty()) { + return Status::NotSupported("Materialized function not supported now. "); + } + + if (_schema_mapping[idx].expr != nullptr) { + // calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr + vectorized::VExprContext* ctx = nullptr; + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx)); + + RETURN_IF_ERROR(ctx->prepare(state, row_desc)); + RETURN_IF_ERROR(ctx->open(state)); + + int result_column_id = -1; + RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id)); + DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size) + << new_block->get_by_position(idx).name << " size invalid" + << ", expect=" << row_size + << ", real=" << ref_block->get_by_position(result_column_id).column->size(); + + if (_schema_mapping[idx].expr->nodes[0].node_type == TExprNodeType::CAST_EXPR) { + RETURN_IF_ERROR( + _check_cast_valid(ref_block->get_by_position(ref_idx).column, + ref_block->get_by_position(result_column_id).column)); + } + swap_idx_map[result_column_id] = idx; + + ctx->close(state); + continue; + } + + // same type, just swap column + swap_idx_map[ref_idx] = idx; + } + + for (auto it : swap_idx_map) { + new_block->get_by_position(it.second).column.swap( + ref_block->get_by_position(it.first).column); + } + + return Status::OK(); +} + +Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, + vectorized::ColumnPtr new_column) const { + // TODO: rethink this check + // This check is to prevent schema-change from causing data loss, + // But it is possible to generate null data in material-view or rollup. + for (size_t i = 0; i < ref_column->size(); i++) { Review Comment: SIMD the code ########## be/src/olap/schema_change.cpp: ########## @@ -665,6 +812,112 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data #undef TYPE_REINTERPRET_CAST #undef ASSIGN_DEFAULT_VALUE +Status RowBlockChanger::change_block(vectorized::Block* ref_block, + vectorized::Block* new_block) const { + if (new_block->columns() != _schema_mapping.size()) { + LOG(WARNING) << "block does not match with schema mapping rules. " + << "block_schema_size=" << new_block->columns() + << ", mapping_schema_size=" << _schema_mapping.size(); + return Status::OLAPInternalError(OLAP_ERR_NOT_INITED); + } + + // material-view or rollup task will fail now + if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) { + return Status::NotSupported( + "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup " + "not supported now. "); + } + + std::vector<bool> nullable_tuples; + for (int i = 0; i < ref_block->columns(); i++) { + nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable()); + } + + ObjectPool pool; + RuntimeState* state = pool.add(new RuntimeState()); + state->set_desc_tbl(&_desc_tbl); + RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples); + + const int row_size = ref_block->rows(); + const int column_size = new_block->columns(); + + // swap ref_block[key] and new_block[value] + std::map<int, int> swap_idx_map; + + for (int idx = 0; idx < column_size; idx++) { Review Comment: change the for loop logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org