This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7aa36cb4438 [improve](serde) support complex type in write/read pb serde (#33124) 7aa36cb4438 is described below commit 7aa36cb44380e36b630d61b4e1e7c040e29c96ce Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Tue Apr 9 14:21:10 2024 +0800 [improve](serde) support complex type in write/read pb serde (#33124) support complex type and ip/jsonb in DataTypeSerDe::write_column_to_pb/read_column_from_pb function --- be/src/vec/columns/column_array.cpp | 3 +- be/src/vec/columns/column_map.cpp | 6 +- .../vec/data_types/serde/data_type_array_serde.cpp | 30 ++ .../vec/data_types/serde/data_type_array_serde.h | 8 +- .../vec/data_types/serde/data_type_decimal_serde.h | 7 +- .../vec/data_types/serde/data_type_ipv4_serde.cpp | 20 + be/src/vec/data_types/serde/data_type_ipv4_serde.h | 3 + .../vec/data_types/serde/data_type_ipv6_serde.cpp | 24 + be/src/vec/data_types/serde/data_type_ipv6_serde.h | 3 + .../vec/data_types/serde/data_type_jsonb_serde.cpp | 28 ++ .../vec/data_types/serde/data_type_jsonb_serde.h | 3 + .../vec/data_types/serde/data_type_map_serde.cpp | 37 ++ be/src/vec/data_types/serde/data_type_map_serde.h | 8 +- .../data_types/serde/data_type_nullable_serde.cpp | 16 +- .../vec/data_types/serde/data_type_number_serde.h | 35 +- .../vec/data_types/serde/data_type_string_serde.h | 8 +- .../data_types/serde/data_type_struct_serde.cpp | 71 ++- .../vec/data_types/serde/data_type_struct_serde.h | 20 +- .../data_types/serde/data_type_serde_pb_test.cpp | 492 ++++++++++++++++++++- gensrc/proto/internal_service.proto | 4 +- gensrc/proto/types.proto | 2 + 21 files changed, 738 insertions(+), 90 deletions(-) diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 7251f1e1798..442ffd44422 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -100,7 +100,8 @@ ColumnArray::ColumnArray(MutableColumnPtr&& nested_column, MutableColumnPtr&& of /// This will also prevent possible overflow in offset. if (data->size() != last_offset) { - LOG(FATAL) << "offsets_column has data inconsistent with nested_column"; + LOG(FATAL) << "offsets_column has data inconsistent with nested_column " << data->size() + << " " << last_offset; } } diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index bec58bfda7b..9303d628933 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -58,10 +58,12 @@ ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, Mutable /// This will also prevent possible overflow in offset. if (keys_column->size() != last_offset) { - LOG(FATAL) << "offsets_column has data inconsistent with key_column"; + LOG(FATAL) << "offsets_column has data inconsistent with key_column " + << keys_column->size() << " " << last_offset; } if (values_column->size() != last_offset) { - LOG(FATAL) << "offsets_column has data inconsistent with value_column"; + LOG(FATAL) << "offsets_column has data inconsistent with value_column " + << values_column->size() << " " << last_offset; } } } diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index 38fe92d4683..dfe39b2c8a4 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -392,5 +392,35 @@ Status DataTypeArraySerDe::write_column_to_orc(const std::string& timezone, cons return Status::OK(); } +Status DataTypeArraySerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + const auto& array_col = assert_cast<const ColumnArray&>(column); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::LIST); + const IColumn& nested_column = array_col.get_data(); + const auto& offsets = array_col.get_offsets(); + auto* child_element = result.add_child_element(); + for (size_t row_id = start; row_id < end; row_id++) { + size_t offset = offsets[row_id - 1]; + size_t next_offset = offsets[row_id]; + result.add_child_offset(next_offset); + RETURN_IF_ERROR(nested_serde->write_column_to_pb(nested_column, *child_element, offset, + next_offset)); + } + return Status::OK(); +} + +Status DataTypeArraySerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& array_column = assert_cast<ColumnArray&>(column); + auto& offsets = array_column.get_offsets(); + IColumn& nested_column = array_column.get_data(); + for (int i = 0; i < arg.child_offset_size(); ++i) { + offsets.emplace_back(arg.child_offset(i)); + } + for (int i = 0; i < arg.child_element_size(); ++i) { + RETURN_IF_ERROR(nested_serde->read_column_from_pb(nested_column, arg.child_element(i))); + } + return Status::OK(); +} } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_array_serde.h index 24b57888c0a..d7d709727d4 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.h +++ b/be/src/vec/data_types/serde/data_type_array_serde.h @@ -64,13 +64,9 @@ public: int hive_text_complex_type_delimiter_level = 1) const override; Status write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const override { - return Status::NotSupported("write_column_to_pb with type " + column.get_name()); - } + int end) const override; - Status read_column_from_pb(IColumn& column, const PValues& arg) const override { - return Status::NotSupported("read_column_from_pb with type " + column.get_name()); - } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, int32_t col_id, int row_num) const override; diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h b/be/src/vec/data_types/serde/data_type_decimal_serde.h index f1aab124cff..6209838fb4b 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.h +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h @@ -127,7 +127,7 @@ Status DataTypeDecimalSerDe<T>::write_column_to_pb(const IColumn& column, PValue int start, int end) const { int row_count = end - start; const auto* col = check_and_get_column<ColumnDecimal<T>>(column); - auto ptype = result.mutable_type(); + auto* ptype = result.mutable_type(); if constexpr (std::is_same_v<T, Decimal<Int128>>) { ptype->set_id(PGenericType::DECIMAL128); } else if constexpr (std::is_same_v<T, Decimal128V3>) { @@ -154,10 +154,11 @@ template <typename T> Status DataTypeDecimalSerDe<T>::read_column_from_pb(IColumn& column, const PValues& arg) const { if constexpr (std::is_same_v<T, Decimal<Int128>> || std::is_same_v<T, Decimal128V3> || std::is_same_v<T, Decimal256> || std::is_same_v<T, Decimal<Int32>>) { - column.resize(arg.bytes_value_size()); + auto old_column_size = column.size(); + column.resize(old_column_size + arg.bytes_value_size()); auto& data = reinterpret_cast<ColumnDecimal<T>&>(column).get_data(); for (int i = 0; i < arg.bytes_value_size(); ++i) { - data[i] = *(T*)(arg.bytes_value(i).c_str()); + data[old_column_size + i] = *(T*)(arg.bytes_value(i).c_str()); } return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp index dc9559ec1d3..55ad8544b85 100644 --- a/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv4_serde.cpp @@ -88,5 +88,25 @@ Status DataTypeIPv4SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& return Status::OK(); } +Status DataTypeIPv4SerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + const auto& column_data = assert_cast<const ColumnIPv4&>(column).get_data(); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::IPV4); + auto* values = result.mutable_uint32_value(); + values->Reserve(end - start); + values->Add(column_data.begin() + start, column_data.begin() + end); + return Status::OK(); +} + +Status DataTypeIPv4SerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& col_data = static_cast<ColumnIPv4&>(column).get_data(); + auto old_column_size = column.size(); + column.resize(old_column_size + arg.uint32_value_size()); + for (int i = 0; i < arg.uint32_value_size(); ++i) { + col_data[old_column_size + i] = arg.uint32_value(i); + } + return Status::OK(); +} } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_ipv4_serde.h b/be/src/vec/data_types/serde/data_type_ipv4_serde.h index 654b7d9532c..20f45e2bea3 100644 --- a/be/src/vec/data_types/serde/data_type_ipv4_serde.h +++ b/be/src/vec/data_types/serde/data_type_ipv4_serde.h @@ -50,6 +50,9 @@ public: FormatOptions& options) const override; Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override; + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; private: template <bool is_binary_format> diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index 7c9d0f42582..a3a6d837b00 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -20,6 +20,7 @@ #include <arrow/builder.h> #include "vec/columns/column_const.h" +#include "vec/core/types.h" #include "vec/io/io_helper.h" namespace doris { @@ -88,5 +89,28 @@ Status DataTypeIPv6SerDe::deserialize_one_cell_from_json(IColumn& column, Slice& return Status::OK(); } +Status DataTypeIPv6SerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + const auto& column_data = assert_cast<const ColumnIPv6&>(column); + result.mutable_bytes_value()->Reserve(end - start); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::IPV6); + for (int i = start; i < end; ++i) { + const auto& val = column_data.get_data_at(i); + result.add_bytes_value(val.data, val.size); + } + return Status::OK(); +} + +Status DataTypeIPv6SerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& col_data = static_cast<ColumnIPv6&>(column).get_data(); + auto old_column_size = column.size(); + col_data.resize(old_column_size + arg.bytes_value_size()); + for (int i = 0; i < arg.bytes_value_size(); ++i) { + col_data[old_column_size + i] = *(IPv6*)(arg.bytes_value(i).c_str()); + } + return Status::OK(); +} + } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.h b/be/src/vec/data_types/serde/data_type_ipv6_serde.h index e48039281c1..308f4639b7b 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.h +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.h @@ -53,6 +53,9 @@ public: FormatOptions& options) const override; Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override; + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; private: template <bool is_binary_format> diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index b7c2e29e0bf..f632b5d83e8 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -239,6 +239,34 @@ Status DataTypeJsonbSerDe::read_one_cell_from_json(IColumn& column, parser.getWriter().getOutput()->getSize()); return Status::OK(); } +Status DataTypeJsonbSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + const auto& string_column = assert_cast<const ColumnString&>(column); + result.mutable_string_value()->Reserve(end - start); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::JSONB); + for (size_t row_num = start; row_num < end; ++row_num) { + const auto& string_ref = string_column.get_data_at(row_num); + if (string_ref.size > 0) { + result.add_string_value( + JsonbToJson::jsonb_to_json_string(string_ref.data, string_ref.size)); + } else { + result.add_string_value(NULL_IN_CSV_FOR_ORDINARY_TYPE); + } + } + return Status::OK(); +} +Status DataTypeJsonbSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& column_string = assert_cast<ColumnString&>(column); + column_string.reserve(column_string.size() + arg.string_value_size()); + JsonBinaryValue value; + for (int i = 0; i < arg.string_value_size(); ++i) { + RETURN_IF_ERROR( + value.from_json_string(arg.string_value(i).c_str(), arg.string_value(i).size())); + column_string.insert_data(value.value(), value.size()); + } + return Status::OK(); +} } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h b/be/src/vec/data_types/serde/data_type_jsonb_serde.h index ec4436d149e..d0eaabc81e2 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h @@ -65,6 +65,9 @@ public: rapidjson::Document::AllocatorType& allocator, int row_num) const override; Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const override; + Status write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; private: template <bool is_binary_format> diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp b/be/src/vec/data_types/serde/data_type_map_serde.cpp index 563a6663755..893aa57c1f4 100644 --- a/be/src/vec/data_types/serde/data_type_map_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp @@ -510,5 +510,42 @@ Status DataTypeMapSerDe::write_column_to_orc(const std::string& timezone, const return Status::OK(); } +Status DataTypeMapSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + const auto& map_column = assert_cast<const ColumnMap&>(column); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::MAP); + const ColumnArray::Offsets64& offsets = map_column.get_offsets(); + const IColumn& nested_keys_column = map_column.get_keys(); + const IColumn& nested_values_column = map_column.get_values(); + auto* key_child_element = result.add_child_element(); + auto* value_child_element = result.add_child_element(); + for (size_t row_id = start; row_id < end; row_id++) { + size_t offset = offsets[row_id - 1]; + size_t next_offset = offsets[row_id]; + result.add_child_offset(next_offset); + RETURN_IF_ERROR(key_serde->write_column_to_pb(nested_keys_column, *key_child_element, + offset, next_offset)); + RETURN_IF_ERROR(value_serde->write_column_to_pb(nested_values_column, *value_child_element, + offset, next_offset)); + } + return Status::OK(); +} + +Status DataTypeMapSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& map_column = assert_cast<ColumnMap&>(column); + auto& offsets = map_column.get_offsets(); + auto& key_column = map_column.get_keys(); + auto& value_column = map_column.get_values(); + for (int i = 0; i < arg.child_offset_size(); ++i) { + offsets.emplace_back(arg.child_offset(i)); + } + for (int i = 0; i < arg.child_element_size(); i = i + 2) { + RETURN_IF_ERROR(key_serde->read_column_from_pb(key_column, arg.child_element(i))); + RETURN_IF_ERROR(value_serde->read_column_from_pb(value_column, arg.child_element(i + 1))); + } + return Status::OK(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h b/be/src/vec/data_types/serde/data_type_map_serde.h index 264a22d853f..c5c767b622e 100644 --- a/be/src/vec/data_types/serde/data_type_map_serde.h +++ b/be/src/vec/data_types/serde/data_type_map_serde.h @@ -65,12 +65,8 @@ public: int hive_text_complex_type_delimiter_level = 1) const override; Status write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const override { - return Status::NotSupported("write_column_to_pb with type " + column.get_name()); - } - Status read_column_from_pb(IColumn& column, const PValues& arg) const override { - return Status::NotSupported("read_column_from_pb with type " + column.get_name()); - } + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, int32_t col_id, int row_num) const override; diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp index 55bdbe1fccc..fa8f9580f79 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp @@ -198,14 +198,14 @@ Status DataTypeNullableSerDe::deserialize_one_cell_from_json(IColumn& column, Sl Status DataTypeNullableSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, int end) const { int row_count = end - start; - auto& nullable_col = assert_cast<const ColumnNullable&>(column); - auto& null_col = nullable_col.get_null_map_column(); + const auto& nullable_col = assert_cast<const ColumnNullable&>(column); + const auto& null_col = nullable_col.get_null_map_column(); if (nullable_col.has_null(row_count)) { result.set_has_null(true); auto* null_map = result.mutable_null_map(); null_map->Reserve(row_count); const auto* col = check_and_get_column<ColumnUInt8>(null_col); - auto& data = col->get_data(); + const auto& data = col->get_data(); null_map->Add(data.begin() + start, data.begin() + end); } return nested_serde->write_column_to_pb(nullable_col.get_nested_column(), result, start, end); @@ -216,17 +216,19 @@ Status DataTypeNullableSerDe::read_column_from_pb(IColumn& column, const PValues auto& col = reinterpret_cast<ColumnNullable&>(column); auto& null_map_data = col.get_null_map_data(); auto& nested = col.get_nested_column(); + auto old_size = nested.size(); if (Status st = nested_serde->read_column_from_pb(nested, arg); !st.ok()) { return st; } - null_map_data.resize(nested.size()); + auto new_size = nested.size(); + null_map_data.resize(new_size); if (arg.has_null()) { for (int i = 0; i < arg.null_map_size(); ++i) { - null_map_data[i] = arg.null_map(i); + null_map_data[old_size + i] = arg.null_map(i); } } else { - for (int i = 0; i < nested.size(); ++i) { - null_map_data[i] = false; + for (int i = 0; i < new_size - old_size; ++i) { + null_map_data[old_size + i] = false; } } return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h b/be/src/vec/data_types/serde/data_type_number_serde.h index 28e2ade1664..32cbfd5069e 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.h +++ b/be/src/vec/data_types/serde/data_type_number_serde.h @@ -107,49 +107,50 @@ private: template <typename T> Status DataTypeNumberSerDe<T>::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto old_column_size = column.size(); if constexpr (std::is_same_v<T, UInt8> || std::is_same_v<T, UInt16> || std::is_same_v<T, UInt32>) { - column.resize(arg.uint32_value_size()); - auto& data = reinterpret_cast<ColumnType&>(column).get_data(); + column.resize(old_column_size + arg.uint32_value_size()); + auto& data = assert_cast<ColumnType&>(column).get_data(); for (int i = 0; i < arg.uint32_value_size(); ++i) { - data[i] = arg.uint32_value(i); + data[old_column_size + i] = arg.uint32_value(i); } } else if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, Int16> || std::is_same_v<T, Int32>) { - column.resize(arg.int32_value_size()); + column.resize(old_column_size + arg.int32_value_size()); auto& data = reinterpret_cast<ColumnType&>(column).get_data(); for (int i = 0; i < arg.int32_value_size(); ++i) { - data[i] = arg.int32_value(i); + data[old_column_size + i] = arg.int32_value(i); } } else if constexpr (std::is_same_v<T, UInt64>) { - column.resize(arg.uint64_value_size()); + column.resize(old_column_size + arg.uint64_value_size()); auto& data = reinterpret_cast<ColumnType&>(column).get_data(); for (int i = 0; i < arg.uint64_value_size(); ++i) { - data[i] = arg.uint64_value(i); + data[old_column_size + i] = arg.uint64_value(i); } } else if constexpr (std::is_same_v<T, Int64>) { - column.resize(arg.int64_value_size()); + column.resize(old_column_size + arg.int64_value_size()); auto& data = reinterpret_cast<ColumnType&>(column).get_data(); for (int i = 0; i < arg.int64_value_size(); ++i) { - data[i] = arg.int64_value(i); + data[old_column_size + i] = arg.int64_value(i); } } else if constexpr (std::is_same_v<T, float>) { - column.resize(arg.float_value_size()); + column.resize(old_column_size + arg.float_value_size()); auto& data = reinterpret_cast<ColumnType&>(column).get_data(); for (int i = 0; i < arg.float_value_size(); ++i) { - data[i] = arg.float_value(i); + data[old_column_size + i] = arg.float_value(i); } } else if constexpr (std::is_same_v<T, double>) { - column.resize(arg.double_value_size()); + column.resize(old_column_size + arg.double_value_size()); auto& data = reinterpret_cast<ColumnType&>(column).get_data(); - for (int i = 0; i < arg.float_value_size(); ++i) { - data[i] = arg.double_value(i); + for (int i = 0; i < arg.double_value_size(); ++i) { + data[old_column_size + i] = arg.double_value(i); } } else if constexpr (std::is_same_v<T, Int128>) { - column.resize(arg.bytes_value_size()); + column.resize(old_column_size + arg.bytes_value_size()); auto& data = reinterpret_cast<ColumnType&>(column).get_data(); for (int i = 0; i < arg.bytes_value_size(); ++i) { - data[i] = *(int128_t*)(arg.bytes_value(i).c_str()); + data[old_column_size + i] = *(int128_t*)(arg.bytes_value(i).c_str()); } } else { return Status::NotSupported("unknown ColumnType for reading from pb"); @@ -161,7 +162,7 @@ template <typename T> Status DataTypeNumberSerDe<T>::write_column_to_pb(const IColumn& column, PValues& result, int start, int end) const { int row_count = end - start; - auto ptype = result.mutable_type(); + auto* ptype = result.mutable_type(); const auto* col = check_and_get_column<ColumnVector<T>>(column); if constexpr (std::is_same_v<T, Int128>) { ptype->set_id(PGenericType::INT128); diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h index 590c8e3808b..c6cef1babd1 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.h +++ b/be/src/vec/data_types/serde/data_type_string_serde.h @@ -123,7 +123,7 @@ public: Status write_column_to_pb(const IColumn& column, PValues& result, int start, int end) const override { - result.mutable_bytes_value()->Reserve(end - start); + result.mutable_string_value()->Reserve(end - start); auto* ptype = result.mutable_type(); ptype->set_id(PGenericType::STRING); for (size_t row_num = start; row_num < end; ++row_num) { @@ -133,10 +133,10 @@ public: return Status::OK(); } Status read_column_from_pb(IColumn& column, const PValues& arg) const override { - column.reserve(arg.string_value_size()); + auto& column_dest = assert_cast<ColumnType&>(column); + column_dest.reserve(column_dest.size() + arg.string_value_size()); for (int i = 0; i < arg.string_value_size(); ++i) { - assert_cast<ColumnType&>(column).insert_data(arg.string_value(i).c_str(), - arg.string_value(i).size()); + column_dest.insert_data(arg.string_value(i).c_str(), arg.string_value(i).size()); } return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp b/be/src/vec/data_types/serde/data_type_struct_serde.cpp index 8c8e80c0c53..a574d9553a7 100644 --- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp @@ -31,9 +31,9 @@ namespace vectorized { class Arena; std::optional<size_t> DataTypeStructSerDe::try_get_position_by_name(const String& name) const { - size_t size = elemSerDeSPtrs.size(); + size_t size = elem_serdes_ptrs.size(); for (size_t i = 0; i < size; ++i) { - if (elemNames[i] == name) { + if (elem_names[i] == name) { return std::optional<size_t>(i); } } @@ -60,10 +60,10 @@ Status DataTypeStructSerDe::serialize_one_cell_to_json(const IColumn& column, in bw.write(','); bw.write(' '); } - std::string col_name = "\"" + elemNames[i] + "\": "; + std::string col_name = "\"" + elem_names[i] + "\": "; bw.write(col_name.c_str(), col_name.length()); - RETURN_IF_ERROR(elemSerDeSPtrs[i]->serialize_one_cell_to_json(struct_column.get_column(i), - row_num, bw, options)); + RETURN_IF_ERROR(elem_serdes_ptrs[i]->serialize_one_cell_to_json(struct_column.get_column(i), + row_num, bw, options)); } bw.write('}'); return Status::OK(); @@ -109,7 +109,7 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_json(IColumn& column, Slic int idx = 0; char quote_char = 0; - auto elem_size = elemSerDeSPtrs.size(); + auto elem_size = elem_serdes_ptrs.size(); int field_pos = 0; for (; idx < slice_size; ++idx) { @@ -137,7 +137,7 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_json(IColumn& column, Slic next.trim_prefix(); next.trim_quote(); // check field_name - if (elemNames[field_pos] != next) { + if (elem_names[field_pos] != next) { // we should do column revert if error for (size_t j = 0; j < field_pos; j++) { struct_column.get_column(j).pop_back(1); @@ -166,7 +166,7 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_json(IColumn& column, Slic "Actual struct field number is more than schema field number {}.", field_pos, elem_size); } - if (Status st = elemSerDeSPtrs[field_pos]->deserialize_one_cell_from_json( + if (Status st = elem_serdes_ptrs[field_pos]->deserialize_one_cell_from_json( struct_column.get_column(field_pos), next, options); st != Status::OK()) { // we should do column revert if error @@ -196,7 +196,7 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_json(IColumn& column, Slic "Actual struct field number is more than schema field number {}.", field_pos, elem_size); } - if (Status st = elemSerDeSPtrs[field_pos]->deserialize_one_cell_from_json( + if (Status st = elem_serdes_ptrs[field_pos]->deserialize_one_cell_from_json( struct_column.get_column(field_pos), next, options); st != Status::OK()) { // we should do column revert if error @@ -255,7 +255,7 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_hive_text( } auto& struct_column = static_cast<ColumnStruct&>(column); for (size_t loc = 0; loc < struct_column.get_columns().size(); loc++) { - Status st = elemSerDeSPtrs[loc]->deserialize_one_cell_from_hive_text( + Status st = elem_serdes_ptrs[loc]->deserialize_one_cell_from_hive_text( struct_column.get_column(loc), slices[loc], options, hive_text_complex_type_delimiter_level + 1); if (st != Status::OK()) { @@ -287,7 +287,7 @@ void DataTypeStructSerDe::serialize_one_cell_to_hive_text( if (i != 0) { bw.write(collection_delimiter); } - elemSerDeSPtrs[i]->serialize_one_cell_to_hive_text( + elem_serdes_ptrs[i]->serialize_one_cell_to_hive_text( struct_column.get_column(i), row_num, bw, options, hive_text_complex_type_delimiter_level + 1); } @@ -312,8 +312,8 @@ void DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const Nul checkArrowStatus(builder.Append(), struct_column.get_name(), builder.type()->name()); for (size_t ei = 0; ei < struct_column.tuple_size(); ++ei) { auto elem_builder = builder.field_builder(ei); - elemSerDeSPtrs[ei]->write_column_to_arrow(struct_column.get_column(ei), nullptr, - elem_builder, r, r + 1, ctz); + elem_serdes_ptrs[ei]->write_column_to_arrow(struct_column.get_column(ei), nullptr, + elem_builder, r, r + 1, ctz); } } } @@ -325,8 +325,8 @@ void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const arrow::A auto concrete_struct = dynamic_cast<const arrow::StructArray*>(arrow_array); DCHECK_EQ(struct_column.tuple_size(), concrete_struct->num_fields()); for (size_t i = 0; i < struct_column.tuple_size(); ++i) { - elemSerDeSPtrs[i]->read_column_from_arrow(struct_column.get_column(i), - concrete_struct->field(i).get(), start, end, ctz); + elem_serdes_ptrs[i]->read_column_from_arrow( + struct_column.get_column(i), concrete_struct->field(i).get(), start, end, ctz); } } @@ -341,14 +341,14 @@ Status DataTypeStructSerDe::_write_column_to_mysql(const IColumn& column, return Status::InternalError("pack mysql buffer failed."); } bool begin = true; - for (size_t j = 0; j < elemSerDeSPtrs.size(); ++j) { + for (size_t j = 0; j < elem_serdes_ptrs.size(); ++j) { if (!begin) { if (0 != result.push_string(", ", 2)) { return Status::InternalError("pack mysql buffer failed."); } } - std::string col_name = "\"" + elemNames[j] + "\": "; + std::string col_name = "\"" + elem_names[j] + "\": "; if (0 != result.push_string(col_name.c_str(), col_name.length())) { return Status::InternalError("pack mysql buffer failed."); } @@ -363,14 +363,14 @@ Status DataTypeStructSerDe::_write_column_to_mysql(const IColumn& column, if (0 != result.push_string("\"", 1)) { return Status::InternalError("pack mysql buffer failed."); } - RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql(col.get_column(j), result, - col_index, false)); + RETURN_IF_ERROR(elem_serdes_ptrs[j]->write_column_to_mysql( + col.get_column(j), result, col_index, false)); if (0 != result.push_string("\"", 1)) { return Status::InternalError("pack mysql buffer failed."); } } else { - RETURN_IF_ERROR(elemSerDeSPtrs[j]->write_column_to_mysql(col.get_column(j), result, - col_index, false)); + RETURN_IF_ERROR(elem_serdes_ptrs[j]->write_column_to_mysql( + col.get_column(j), result, col_index, false)); } } begin = false; @@ -403,7 +403,7 @@ Status DataTypeStructSerDe::write_column_to_orc(const std::string& timezone, con const ColumnStruct& struct_col = assert_cast<const ColumnStruct&>(column); for (size_t row_id = start; row_id < end; row_id++) { for (int i = 0; i < struct_col.tuple_size(); ++i) { - RETURN_IF_ERROR(elemSerDeSPtrs[i]->write_column_to_orc( + RETURN_IF_ERROR(elem_serdes_ptrs[i]->write_column_to_orc( timezone, struct_col.get_column(i), nullptr, cur_batch->fields[i], row_id, row_id + 1, buffer_list)); } @@ -413,5 +413,32 @@ Status DataTypeStructSerDe::write_column_to_orc(const std::string& timezone, con return Status::OK(); } +Status DataTypeStructSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, + int end) const { + const auto& struct_col = assert_cast<const ColumnStruct&>(column); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::STRUCT); + auto tuple_size = struct_col.tuple_size(); + PValues* child_elements[tuple_size]; + for (int i = 0; i < tuple_size; ++i) { + child_elements[i] = result.add_child_element(); + } + for (int i = 0; i < tuple_size; ++i) { + RETURN_IF_ERROR(elem_serdes_ptrs[i]->write_column_to_pb(struct_col.get_column(i), + *child_elements[i], start, end)); + } + return Status::OK(); +} + +Status DataTypeStructSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { + auto& struct_column = assert_cast<ColumnStruct&>(column); + DCHECK_EQ(struct_column.tuple_size(), arg.child_element_size()); + for (size_t i = 0; i < struct_column.tuple_size(); ++i) { + RETURN_IF_ERROR(elem_serdes_ptrs[i]->read_column_from_pb(struct_column.get_column(i), + arg.child_element(i))); + } + return Status::OK(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h b/be/src/vec/data_types/serde/data_type_struct_serde.h index b09def9719f..6f89575469e 100644 --- a/be/src/vec/data_types/serde/data_type_struct_serde.h +++ b/be/src/vec/data_types/serde/data_type_struct_serde.h @@ -108,9 +108,11 @@ public: return true; } - DataTypeStructSerDe(const DataTypeSerDeSPtrs& _elemSerDeSPtrs, const Strings names, + DataTypeStructSerDe(const DataTypeSerDeSPtrs& _elem_serdes_ptrs, const Strings names, int nesting_level = 1) - : DataTypeSerDe(nesting_level), elemSerDeSPtrs(_elemSerDeSPtrs), elemNames(names) {} + : DataTypeSerDe(nesting_level), + elem_serdes_ptrs(_elem_serdes_ptrs), + elem_names(names) {} Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw, FormatOptions& options) const override; @@ -137,12 +139,8 @@ public: int hive_text_complex_type_delimiter_level = 1) const override; Status write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const override { - return Status::NotSupported("write_column_to_pb with type " + column.get_name()); - } - Status read_column_from_pb(IColumn& column, const PValues& arg) const override { - return Status::NotSupported("read_column_from_pb with type " + column.get_name()); - } + int end) const override; + Status read_column_from_pb(IColumn& column, const PValues& arg) const override; void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, int32_t col_id, int row_num) const override; @@ -166,7 +164,7 @@ public: void set_return_object_as_string(bool value) override { DataTypeSerDe::set_return_object_as_string(value); - for (auto& serde : elemSerDeSPtrs) { + for (auto& serde : elem_serdes_ptrs) { serde->set_return_object_as_string(value); } } @@ -182,8 +180,8 @@ private: Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, int row_idx, bool col_const) const; - DataTypeSerDeSPtrs elemSerDeSPtrs; - Strings elemNames; + DataTypeSerDeSPtrs elem_serdes_ptrs; + Strings elem_names; }; } // namespace vectorized } // namespace doris diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp index beda6c88a9c..a11b25291e6 100644 --- a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp @@ -19,6 +19,7 @@ #include <gen_cpp/types.pb.h> #include <gtest/gtest-message.h> #include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> #include <math.h> #include <stdlib.h> #include <time.h> @@ -28,53 +29,284 @@ #include <string> #include <vector> +#include "common/status.h" #include "gtest/gtest_pred_impl.h" #include "olap/hll.h" #include "util/bitmap_value.h" #include "util/quantile_state.h" #include "vec/columns/column.h" +#include "vec/columns/column_array.h" #include "vec/columns/column_complex.h" #include "vec/columns/column_decimal.h" +#include "vec/columns/column_map.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" +#include "vec/columns/column_struct.h" #include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/core/block.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_bitmap.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_hll.h" #include "vec/data_types/data_type_ipv4.h" #include "vec/data_types/data_type_ipv6.h" +#include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" #include "vec/data_types/data_type_quantilestate.h" #include "vec/data_types/data_type_string.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/data_types/data_type_time_v2.h" #include "vec/data_types/serde/data_type_serde.h" namespace doris::vectorized { inline void column_to_pb(const DataTypePtr data_type, const IColumn& col, PValues* result) { const DataTypeSerDeSPtr serde = data_type->get_serde(); - static_cast<void>(serde->write_column_to_pb(col, *result, 0, col.size())); + Status st = serde->write_column_to_pb(col, *result, 0, col.size()); + if (!st.ok()) { + std::cerr << "column_to_pb error, maybe not impl it: " << st.msg() << " " + << data_type->get_name() << std::endl; + } } -inline void pb_to_column(const DataTypePtr data_type, PValues& result, IColumn& col) { +inline bool pb_to_column(const DataTypePtr data_type, PValues& result, IColumn& col) { auto serde = data_type->get_serde(); - static_cast<void>(serde->read_column_from_pb(col, result)); + Status st = serde->read_column_from_pb(col, result); + if (!st.ok()) { + std::cerr << "pb_to_column error, maybe not impl it: " << st.msg() << " " + << data_type->get_name() << std::endl; + return false; + } + return true; } -inline void check_pb_col(const DataTypePtr data_type, const IColumn& col) { +inline void check_pb_col(const DataTypePtr data_type, const IColumn& input_column) { PValues pv = PValues(); - column_to_pb(data_type, col, &pv); + column_to_pb(data_type, input_column, &pv); int s1 = pv.bytes_value_size(); + auto except_column = data_type->create_column(); + bool success_deserialized = pb_to_column(data_type, pv, *except_column); - auto col1 = data_type->create_column(); - pb_to_column(data_type, pv, *col1); PValues as_pv = PValues(); - column_to_pb(data_type, *col1, &as_pv); - + column_to_pb(data_type, *except_column, &as_pv); int s2 = as_pv.bytes_value_size(); EXPECT_EQ(s1, s2); + + Block block_out, block_input; + block_input.insert({input_column.get_ptr(), data_type, ""}); + std::string input_str = block_input.dump_data(); + block_out.insert({std::move(except_column), data_type, ""}); + std::string output_str = block_out.dump_data(); + //input column data should same as output column data of deserialize + if (success_deserialized) { + EXPECT_EQ(input_str, output_str); + } else { + EXPECT_TRUE(false); + } +} +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest2) { + std::cout << "==== double === " << std::endl; + // double + { + auto vec = vectorized::ColumnVector<Float64>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 10; ++i) { + data.push_back(i); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeFloat64>()); + check_pb_col(data_type, *vec.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest3) { + std::cout << "==== nullable_int32 === " << std::endl; + // nullable_int + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto null_map = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + auto& null_map_data = null_map->get_data(); + + for (int i = 0; i < 10; ++i) { + data.push_back(i); + null_map_data.push_back(i % 2); + } + auto nullable_column = + vectorized::ColumnNullable::create(std::move(vec), std::move(null_map)); + vectorized::DataTypePtr data_type = std::make_shared<vectorized::DataTypeInt32>(); + vectorized::DataTypePtr nullable_type = make_nullable(data_type); + + check_pb_col(nullable_type, *nullable_column.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest4) { + std::cout << "==== array<int32> === " << std::endl; + // array<int32> + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto null_map = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + auto& null_map_data = null_map->get_data(); + int rows = 10; + for (int i = 0; i < rows; ++i) { + data.push_back(i); + null_map_data.push_back(i % 2); + } + auto nullable_column = + vectorized::ColumnNullable::create(std::move(vec), std::move(null_map)); + auto offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + offset_column->get_data().push_back(3); + offset_column->get_data().push_back(rows); + /* + +-------------------------------+ + | [0, NULL, 2] | + |[NULL, 4, NULL, 6, NULL, 8, NULL]| + +-------------------------------+ + */ + auto array_column = vectorized::ColumnArray::create(std::move(nullable_column), + std::move(offset_column)); + + vectorized::DataTypePtr data_type = std::make_shared<vectorized::DataTypeInt32>(); + vectorized::DataTypePtr nullable_type = make_nullable(data_type); + vectorized::DataTypePtr array_type = + std::make_shared<vectorized::DataTypeArray>(nullable_type); + check_pb_col(array_type, *array_column.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest5) { + std::cout << "==== array<array<int32>> === " << std::endl; + // array<array<int32>> + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto null_map = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + auto& null_map_data = null_map->get_data(); + int rows = 10; + for (int i = 0; i < rows; ++i) { + data.push_back(i); + null_map_data.push_back(i % 2); + } + auto nullable_column = + vectorized::ColumnNullable::create(std::move(vec), std::move(null_map)); + auto offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + offset_column->get_data().push_back(rows); + //[0,1,2,3,.....9] + auto array_column = vectorized::ColumnArray::create(std::move(nullable_column), + std::move(offset_column)); + vectorized::DataTypePtr data_type = std::make_shared<vectorized::DataTypeInt32>(); + vectorized::DataTypePtr nullable_type = make_nullable(data_type); + vectorized::DataTypePtr array_type = + std::make_shared<vectorized::DataTypeArray>(nullable_type); + + auto out_offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + out_offset_column->get_data().push_back(1); + vectorized::DataTypePtr out_array_type = + std::make_shared<vectorized::DataTypeArray>(make_nullable(array_type)); + //[[0,1,2,3,.....9]] + auto out_array_column = vectorized::ColumnArray::create( + (make_nullable(std::move(array_column))), std::move(out_offset_column)); + check_pb_col(out_array_type, *out_array_column.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest6) { + std::cout << "==== array<array<int32>> === " << std::endl; + // array<array<int32>> + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto null_map = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + auto& null_map_data = null_map->get_data(); + int rows = 10; + for (int i = 0; i < rows; ++i) { + data.push_back(i); + null_map_data.push_back(i % 2); + } + auto nullable_column = + vectorized::ColumnNullable::create(std::move(vec), std::move(null_map)); + auto offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + offset_column->get_data().push_back(4); + offset_column->get_data().push_back(rows); + /* + +-------------------------------+ + | [0, NULL, 2, NULL] | + |[ 4, NULL, 6, NULL, 8, NULL] | + +-------------------------------+ + */ + auto array_column = vectorized::ColumnArray::create(std::move(nullable_column), + std::move(offset_column)); + vectorized::DataTypePtr data_type = std::make_shared<vectorized::DataTypeInt32>(); + vectorized::DataTypePtr nullable_type = make_nullable(data_type); + vectorized::DataTypePtr array_type = + std::make_shared<vectorized::DataTypeArray>(nullable_type); + auto out_offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + //[[0, NULL, 2, NULL], [4, NULL, 6, NULL, 8, NULL]] + out_offset_column->get_data().push_back(2); + vectorized::DataTypePtr out_array_type = + std::make_shared<vectorized::DataTypeArray>(make_nullable(array_type)); + auto null_array_column = make_nullable(std::move(array_column)); + + auto out_array_column = + vectorized::ColumnArray::create(null_array_column, std::move(out_offset_column)); + check_pb_col(out_array_type, *out_array_column.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest7) { + std::cout << "==== array<array<int32>> === " << std::endl; + // array<array<int32>> + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto null_map = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + auto& null_map_data = null_map->get_data(); + int rows = 10; + for (int i = 0; i < rows; ++i) { + data.push_back(i); + null_map_data.push_back(i % 2); + } + auto nullable_column = + vectorized::ColumnNullable::create(std::move(vec), std::move(null_map)); + auto offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + offset_column->get_data().push_back(4); + offset_column->get_data().push_back(rows); + /* + +-------------------------------+ + | [0, NULL, 2, NULL] | + |[ 4, NULL, 6, NULL, 8, NULL] | + +-------------------------------+ + */ + auto array_column = vectorized::ColumnArray::create(std::move(nullable_column), + std::move(offset_column)); + vectorized::DataTypePtr data_type = std::make_shared<vectorized::DataTypeInt32>(); + vectorized::DataTypePtr nullable_type = make_nullable(data_type); + vectorized::DataTypePtr array_type = + std::make_shared<vectorized::DataTypeArray>(nullable_type); + auto out_offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + /* asd Array(Nullable(Array(Nullable(Int32)))) Array(size = 2, UInt64(size = 2), Nullable(size = 2, Array(size = 2, UInt64(size = 2), Nullable(size = 10, Int32(size = 10), UInt8(size = 10))), UInt8(size = 2))) + +--------------------------------------------+ + |asd(Array(Nullable(Array(Nullable(Int32)))))| + +--------------------------------------------+ + | [[0, NULL, 2, NULL]]| + | [[4, NULL, 6, NULL, 8, NULL]]| + +--------------------------------------------+ + */ + out_offset_column->get_data().push_back(1); + out_offset_column->get_data().push_back(2); + vectorized::DataTypePtr out_array_type = + std::make_shared<vectorized::DataTypeArray>(make_nullable(array_type)); + auto null_array_column = make_nullable(std::move(array_column)); + + auto out_array_column = + vectorized::ColumnArray::create(null_array_column, std::move(out_offset_column)); + check_pb_col(out_array_type, *out_array_column.get()); + } } inline void serialize_and_deserialize_pb_test() { @@ -229,4 +461,244 @@ TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest) { serialize_and_deserialize_pb_test(); } -} // namespace doris::vectorized +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestMap) { + std::cout << "==== map<string, string> === " << std::endl; + DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr m = std::make_shared<DataTypeMap>(s, d); + Array k1, k2, v1, v2; + k1.push_back("null"); + k1.push_back("doris"); + k1.push_back("clever amory"); + v1.push_back("ss"); + v1.push_back(Null()); + v1.push_back("NULL"); + k2.push_back("hello amory"); + k2.push_back("NULL"); + k2.push_back("cute amory"); + k2.push_back("doris"); + v2.push_back("s"); + v2.push_back("0"); + v2.push_back("sf"); + v2.push_back(Null()); + Map m1, m2; + m1.push_back(k1); + m1.push_back(v1); + m2.push_back(k2); + m2.push_back(v2); + MutableColumnPtr map_column = m->create_column(); + map_column->reserve(2); + map_column->insert(m1); + map_column->insert(m2); + /* + +-----------------------------------------+ + |(Map(Nullable(String), Nullable(String))) | + +-----------------------------------------+ + |{"null":"ss", "doris":null, "clever amory":"NULL"} | + |{"hello amory":"s", "NULL":"0", "cute amory":"sf", "doris":null}| + +-----------------------------------------+ + */ + + vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), m, ""); + Block block; + block.insert(type_and_name); + check_pb_col(m, *map_column.get()); +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestMap2) { + std::cout << "==== map<string,map<string, string>> === " << std::endl; + DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr m = std::make_shared<DataTypeMap>(s, d); + Array k1, k2, v1, v2; + k1.push_back("null"); + k1.push_back("doris"); + k1.push_back("clever amory"); + v1.push_back("ss"); + v1.push_back(Null()); + v1.push_back("NULL"); + k2.push_back("hello amory"); + k2.push_back("NULL"); + k2.push_back("cute amory"); + k2.push_back("doris"); + v2.push_back("s"); + v2.push_back("0"); + v2.push_back("sf"); + v2.push_back(Null()); + Map m1, m2; + m1.push_back(k1); + m1.push_back(v1); + m2.push_back(k2); + m2.push_back(v2); + MutableColumnPtr map_column = m->create_column(); + map_column->reserve(2); + map_column->insert(m1); + map_column->insert(m2); + /* + +-----------------------------------------+ + |(Map(Nullable(String), Nullable(String))) | + +-----------------------------------------+ + |{"null":"ss", "doris":null, "clever amory":"NULL"} | + |{"hello amory":"s", "NULL":"0", "cute amory":"sf", "doris":null}| + +-----------------------------------------+ + */ + + DataTypePtr outer_string_type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + auto outer_string_key_column = outer_string_type->create_column(); + std::string str1 = "nested_complex_map_1"; + std::string str2 = "nested_complex_map_2"; + outer_string_key_column->insert_data(str1.c_str(), str1.size()); + outer_string_key_column->insert_data(str2.c_str(), str2.size()); + + auto outer_offset_column = vectorized::ColumnArray::ColumnOffsets::create(); + outer_offset_column->get_data().push_back(1); + outer_offset_column->get_data().push_back(2); + DataTypePtr outer_type = std::make_shared<DataTypeMap>(s, m); + + /* + outer_map_column: Map(Nullable(String), Map(Nullable(String), Nullable(String))) + +----------------------------------------------------------------+ + |(Map(Nullable(String), Map(Nullable(String), Nullable(String))))| + +----------------------------------------------------------------+ + |{"nested_complex_map_1":{"null":"ss", "doris":null, "clever amory":"NULL"}}| + |{"nested_complex_map_2":{"hello amory":"s", "NULL":"0", "cute amory":"sf", "doris":null}}| + +----------------------------------------------------------------+ + */ + auto outer_map_column = + vectorized::ColumnMap::create(std::move(outer_string_key_column), std::move(map_column), + std::move(outer_offset_column)); + + vectorized::ColumnWithTypeAndName type_and_name_outer(outer_map_column->get_ptr(), outer_type, + ""); + Block block2; + block2.insert(type_and_name_outer); + check_pb_col(outer_type, *outer_map_column.get()); +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestStruct) { + std::cout << "==== struct<string, int64, uint8> === " << std::endl; + DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>()); + DataTypePtr m = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()); + DataTypePtr st = std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m}); + Tuple t1, t2; + t1.push_back(String("amory cute")); + t1.push_back(__int128_t(37)); + t1.push_back(true); + t2.push_back("null"); + t2.push_back(__int128_t(26)); + t2.push_back(false); + MutableColumnPtr struct_column = st->create_column(); + struct_column->reserve(2); + struct_column->insert(t1); + struct_column->insert(t2); + /* + +-------------------------------------------------------------------+ + |(Struct(1:Nullable(String), 2:Nullable(Int128), 3:Nullable(UInt8)))| + +-------------------------------------------------------------------+ + | {amory cute, 37, 1}| + | {null, 26, 0}| + +-------------------------------------------------------------------+ + */ + vectorized::ColumnWithTypeAndName type_and_name(struct_column->get_ptr(), st, ""); + Block block; + block.insert(type_and_name); + check_pb_col(st, *struct_column.get()); +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestStruct2) { + std::cout << "==== struct<string,struct<string, int64, uint8>> === " << std::endl; + DataTypePtr s = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()); + DataTypePtr m = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()); + DataTypePtr st = std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {s, d, m}); + Tuple t1, t2; + t1.push_back(String("amory cute")); + t1.push_back(37); + t1.push_back(true); + t2.push_back("null"); + t2.push_back(26); + t2.push_back(false); + MutableColumnPtr struct_column = st->create_column(); + struct_column->reserve(2); + struct_column->insert(t1); + struct_column->insert(t2); + + DataTypePtr string_type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + DataTypePtr outer_struct = + std::make_shared<DataTypeStruct>(std::vector<DataTypePtr> {string_type, st}); + auto outer_string_key_column = string_type->create_column(); + std::string str1 = "nested_complex_struct_1"; + std::string str2 = "nested_complex_struct_2"; + outer_string_key_column->insert_data(str1.c_str(), str1.size()); + outer_string_key_column->insert_data(str2.c_str(), str2.size()); + + std::vector<ColumnPtr> vector_columns; + vector_columns.emplace_back(outer_string_key_column->get_ptr()); + vector_columns.emplace_back(struct_column->get_ptr()); + auto outer_struct_column = ColumnStruct::create(vector_columns); + /* + +-------------------------------------------------------------------------------------------------+ + |(Struct(1:Nullable(String), 2:Struct(1:Nullable(String), 2:Nullable(Int128), 3:Nullable(UInt8))))| + +-------------------------------------------------------------------------------------------------+ + | {nested_complex_struct_1, {amory cute, 37, 1}}| + | {nested_complex_struct_2, {null, 26, 0}}| + +-------------------------------------------------------------------------------------------------+ + */ + vectorized::ColumnWithTypeAndName type_and_name(outer_struct_column->get_ptr(), outer_struct, + ""); + Block block; + block.insert(type_and_name); + check_pb_col(outer_struct, *outer_struct_column.get()); +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestDateTime) { + std::cout << "==== datetime === " << std::endl; + // datetime + { + auto vec = vectorized::ColumnDateTimeV2::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 10; ++i) { + uint16_t year = 2022; + uint8_t month = 5; + uint8_t day = 24; + uint8_t hour = 12; + uint8_t minute = i; + uint8_t second = 0; + uint32_t microsecond = 123000; + auto value = ((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) | + ((uint64_t)day << 37) | ((uint64_t)hour << 32) | + ((uint64_t)minute << 26) | ((uint64_t)second << 20) | + (uint64_t)microsecond)); + DateV2Value<DateTimeV2ValueType> datetime_v2; + datetime_v2.from_datetime(value); + auto datetime_val = binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(datetime_v2); + data.push_back(datetime_val); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeDateTimeV2>(6)); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, ""); + Block block; + block.insert(type_and_name); + check_pb_col(data_type, *vec.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestLargeInt) { + std::cout << "==== LargeInt === " << std::endl; + // LargeInt + { + auto vec = vectorized::ColumnVector<Int128>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 10; ++i) { + data.push_back(500000000000 + i); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt128>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, ""); + Block block; + block.insert(type_and_name); + check_pb_col(data_type, *vec.get()); + } +} +} // namespace doris::vectorized \ No newline at end of file diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index faa34b12153..b6579323aae 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -602,8 +602,10 @@ message PPublishFilterResponse { message PExprResult { required PScalarType type = 1; - required string content = 2; + required string content = 2; //maybe need remove after next version of 2.1 required bool success = 3; + optional PTypeDesc type_desc = 4; + optional PValues result_content = 5; }; message PExprResultMap { diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index 41bb3373f23..012434dc3bc 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -188,6 +188,8 @@ message PValues { repeated string string_value = 11; repeated bytes bytes_value = 12; repeated PDateTime datetime_value = 13; + repeated PValues child_element = 14; + repeated int64 child_offset = 15; } // this mesage may not used for now --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org