xiaokang commented on code in PR #16335: URL: https://github.com/apache/doris/pull/16335#discussion_r1097090026
########## be/src/exec/base_scanner.cpp: ########## @@ -250,6 +265,28 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); } + // handle dynamic generated columns Review Comment: Does new scanners still use BaseScanner? ########## be/src/vec/common/schema_util.cpp: ########## @@ -0,0 +1,481 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <vec/columns/column_array.h> +#include <vec/columns/column_object.h> +#include <vec/common/schema_util.h> +#include <vec/core/field.h> +#include <vec/data_types/data_type_array.h> +#include <vec/data_types/data_type_object.h> +#include <vec/functions/simple_function_factory.h> +#include <vec/json/parse2column.h> + +#include <vec/data_types/data_type_factory.hpp> + +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "olap/rowset/rowset_writer_context.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "util/thrift_rpc_helper.h" + +namespace doris::vectorized::schema_util { + +size_t get_number_of_dimensions(const IDataType& type) { + if (const auto* type_array = typeid_cast<const DataTypeArray*>(&type)) { + return type_array->get_number_of_dimensions(); + } + return 0; +} +size_t get_number_of_dimensions(const IColumn& column) { + if (const auto* column_array = check_and_get_column<ColumnArray>(column)) { + return column_array->get_number_of_dimensions(); + } + return 0; +} + +DataTypePtr get_base_type_of_array(const DataTypePtr& type) { + /// Get raw pointers to avoid extra copying of type pointers. + const DataTypeArray* last_array = nullptr; + const auto* current_type = type.get(); + while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) { + current_type = type_array->get_nested_type().get(); + last_array = type_array; + } + return last_array ? last_array->get_nested_type() : type; +} + +Array create_empty_array_field(size_t num_dimensions) { + DCHECK(num_dimensions > 0); + Array array; + Array* current_array = &array; + for (size_t i = 1; i < num_dimensions; ++i) { + current_array->push_back(Array()); + current_array = ¤t_array->back().get<Array&>(); + } + return array; +} + +FieldType get_field_type(const IDataType* data_type) { + switch (data_type->get_type_id()) { + case TypeIndex::UInt8: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_TINYINT; + case TypeIndex::UInt16: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_SMALLINT; + case TypeIndex::UInt32: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT; + case TypeIndex::UInt64: + return FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT; + case TypeIndex::Int8: + return FieldType::OLAP_FIELD_TYPE_TINYINT; + case TypeIndex::Int16: + return FieldType::OLAP_FIELD_TYPE_SMALLINT; + case TypeIndex::Int32: + return FieldType::OLAP_FIELD_TYPE_INT; + case TypeIndex::Int64: + return FieldType::OLAP_FIELD_TYPE_BIGINT; + case TypeIndex::Float32: + return FieldType::OLAP_FIELD_TYPE_FLOAT; + case TypeIndex::Float64: + return FieldType::OLAP_FIELD_TYPE_DOUBLE; + case TypeIndex::Decimal32: + return FieldType::OLAP_FIELD_TYPE_DECIMAL; + case TypeIndex::Array: + return FieldType::OLAP_FIELD_TYPE_ARRAY; + case TypeIndex::String: + return FieldType::OLAP_FIELD_TYPE_STRING; + case TypeIndex::Date: + return FieldType::OLAP_FIELD_TYPE_DATE; + case TypeIndex::DateTime: + return FieldType::OLAP_FIELD_TYPE_DATETIME; + case TypeIndex::Tuple: + return FieldType::OLAP_FIELD_TYPE_STRUCT; + // TODO add more types + default: + LOG(FATAL) << "unknow type"; + return FieldType::OLAP_FIELD_TYPE_UNKNOWN; + } +} + +Status parse_object_column(ColumnObject& dest, const IColumn& src, bool need_finalize, + const int* row_begin, const int* row_end) { + assert(src.is_column_string()); + const ColumnString* parsing_column {nullptr}; + if (!src.is_nullable()) { + parsing_column = reinterpret_cast<const ColumnString*>(src.get_ptr().get()); + } else { + auto nullable_column = reinterpret_cast<const ColumnNullable*>(src.get_ptr().get()); + parsing_column = reinterpret_cast<const ColumnString*>( + nullable_column->get_nested_column().get_ptr().get()); + } + std::vector<StringRef> jsons; + if (row_begin != nullptr) { + assert(row_end); + for (auto x = row_begin; x != row_end; ++x) { + StringRef ref = parsing_column->get_data_at(*x); + jsons.push_back(ref); + } + } else { + for (size_t i = 0; i < parsing_column->size(); ++i) { + StringRef ref = parsing_column->get_data_at(i); + jsons.push_back(ref); + } + } + // batch parse + RETURN_IF_ERROR(parse_json_to_variant(dest, jsons)); + + if (need_finalize) { + dest.finalize(); + } + return Status::OK(); +} + +Status parse_object_column(Block& block, size_t position) { + // parse variant column and rewrite column + auto col = block.get_by_position(position).column; + const std::string& col_name = block.get_by_position(position).name; + if (!col->is_column_string()) { + return Status::InvalidArgument("only ColumnString can be parsed to ColumnObject"); + } + vectorized::DataTypePtr type( + std::make_shared<vectorized::DataTypeObject>("", col->is_nullable())); + auto column_object = type->create_column(); + RETURN_IF_ERROR( + parse_object_column(assert_cast<ColumnObject&>(column_object->assume_mutable_ref()), + *col, true /*need finalize*/, nullptr, nullptr)); + // replace by object + block.safe_get_by_position(position).column = column_object->get_ptr(); + block.safe_get_by_position(position).type = type; + block.safe_get_by_position(position).name = col_name; + return Status::OK(); +} + +void flatten_object(Block& block, size_t pos, bool replace_if_duplicated) { + auto column_object_ptr = + assert_cast<ColumnObject*>(block.get_by_position(pos).column->assume_mutable().get()); + if (column_object_ptr->empty()) { + block.erase(pos); + return; + } + size_t num_rows = column_object_ptr->size(); + assert(block.rows() <= num_rows); + assert(column_object_ptr->is_finalized()); + Columns subcolumns; + DataTypes types; + Names names; + for (auto& subcolumn : column_object_ptr->get_subcolumns()) { + subcolumns.push_back(subcolumn->data.get_finalized_column().get_ptr()); + types.push_back(subcolumn->data.get_least_common_type()); + names.push_back(subcolumn->path.get_path()); + } + block.erase(pos); + for (size_t i = 0; i < subcolumns.size(); ++i) { + // block may already contains this column, eg. key columns, we should ignore + // or replcace the same column from object subcolumn + if (block.has(names[i])) { + if (replace_if_duplicated) { + auto& column_type_name = block.get_by_name(names[i]); + column_type_name.column = subcolumns[i]; + column_type_name.type = types[i]; + } + continue; + } + block.insert(ColumnWithTypeAndName {subcolumns[i], types[i], names[i]}); + } + + // fill default value + for (auto& [column, _1, _2] : block.get_columns_with_type_and_name()) { + if (column->size() < num_rows) { + column->assume_mutable()->insert_many_defaults(num_rows - column->size()); + } + } +} + +Status flatten_object(Block& block, bool replace_if_duplicated) { + auto object_pos = + std::find_if(block.begin(), block.end(), [](const ColumnWithTypeAndName& column) { + return column.type->get_type_id() == TypeIndex::VARIANT; + }); + if (object_pos != block.end()) { + flatten_object(block, object_pos - block.begin(), replace_if_duplicated); + } + return Status::OK(); +} + +bool is_conversion_required_between_integers(const IDataType& lhs, const IDataType& rhs) { + WhichDataType which_lhs(lhs); + WhichDataType which_rhs(rhs); + bool is_native_int = which_lhs.is_native_int() && which_rhs.is_native_int(); + bool is_native_uint = which_lhs.is_native_uint() && which_rhs.is_native_uint(); + return (is_native_int || is_native_uint) && + lhs.get_size_of_value_in_memory() <= rhs.get_size_of_value_in_memory(); +} + +bool is_conversion_required_between_integers(FieldType lhs, FieldType rhs) { + // We only support signed integers for semi-structure data at present + // TODO add unsigned integers + if (lhs == OLAP_FIELD_TYPE_BIGINT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT || rhs == OLAP_FIELD_TYPE_SMALLINT || + rhs == OLAP_FIELD_TYPE_INT || rhs == OLAP_FIELD_TYPE_BIGINT); + } + if (lhs == OLAP_FIELD_TYPE_INT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT || rhs == OLAP_FIELD_TYPE_SMALLINT || + rhs == OLAP_FIELD_TYPE_INT); + } + if (lhs == OLAP_FIELD_TYPE_SMALLINT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT || rhs == OLAP_FIELD_TYPE_SMALLINT); + } + if (lhs == OLAP_FIELD_TYPE_TINYINT) { + return !(rhs == OLAP_FIELD_TYPE_TINYINT); + } + return true; +} + +Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, ColumnPtr* result) { + ColumnsWithTypeAndName arguments {arg, + {type->create_column_const_with_default_value(1), type, ""}}; + auto function = SimpleFunctionFactory::instance().get_function("CAST", arguments, type); + Block tmp_block {arguments}; + // the 0 position is input argument, the 1 position is to type argument, the 2 position is result argument + vectorized::ColumnNumbers argnum; + argnum.emplace_back(0); + argnum.emplace_back(1); + size_t result_column = tmp_block.columns(); + tmp_block.insert({nullptr, type, arg.name}); + RETURN_IF_ERROR( + function->execute(nullptr, tmp_block, argnum, result_column, arg.column->size())); + *result = std::move(tmp_block.get_by_position(result_column).column); + return Status::OK(); +} + +static void get_column_def(const vectorized::DataTypePtr& data_type, const std::string& name, + TColumnDef* column) { + if (!name.empty()) { + column->columnDesc.__set_columnName(name); + } + if (data_type->is_nullable()) { + const auto& real_type = static_cast<const DataTypeNullable&>(*data_type); + column->columnDesc.__set_isAllowNull(true); + get_column_def(real_type.get_nested_type(), "", column); + return; + } + column->columnDesc.__set_columnType(to_thrift(get_primitive_type(data_type->get_type_id()))); + if (data_type->get_type_id() == TypeIndex::Array) { + TColumnDef child; + column->columnDesc.__set_children({}); + get_column_def(assert_cast<const DataTypeArray*>(data_type.get())->get_nested_type(), "", + &child); + column->columnDesc.columnLength = + TabletColumn::get_field_length_by_type(column->columnDesc.columnType, 0); + column->columnDesc.children.push_back(child.columnDesc); + return; + } + if (data_type->get_type_id() == TypeIndex::Tuple) { + // TODO + // auto tuple_type = assert_cast<const DataTypeTuple*>(data_type.get()); + // DCHECK_EQ(tuple_type->get_elements().size(), tuple_type->get_element_names().size()); + // for (size_t i = 0; i < tuple_type->get_elements().size(); ++i) { + // TColumnDef child; + // get_column_def(tuple_type->get_element(i), tuple_type->get_element_names()[i], &child); + // column->columnDesc.children.push_back(child.columnDesc); + // } + // return; + } + if (data_type->get_type_id() == TypeIndex::String) { + return; + } + if (WhichDataType(*data_type).is_simple()) { + column->columnDesc.__set_columnLength(data_type->get_size_of_value_in_memory()); + return; + } +} + +// send an empty add columns rpc, the rpc response will fill with base schema info +// maybe we could seperate this rpc from add columns rpc +Status send_fetch_full_base_schema_view_rpc(FullBaseSchemaView* schema_view) { Review Comment: The name is misleading, since it's not just send rpc but also get result. ########## be/src/vec/sink/vtablet_sink.cpp: ########## @@ -225,8 +225,6 @@ Status VNodeChannel::init(RuntimeState* state) { _rpc_timeout_ms = state->query_options().query_timeout * 1000; _timeout_watch.start(); - _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); Review Comment: why delete it ########## be/src/runtime/types.h: ########## @@ -201,6 +202,8 @@ struct TypeDescriptor { bool is_bitmap_type() const { return type == TYPE_OBJECT; } + bool is_variant() const { return type == TYPE_VARIANT; } Review Comment: is_variant_type ########## be/src/vec/core/block.cpp: ########## @@ -863,6 +876,10 @@ void MutableBlock::add_row(const Block* block, int row) { } void MutableBlock::add_rows(const Block* block, const int* row_begin, const int* row_end) { + if (_type == BlockType::DYNAMIC) { Review Comment: It may be more efficient to add UNLIKELY since most tables are without dynamic schema. ########## be/src/vec/core/field.h: ########## @@ -86,6 +86,14 @@ using FieldVector = std::vector<Field>; DEFINE_FIELD_VECTOR(Array); DEFINE_FIELD_VECTOR(Tuple); +using FieldMap = std::map<String, Field, std::less<String>>; Review Comment: Is there any consideration to define FieldMap inside DEFINE_FIELD_VECTOR scope? ########## be/src/vec/io/var_int.h: ########## @@ -19,6 +19,7 @@ #include <iostream> +#include "vec/common/string_buffer.hpp" Review Comment: not used include? ########## be/src/vec/functions/if.cpp: ########## @@ -122,7 +122,10 @@ class FunctionIf : public IFunction { } DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { - return get_least_supertype({arguments[1], arguments[2]}); + DataTypePtr type = nullptr; + get_least_supertype(DataTypes {arguments[1], arguments[2]}, &type); Review Comment: why not keep return value? ########## be/src/vec/data_types/get_least_supertype.h: ########## @@ -30,6 +33,13 @@ namespace doris::vectorized { * Examples: least common supertype for UInt8, Int8 - Int16. * Examples: there is no least common supertype for Array(UInt8), Int8. */ -DataTypePtr get_least_supertype(const DataTypes& types); Review Comment: should keep original version for backward compatability. ########## be/src/olap/rowset/rowset_meta.h: ########## @@ -367,7 +367,6 @@ class RowsetMeta { int64_t newest_write_timestamp() const { return _rowset_meta_pb.newest_write_timestamp(); } void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) { - DCHECK(_schema == nullptr); Review Comment: why delete DCHECK? ########## be/src/olap/memtable.cpp: ########## @@ -148,7 +148,11 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); - auto target_block = input_block->copy_block(_column_offset); + vectorized::Block target_block = *input_block; + // maybe rollup tablet, dynamic table's tablet need full columns + if (!_tablet_schema->is_dynamic_schema()) { Review Comment: add comment for why not copy_block for dynamic schema ########## be/src/vec/core/field.h: ########## @@ -86,6 +86,14 @@ using FieldVector = std::vector<Field>; DEFINE_FIELD_VECTOR(Array); DEFINE_FIELD_VECTOR(Tuple); +using FieldMap = std::map<String, Field, std::less<String>>; +#define DEFINE_FIELD_MAP(X) \ + struct X : public FieldMap { \ + using FieldMap::FieldMap; \ + } +DEFINE_FIELD_MAP(Object); Review Comment: use consistent name Variant for type variant ########## be/src/runtime/primitive_type.cpp: ########## @@ -526,6 +526,39 @@ TTypeDesc gen_type_desc(const TPrimitiveType::type val, const std::string& name) return type_desc; } +PrimitiveType get_primitive_type(vectorized::TypeIndex v_type) { + switch (v_type) { + case vectorized::TypeIndex::Int8: + return PrimitiveType::TYPE_TINYINT; + case vectorized::TypeIndex::Int16: + return PrimitiveType::TYPE_SMALLINT; + case vectorized::TypeIndex::Int32: + return PrimitiveType::TYPE_INT; + case vectorized::TypeIndex::Int64: + return PrimitiveType::TYPE_BIGINT; + case vectorized::TypeIndex::Float32: + return PrimitiveType::TYPE_FLOAT; + case vectorized::TypeIndex::Float64: + return PrimitiveType::TYPE_DOUBLE; + case vectorized::TypeIndex::Decimal32: + return PrimitiveType::TYPE_DECIMALV2; + case vectorized::TypeIndex::Array: + return PrimitiveType::TYPE_ARRAY; + case vectorized::TypeIndex::String: + return PrimitiveType::TYPE_STRING; + case vectorized::TypeIndex::Date: + return PrimitiveType::TYPE_DATE; + case vectorized::TypeIndex::DateTime: + return PrimitiveType::TYPE_DATETIME; + case vectorized::TypeIndex::Tuple: + return PrimitiveType::TYPE_STRUCT; + // TODO add vectorized::more types Review Comment: add DECIMAL128I, JSONB, MAP ... ########## be/src/vec/sink/vtablet_sink.cpp: ########## @@ -428,7 +426,16 @@ Status VNodeChannel::add_block(vectorized::Block* block, std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - block->append_block_by_selector(_cur_mutable_block->mutable_columns(), *(payload.first)); + if (UNLIKELY(!_cur_mutable_block)) { + _cur_mutable_block.reset(new vectorized::MutableBlock(block->clone_empty())); Review Comment: use block->clone_empty() instead of _tuple_desc to keep new schema? ########## be/src/vec/core/field.h: ########## @@ -308,6 +316,7 @@ class Field { AggregateFunctionState = 22, JSONB = 23, Decimal128I = 24, + Object = 25, Review Comment: use consistent name Variant ########## be/src/vec/core/field.h: ########## @@ -346,6 +355,8 @@ class Field { return "AggregateFunctionState"; case FixedLengthObject: return "FixedLengthObject"; + case Object: + return "Object"; Review Comment: use consistent name Variant ########## be/src/vec/core/block.cpp: ########## @@ -678,11 +682,14 @@ Block Block::copy_block(const std::vector<int>& column_offset) const { return columns_with_type_and_name; } -void Block::append_block_by_selector(MutableColumns& columns, - const IColumn::Selector& selector) const { - DCHECK(data.size() == columns.size()); +void Block::append_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const { + if (dst->get_block_type() == BlockType::DYNAMIC) { Review Comment: It may be more efficient to add UNLIKELY since most tables are without dynamic schema. ########## be/src/vec/columns/columns_common.h: ########## @@ -51,4 +51,45 @@ void filter_arrays_impl_only_data(const PaddedPODArray<T>& src_elems, PaddedPODArray<T>& res_elems, const IColumn::Filter& filt, ssize_t result_size_hint); +namespace detail { +template <typename T> +const PaddedPODArray<T>* get_indexes_data(const IColumn& indexes); +} + +/// Check limit <= indexes->size() and call column.index_impl(const PaddedPodArray<Type> & indexes, UInt64 limit). +template <typename Column> +ColumnPtr select_index_impl(const Column& column, const IColumn& indexes, size_t limit) { + if (limit == 0) { + limit = indexes.size(); + } + + if (indexes.size() < limit) { + LOG(FATAL) << "Size of indexes is less than required."; + } + + if (auto* data_uint8 = detail::get_indexes_data<UInt8>(indexes)) { + return column.template index_impl<UInt8>(*data_uint8, limit); + } else if (auto* data_uint16 = detail::get_indexes_data<UInt16>(indexes)) { + return column.template index_impl<UInt16>(*data_uint16, limit); + } else if (auto* data_uint32 = detail::get_indexes_data<UInt32>(indexes)) { + return column.template index_impl<UInt32>(*data_uint32, limit); + } else if (auto* data_uint64 = detail::get_indexes_data<UInt64>(indexes)) { Review Comment: Is the method suitable for Int8/16/32, Int/UInt128? ########## be/src/vec/core/block.cpp: ########## @@ -872,6 +889,10 @@ void MutableBlock::add_rows(const Block* block, const int* row_begin, const int* } void MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { + if (_type == BlockType::DYNAMIC) { Review Comment: It may be more efficient to add UNLIKELY since most tables are without dynamic schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org