This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e9a4cbcdf9 [Refact](type system) refact column with arrow serde (#19091) e9a4cbcdf9 is described below commit e9a4cbcdf9442e3577e54e5742d9c26335453551 Author: amory <wangqian...@selectdb.com> AuthorDate: Thu May 4 15:28:46 2023 +0800 [Refact](type system) refact column with arrow serde (#19091) * refact arrow serde * add date serde * update arrow and fix nullable and date type --- be/src/util/arrow/block_convertor.cpp | 6 +- be/src/vec/CMakeLists.txt | 3 + be/src/vec/columns/column.h | 1 + be/src/vec/data_types/data_type_date.h | 3 + be/src/vec/data_types/data_type_date_time.h | 3 + be/src/vec/data_types/data_type_time_v2.h | 6 +- .../vec/data_types/serde/data_type_array_serde.cpp | 40 +++ .../vec/data_types/serde/data_type_array_serde.h | 6 + .../vec/data_types/serde/data_type_bitmap_serde.h | 9 + .../data_types/serde/data_type_date64_serde.cpp | 111 +++++++ ...pe_array_serde.cpp => data_type_date64_serde.h} | 41 +-- .../serde/data_type_datetimev2_serde.cpp | 49 +++ ..._array_serde.h => data_type_datetimev2_serde.h} | 42 ++- .../data_types/serde/data_type_datev2_serde.cpp | 68 +++++ ...pe_array_serde.cpp => data_type_datev2_serde.h} | 41 +-- .../data_types/serde/data_type_decimal_serde.cpp | 133 ++++++++- .../vec/data_types/serde/data_type_decimal_serde.h | 6 + .../serde/data_type_fixedlengthobject_serde.h | 9 + .../vec/data_types/serde/data_type_hll_serde.cpp | 21 ++ be/src/vec/data_types/serde/data_type_hll_serde.h | 7 + .../vec/data_types/serde/data_type_map_serde.cpp | 12 + be/src/vec/data_types/serde/data_type_map_serde.h | 5 + .../data_types/serde/data_type_nullable_serde.cpp | 37 +++ .../data_types/serde/data_type_nullable_serde.h | 5 + .../data_types/serde/data_type_number_serde.cpp | 110 ++++++- .../vec/data_types/serde/data_type_number_serde.h | 12 + .../vec/data_types/serde/data_type_object_serde.h | 10 + .../serde/data_type_quantilestate_serde.h | 9 + be/src/vec/data_types/serde/data_type_serde.h | 24 ++ .../data_types/serde/data_type_string_serde.cpp | 60 ++++ .../vec/data_types/serde/data_type_string_serde.h | 6 + .../data_types/serde/data_type_struct_serde.cpp | 12 + .../vec/data_types/serde/data_type_struct_serde.h | 6 + be/src/vec/utils/arrow_column_to_doris_column.cpp | 329 +-------------------- be/test/CMakeLists.txt | 3 +- .../serde/data_type_serde_arrow_test.cpp | 321 ++++++++++++++++++++ .../data_types/serde/data_type_serde_pb_test.cpp | 200 +++++++++++++ 37 files changed, 1375 insertions(+), 391 deletions(-) diff --git a/be/src/util/arrow/block_convertor.cpp b/be/src/util/arrow/block_convertor.cpp index 2b426ca2f2..89a1e09a73 100644 --- a/be/src/util/arrow/block_convertor.cpp +++ b/be/src/util/arrow/block_convertor.cpp @@ -389,10 +389,8 @@ Status FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) { return to_status(arrow_st); } _cur_builder = builder.get(); - arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this); - if (!arrow_st.ok()) { - return to_status(arrow_st); - } + _cur_type->get_serde()->write_column_to_arrow(*_cur_col, nullptr, _cur_builder, _cur_start, + _cur_start + _cur_rows); arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]); if (!arrow_st.ok()) { return to_status(arrow_st); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 6c82623816..086a301456 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -87,6 +87,9 @@ set(VEC_FILES data_types/serde/data_type_array_serde.cpp data_types/serde/data_type_struct_serde.cpp data_types/serde/data_type_number_serde.cpp + data_types/serde/data_type_datev2_serde.cpp + data_types/serde/data_type_datetimev2_serde.cpp + data_types/serde/data_type_date64_serde.cpp data_types/serde/data_type_string_serde.cpp data_types/serde/data_type_decimal_serde.cpp data_types/serde/data_type_object_serde.cpp diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index e30c35ba31..8bf83b533c 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -148,6 +148,7 @@ public: virtual void set_rowset_segment_id(std::pair<RowsetId, uint32_t> rowset_segment_id) {} virtual std::pair<RowsetId, uint32_t> get_rowset_segment_id() const { return {}; } + // todo(Amory) from column to get data type is not correct ,column is memory data,can not to assume memory data belong to which data type virtual TypeIndex get_data_type() const { LOG(FATAL) << "Cannot get_data_type() column " << get_name(); __builtin_unreachable(); diff --git a/be/src/vec/data_types/data_type_date.h b/be/src/vec/data_types/data_type_date.h index 6648932d5f..54e45e63d3 100644 --- a/be/src/vec/data_types/data_type_date.h +++ b/be/src/vec/data_types/data_type_date.h @@ -32,6 +32,7 @@ #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_number_base.h" +#include "vec/data_types/serde/data_type_date64_serde.h" namespace doris { namespace vectorized { @@ -64,6 +65,8 @@ public: static void cast_to_date(Int64& x); MutableColumnPtr create_column() const override; + + DataTypeSerDeSPtr get_serde() const override { return std::make_shared<DataTypeDate64SerDe>(); } }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_date_time.h b/be/src/vec/data_types/data_type_date_time.h index c4056f3612..98aa4b26be 100644 --- a/be/src/vec/data_types/data_type_date_time.h +++ b/be/src/vec/data_types/data_type_date_time.h @@ -32,6 +32,7 @@ #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_number_base.h" +#include "vec/data_types/serde/data_type_date64_serde.h" namespace doris { namespace vectorized { @@ -84,6 +85,8 @@ public: std::string to_string(const IColumn& column, size_t row_num) const override; + DataTypeSerDeSPtr get_serde() const override { return std::make_shared<DataTypeDate64SerDe>(); } + void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; diff --git a/be/src/vec/data_types/data_type_time_v2.h b/be/src/vec/data_types/data_type_time_v2.h index 8872eeafc0..336048b39a 100644 --- a/be/src/vec/data_types/data_type_time_v2.h +++ b/be/src/vec/data_types/data_type_time_v2.h @@ -34,6 +34,8 @@ #include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_number_base.h" +#include "vec/data_types/serde/data_type_datetimev2_serde.h" +#include "vec/data_types/serde/data_type_datev2_serde.h" #include "vec/data_types/serde/data_type_number_serde.h" #include "vec/data_types/serde/data_type_serde.h" @@ -66,6 +68,8 @@ public: bool can_be_used_as_version() const override { return true; } bool can_be_inside_nullable() const override { return true; } + DataTypeSerDeSPtr get_serde() const override { return std::make_shared<DataTypeDateV2SerDe>(); } + bool equals(const IDataType& rhs) const override; std::string to_string(const IColumn& column, size_t row_num) const override; void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; @@ -113,7 +117,7 @@ public: void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override; Status from_string(ReadBuffer& rb, IColumn* column) const override; DataTypeSerDeSPtr get_serde() const override { - return std::make_shared<DataTypeNumberSerDe<UInt64>>(); + return std::make_shared<DataTypeDateTimeV2SerDe>(); }; MutableColumnPtr create_column() const override; diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_array_serde.cpp index 8ae9c61858..15c1a4f683 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp @@ -17,8 +17,13 @@ #include "data_type_array_serde.h" +#include <arrow/array/builder_nested.h> + +#include "gutil/casts.h" #include "util/jsonb_document.h" #include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" namespace doris { @@ -43,5 +48,40 @@ void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbVa column.deserialize_and_insert_from_arena(blob->getBlob()); } +void DataTypeArraySerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + auto& array_column = static_cast<const ColumnArray&>(column); + auto& offsets = array_column.get_offsets(); + auto& nested_data = array_column.get_data(); + auto& builder = assert_cast<arrow::ListBuilder&>(*array_builder); + auto nested_builder = builder.value_builder(); + for (size_t array_idx = start; array_idx < end; ++array_idx) { + checkArrowStatus(builder.Append(), column.get_name(), array_builder->type()->name()); + nested_serde->write_column_to_arrow(nested_data, null_map, nested_builder, + offsets[array_idx - 1], offsets[array_idx]); + } +} + +void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + auto& column_array = static_cast<ColumnArray&>(column); + auto& offsets_data = column_array.get_offsets(); + auto concrete_array = down_cast<const arrow::ListArray*>(arrow_array); + auto arrow_offsets_array = concrete_array->offsets(); + auto arrow_offsets = down_cast<arrow::Int32Array*>(arrow_offsets_array.get()); + auto prev_size = offsets_data.back(); + auto arrow_nested_start_offset = arrow_offsets->Value(start); + auto arrow_nested_end_offset = arrow_offsets->Value(end); + for (int64_t i = start + 1; i < end + 1; ++i) { + // convert to doris offset, start from offsets.back() + offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - arrow_nested_start_offset); + } + return nested_serde->read_column_from_arrow( + column_array.get_data(), concrete_array->values().get(), arrow_nested_start_offset, + arrow_nested_end_offset, ctz); +} + } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_array_serde.h index cf28e33728..e8d08bbf10 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.h +++ b/be/src/vec/data_types/serde/data_type_array_serde.h @@ -51,6 +51,12 @@ public: void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_bytemap, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; + private: DataTypeSerDeSPtr nested_serde; }; diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h index 23a2a689d9..dc3dd8f096 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -41,6 +41,15 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_bytemap, + arrow::ArrayBuilder* array_builder, int start, + int end) const override { + LOG(FATAL) << "Not support write bitmap column to arrow"; + } + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override { + LOG(FATAL) << "Not support read bitmap column from arrow"; + } }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp new file mode 100644 index 0000000000..1f6a3766f5 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_date64_serde.h" + +#include <arrow/builder.h> + +#include <type_traits> + +#include "gutil/casts.h" + +namespace doris { +namespace vectorized { + +void DataTypeDate64SerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + auto& col_data = static_cast<const ColumnVector<Int64>&>(column).get_data(); + auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder); + for (size_t i = start; i < end; ++i) { + char buf[64]; + const vectorized::VecDateTimeValue* time_val = + (const vectorized::VecDateTimeValue*)(&col_data[i]); + int len = time_val->to_buffer(buf); + if (null_map && null_map[i]) { + checkArrowStatus(string_builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + checkArrowStatus(string_builder.Append(buf, len), column.get_name(), + array_builder->type()->name()); + } + } +} + +static int64_t time_unit_divisor(arrow::TimeUnit::type unit) { + // Doris only supports seconds + switch (unit) { + case arrow::TimeUnit::type::SECOND: { + return 1L; + } + case arrow::TimeUnit::type::MILLI: { + return 1000L; + } + case arrow::TimeUnit::type::MICRO: { + return 1000000L; + } + case arrow::TimeUnit::type::NANO: { + return 1000000000L; + } + default: + return 0L; + } +} + +void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + auto& col_data = static_cast<ColumnVector<Int64>&>(column).get_data(); + int64_t divisor = 1; + int64_t multiplier = 1; + if (arrow_array->type()->id() == arrow::Type::DATE64) { + auto concrete_array = down_cast<const arrow::Date64Array*>(arrow_array); + divisor = 1000; //ms => secs + for (size_t value_i = start; value_i < end; ++value_i) { + VecDateTimeValue v; + v.from_unixtime( + static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, ctz); + col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v)); + } + } else if (arrow_array->type()->id() == arrow::Type::TIMESTAMP) { + auto concrete_array = down_cast<const arrow::TimestampArray*>(arrow_array); + const auto type = std::static_pointer_cast<arrow::TimestampType>(arrow_array->type()); + divisor = time_unit_divisor(type->unit()); + if (divisor == 0L) { + LOG(FATAL) << "Invalid Time Type:" << type->name(); + } + for (size_t value_i = start; value_i < end; ++value_i) { + VecDateTimeValue v; + v.from_unixtime( + static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, ctz); + col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v)); + } + } else if (arrow_array->type()->id() == arrow::Type::DATE32) { + auto concrete_array = down_cast<const arrow::Date32Array*>(arrow_array); + multiplier = 24 * 60 * 60; // day => secs + for (size_t value_i = start; value_i < end; ++value_i) { + // std::cout << "serde : " << concrete_array->Value(value_i) << std::endl; + VecDateTimeValue v; + v.from_unixtime( + static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, ctz); + v.cast_to_date(); + col_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v)); + } + } +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.h similarity index 54% copy from be/src/vec/data_types/serde/data_type_array_serde.cpp copy to be/src/vec/data_types/serde/data_type_date64_serde.h index 8ae9c61858..80a0a1518c 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.h @@ -15,33 +15,38 @@ // specific language governing permissions and limitations // under the License. -#include "data_type_array_serde.h" +#pragma once +#include <gen_cpp/types.pb.h> +#include <glog/logging.h> +#include <stddef.h> +#include <stdint.h> + +#include <ostream> +#include <string> + +#include "common/status.h" +#include "data_type_number_serde.h" +#include "olap/olap_common.h" #include "util/jsonb_document.h" +#include "util/jsonb_writer.h" #include "vec/columns/column.h" +#include "vec/columns/column_vector.h" #include "vec/common/string_ref.h" +#include "vec/core/types.h" namespace doris { +class JsonbOutStream; namespace vectorized { class Arena; -void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, - Arena* mem_pool, int32_t col_id, - int row_num) const { - result.writeKey(col_id); - const char* begin = nullptr; - // maybe serialize_value_into_arena should move to here later. - StringRef value = column.serialize_value_into_arena(row_num, *mem_pool, begin); - result.writeStartBinary(); - result.writeBinary(value.data, value.size); - result.writeEndBinary(); -} - -void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { - auto blob = static_cast<const JsonbBlobVal*>(arg); - column.deserialize_and_insert_from_arena(blob->getBlob()); -} - +class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> { + void write_column_to_arrow(const IColumn& column, const UInt8* null_bytemap, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; +}; } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp new file mode 100644 index 0000000000..a8476197dd --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_datetimev2_serde.h" + +#include <arrow/builder.h> + +#include <type_traits> + +#include "gutil/casts.h" + +namespace doris { +namespace vectorized { + +void DataTypeDateTimeV2SerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + auto& col_data = static_cast<const ColumnVector<UInt64>&>(column).get_data(); + auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder); + for (size_t i = start; i < end; ++i) { + char buf[64]; + const vectorized::DateV2Value<vectorized::DateTimeV2ValueType>* time_val = + (const vectorized::DateV2Value<vectorized::DateTimeV2ValueType>*)(col_data[i]); + int len = time_val->to_buffer(buf); + if (null_map && null_map[i]) { + checkArrowStatus(string_builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + checkArrowStatus(string_builder.Append(buf, len), column.get_name(), + array_builder->type()->name()); + } + } +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h similarity index 53% copy from be/src/vec/data_types/serde/data_type_array_serde.h copy to be/src/vec/data_types/serde/data_type_datetimev2_serde.h index cf28e33728..7302a1d4d5 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.h +++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.h @@ -17,42 +17,38 @@ #pragma once +#include <gen_cpp/types.pb.h> #include <glog/logging.h> +#include <stddef.h> #include <stdint.h> #include <ostream> +#include <string> #include "common/status.h" -#include "data_type_serde.h" +#include "data_type_number_serde.h" +#include "olap/olap_common.h" +#include "util/jsonb_document.h" #include "util/jsonb_writer.h" +#include "vec/columns/column.h" +#include "vec/columns/column_vector.h" +#include "vec/common/string_ref.h" +#include "vec/core/types.h" namespace doris { -class PValues; -class JsonbValue; +class JsonbOutStream; namespace vectorized { -class IColumn; class Arena; -class DataTypeArraySerDe : public DataTypeSerDe { -public: - DataTypeArraySerDe(const DataTypeSerDeSPtr& _nested_serde) : nested_serde(_nested_serde) {} - - Status write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const override { - LOG(FATAL) << "Not support write array column to pb"; - } - Status read_column_from_pb(IColumn& column, const PValues& arg) const override { - LOG(FATAL) << "Not support read from pb to array"; +class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> { + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override { + LOG(FATAL) << "not support read arrow array to uint64 column"; } - - void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, - int32_t col_id, int row_num) const override; - - void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; - -private: - DataTypeSerDeSPtr nested_serde; }; } // namespace vectorized -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_datev2_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp new file mode 100644 index 0000000000..4095c8d872 --- /dev/null +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.cpp @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "data_type_datev2_serde.h" + +#include <arrow/builder.h> + +#include <type_traits> + +#include "gutil/casts.h" + +namespace doris { +namespace vectorized { + +void DataTypeDateV2SerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + auto& col_data = static_cast<const ColumnVector<UInt32>&>(column).get_data(); + auto& string_builder = assert_cast<arrow::StringBuilder&>(*array_builder); + for (size_t i = start; i < end; ++i) { + char buf[64]; + const vectorized::DateV2Value<vectorized::DateV2ValueType>* time_val = + (const vectorized::DateV2Value<vectorized::DateV2ValueType>*)(&col_data[i]); + int len = time_val->to_buffer(buf); + if (null_map && null_map[i]) { + checkArrowStatus(string_builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + checkArrowStatus(string_builder.Append(buf, len), column.get_name(), + array_builder->type()->name()); + } + } +} + +void DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + std::cout << "column : " << column.get_name() << " data" << getTypeName(column.get_data_type()) + << " array " << arrow_array->type_id() << std::endl; + auto& col_data = static_cast<ColumnVector<UInt32>&>(column).get_data(); + auto concrete_array = down_cast<const arrow::Date64Array*>(arrow_array); + int64_t divisor = 1; + int64_t multiplier = 1; + + multiplier = 24 * 60 * 60; // day => secs + for (size_t value_i = start; value_i < end; ++value_i) { + DateV2Value<DateV2ValueType> v; + v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, + ctz); + col_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, UInt32>(v)); + } +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp b/be/src/vec/data_types/serde/data_type_datev2_serde.h similarity index 54% copy from be/src/vec/data_types/serde/data_type_array_serde.cpp copy to be/src/vec/data_types/serde/data_type_datev2_serde.h index 8ae9c61858..587f2be0c2 100644 --- a/be/src/vec/data_types/serde/data_type_array_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_datev2_serde.h @@ -15,33 +15,38 @@ // specific language governing permissions and limitations // under the License. -#include "data_type_array_serde.h" +#pragma once +#include <gen_cpp/types.pb.h> +#include <glog/logging.h> +#include <stddef.h> +#include <stdint.h> + +#include <ostream> +#include <string> + +#include "common/status.h" +#include "data_type_number_serde.h" +#include "olap/olap_common.h" #include "util/jsonb_document.h" +#include "util/jsonb_writer.h" #include "vec/columns/column.h" +#include "vec/columns/column_vector.h" #include "vec/common/string_ref.h" +#include "vec/core/types.h" namespace doris { +class JsonbOutStream; namespace vectorized { class Arena; -void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, - Arena* mem_pool, int32_t col_id, - int row_num) const { - result.writeKey(col_id); - const char* begin = nullptr; - // maybe serialize_value_into_arena should move to here later. - StringRef value = column.serialize_value_into_arena(row_num, *mem_pool, begin); - result.writeStartBinary(); - result.writeBinary(value.data, value.size); - result.writeEndBinary(); -} - -void DataTypeArraySerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { - auto blob = static_cast<const JsonbBlobVal*>(arg); - column.deserialize_and_insert_from_arena(blob->getBlob()); -} - +class DataTypeDateV2SerDe : public DataTypeNumberSerDe<UInt32> { + void write_column_to_arrow(const IColumn& column, const UInt8* null_bytemap, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; +}; } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp index 6e6fa5f4d3..893e10769a 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.cpp @@ -16,7 +16,138 @@ // under the License. #include "data_type_decimal_serde.h" + +#include <arrow/array/array_base.h> +#include <arrow/array/array_decimal.h> +#include <arrow/builder.h> +#include <arrow/util/decimal.h> + +#include "arrow/type.h" +#include "gutil/casts.h" +#include "vec/columns/column_decimal.h" +#include "vec/common/arithmetic_overflow.h" + namespace doris { -namespace vectorized {} // namespace vectorized +namespace vectorized { + +template <typename T> +void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + auto& col = reinterpret_cast<const ColumnDecimal<T>&>(column); + auto& builder = reinterpret_cast<arrow::Decimal128Builder&>(*array_builder); + if constexpr (std::is_same_v<T, Decimal<Int128>>) { + std::shared_ptr<arrow::DataType> s_decimal_ptr = + std::make_shared<arrow::Decimal128Type>(27, 9); + for (size_t i = start; i < end; ++i) { + if (null_map && null_map[i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + const auto& data_ref = col.get_data_at(i); + const PackedInt128* p_value = reinterpret_cast<const PackedInt128*>(data_ref.data); + int64_t high = (p_value->value) >> 64; + uint64 low = p_value->value; + arrow::Decimal128 value(high, low); + checkArrowStatus(builder.Append(value), column.get_name(), + array_builder->type()->name()); + } + } else if constexpr (std::is_same_v<T, Decimal<Int128I>>) { + std::shared_ptr<arrow::DataType> s_decimal_ptr = + std::make_shared<arrow::Decimal128Type>(38, col.get_scale()); + for (size_t i = start; i < end; ++i) { + if (null_map && null_map[i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + const auto& data_ref = col.get_data_at(i); + const PackedInt128* p_value = reinterpret_cast<const PackedInt128*>(data_ref.data); + int64_t high = (p_value->value) >> 64; + uint64 low = p_value->value; + arrow::Decimal128 value(high, low); + checkArrowStatus(builder.Append(value), column.get_name(), + array_builder->type()->name()); + } + } else if constexpr (std::is_same_v<T, Decimal<Int32>>) { + std::shared_ptr<arrow::DataType> s_decimal_ptr = + std::make_shared<arrow::Decimal128Type>(8, col.get_scale()); + for (size_t i = start; i < end; ++i) { + if (null_map && null_map[i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + const auto& data_ref = col.get_data_at(i); + const int32_t* p_value = reinterpret_cast<const int32_t*>(data_ref.data); + int64_t high = *p_value > 0 ? 0 : 1UL << 63; + arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value); + checkArrowStatus(builder.Append(value), column.get_name(), + array_builder->type()->name()); + } + } else if constexpr (std::is_same_v<T, Decimal<Int64>>) { + std::shared_ptr<arrow::DataType> s_decimal_ptr = + std::make_shared<arrow::Decimal128Type>(18, col.get_scale()); + for (size_t i = start; i < end; ++i) { + if (null_map && null_map[i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + const auto& data_ref = col.get_data_at(i); + const int64_t* p_value = reinterpret_cast<const int64_t*>(data_ref.data); + int64_t high = *p_value > 0 ? 0 : 1UL << 63; + arrow::Decimal128 value(high, *p_value > 0 ? *p_value : -*p_value); + checkArrowStatus(builder.Append(value), column.get_name(), + array_builder->type()->name()); + } + } else { + LOG(FATAL) << "Not support write " << column.get_name() << " to arrow"; + } +} + +template <typename T> +void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column, + const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const { + if constexpr (std::is_same_v<T, Decimal<Int128>>) { + auto& column_data = static_cast<ColumnDecimal<vectorized::Decimal128>&>(column).get_data(); + auto concrete_array = down_cast<const arrow::DecimalArray*>(arrow_array); + const auto* arrow_decimal_type = + static_cast<const arrow::DecimalType*>(arrow_array->type().get()); + // TODO check precision + const auto scale = arrow_decimal_type->scale(); + for (size_t value_i = start; value_i < end; ++value_i) { + auto value = *reinterpret_cast<const vectorized::Decimal128*>( + concrete_array->Value(value_i)); + // convert scale to 9; + if (9 > scale) { + using MaxNativeType = typename Decimal128::NativeType; + MaxNativeType converted_value = common::exp10_i128(9 - scale); + if (common::mul_overflow(static_cast<MaxNativeType>(value), converted_value, + converted_value)) { + VLOG_DEBUG << "Decimal convert overflow"; + value = converted_value < 0 + ? std::numeric_limits<typename Decimal128 ::NativeType>::min() + : std::numeric_limits<typename Decimal128 ::NativeType>::max(); + } else { + value = converted_value; + } + } else if (9 < scale) { + value = value / common::exp10_i128(scale - 9); + } + column_data.emplace_back(value); + } + } else { + LOG(FATAL) << "Not support read " << column.get_name() << " from arrow"; + } +} + +template class DataTypeDecimalSerDe<Decimal32>; +template class DataTypeDecimalSerDe<Decimal64>; +template class DataTypeDecimalSerDe<Decimal128>; +template class DataTypeDecimalSerDe<Decimal128I>; +} // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_decimal_serde.h b/be/src/vec/data_types/serde/data_type_decimal_serde.h index 79293e9dd8..0e9685d936 100644 --- a/be/src/vec/data_types/serde/data_type_decimal_serde.h +++ b/be/src/vec/data_types/serde/data_type_decimal_serde.h @@ -54,6 +54,12 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; }; template <typename T> diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h index 7d5db6c041..af02dc9824 100644 --- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h +++ b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h @@ -51,6 +51,15 @@ public: void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override { LOG(FATAL) << "Not support read from jsonb to FixedLengthObject"; } + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override { + LOG(FATAL) << "Not support write FixedLengthObject column to arrow"; + } + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override { + LOG(FATAL) << "Not support read FixedLengthObject column from arrow"; + } }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index b7e1afb3a7..a3fc40aebc 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -23,6 +23,7 @@ #include <string> +#include "arrow/array/builder_binary.h" #include "olap/hll.h" #include "util/jsonb_document.h" #include "util/slice.h" @@ -79,5 +80,25 @@ void DataTypeHLLSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValu col.insert_value(hyper_log_log); } +void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + const auto& col = assert_cast<const ColumnHLL&>(column); + auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && null_map[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& hll_value = const_cast<HyperLogLog&>(col.get_element(string_i)); + std::string memory_buffer(hll_value.max_serialized_size(), '0'); + hll_value.serialize((uint8_t*)memory_buffer.data()); + checkArrowStatus( + builder.Append(memory_buffer.data(), static_cast<int>(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } +} + } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h b/be/src/vec/data_types/serde/data_type_hll_serde.h index facd6aaa72..9a47f8fbd7 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.h +++ b/be/src/vec/data_types/serde/data_type_hll_serde.h @@ -41,6 +41,13 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override { + LOG(FATAL) << "Not support read hll column from arrow"; + } }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_map_serde.cpp b/be/src/vec/data_types/serde/data_type_map_serde.cpp index 9f430e3198..87dd4e5615 100644 --- a/be/src/vec/data_types/serde/data_type_map_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_map_serde.cpp @@ -40,5 +40,17 @@ void DataTypeMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWrite result.writeBinary(value.data, value.size); result.writeEndBinary(); } + +void DataTypeMapSerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + LOG(FATAL) << "Not support write " << column.get_name() << " to arrow"; +} + +void DataTypeMapSerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + LOG(FATAL) << "Not support read " << column.get_name() << " from arrow"; +} } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_map_serde.h b/be/src/vec/data_types/serde/data_type_map_serde.h index dfbe55cf19..4708e36afd 100644 --- a/be/src/vec/data_types/serde/data_type_map_serde.h +++ b/be/src/vec/data_types/serde/data_type_map_serde.h @@ -50,6 +50,11 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; private: DataTypeSerDeSPtr key_serde; diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp index 8072c785c9..0c7e72bfda 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp @@ -17,6 +17,7 @@ #include "data_type_nullable_serde.h" +#include <arrow/array/array_base.h> #include <gen_cpp/types.pb.h> #include <algorithm> @@ -94,5 +95,41 @@ void DataTypeNullableSerDe::read_one_cell_from_jsonb(IColumn& column, const Json auto& null_map_data = col.get_null_map_data(); null_map_data.push_back(0); } + +/**nullable serialize to arrow + 1/ convert the null_map from doris to arrow null byte map + 2/ pass the arrow null byteamp to nested column , and call AppendValues +**/ +void DataTypeNullableSerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + const auto& column_nullable = assert_cast<const ColumnNullable&>(column); + const PaddedPODArray<UInt8>& bytemap = column_nullable.get_null_map_data(); + PaddedPODArray<UInt8> res; + if (column_nullable.has_null()) { + res.reserve(end - start); + for (size_t i = start; i < end; ++i) { + res.emplace_back( + !(bytemap)[i]); //Invert values since Arrow interprets 1 as a non-null value + } + } + const UInt8* arrow_null_bytemap_raw_ptr = res.empty() ? nullptr : res.data(); + nested_serde->write_column_to_arrow(column_nullable.get_nested_column(), + arrow_null_bytemap_raw_ptr, array_builder, start, end); +} + +void DataTypeNullableSerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + auto& col = reinterpret_cast<ColumnNullable&>(column); + NullMap& map_data = col.get_null_map_data(); + for (size_t i = start; i < end; ++i) { + auto is_null = arrow_array->IsNull(i); + map_data.emplace_back(is_null); + } + return nested_serde->read_column_from_arrow(col.get_nested_column(), arrow_array, start, end, + ctz); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h b/be/src/vec/data_types/serde/data_type_nullable_serde.h index 3975675715..7631ed90ef 100644 --- a/be/src/vec/data_types/serde/data_type_nullable_serde.h +++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h @@ -43,6 +43,11 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; private: DataTypeSerDeSPtr nested_serde; diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index f6667e8930..d8736d30a8 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -17,6 +17,114 @@ #include "data_type_number_serde.h" +#include <arrow/builder.h> + +#include <type_traits> + +#include "gutil/casts.h" + namespace doris { -namespace vectorized {} // namespace vectorized +namespace vectorized { + +// Type map的基本结构 +template <typename Key, typename Value, typename... Rest> +struct TypeMap { + using KeyType = Key; + using ValueType = Value; + using Next = TypeMap<Rest...>; +}; + +// Type map的末端 +template <> +struct TypeMap<void, void> {}; + +// TypeMapLookup 前向声明 +template <typename Key, typename Map> +struct TypeMapLookup; + +// Type map查找:找到匹配的键时的情况 +template <typename Key, typename Value, typename... Rest> +struct TypeMapLookup<Key, TypeMap<Key, Value, Rest...>> { + using ValueType = Value; +}; + +// Type map查找:递归查找 +template <typename Key, typename K, typename V, typename... Rest> +struct TypeMapLookup<Key, TypeMap<K, V, Rest...>> { + using ValueType = typename TypeMapLookup<Key, TypeMap<Rest...>>::ValueType; +}; + +using DORIS_NUMERIC_ARROW_BUILDER = + TypeMap<UInt8, arrow::BooleanBuilder, Int8, arrow::Int8Builder, UInt16, + arrow::UInt16Builder, Int16, arrow::Int16Builder, UInt32, arrow::UInt32Builder, + Int32, arrow::Int32Builder, UInt64, arrow::UInt64Builder, Int64, + arrow::Int64Builder, UInt128, arrow::FixedSizeBinaryBuilder, Int128, + arrow::FixedSizeBinaryBuilder, Float32, arrow::FloatBuilder, Float64, + arrow::DoubleBuilder, void, + void // 添加这一行来表示TypeMap的末端 + >; + +template <typename T> +void DataTypeNumberSerDe<T>::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + auto& col_data = assert_cast<const ColumnType&>(column).get_data(); + using ARROW_BUILDER_TYPE = typename TypeMapLookup<T, DORIS_NUMERIC_ARROW_BUILDER>::ValueType; + if constexpr (std::is_same_v<T, UInt8>) { + ARROW_BUILDER_TYPE& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder); + checkArrowStatus( + builder.AppendValues(reinterpret_cast<const uint8_t*>(col_data.data() + start), + end - start, reinterpret_cast<const uint8_t*>(null_map)), + column.get_name(), array_builder->type()->name()); + } else if constexpr (std::is_same_v<T, Int128> || std::is_same_v<T, UInt128>) { + ARROW_BUILDER_TYPE& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder); + size_t fixed_length = sizeof(typename ColumnType::value_type); + const uint8_t* data_start = + reinterpret_cast<const uint8_t*>(col_data.data()) + start * fixed_length; + checkArrowStatus(builder.AppendValues(data_start, end - start, + reinterpret_cast<const uint8_t*>(null_map)), + column.get_name(), array_builder->type()->name()); + } else { + ARROW_BUILDER_TYPE& builder = assert_cast<ARROW_BUILDER_TYPE&>(*array_builder); + checkArrowStatus(builder.AppendValues(col_data.data() + start, end - start, + reinterpret_cast<const uint8_t*>(null_map)), + column.get_name(), array_builder->type()->name()); + } +} + +template <typename T> +void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column, + const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const { + int row_count = end - start; + auto& col_data = static_cast<ColumnVector<T>&>(column).get_data(); + + // now uint8 for bool + if constexpr (std::is_same_v<T, UInt8>) { + auto concrete_array = down_cast<const arrow::BooleanArray*>(arrow_array); + for (size_t bool_i = 0; bool_i != static_cast<size_t>(concrete_array->length()); ++bool_i) { + col_data.emplace_back(concrete_array->Value(bool_i)); + } + return; + } + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr<arrow::Buffer> buffer = arrow_array->data()->buffers[1]; + const auto* raw_data = reinterpret_cast<const T*>(buffer->data()) + start; + col_data.insert(raw_data, raw_data + row_count); +} + +/// Explicit template instantiations - to avoid code bloat in headers. +template class DataTypeNumberSerDe<UInt8>; +template class DataTypeNumberSerDe<UInt16>; +template class DataTypeNumberSerDe<UInt32>; +template class DataTypeNumberSerDe<UInt64>; +template class DataTypeNumberSerDe<UInt128>; +template class DataTypeNumberSerDe<Int8>; +template class DataTypeNumberSerDe<Int16>; +template class DataTypeNumberSerDe<Int32>; +template class DataTypeNumberSerDe<Int64>; +template class DataTypeNumberSerDe<Int128>; +template class DataTypeNumberSerDe<Float32>; +template class DataTypeNumberSerDe<Float64>; +} // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h b/be/src/vec/data_types/serde/data_type_number_serde.h index 62632b946d..90566510ca 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.h +++ b/be/src/vec/data_types/serde/data_type_number_serde.h @@ -41,6 +41,12 @@ class JsonbOutStream; namespace vectorized { class Arena; +// special data type using, maybe has various serde actions, so use specific date serde +// DataTypeDateV2 => T:UInt32 +// DataTypeDateTimeV2 => T:UInt64 +// DataTypeTime => T:Float64 +// DataTypeDate => T:Int64 +// DataTypeDateTime => T:Int64 template <typename T> class DataTypeNumberSerDe : public DataTypeSerDe { static_assert(IsNumber<T>); @@ -55,6 +61,12 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; }; template <typename T> diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index c72deb59ea..d8c93b9afe 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -51,6 +51,16 @@ public: void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override { LOG(FATAL) << "Not support write json object to column"; } + + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override { + LOG(FATAL) << "Not support write object column to arrow"; + } + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override { + LOG(FATAL) << "Not support read object column from arrow"; + } }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index bf5912446b..dc2031c5ef 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -47,6 +47,15 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override { + LOG(FATAL) << "Not support write " << column.get_name() << " to arrow"; + } + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override { + LOG(FATAL) << "Not support read " << column.get_name() << " from arrow"; + } }; template <typename T> diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index 5908c0836e..2d196fcb9e 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -22,8 +22,19 @@ #include <memory> #include <vector> +#include "arrow/status.h" #include "common/status.h" #include "util/jsonb_writer.h" +#include "vec/common/pod_array_fwd.h" +#include "vec/core/types.h" + +namespace arrow { +class ArrayBuilder; +class Array; +} // namespace arrow +namespace cctz { +class time_zone; +} // namespace cctz namespace doris { class PValues; @@ -70,8 +81,21 @@ public: // JSON serializer and deserializer // Arrow serializer and deserializer + virtual void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const = 0; + virtual void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const = 0; }; +inline void checkArrowStatus(const arrow::Status& status, const std::string& column, + const std::string& format_name) { + if (!status.ok()) { + LOG(FATAL) << "arrow serde with arrow: " << format_name << " with column : " << column + << " with error msg: " << status.ToString(); + } +} + using DataTypeSerDeSPtr = std::shared_ptr<DataTypeSerDe>; using DataTypeSerDeSPtrs = std::vector<DataTypeSerDeSPtr>; diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp b/be/src/vec/data_types/serde/data_type_string_serde.cpp index 081b9b1748..ba230da987 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_string_serde.cpp @@ -21,7 +21,10 @@ #include <gen_cpp/types.pb.h> #include <stddef.h> +#include "arrow/array/builder_binary.h" +#include "gutil/casts.h" #include "util/jsonb_document.h" +#include "util/jsonb_utils.h" #include "vec/columns/column.h" #include "vec/columns/column_string.h" #include "vec/common/string_ref.h" @@ -63,5 +66,62 @@ void DataTypeStringSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV auto blob = static_cast<const JsonbBlobVal*>(arg); col.insert_data(blob->getBlob(), blob->getBlobLen()); } + +void DataTypeStringSerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + const auto& string_column = assert_cast<const ColumnString&>(column); + auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && null_map[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + std::string_view string_ref = string_column.get_data_at(string_i).to_string_view(); + if (column.get_data_type() == TypeIndex::JSONB) { + std::string json_string = + JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size()); + checkArrowStatus(builder.Append(json_string.data(), json_string.size()), + column.get_name(), array_builder->type()->name()); + } else { + checkArrowStatus(builder.Append(string_ref.data(), string_ref.size()), + column.get_name(), array_builder->type()->name()); + } + } +} + +void DataTypeStringSerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + auto& column_chars_t = assert_cast<ColumnString&>(column).get_chars(); + auto& column_offsets = assert_cast<ColumnString&>(column).get_offsets(); + if (arrow_array->type_id() == arrow::Type::STRING || + arrow_array->type_id() == arrow::Type::BINARY) { + auto concrete_array = down_cast<const arrow::BinaryArray*>(arrow_array); + std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data(); + + for (size_t offset_i = start; offset_i < end; ++offset_i) { + if (!concrete_array->IsNull(offset_i)) { + const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); + column_chars_t.insert(raw_data, raw_data + concrete_array->value_length(offset_i)); + } + column_offsets.emplace_back(column_chars_t.size()); + } + } else if (arrow_array->type_id() == arrow::Type::FIXED_SIZE_BINARY) { + auto concrete_array = down_cast<const arrow::FixedSizeBinaryArray*>(arrow_array); + uint32_t width = concrete_array->byte_width(); + const auto* array_data = concrete_array->GetValue(start); + + for (size_t offset_i = 0; offset_i < end - start; ++offset_i) { + if (!concrete_array->IsNull(offset_i)) { + const auto* raw_data = array_data + (offset_i * width); + column_chars_t.insert(raw_data, raw_data + width); + } + column_offsets.emplace_back(column_chars_t.size()); + } + } +} + } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h index 9893c5721e..5fe7d00db8 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.h +++ b/be/src/vec/data_types/serde/data_type_string_serde.h @@ -40,6 +40,12 @@ public: int32_t col_id, int row_num) const override; void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.cpp b/be/src/vec/data_types/serde/data_type_struct_serde.cpp index 2af87dd484..be21680009 100644 --- a/be/src/vec/data_types/serde/data_type_struct_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_struct_serde.cpp @@ -42,5 +42,17 @@ void DataTypeStructSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV auto blob = static_cast<const JsonbBlobVal*>(arg); column.deserialize_and_insert_from_arena(blob->getBlob()); } + +void DataTypeStructSerDe::write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const { + LOG(FATAL) << "Not support write " << column.get_name() << " to arrow"; +} + +void DataTypeStructSerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, + int start, int end, + const cctz::time_zone& ctz) const { + LOG(FATAL) << "Not support read " << column.get_name() << " from arrow"; +} } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_struct_serde.h b/be/src/vec/data_types/serde/data_type_struct_serde.h index e6abe47b7d..836d5bdbdd 100644 --- a/be/src/vec/data_types/serde/data_type_struct_serde.h +++ b/be/src/vec/data_types/serde/data_type_struct_serde.h @@ -51,6 +51,12 @@ public: void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void write_column_to_arrow(const IColumn& column, const UInt8* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end) const override; + void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, + int end, const cctz::time_zone& ctz) const override; + private: DataTypeSerDeSPtrs elemSerDeSPtrs; }; diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp index af94664f1b..5de57156dd 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.cpp +++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp @@ -71,19 +71,6 @@ M(::arrow::Type::DATE64, TYPE_DATETIME) \ M(::arrow::Type::DECIMAL, TYPE_DECIMALV2) -#define FOR_ARROW_NUMERIC_TYPES(M) \ - M(arrow::Type::UINT8, UInt8) \ - M(arrow::Type::INT8, Int8) \ - M(arrow::Type::INT16, Int16) \ - M(arrow::Type::UINT16, UInt16) \ - M(arrow::Type::INT32, Int32) \ - M(arrow::Type::UINT32, UInt32) \ - M(arrow::Type::UINT64, UInt64) \ - M(arrow::Type::INT64, Int64) \ - M(arrow::Type::HALF_FLOAT, Float32) \ - M(arrow::Type::FLOAT, Float32) \ - M(arrow::Type::DOUBLE, Float64) - namespace doris::vectorized { PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) { @@ -100,254 +87,7 @@ PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) { return INVALID_TYPE; } -static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx, - vectorized::ColumnNullable* nullable_column, - size_t num_elements) { - size_t null_elements_count = 0; - NullMap& map_data = nullable_column->get_null_map_data(); - for (size_t i = 0; i < num_elements; ++i) { - auto is_null = array->IsNull(array_idx + i); - map_data.emplace_back(is_null); - null_elements_count += is_null; - } - return null_elements_count; -} - -/// Inserts chars and offsets right into internal column data to reduce an overhead. -/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars. -/// Also internal strings are null terminated. -static Status convert_column_with_string_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements) { - auto& column_chars_t = assert_cast<ColumnString&>(*data_column).get_chars(); - auto& column_offsets = assert_cast<ColumnString&>(*data_column).get_offsets(); - - auto concrete_array = down_cast<const arrow::BinaryArray*>(array); - std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data(); - - for (size_t offset_i = array_idx; offset_i < array_idx + num_elements; ++offset_i) { - if (!concrete_array->IsNull(offset_i) && buffer) { - const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); - column_chars_t.insert(raw_data, raw_data + concrete_array->value_length(offset_i)); - } - - column_offsets.emplace_back(column_chars_t.size()); - } - return Status::OK(); -} - -static Status convert_column_with_fixed_size_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, - size_t num_elements) { - auto& column_chars_t = assert_cast<ColumnString&>(*data_column).get_chars(); - auto& column_offsets = assert_cast<ColumnString&>(*data_column).get_offsets(); - - auto concrete_array = down_cast<const arrow::FixedSizeBinaryArray*>(array); - uint32_t width = concrete_array->byte_width(); - const auto* array_data = concrete_array->GetValue(array_idx); - - for (size_t offset_i = 0; offset_i < num_elements; ++offset_i) { - if (!concrete_array->IsNull(offset_i)) { - const auto* raw_data = array_data + (offset_i * width); - column_chars_t.insert(raw_data, raw_data + width); - } - column_offsets.emplace_back(column_chars_t.size()); - } - return Status::OK(); -} - -/// Inserts numeric data right into internal column data to reduce an overhead -template <typename NumericType, typename VectorType = ColumnVector<NumericType>> -Status convert_column_with_numeric_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements) { - auto& column_data = static_cast<VectorType&>(*data_column).get_data(); - /// buffers[0] is a null bitmap and buffers[1] are actual values - std::shared_ptr<arrow::Buffer> buffer = array->data()->buffers[1]; - const auto* raw_data = reinterpret_cast<const NumericType*>(buffer->data()) + array_idx; - column_data.insert(raw_data, raw_data + num_elements); - return Status::OK(); -} - -static Status convert_column_with_boolean_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements) { - auto& column_data = static_cast<ColumnVector<UInt8>&>(*data_column).get_data(); - auto concrete_array = down_cast<const arrow::BooleanArray*>(array); - for (size_t bool_i = array_idx; bool_i < array_idx + num_elements; ++bool_i) { - column_data.emplace_back(concrete_array->Value(bool_i)); - } - return Status::OK(); -} - -static int64_t time_unit_divisor(arrow::TimeUnit::type unit) { - // Doris only supports seconds - switch (unit) { - case arrow::TimeUnit::type::SECOND: { - return 1L; - } - case arrow::TimeUnit::type::MILLI: { - return 1000L; - } - case arrow::TimeUnit::type::MICRO: { - return 1000000L; - } - case arrow::TimeUnit::type::NANO: { - return 1000000000L; - } - default: - return 0L; - } -} - -template <typename ArrowType> -Status convert_column_with_timestamp_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements, - const cctz::time_zone& ctz) { - auto& column_data = static_cast<ColumnVector<Int64>&>(*data_column).get_data(); - auto concrete_array = down_cast<const ArrowType*>(array); - int64_t divisor = 1; - int64_t multiplier = 1; - if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) { - const auto type = std::static_pointer_cast<arrow::TimestampType>(array->type()); - divisor = time_unit_divisor(type->unit()); - if (divisor == 0L) { - return Status::InternalError(fmt::format("Invalid Time Type:{}", type->name())); - } - } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) { - multiplier = 24 * 60 * 60; // day => secs - } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) { - divisor = 1000; //ms => secs - } - - for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) { - VecDateTimeValue v; - v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, - ctz); - if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) { - v.cast_to_date(); - } - column_data.emplace_back(binary_cast<VecDateTimeValue, Int64>(v)); - } - return Status::OK(); -} - -template <typename ArrowType> -Status convert_column_with_date_v2_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements, - const cctz::time_zone& ctz) { - auto& column_data = static_cast<ColumnVector<UInt32>&>(*data_column).get_data(); - auto concrete_array = down_cast<const ArrowType*>(array); - int64_t divisor = 1; - int64_t multiplier = 1; - if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) { - const auto type = std::static_pointer_cast<arrow::TimestampType>(array->type()); - divisor = time_unit_divisor(type->unit()); - if (divisor == 0L) { - return Status::InternalError(fmt::format("Invalid Time Type:{}", type->name())); - } - } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) { - multiplier = 24 * 60 * 60; // day => secs - } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) { - divisor = 1000; //ms => secs - } - - for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) { - DateV2Value<DateV2ValueType> v; - v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, - ctz); - column_data.emplace_back(binary_cast<DateV2Value<DateV2ValueType>, UInt32>(v)); - } - return Status::OK(); -} - -template <typename ArrowType> -Status convert_column_with_datetime_v2_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements, - const cctz::time_zone& ctz) { - auto& column_data = static_cast<ColumnVector<UInt64>&>(*data_column).get_data(); - auto concrete_array = down_cast<const ArrowType*>(array); - int64_t divisor = 1; - int64_t multiplier = 1; - if constexpr (std::is_same_v<ArrowType, arrow::TimestampArray>) { - const auto type = std::static_pointer_cast<arrow::TimestampType>(array->type()); - divisor = time_unit_divisor(type->unit()); - if (divisor == 0L) { - return Status::InternalError(fmt::format("Invalid Time Type:{}", type->name())); - } - } else if constexpr (std::is_same_v<ArrowType, arrow::Date32Array>) { - multiplier = 24 * 60 * 60; // day => secs - } else if constexpr (std::is_same_v<ArrowType, arrow::Date64Array>) { - divisor = 1000; //ms => secs - } - - for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) { - DateV2Value<DateTimeV2ValueType> v; - v.from_unixtime(static_cast<Int64>(concrete_array->Value(value_i)) / divisor * multiplier, - ctz); - column_data.emplace_back(binary_cast<DateV2Value<DateTimeV2ValueType>, UInt64>(v)); - } - return Status::OK(); -} - -static Status convert_column_with_decimal_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements) { - auto& column_data = - static_cast<ColumnDecimal<vectorized::Decimal128>&>(*data_column).get_data(); - auto concrete_array = down_cast<const arrow::DecimalArray*>(array); - const auto* arrow_decimal_type = static_cast<arrow::DecimalType*>(array->type().get()); - // TODO check precision - //size_t precision = arrow_decimal_type->precision(); - const auto scale = arrow_decimal_type->scale(); - - for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) { - auto value = - *reinterpret_cast<const vectorized::Decimal128*>(concrete_array->Value(value_i)); - // convert scale to 9 - if (scale != 9) { - value = convert_decimals<vectorized::DataTypeDecimal<vectorized::Decimal128>, - vectorized::DataTypeDecimal<vectorized::Decimal128>>(value, - scale, 9); - } - column_data.emplace_back(value); - } - return Status::OK(); -} - -static Status convert_offset_from_list_column(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements, - size_t* start_idx_for_data, size_t* num_for_data) { - auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets(); - auto concrete_array = down_cast<const arrow::ListArray*>(array); - auto arrow_offsets_array = concrete_array->offsets(); - auto arrow_offsets = down_cast<arrow::Int32Array*>(arrow_offsets_array.get()); - auto prev_size = offsets_data.back(); - for (int64_t i = array_idx + 1; i < array_idx + num_elements + 1; ++i) { - // convert to doris offset, start from offsets.back() - offsets_data.emplace_back(prev_size + arrow_offsets->Value(i) - - arrow_offsets->Value(array_idx)); - } - *start_idx_for_data = arrow_offsets->Value(array_idx); - *num_for_data = offsets_data.back() - prev_size; - - return Status::OK(); -} - -static Status convert_column_with_list_data(const arrow::Array* array, size_t array_idx, - MutableColumnPtr& data_column, size_t num_elements, - const cctz::time_zone& ctz, - const DataTypePtr& nested_type) { - size_t start_idx_of_data = 0; - size_t num_of_data = 0; - // get start idx and num of values from arrow offsets - RETURN_IF_ERROR(convert_offset_from_list_column(array, array_idx, data_column, num_elements, - &start_idx_of_data, &num_of_data)); - auto& data_column_ptr = static_cast<ColumnArray&>(*data_column).get_data_ptr(); - auto concrete_array = down_cast<const arrow::ListArray*>(array); - std::shared_ptr<arrow::Array> arrow_data = concrete_array->values(); - - return arrow_column_to_doris_column(arrow_data.get(), start_idx_of_data, data_column_ptr, - nested_type, num_of_data, ctz); -} - -// For convenient unit test. Not use this in formal code. +//// For convenient unit test. Not use this in formal code. Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx, ColumnPtr& doris_column, const DataTypePtr& type, size_t num_elements, const std::string& timezone) { @@ -360,69 +100,10 @@ Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arr Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx, ColumnPtr& doris_column, const DataTypePtr& type, size_t num_elements, const cctz::time_zone& ctz) { - // src column always be nullable for simplify converting - CHECK(doris_column->is_nullable()); - MutableColumnPtr data_column = nullptr; - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( - (*std::move(doris_column)).mutate().get()); - fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column, num_elements); - data_column = nullable_column->get_nested_column_ptr(); - WhichDataType which_type(remove_nullable(type)); - // process data - switch (arrow_column->type()->id()) { - case arrow::Type::STRING: - case arrow::Type::BINARY: - return convert_column_with_string_data(arrow_column, arrow_batch_cur_idx, data_column, - num_elements); - case arrow::Type::FIXED_SIZE_BINARY: - return convert_column_with_fixed_size_data(arrow_column, arrow_batch_cur_idx, data_column, - num_elements); -#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ - case ARROW_NUMERIC_TYPE: \ - return convert_column_with_numeric_data<CPP_NUMERIC_TYPE>( \ - arrow_column, arrow_batch_cur_idx, data_column, num_elements); - FOR_ARROW_NUMERIC_TYPES(DISPATCH) -#undef DISPATCH - case arrow::Type::BOOL: - return convert_column_with_boolean_data(arrow_column, arrow_batch_cur_idx, data_column, - num_elements); - case arrow::Type::DATE32: - if (which_type.is_date_v2()) { - return convert_column_with_date_v2_data<arrow::Date32Array>( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz); - } else { - return convert_column_with_timestamp_data<arrow::Date32Array>( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz); - } - case arrow::Type::DATE64: - if (which_type.is_date_v2_or_datetime_v2()) { - return convert_column_with_datetime_v2_data<arrow::Date64Array>( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz); - } else { - return convert_column_with_timestamp_data<arrow::Date64Array>( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz); - } - case arrow::Type::TIMESTAMP: - if (which_type.is_date_v2_or_datetime_v2()) { - return convert_column_with_datetime_v2_data<arrow::TimestampArray>( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz); - } else { - return convert_column_with_timestamp_data<arrow::TimestampArray>( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz); - } - case arrow::Type::DECIMAL: - return convert_column_with_decimal_data(arrow_column, arrow_batch_cur_idx, data_column, - num_elements); - case arrow::Type::LIST: - CHECK(type->have_subtypes()); - return convert_column_with_list_data( - arrow_column, arrow_batch_cur_idx, data_column, num_elements, ctz, - (reinterpret_cast<const DataTypeArray*>(type.get()))->get_nested_type()); - default: - break; - } - return Status::NotSupported( - fmt::format("Not support arrow type:{}", arrow_column->type()->name())); + type->get_serde()->read_column_from_arrow(doris_column->assume_mutable_ref(), arrow_column, + arrow_batch_cur_idx, + arrow_batch_cur_idx + num_elements, ctz); + return Status::OK(); } } // namespace doris::vectorized diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index d543c48570..50cc5ad6c4 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -228,7 +228,8 @@ set(VEC_TEST_FILES vec/columns/column_decimal_test.cpp vec/columns/column_fixed_length_object_test.cpp vec/data_types/complex_type_test.cpp - vec/data_types/serde/data_type_serde_test.cpp + vec/data_types/serde/data_type_serde_pb_test.cpp + vec/data_types/serde/data_type_serde_arrow_test.cpp vec/core/block_test.cpp vec/core/block_spill_test.cpp vec/core/column_array_test.cpp diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp new file mode 100644 index 0000000000..e7917e7cea --- /dev/null +++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp @@ -0,0 +1,321 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/array/builder_base.h> +#include <arrow/array/builder_binary.h> +#include <arrow/array/builder_decimal.h> +#include <arrow/array/builder_nested.h> +#include <arrow/array/builder_primitive.h> +#include <arrow/record_batch.h> +#include <arrow/status.h> +#include <arrow/type.h> +#include <arrow/util/decimal.h> +#include <arrow/visit_type_inline.h> +#include <arrow/visitor.h> +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/types.pb.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <math.h> +#include <stdint.h> +#include <stdlib.h> +#include <time.h> + +#include <iostream> +#include <memory> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "gtest/gtest_pred_impl.h" +#include "olap/hll.h" +#include "runtime/descriptors.cpp" +#include "runtime/descriptors.h" +#include "util/arrow/block_convertor.h" +#include "util/arrow/row_batch.h" +#include "util/bitmap_value.h" +#include "util/quantile_state.h" +#include "vec/columns/column.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_decimal.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/block.h" +#include "vec/core/field.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_bitmap.h" +#include "vec/data_types/data_type_date.h" +#include "vec/data_types/data_type_date_time.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_hll.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_quantilestate.h" +#include "vec/data_types/data_type_string.h" +#include "vec/data_types/data_type_time_v2.h" +#include "vec/runtime/vdatetime_value.h" +#include "vec/utils/arrow_column_to_doris_column.h" +namespace doris::vectorized { + +void serialize_and_deserialize_arrow_test() { + vectorized::Block block; + std::vector<std::tuple<std::string, FieldType, int, PrimitiveType, bool>> cols { + {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false}, + {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true}, + {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING, false}, + {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3, TYPE_DECIMAL128I, false}, + {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11, TYPE_DATETIME, false}, + {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN, false}}; + int row_num = 7; + // make desc and generate block + TupleDescriptor tuple_desc(PTupleDescriptor(), true); + for (auto t : cols) { + TSlotDescriptor tslot; + std::string col_name = std::get<0>(t); + tslot.__set_colName(col_name); + TypeDescriptor type_desc(std::get<3>(t)); + bool is_nullable(std::get<4>(t)); + switch (std::get<3>(t)) { + case TYPE_BOOLEAN: + tslot.__set_slotType(type_desc.to_thrift()); + { + auto vec = vectorized::ColumnVector<UInt8>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < row_num; ++i) { + data.push_back(i % 2); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeUInt8>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, + col_name); + block.insert(std::move(type_and_name)); + } + break; + case TYPE_INT: + tslot.__set_slotType(type_desc.to_thrift()); + if (is_nullable) { + { + auto column_vector_int32 = vectorized::ColumnVector<Int32>::create(); + auto column_nullable_vector = + vectorized::make_nullable(std::move(column_vector_int32)); + auto mutable_nullable_vector = std::move(*column_nullable_vector).mutate(); + for (int i = 0; i < row_num; i++) { + mutable_nullable_vector->insert(int32(i)); + } + auto data_type = vectorized::make_nullable( + std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name( + mutable_nullable_vector->get_ptr(), data_type, col_name); + block.insert(type_and_name); + } + } else { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < row_num; ++i) { + data.push_back(i); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, + col_name); + block.insert(std::move(type_and_name)); + } + break; + case TYPE_DECIMAL128I: + type_desc.precision = 27; + type_desc.scale = 9; + tslot.__set_slotType(type_desc.to_thrift()); + { + vectorized::DataTypePtr decimal_data_type( + doris::vectorized::create_decimal(27, 9, true)); + auto decimal_column = decimal_data_type->create_column(); + auto& data = ((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*) + decimal_column.get()) + ->get_data(); + for (int i = 0; i < row_num; ++i) { + __int128_t value = i * pow(10, 9) + i * pow(10, 8); + data.push_back(value); + } + vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), + decimal_data_type, col_name); + block.insert(type_and_name); + } + break; + case TYPE_STRING: + tslot.__set_slotType(type_desc.to_thrift()); + { + auto strcol = vectorized::ColumnString::create(); + for (int i = 0; i < row_num; ++i) { + std::string is = std::to_string(i); + strcol->insert_data(is.c_str(), is.size()); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); + vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, + col_name); + block.insert(type_and_name); + } + break; + case TYPE_HLL: + tslot.__set_slotType(type_desc.to_thrift()); + { + vectorized::DataTypePtr hll_data_type(std::make_shared<vectorized::DataTypeHLL>()); + auto hll_column = hll_data_type->create_column(); + std::vector<HyperLogLog>& container = + ((vectorized::ColumnHLL*)hll_column.get())->get_data(); + for (int i = 0; i < row_num; ++i) { + HyperLogLog hll; + hll.update(i); + container.push_back(hll); + } + vectorized::ColumnWithTypeAndName type_and_name(hll_column->get_ptr(), + hll_data_type, col_name); + + block.insert(type_and_name); + } + break; + case TYPE_DATEV2: + tslot.__set_slotType(type_desc.to_thrift()); + { + auto column_vector_date_v2 = vectorized::ColumnVector<vectorized::UInt32>::create(); + auto& date_v2_data = column_vector_date_v2->get_data(); + for (int i = 0; i < row_num; ++i) { + vectorized::DateV2Value<doris::vectorized::DateV2ValueType> value; + value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6)); + date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); + } + vectorized::DataTypePtr date_v2_type( + std::make_shared<vectorized::DataTypeDateV2>()); + vectorized::ColumnWithTypeAndName test_date_v2(column_vector_date_v2->get_ptr(), + date_v2_type, col_name); + block.insert(test_date_v2); + } + break; + case TYPE_DATE: // int64 + tslot.__set_slotType(type_desc.to_thrift()); + { + auto column_vector_date = vectorized::ColumnVector<vectorized::Int64>::create(); + auto& date_data = column_vector_date->get_data(); + for (int i = 0; i < row_num; ++i) { + vectorized::VecDateTimeValue value; + value.from_date_int64(20210501); + date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); + } + vectorized::DataTypePtr date_type(std::make_shared<vectorized::DataTypeDate>()); + vectorized::ColumnWithTypeAndName test_date(column_vector_date->get_ptr(), + date_type, col_name); + block.insert(test_date); + } + break; + case TYPE_DATETIME: // int64 + tslot.__set_slotType(type_desc.to_thrift()); + { + auto column_vector_datetime = vectorized::ColumnVector<vectorized::Int64>::create(); + auto& datetime_data = column_vector_datetime->get_data(); + for (int i = 0; i < row_num; ++i) { + vectorized::VecDateTimeValue value; + value.from_date_int64(20210501080910); + datetime_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); + } + vectorized::DataTypePtr datetime_type( + std::make_shared<vectorized::DataTypeDateTime>()); + vectorized::ColumnWithTypeAndName test_datetime(column_vector_datetime->get_ptr(), + datetime_type, col_name); + block.insert(test_datetime); + } + break; + default: + break; + } + + tslot.__set_col_unique_id(std::get<2>(t)); + SlotDescriptor* slot = new SlotDescriptor(tslot); + tuple_desc.add_slot(slot); + } + + RowDescriptor row_desc(&tuple_desc, true); + // arrow schema + std::shared_ptr<arrow::Schema> _arrow_schema; + EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK()); + + // serialize + std::shared_ptr<arrow::RecordBatch> result; + std::cout << "block structure: " << block.dump_structure() << std::endl; + std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << std::endl; + + convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), &result); + Block new_block = block.clone_empty(); + // deserialize + for (auto t : cols) { + std::string real_column_name = std::get<0>(t); + auto* array = result->GetColumnByName(real_column_name).get(); + auto& column_with_type_and_name = new_block.get_by_name(real_column_name); + if (std::get<3>(t) == PrimitiveType::TYPE_DATE || + std::get<3>(t) == PrimitiveType::TYPE_DATETIME) { + { + auto strcol = vectorized::ColumnString::create(); + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); + vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, + real_column_name); + arrow_column_to_doris_column(array, 0, type_and_name.column, type_and_name.type, + block.rows(), "UTC"); + { + auto& col = column_with_type_and_name.column.get()->assume_mutable_ref(); + auto& date_data = static_cast<ColumnVector<Int64>&>(col).get_data(); + for (int i = 0; i < strcol->size(); ++i) { + StringRef str = strcol->get_data_at(i); + vectorized::VecDateTimeValue value; + value.from_date_str(str.data, str.size); + date_data.push_back(*reinterpret_cast<vectorized::Int64*>(&value)); + } + } + } + continue; + } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) { + auto strcol = vectorized::ColumnString::create(); + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); + vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, + real_column_name); + arrow_column_to_doris_column(array, 0, type_and_name.column, type_and_name.type, + block.rows(), "UTC"); + { + auto& col = column_with_type_and_name.column.get()->assume_mutable_ref(); + auto& date_data = static_cast<ColumnVector<UInt32>&>(col).get_data(); + for (int i = 0; i < strcol->size(); ++i) { + StringRef str = strcol->get_data_at(i); + DateV2Value<DateV2ValueType> value; + value.from_date_str(str.data, str.size); + date_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); + } + } + continue; + } + arrow_column_to_doris_column(array, 0, column_with_type_and_name.column, + column_with_type_and_name.type, block.rows(), "UTC"); + } + + std::cout << block.dump_data() << std::endl; + std::cout << new_block.dump_data() << std::endl; + EXPECT_EQ(block.dump_data(), new_block.dump_data()); +} + +TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) { + serialize_and_deserialize_arrow_test(); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp new file mode 100644 index 0000000000..7bed95be8d --- /dev/null +++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp @@ -0,0 +1,200 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/types.pb.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <math.h> +#include <stdlib.h> +#include <time.h> + +#include <iostream> +#include <memory> +#include <string> +#include <vector> + +#include "gtest/gtest_pred_impl.h" +#include "olap/hll.h" +#include "util/bitmap_value.h" +#include "util/quantile_state.h" +#include "vec/columns/column.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_decimal.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_bitmap.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_hll.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_quantilestate.h" +#include "vec/data_types/data_type_string.h" +#include "vec/data_types/serde/data_type_serde.h" + +namespace doris::vectorized { + +void column_to_pb(const DataTypePtr data_type, const IColumn& col, PValues* result) { + const DataTypeSerDeSPtr serde = data_type->get_serde(); + serde->write_column_to_pb(col, *result, 0, col.size()); +} + +void pb_to_column(const DataTypePtr data_type, PValues& result, IColumn& col) { + auto serde = data_type->get_serde(); + serde->read_column_from_pb(col, result); +} + +void check_pb_col(const DataTypePtr data_type, const IColumn& col) { + PValues pv = PValues(); + column_to_pb(data_type, col, &pv); + std::string s1 = pv.DebugString(); + + auto col1 = data_type->create_column(); + pb_to_column(data_type, pv, *col1); + PValues as_pv = PValues(); + column_to_pb(data_type, *col1, &as_pv); + + std::string s2 = as_pv.DebugString(); + EXPECT_EQ(s1, s2); +} + +void serialize_and_deserialize_pb_test() { + // int + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 1024; ++i) { + data.push_back(i); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + check_pb_col(data_type, *vec.get()); + } + // string + { + auto strcol = vectorized::ColumnString::create(); + for (int i = 0; i < 1024; ++i) { + std::string is = std::to_string(i); + strcol->insert_data(is.c_str(), is.size()); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeString>()); + check_pb_col(data_type, *strcol.get()); + } + // decimal + { + vectorized::DataTypePtr decimal_data_type(doris::vectorized::create_decimal(27, 9, true)); + auto decimal_column = decimal_data_type->create_column(); + auto& data = ((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*) + decimal_column.get()) + ->get_data(); + for (int i = 0; i < 1024; ++i) { + __int128_t value = i * pow(10, 9) + i * pow(10, 8); + data.push_back(value); + } + check_pb_col(decimal_data_type, *decimal_column.get()); + } + // bitmap + { + vectorized::DataTypePtr bitmap_data_type(std::make_shared<vectorized::DataTypeBitMap>()); + auto bitmap_column = bitmap_data_type->create_column(); + std::vector<BitmapValue>& container = + ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data(); + for (int i = 0; i < 4; ++i) { + BitmapValue bv; + for (int j = 0; j <= i; ++j) { + bv.add(j); + } + container.push_back(bv); + } + check_pb_col(bitmap_data_type, *bitmap_column.get()); + } + // hll + { + vectorized::DataTypePtr hll_data_type(std::make_shared<vectorized::DataTypeHLL>()); + auto hll_column = hll_data_type->create_column(); + std::vector<HyperLogLog>& container = + ((vectorized::ColumnHLL*)hll_column.get())->get_data(); + for (int i = 0; i < 4; ++i) { + HyperLogLog hll; + hll.update(i); + container.push_back(hll); + } + check_pb_col(hll_data_type, *hll_column.get()); + } + // quantilestate + { + vectorized::DataTypePtr quantile_data_type( + std::make_shared<vectorized::DataTypeQuantileStateDouble>()); + auto quantile_column = quantile_data_type->create_column(); + std::vector<QuantileStateDouble>& container = + ((vectorized::ColumnQuantileStateDouble*)quantile_column.get())->get_data(); + const long max_rand = 1000000L; + double lower_bound = 0; + double upper_bound = 100; + srandom(time(nullptr)); + for (int i = 0; i < 1024; ++i) { + QuantileStateDouble q; + double random_double = + lower_bound + (upper_bound - lower_bound) * (random() % max_rand) / max_rand; + q.add_value(random_double); + container.push_back(q); + } + check_pb_col(quantile_data_type, *quantile_column.get()); + } + // nullable string + { + vectorized::DataTypePtr string_data_type(std::make_shared<vectorized::DataTypeString>()); + vectorized::DataTypePtr nullable_data_type( + std::make_shared<vectorized::DataTypeNullable>(string_data_type)); + auto nullable_column = nullable_data_type->create_column(); + ((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024); + check_pb_col(nullable_data_type, *nullable_column.get()); + } + // nullable decimal + { + vectorized::DataTypePtr decimal_data_type(doris::vectorized::create_decimal(27, 9, true)); + vectorized::DataTypePtr nullable_data_type( + std::make_shared<vectorized::DataTypeNullable>(decimal_data_type)); + auto nullable_column = nullable_data_type->create_column(); + ((vectorized::ColumnNullable*)nullable_column.get())->insert_null_elements(1024); + check_pb_col(nullable_data_type, *nullable_column.get()); + } + // int with 1024 batch size + { + auto vec = vectorized::ColumnVector<Int32>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < 1024; ++i) { + data.push_back(i); + } + std::cout << vec->size() << std::endl; + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::DataTypePtr nullable_data_type( + std::make_shared<vectorized::DataTypeNullable>(data_type)); + auto nullable_column = nullable_data_type->create_column(); + ((vectorized::ColumnNullable*)nullable_column.get()) + ->insert_range_from_not_nullable(*vec, 0, 1024); + check_pb_col(nullable_data_type, *nullable_column.get()); + } +} + +TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTest) { + serialize_and_deserialize_pb_test(); +} + +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org