This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5641c2dcf7e [fix](Outfile) Fix the data type mapping for complex types in Doris to the ORC and Parquet file formats. (#44041) 5641c2dcf7e is described below commit 5641c2dcf7e08bda64b5118fca7a1905af89f56b Author: Tiewei Fang <fangtie...@selectdb.com> AuthorDate: Thu Nov 28 23:27:10 2024 +0800 [fix](Outfile) Fix the data type mapping for complex types in Doris to the ORC and Parquet file formats. (#44041) ### What problem does this PR solve? Problem Summary: As before, the behavior of exporting of complex data types in Doris is as follows: | orc type | parquet type | csv -- | -- | -- | -- bitmap | string | Not Supported | Not Supported quantile_state | Not Supported | Not Supported | Not Supported hll | string | string | invisible string jsonb | Not Supported | string | string variant | Not Supported | string | string What's more, there are some issues when exporting complex data types to the ORC file format. This PR does two things: 1. Fix the problem with exporting complex data types from Doris. 2. Support exporting these three complex types to both the ORC and the Parquet file format. | orc type | parquet type | csv -- | -- | -- | -- bitmap | binary | binary | "NULL" quantile_state | binary | binary | "NULL" hll | binary | binary | "NULL" jsonb | string | string | string variant | string | string | string ### Release note [fix](Outfile) Fix the data type mapping for complex types in Doris to the ORC and Parquet file formats. --- be/src/util/arrow/row_batch.cpp | 9 +- .../data_types/serde/data_type_bitmap_serde.cpp | 59 +++++++++++- .../vec/data_types/serde/data_type_bitmap_serde.h | 13 +-- .../data_types/serde/data_type_date64_serde.cpp | 20 +--- .../vec/data_types/serde/data_type_hll_serde.cpp | 48 +++++----- .../vec/data_types/serde/data_type_ipv6_serde.cpp | 37 +++---- .../vec/data_types/serde/data_type_jsonb_serde.cpp | 25 ++++- .../data_types/serde/data_type_number_serde.cpp | 36 ++----- .../data_types/serde/data_type_object_serde.cpp | 33 +++++++ .../vec/data_types/serde/data_type_object_serde.h | 4 +- .../serde/data_type_quantilestate_serde.h | 56 ++++++++++- be/src/vec/data_types/serde/data_type_serde.h | 12 +++ .../org/apache/doris/analysis/OutFileClause.java | 21 ++-- .../outfile/test_outfile_complex_type.out | 25 +++++ .../outfile/test_outfile_jsonb_and_variant.out | 25 +++++ .../outfile/test_outfile_complex_type.groovy | 106 +++++++++++++++++++++ .../outfile/test_outfile_jsonb_and_variant.groovy | 104 ++++++++++++++++++++ 17 files changed, 501 insertions(+), 132 deletions(-) diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index dd11d5ae46f..a0cd77aee41 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -24,6 +24,7 @@ #include <arrow/result.h> #include <arrow/status.h> #include <arrow/type.h> +#include <arrow/type_fwd.h> #include <glog/logging.h> #include <stdint.h> @@ -84,12 +85,10 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow:: case TYPE_LARGEINT: case TYPE_VARCHAR: case TYPE_CHAR: - case TYPE_HLL: case TYPE_DATE: case TYPE_DATETIME: case TYPE_STRING: case TYPE_JSONB: - case TYPE_OBJECT: *result = arrow::utf8(); break; case TYPE_DATEV2: @@ -150,6 +149,12 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow:: *result = arrow::utf8(); break; } + case TYPE_QUANTILE_STATE: + case TYPE_OBJECT: + case TYPE_HLL: { + *result = arrow::binary(); + break; + } default: return Status::InvalidArgument("Unknown primitive type({}) convert to Arrow type", type.type); diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index a4a2367aded..5d024a41834 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -17,6 +17,7 @@ #include "data_type_bitmap_serde.h" +#include <arrow/array/builder_binary.h> #include <gen_cpp/types.pb.h> #include <string> @@ -27,6 +28,7 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris { @@ -34,6 +36,29 @@ namespace vectorized { class IColumn; #include "common/compile_check_begin.h" +Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int64_t start_idx, + int64_t end_idx, BufferWritable& bw, + FormatOptions& options) const { + SERIALIZE_COLUMN_TO_JSON(); +} + +Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num, + BufferWritable& bw, + FormatOptions& options) const { + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); + } + return Status::OK(); +} + Status DataTypeBitMapSerDe::deserialize_column_from_json_vector( IColumn& column, std::vector<Slice>& slices, int* num_deserialized, const FormatOptions& options) const { @@ -96,6 +121,26 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr result.writeEndBinary(); } +void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map, + arrow::ArrayBuilder* array_builder, int64_t start, + int64_t end, const cctz::time_zone& ctz) const { + const auto& col = assert_cast<const ColumnBitmap&>(column); + auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& bitmap_value = const_cast<BitmapValue&>(col.get_element(string_i)); + std::string memory_buffer(bitmap_value.getSizeInBytes(), '0'); + bitmap_value.write_to(memory_buffer.data()); + checkArrowStatus( + builder.Append(memory_buffer.data(), static_cast<int>(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } +} + void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { auto& col = reinterpret_cast<ColumnBitmap&>(column); auto blob = static_cast<const JsonbBlobVal*>(arg); @@ -148,11 +193,19 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = assert_cast<const ColumnBitmap&>(column); orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + INIT_MEMORY_FOR_ORC_WRITER() + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast<char*>(ele.data); - cur_batch->length[row_id] = ele.size; + auto bitmap_value = const_cast<BitmapValue&>(col_data.get_element(row_id)); + size_t len = bitmap_value.getSizeInBytes(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + bitmap_value.write_to(const_cast<char*>(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h index a4be5b8ec20..24c2e6f930d 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -36,14 +36,10 @@ public: DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {}; Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, - FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); - } + FormatOptions& options) const override; Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx, - BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); - } + BufferWritable& bw, FormatOptions& options) const override; Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override; @@ -63,10 +59,7 @@ public: void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int64_t start, int64_t end, - const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); - } + const cctz::time_zone& ctz) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index 8e102ec6e3a..c91db85be5b 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -289,16 +289,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = static_cast<const ColumnVector<Int64>&>(column).get_data(); orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -310,18 +301,11 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con REALLOC_MEMORY_FOR_ORC_WRITER() + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; cur_batch->length[row_id] = len; offset += len; } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; - } - } - buffer_list.emplace_back(bufferRef); cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index c22bb31862e..42260b09260 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -21,6 +21,7 @@ #include <stddef.h> #include <stdint.h> +#include <memory> #include <string> #include "arrow/array/builder_binary.h" @@ -48,28 +49,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const IColumn& column, int64_t Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, FormatOptions& options) const { - if (!options._output_object_data) { - /** - * For null values in ordinary types, we use \N to represent them; - * for null values in nested types, we use null to represent them, just like the json format. - */ - if (_nesting_level >= 2) { - bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), - strlen(NULL_IN_COMPLEX_TYPE.c_str())); - } else { - bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), - strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); - } - return Status::OK(); + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); } - auto col_row = check_column_const_set_readability(column, row_num); - ColumnPtr ptr = col_row.first; - row_num = col_row.second; - auto& data = const_cast<HyperLogLog&>(assert_cast<const ColumnHLL&>(*ptr).get_element(row_num)); - std::unique_ptr<char[]> buf = - std::make_unique_for_overwrite<char[]>(data.max_serialized_size()); - size_t size = data.serialize((uint8*)buf.get()); - bw.write(buf.get(), size); return Status::OK(); } @@ -139,7 +129,7 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const NullMa arrow::ArrayBuilder* array_builder, int64_t start, int64_t end, const cctz::time_zone& ctz) const { const auto& col = assert_cast<const ColumnHLL&>(column); - auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder); + auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder); for (size_t string_i = start; string_i < end; ++string_i) { if (null_map && (*null_map)[string_i]) { checkArrowStatus(builder.AppendNull(), column.get_name(), @@ -198,11 +188,19 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const auto& col_data = assert_cast<const ColumnHLL&>(column); orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + INIT_MEMORY_FOR_ORC_WRITER() + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast<char*>(ele.data); - cur_batch->length[row_id] = ele.size; + auto hll_value = const_cast<HyperLogLog&>(col_data.get_element(row_id)); + size_t len = hll_value.max_serialized_size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + hll_value.serialize((uint8_t*)(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index ac4dbc03043..e899de93c90 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -187,38 +187,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const std::vector<StringRef>& buffer_list) const { const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data(); orc::StringVectorBatch* cur_batch = assert_cast<orc::StringVectorBatch*>(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); - size_t len = ipv6_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + INIT_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); + size_t len = ipv6_str.size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str()); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } - buffer_list.emplace_back(bufferRef); + cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index 10218e4164d..adc041f5111 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -23,6 +23,7 @@ #include <cstddef> #include <cstdint> +#include <memory> #include "arrow/array/builder_binary.h" #include "common/exception.h" @@ -142,7 +143,29 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons orc::ColumnVectorBatch* orc_col_batch, int64_t start, int64_t end, std::vector<StringRef>& buffer_list) const { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); + auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + const auto& string_column = assert_cast<const ColumnString&>(column); + + INIT_MEMORY_FOR_ORC_WRITER() + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + std::string_view string_ref = string_column.get_data_at(row_id).to_string_view(); + auto serialized_value = std::make_unique<std::string>( + JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size())); + auto len = serialized_value->size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + memcpy(const_cast<char*>(bufferRef.data) + offset, serialized_value->data(), len); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); } void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value& target, diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index 9416fc9a8b3..55c7b2c9505 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -342,38 +342,22 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const std::string& timezone, if constexpr (std::is_same_v<T, Int128>) { // largeint orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string value_str = fmt::format("{}", col_data[row_id]); - size_t len = value_str.size(); + if (cur_batch->notNull[row_id] == 1) { + std::string value_str = fmt::format("{}", col_data[row_id]); + size_t len = value_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + REALLOC_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast<char*>(bufferRef.data) + offset, value_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + strcpy(const_cast<char*>(bufferRef.data) + offset, value_str.c_str()); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } - buffer_list.emplace_back(bufferRef); + cur_batch->numElements = end - start; } else if constexpr (std::is_same_v<T, Int8> || std::is_same_v<T, UInt8>) { // tinyint/boolean WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch) diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index f6719437285..fc536d9ef0d 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -20,6 +20,7 @@ #include <rapidjson/stringbuffer.h> #include <cstdint> +#include <string> #include "common/exception.h" #include "common/status.h" @@ -164,6 +165,38 @@ void DataTypeObjectSerDe::write_column_to_arrow(const IColumn& column, const Nul } } +Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column, + const NullMap* null_map, + orc::ColumnVectorBatch* orc_col_batch, + int64_t start, int64_t end, + std::vector<StringRef>& buffer_list) const { + const auto* var = check_and_get_column<ColumnObject>(column); + orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + + INIT_MEMORY_FOR_ORC_WRITER() + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + auto serialized_value = std::make_unique<std::string>(); + if (!var->serialize_one_row_to_string(row_id, serialized_value.get())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}", + var->dump_structure()); + } + auto len = serialized_value->length(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + memcpy(const_cast<char*>(bufferRef.data) + offset, serialized_value->data(), len); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index 414755ef0f8..c08d4d0af0d 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -89,9 +89,7 @@ public: Status write_column_to_orc(const std::string& timezone, const IColumn& column, const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int64_t start, int64_t end, - std::vector<StringRef>& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type " + column.get_name()); - } + std::vector<StringRef>& buffer_list) const override; private: template <bool is_binary_format> diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index 9608bf1cb78..d3526ba3899 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -17,6 +17,7 @@ #pragma once +#include <arrow/array/builder_binary.h> #include <gen_cpp/types.pb.h> #include <stddef.h> #include <stdint.h> @@ -32,6 +33,7 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/string_ref.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris { @@ -43,12 +45,23 @@ public: Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); + } + return Status::OK(); } Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx, BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); + SERIALIZE_COLUMN_TO_JSON(); } Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override { @@ -102,8 +115,21 @@ public: void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int64_t start, int64_t end, const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); + const auto& col = assert_cast<const ColumnQuantileState&>(column); + auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& quantile_state_value = const_cast<QuantileState&>(col.get_element(string_i)); + std::string memory_buffer(quantile_state_value.get_serialized_size(), '0'); + quantile_state_value.serialize((uint8_t*)memory_buffer.data()); + checkArrowStatus(builder.Append(memory_buffer.data(), + static_cast<int>(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } } void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override { @@ -126,7 +152,27 @@ public: const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int64_t start, int64_t end, std::vector<StringRef>& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); + auto& col_data = assert_cast<const ColumnQuantileState&>(column); + orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + + INIT_MEMORY_FOR_ORC_WRITER() + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + auto quantilestate_value = const_cast<QuantileState&>(col_data.get_element(row_id)); + size_t len = quantilestate_value.get_serialized_size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + quantilestate_value.serialize((uint8_t*)(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); } private: diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index bec0fabfcca..1a089bb73fe 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -77,6 +77,18 @@ struct ColumnVectorBatch; ++*num_deserialized; \ } +#define INIT_MEMORY_FOR_ORC_WRITER() \ + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); \ + if (!ptr) { \ + return Status::InternalError( \ + "malloc memory error when write largeint column data to orc file."); \ + } \ + StringRef bufferRef; \ + bufferRef.data = ptr; \ + bufferRef.size = BUFFER_UNIT_SIZE; \ + size_t offset = 0; \ + buffer_list.emplace_back(bufferRef); + #define REALLOC_MEMORY_FOR_ORC_WRITER() \ while (bufferRef.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(bufferRef.size + BUFFER_UNIT_SIZE); \ diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index ef65b405853..026e4da29b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -36,7 +36,6 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TParquetCompressionType; @@ -302,11 +301,8 @@ public class OutFileClause { break; case HLL: case BITMAP: - if (!(ConnectContext.get() != null && ConnectContext.get() - .getSessionVariable().isReturnObjectDataAsBinary())) { - break; - } - orcType = "string"; + case QUANTILE_STATE: + orcType = "binary"; break; case DATEV2: orcType = "date"; @@ -327,6 +323,8 @@ public class OutFileClause { case DATE: case DATETIME: case IPV6: + case VARIANT: + case JSONB: orcType = "string"; break; case DECIMALV2: @@ -445,6 +443,8 @@ public class OutFileClause { case DATE: case DATETIME: case IPV6: + case VARIANT: + case JSONB: checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString()); break; case DECIMAL32: @@ -455,13 +455,8 @@ public class OutFileClause { break; case HLL: case BITMAP: - if (ConnectContext.get() != null && ConnectContext.get() - .getSessionVariable().isReturnObjectDataAsBinary()) { - checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString()); - } else { - throw new AnalysisException("Orc format does not support column type: " - + resultType.getPrimitiveType()); - } + case QUANTILE_STATE: + checkOrcType(schema.second, "binary", true, resultType.getPrimitiveType().toString()); break; case STRUCT: checkOrcType(schema.second, "struct", false, resultType.getPrimitiveType().toString()); diff --git a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out new file mode 100644 index 00000000000..cd6f000b6c5 --- /dev/null +++ b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_load_parquet -- +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 +20220201 4 00000045010000000000000040 010114CAA737BD54146E +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C + +-- !select_load_orc -- +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 +20220201 4 00000045010000000000000040 010114CAA737BD54146E +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C + +-- !select_load_csv -- +20220201 0 \N \N \N +20220201 1 \N \N \N +20220201 2 \N \N \N +20220201 3 \N \N \N +20220201 4 \N \N \N +20220201 5 \N \N \N + diff --git a/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out new file mode 100644 index 00000000000..d2583093964 --- /dev/null +++ b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_load_parquet -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + +-- !select_load_orc -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + +-- !select_load_orc -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + diff --git a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy new file mode 100644 index 00000000000..49f81732791 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_outfile_complex_type", "p0") { + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def export_table_name = "test_outfile_complex_type_table" + def outFilePath = "${bucket}/outfile/complex_type/exp_" + + def outfile_to_S3 = { format -> + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${format} + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + sql """ DROP TABLE IF EXISTS ${export_table_name} """ + sql """ + CREATE TABLE `${export_table_name}` ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `price` quantile_state QUANTILE_UNION NOT NULL COMMENT "", + `hll_t` hll hll_union, + `device_id` bitmap BITMAP_UNION + ) ENGINE=OLAP + AGGREGATE KEY(`dt`, `id`) + DISTRIBUTED BY HASH(`dt`) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO `${export_table_name}` values + (20220201,0, to_quantile_state(1, 2048), hll_hash(1), to_bitmap(243)), + (20220201,1, to_quantile_state(-1, 2048), hll_hash(2), bitmap_from_array([1,2,3,4,5,434543])), + (20220201,2, to_quantile_state(0, 2048), hll_hash(3), to_bitmap(1234566)), + (20220201,3, to_quantile_state(1, 2048), hll_hash(4), to_bitmap(8888888888888)), + (20220201,4, to_quantile_state(2, 2048), hll_hash(5), to_bitmap(98392819412234)), + (20220201,5, to_quantile_state(3, 2048), hll_hash(6), to_bitmap(253234234)); + """ + + // parquet file format + def format = "parquet" + def outfile_url = outfile_to_S3("${format}") + qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "orc" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // csv file foramt + format = "csv" + outfile_url = outfile_to_S3("${format}") + qt_select_load_csv """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ +} \ No newline at end of file diff --git a/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy new file mode 100644 index 00000000000..ed3019436ae --- /dev/null +++ b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_outfile_jsonb_and_variant", "p0") { + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def export_table_name = "test_outfile_jsonb_and_variant_table" + def outFilePath = "${bucket}/outfile/jsonb_and_variant/exp_" + + def outfile_to_S3 = { format -> + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${format} + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + sql """ DROP TABLE IF EXISTS ${export_table_name} """ + sql """ + CREATE TABLE `${export_table_name}` ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `json_col` JSON NULL COMMENT "", + `variant_col` variant NULL COMMENT "" + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`dt`) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO `${export_table_name}` values + (20220201,0, '{"k1": "100"}', '{"k1": "100"}'), + (20220201,1, '{"k1": "100", "k2": "123"}', '{"k1": "100", "k2": "123"}'), + (20220201,2, '{"k1": "100", "abc": "567"}', '{"k1": "100", "abc": "567"}'), + (20220201,3, '{"k1": "100", "k3": 123}', '{"k1": "100", "k3": 123}'), + (20220201,4, '{"k1": "100", "doris": "nereids"}', '{"k1": "100", "doris": "nereids"}'), + (20220201,5, '{"k1": "100", "doris": "pipeline"}', '{"k1": "100", "doris": "pipeline"}'); + """ + + // parquet file format + def format = "parquet" + def outfile_url = outfile_to_S3("${format}") + qt_select_load_parquet """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "orc" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "csv" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org