This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0e0f8090f7f [refactor](text_convert)Use serde to replace text_convert. (#25543) 0e0f8090f7f is described below commit 0e0f8090f7f3875c5a85ad74f260524e3bcfd4a1 Author: daidai <2017501...@qq.com> AuthorDate: Tue Oct 24 09:52:43 2023 +0800 [refactor](text_convert)Use serde to replace text_convert. (#25543) Remove text_convert and use serde to replace it. --- be/src/exec/text_converter.cpp | 426 --------------------- be/src/exec/text_converter.h | 87 ----- be/src/vec/exec/format/csv/csv_reader.cpp | 1 - be/src/vec/exec/format/csv/csv_reader.h | 1 - be/src/vec/exec/format/orc/vorc_reader.cpp | 20 +- be/src/vec/exec/format/orc/vorc_reader.h | 2 - .../exec/format/parquet/vparquet_group_reader.cpp | 20 +- .../exec/format/parquet/vparquet_group_reader.h | 2 - be/src/vec/exec/scan/new_odbc_scanner.cpp | 16 +- be/src/vec/exec/scan/new_odbc_scanner.h | 4 +- be/src/vec/exec/scan/vfile_scanner.cpp | 21 +- be/src/vec/exec/scan/vfile_scanner.h | 2 - be/src/vec/exec/vmysql_scan_node.cpp | 32 +- be/src/vec/exec/vmysql_scan_node.h | 9 +- 14 files changed, 81 insertions(+), 562 deletions(-) diff --git a/be/src/exec/text_converter.cpp b/be/src/exec/text_converter.cpp deleted file mode 100644 index 55455619324..00000000000 --- a/be/src/exec/text_converter.cpp +++ /dev/null @@ -1,426 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "text_converter.h" - -#include <glog/logging.h> -#include <sql.h> -#include <stdint.h> - -#include <algorithm> -#include <ostream> - -// IWYU pragma: no_include <opentelemetry/common/threadlocal.h> -#include "common/compiler_util.h" // IWYU pragma: keep -#include "olap/hll.h" -#include "runtime/decimalv2_value.h" -#include "runtime/define_primitive_type.h" -#include "runtime/descriptors.h" -#include "runtime/types.h" -#include "util/slice.h" -#include "util/string_parser.hpp" -#include "vec/columns/column_array.h" -#include "vec/columns/column_complex.h" -#include "vec/columns/column_map.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/columns/column_struct.h" -#include "vec/columns/column_vector.h" -#include "vec/core/types.h" -#include "vec/runtime/vdatetime_value.h" - -namespace doris { - -TextConverter::TextConverter(char escape_char, char collection_delimiter, char map_kv_delimiter) - : _escape_char(escape_char), - _collection_delimiter(collection_delimiter), - _map_kv_delimiter(map_kv_delimiter) {} - -void TextConverter::write_string_column(const SlotDescriptor* slot_desc, - vectorized::MutableColumnPtr* column_ptr, const char* data, - size_t len, bool need_escape) { - DCHECK(column_ptr->get()->is_nullable()); - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get()); - if (need_escape) { - unescape_string_on_spot(data, &len); - } - if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { - nullable_column->get_null_map_data().push_back(1); - reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column()) - .insert_default(); - } else { - nullable_column->get_null_map_data().push_back(0); - reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column()) - .insert_data(data, len); - } -} - -bool TextConverter::_write_data(const TypeDescriptor& type_desc, - vectorized::IColumn* nullable_col_ptr, const char* data, size_t len, - bool copy_string, bool need_escape, size_t rows, - char array_delimiter) { - vectorized::IColumn* col_ptr = nullable_col_ptr; - // \N means it's NULL - std::string col_type_name = col_ptr->get_name(); - bool is_null_able = typeid(*nullable_col_ptr) == typeid(vectorized::ColumnNullable); - if (is_null_able) { - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr); - if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { - nullable_column->insert_many_defaults(rows); - return true; - } else { - auto& null_map = nullable_column->get_null_map_data(); - null_map.resize_fill(null_map.size() + rows, 0); - col_ptr = &nullable_column->get_nested_column(); - } - } - - StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; - size_t origin_size = col_ptr->size(); - // Parse the raw-text data. Translate the text string to internal format. - switch (type_desc.type) { - case TYPE_HLL: { - HyperLogLog hyper_log_log(Slice(data, len)); - auto& hyper_data = reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data(); - for (size_t i = 0; i < rows; ++i) { - hyper_data.emplace_back(hyper_log_log); - } - break; - } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - if (need_escape) { - unescape_string_on_spot(data, &len); - } - reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_many_data(data, len, rows); - break; - } - - case TYPE_BOOLEAN: { - bool num = StringParser::string_to_bool(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, (uint8_t)num); - break; - } - case TYPE_TINYINT: { - int8_t num = StringParser::string_to_int<int8_t>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - case TYPE_SMALLINT: { - int16_t num = StringParser::string_to_int<int16_t>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - case TYPE_INT: { - int32_t num = StringParser::string_to_int<int32_t>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - case TYPE_BIGINT: { - int64_t num = StringParser::string_to_int<int64_t>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - case TYPE_LARGEINT: { - __int128 num = StringParser::string_to_int<__int128>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - - case TYPE_FLOAT: { - float num = StringParser::string_to_float<float>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - case TYPE_DOUBLE: { - double num = StringParser::string_to_float<double>(data, len, &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, num); - break; - } - case TYPE_DATE: { - VecDateTimeValue ts_slot; - if (!ts_slot.from_date_str(data, len)) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - ts_slot.cast_to_date(); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, *reinterpret_cast<int64_t*>(&ts_slot)); - break; - } - case TYPE_DATEV2: { - DateV2Value<DateV2ValueType> ts_slot; - if (!ts_slot.from_date_str(data, len)) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - uint32_t int_val = ts_slot.to_date_int_val(); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, int_val); - break; - } - case TYPE_DATETIME: { - VecDateTimeValue ts_slot; - if (!ts_slot.from_date_str(data, len)) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - ts_slot.to_datetime(); - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, *reinterpret_cast<int64_t*>(&ts_slot)); - break; - } - case TYPE_DATETIMEV2: { - DateV2Value<DateTimeV2ValueType> ts_slot; - if (!ts_slot.from_date_str(data, len)) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - uint64_t int_val = ts_slot.to_date_int_val(); - reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, int_val); - break; - } - case TYPE_DECIMALV2: { - DecimalV2Value decimal_slot; - if (decimal_slot.parse_from_str(data, len)) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, decimal_slot.value()); - break; - } - case TYPE_DECIMAL32: { - StringParser::ParseResult result = StringParser::PARSE_SUCCESS; - int32_t value = StringParser::string_to_decimal<TYPE_DECIMAL32>( - data, len, type_desc.precision, type_desc.scale, &result); - if (result != StringParser::PARSE_SUCCESS) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, value); - break; - } - case TYPE_DECIMAL64: { - StringParser::ParseResult result = StringParser::PARSE_SUCCESS; - int64_t value = StringParser::string_to_decimal<TYPE_DECIMAL64>( - data, len, type_desc.precision, type_desc.scale, &result); - if (result != StringParser::PARSE_SUCCESS) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, value); - break; - } - case TYPE_DECIMAL128I: { - StringParser::ParseResult result = StringParser::PARSE_SUCCESS; - vectorized::Int128 value = StringParser::string_to_decimal<TYPE_DECIMAL128I>( - data, len, type_desc.precision, type_desc.scale, &result); - if (result != StringParser::PARSE_SUCCESS) { - parse_result = StringParser::PARSE_FAILURE; - break; - } - reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr) - ->get_data() - .resize_fill(origin_size + rows, value); - break; - } - case TYPE_ARRAY: { - auto col = reinterpret_cast<vectorized::ColumnArray*>(col_ptr); - - std::vector<std::pair<size_t, size_t>> ranges; - for (size_t i = 0, from = 0; i <= len; i++) { - if (i < len && data[i] != array_delimiter && data[i] != _collection_delimiter) { - continue; - } - ranges.push_back({from, i - from}); - from = i + 1; - } - - auto sub_type = type_desc.children[0]; - for (int i = 0; i < rows; i++) { - for (auto range : ranges) { - _write_data(sub_type, &col->get_data(), data + range.first, range.second, - copy_string, need_escape, 1, array_delimiter + 1); - } - col->get_offsets().push_back(col->get_offsets().back() + ranges.size()); - } - - break; - } - case TYPE_MAP: { - auto col = reinterpret_cast<vectorized::ColumnMap*>(col_ptr); - - std::vector<std::array<size_t, 3>> ranges; - for (size_t i = 0, from = 0, kv = 0; i <= len; i++) { - /* - * In hive , when you special map key and value delimiter as ':' - * for map<int,timestamp> column , the query result is correct , but - * for map<timestamp, int> column and map<timestamp,timestamp> column , the query result is incorrect, - * because this field have many '_map_kv_delimiter'. - * - * So i use 'kv <= from' in order to get _map_kv_delimiter that appears first. - * */ - if (i < len && data[i] == _map_kv_delimiter && kv <= from) { - kv = i; - continue; - } - if ((i == len || data[i] == _collection_delimiter) && i >= kv + 1) { - ranges.push_back({from, kv, i - 1}); - from = i + 1; - kv = from; - } - } - - auto key_type = type_desc.children[0]; - auto value_type = type_desc.children[1]; - - for (int i = 0; i < rows; i++) { - for (auto range : ranges) { - _write_data(key_type, &col->get_keys(), data + range[0], range[1] - range[0], - copy_string, need_escape, 1, array_delimiter + 1); - - _write_data(value_type, &col->get_values(), data + range[1] + 1, - range[2] - range[1], copy_string, need_escape, 1, array_delimiter + 1); - } - - col->get_offsets().push_back(col->get_offsets().back() + ranges.size()); - } - - break; - } - case TYPE_STRUCT: { - auto col = reinterpret_cast<vectorized::ColumnStruct*>(col_ptr); - - std::vector<std::pair<size_t, size_t>> ranges; - for (size_t i = 0, from = 0; i <= len; i++) { - if (i == len || data[i] == _collection_delimiter) { - ranges.push_back({from, i - from}); - from = i + 1; - } - } - for (int i = 0; i < rows; i++) { - for (size_t loc = 0; loc < col->get_columns().size(); loc++) { - _write_data(type_desc.children[loc], &col->get_column(loc), - data + ranges[loc].first, ranges[loc].second, copy_string, need_escape, - rows, array_delimiter + 1); - } - } - break; - } - default: - DCHECK(false) << "bad slot type: " << type_desc; - break; - } - - if (UNLIKELY(parse_result == StringParser::PARSE_FAILURE)) { - if (is_null_able) { - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr); - size_t size = nullable_column->get_null_map_data().size(); - doris::vectorized::NullMap& null_map_data = nullable_column->get_null_map_data(); - for (int i = 1; i <= rows; ++i) { - null_map_data[size - i] = 1; - } - nullable_column->get_nested_column().insert_many_defaults(rows); - } - return false; - } - return true; -} - -bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, - vectorized::IColumn* nullable_col_ptr, const char* data, - size_t len, bool copy_string, bool need_escape, size_t rows) { - return _write_data(slot_desc->type(), nullable_col_ptr, data, len, copy_string, need_escape, - rows, '\2'); -} - -void TextConverter::unescape_string_on_spot(const char* src, size_t* len) { - const char* start = src; - char* dest_ptr = const_cast<char*>(src); - const char* end = src + *len; - bool escape_next_char = false; - - while (src < end) { - if (*src == _escape_char) { - escape_next_char = !escape_next_char; - } else { - escape_next_char = false; - } - - if (escape_next_char) { - ++src; - } else { - *dest_ptr++ = *src++; - } - } - - *len = dest_ptr - start; -} - -} // namespace doris diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h deleted file mode 100644 index 8a60c745231..00000000000 --- a/be/src/exec/text_converter.h +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <cstddef> - -#include "vec/columns/column.h" - -namespace doris { - -class SlotDescriptor; - -// Helper class for dealing with text data, e.g., converting text data to -// numeric types, etc. -class TextConverter { -public: - static constexpr char NULL_STR[3] = {'\\', 'N', '\0'}; - - TextConverter(char escape_char, char collection_delimiter = '\2', char map_kv_delimiter = '\3'); - - inline void write_string_column(const SlotDescriptor* slot_desc, - vectorized::MutableColumnPtr* column_ptr, const char* data, - size_t len) { - return write_string_column(slot_desc, column_ptr, data, len, false); - } - - void write_string_column(const SlotDescriptor* slot_desc, - vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len, - bool need_escape); - - inline bool write_column(const SlotDescriptor* slot_desc, - vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len, - bool copy_string, bool need_escape) { - vectorized::IColumn* nullable_col_ptr = column_ptr->get(); - return write_vec_column(slot_desc, nullable_col_ptr, data, len, copy_string, need_escape); - } - - inline bool write_vec_column(const SlotDescriptor* slot_desc, - vectorized::IColumn* nullable_col_ptr, const char* data, - size_t len, bool copy_string, bool need_escape) { - return write_vec_column(slot_desc, nullable_col_ptr, data, len, copy_string, need_escape, - 1); - } - - /// Write consecutive rows of the same data. - bool write_vec_column(const SlotDescriptor* slot_desc, vectorized::IColumn* nullable_col_ptr, - const char* data, size_t len, bool copy_string, bool need_escape, - size_t rows); - void unescape_string_on_spot(const char* src, size_t* len); - - void set_collection_delimiter(char collection_delimiter) { - _collection_delimiter = collection_delimiter; - } - void set_map_kv_delimiter(char mapkv_delimiter) { _map_kv_delimiter = mapkv_delimiter; } - - inline void set_escape_char(const char escape) { this->_escape_char = escape; } - -private: - bool _write_data(const TypeDescriptor& type_desc, vectorized::IColumn* nullable_col_ptr, - const char* data, size_t len, bool copy_string, bool need_escape, size_t rows, - char array_delimiter); - - char _escape_char; - - //struct,array and map delimiter - char _collection_delimiter; - - //map key and value delimiter - char _map_kv_delimiter; -}; - -} // namespace doris diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 0839d1ed026..5cb29ed4e77 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -36,7 +36,6 @@ #include "common/status.h" #include "exec/decompressor.h" #include "exec/line_reader.h" -#include "exec/text_converter.h" #include "io/file_factory.h" #include "io/fs/broker_file_reader.h" #include "io/fs/buffered_reader.h" diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index d06ae471bf5..f22922c48c1 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -40,7 +40,6 @@ namespace doris { class LineReader; -class TextConverter; class Decompressor; class SlotDescriptor; class RuntimeProfile; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 212540c0d8e..ca64f527653 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -267,7 +267,6 @@ Status OrcReader::init_reader( not_single_slot_filter_conjuncts->end()); } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; - _text_converter.reset(new TextConverter('\\')); _obj_pool = std::make_shared<ObjectPool>(); { SCOPED_RAW_TIMER(&_statistics.create_reader_time); @@ -836,15 +835,30 @@ Status OrcReader::_fill_partition_columns( Block* block, size_t rows, const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& partition_columns) { + DataTypeSerDe::FormatOptions _text_formatOptions; for (auto& kv : partition_columns) { auto doris_column = block->get_by_name(kv.first).column; IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); auto& [value, slot_desc] = kv.second; - if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()), - value.size(), true, false, rows)) { + auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); + Slice slice(value.data(), value.size()); + vector<Slice> slices(rows); + for (int i = 0; i < rows; i++) { + slices[i] = {value.data(), value.size()}; + } + int num_deserialized = 0; + if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized, + _text_formatOptions) != Status::OK()) { return Status::InternalError("Failed to fill partition column: {}={}", slot_desc->col_name(), value); } + if (num_deserialized != rows) { + return Status::InternalError( + "Failed to fill partition column: {}={} ." + "Number of rows expected to be written : {}, number of rows actually written : " + "{}", + slot_desc->col_name(), value, num_deserialized, rows); + } } return Status::OK(); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index feb1b700626..801becf0688 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -33,7 +33,6 @@ #include "common/config.h" #include "common/status.h" #include "exec/olap_common.h" -#include "exec/text_converter.h" #include "io/file_factory.h" #include "io/fs/file_reader.h" #include "io/fs/file_reader_writer_fwd.h" @@ -539,7 +538,6 @@ private: bool _is_acid = false; std::unique_ptr<IColumn::Filter> _filter = nullptr; LazyReadContext _lazy_read_ctx; - std::unique_ptr<TextConverter> _text_converter = nullptr; const TransactionalHiveReader::AcidRowIDSet* _delete_rows = nullptr; std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 9cde2f1d70b..14d6e00dbd9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -116,7 +116,6 @@ Status RowGroupReader::init( not_single_slot_filter_conjuncts->end()); } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; - _text_converter.reset(new TextConverter('\\')); _merge_read_ranges(row_ranges); if (_read_columns.empty()) { // Query task that only select columns in path. @@ -615,15 +614,30 @@ Status RowGroupReader::_fill_partition_columns( Block* block, size_t rows, const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& partition_columns) { + DataTypeSerDe::FormatOptions _text_formatOptions; for (auto& kv : partition_columns) { auto doris_column = block->get_by_name(kv.first).column; IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); auto& [value, slot_desc] = kv.second; - if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()), - value.size(), true, false, rows)) { + auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); + Slice slice(value.data(), value.size()); + vector<Slice> slices(rows); + for (int i = 0; i < rows; i++) { + slices[i] = {value.data(), value.size()}; + } + int num_deserialized = 0; + if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized, + _text_formatOptions) != Status::OK()) { return Status::InternalError("Failed to fill partition column: {}={}", slot_desc->col_name(), value); } + if (num_deserialized != rows) { + return Status::InternalError( + "Failed to fill partition column: {}={} ." + "Number of rows expected to be written : {}, number of rows actually written : " + "{}", + slot_desc->col_name(), value, num_deserialized, rows); + } } return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 1461e16b803..aeb1404b12c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -27,7 +27,6 @@ #include <utility> #include <vector> -#include "exec/text_converter.h" #include "io/fs/file_reader_writer_fwd.h" #include "vec/columns/column.h" #include "vec/common/allocator.h" @@ -205,7 +204,6 @@ private: int64_t _lazy_read_filtered_rows = 0; // If continuous batches are skipped, we can cache them to skip a whole page size_t _cached_filtered_rows = 0; - std::unique_ptr<TextConverter> _text_converter = nullptr; std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr = nullptr; int64_t _total_read_rows = 0; const TupleDescriptor* _tuple_descriptor; diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp index 2b371bbea7c..55c52edfcba 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -89,12 +89,14 @@ Status NewOdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con return Status::InternalError("new a odbc scanner failed."); } - _text_converter.reset(new (std::nothrow) TextConverter('\\')); + _text_serdes = create_data_type_serdes(_tuple_desc->slots()); - if (_text_converter == nullptr) { - return Status::InternalError("new a text convertor failed."); + for (int i = 0; i < _tuple_desc->slots().size(); ++i) { + auto& slot_desc = _tuple_desc->slots()[i]; + if (slot_desc->is_materialized() && _text_serdes[i].get() == nullptr) { + return Status::InternalError("new a {} serde failed.", slot_desc->type().type); + } } - _is_init = true; return Status::OK(); @@ -199,8 +201,10 @@ Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* "table={}, column={}, buffer_length", _table_name, slot_desc->col_name(), column_data.buffer_length); } else { - if (!_text_converter->write_column(slot_desc, &columns[column_index], - value_data, value_len, true, false)) { + Slice slice(value_data, value_len); + if (_text_serdes[column_index]->deserialize_one_cell_from_json( + *columns[column_index].get(), slice, _text_formatOptions) != + Status::OK()) { std::stringstream ss; ss << "Fail to convert odbc value:'" << value_data << "' to " << slot_desc->type() << " on column:`" << slot_desc->col_name() + "`"; diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h index e75b6465a05..9305f8361d1 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.h +++ b/be/src/vec/exec/scan/new_odbc_scanner.h @@ -26,7 +26,6 @@ #include "common/global_types.h" #include "common/status.h" #include "exec/odbc_connector.h" -#include "exec/text_converter.h" #include "vec/exec/scan/vscanner.h" namespace doris { @@ -82,6 +81,7 @@ private: std::unique_ptr<ODBCConnector> _odbc_connector; ODBCConnectorParam _odbc_param; // Helper class for converting text to other types; - std::unique_ptr<TextConverter> _text_converter; + DataTypeSerDeSPtrs _text_serdes; + DataTypeSerDe::FormatOptions _text_formatOptions; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 44fa7dc240b..1da53c114e9 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -446,15 +446,30 @@ Status VFileScanner::_cast_to_input_block(Block* block) { } Status VFileScanner::_fill_columns_from_path(size_t rows) { + DataTypeSerDe::FormatOptions _text_formatOptions; for (auto& kv : *_partition_columns) { auto doris_column = _src_block_ptr->get_by_name(kv.first).column; IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); auto& [value, slot_desc] = kv.second; - if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()), - value.size(), true, false, rows)) { + auto _text_serde = slot_desc->get_data_type_ptr()->get_serde(); + Slice slice(value.data(), value.size()); + vector<Slice> slices(rows); + for (int i = 0; i < rows; i++) { + slices[i] = {value.data(), value.size()}; + } + int num_deserialized = 0; + if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized, + _text_formatOptions) != Status::OK()) { return Status::InternalError("Failed to fill partition column: {}={}", slot_desc->col_name(), value); } + if (num_deserialized != rows) { + return Status::InternalError( + "Failed to fill partition column: {}={} ." + "Number of rows expected to be written : {}, number of rows actually written : " + "{}", + slot_desc->col_name(), value, num_deserialized, rows); + } } return Status::OK(); } @@ -920,7 +935,7 @@ Status VFileScanner::_generate_fill_columns() { const char* data = column_from_path.c_str(); size_t size = column_from_path.size(); if (size == 4 && memcmp(data, "null", 4) == 0) { - data = TextConverter::NULL_STR; + data = const_cast<char*>("\\N"); } _partition_columns->emplace(slot_desc->col_name(), std::make_tuple(data, slot_desc)); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index d5b1edb9e50..b7ceefe775c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -30,7 +30,6 @@ #include "common/global_types.h" #include "common/status.h" #include "exec/olap_common.h" -#include "exec/text_converter.h" #include "io/io_common.h" #include "pipeline/exec/file_scan_operator.h" #include "runtime/descriptors.h" @@ -93,7 +92,6 @@ protected: Status _cast_src_block(Block* block) { return Status::OK(); } protected: - std::unique_ptr<TextConverter> _text_converter; const TFileScanRangeParams* _params; const std::vector<TFileRangeDesc>& _ranges; int _next_range; diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp index 1d620141b9a..d744a54470c 100644 --- a/be/src/vec/exec/vmysql_scan_node.cpp +++ b/be/src/vec/exec/vmysql_scan_node.cpp @@ -19,7 +19,6 @@ #include <gen_cpp/PlanNodes_types.h> -#include "exec/text_converter.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "util/types.h" @@ -77,10 +76,13 @@ Status VMysqlScanNode::prepare(RuntimeState* state) { return Status::InternalError("new a mysql scanner failed."); } - _text_converter.reset(new (std::nothrow) TextConverter('\\')); + _text_serdes = create_data_type_serdes(_tuple_desc->slots()); - if (_text_converter == nullptr) { - return Status::InternalError("new a text convertor failed."); + for (int i = 0; i < _slot_num; ++i) { + auto& slot_desc = _tuple_desc->slots()[i]; + if (slot_desc->is_materialized() && _text_serdes[i].get() == nullptr) { + return Status::InternalError("new a {} serde failed.", slot_desc->type().type); + } } _is_init = true; @@ -177,8 +179,14 @@ Status VMysqlScanNode::get_next(RuntimeState* state, vectorized::Block* block, b slot_desc->col_name()); } } else { - RETURN_IF_ERROR( - write_text_column(data[j], length[j], slot_desc, &columns[i], state)); + Slice slice(data[j], length[j]); + if (_text_serdes[i]->deserialize_one_cell_from_hive_text( + *columns[i].get(), slice, _text_formatOptions) != Status::OK()) { + std::stringstream ss; + ss << "Fail to convert mysql value:'" << data[j] << "' to " + << slot_desc->type() << " on column:`" << slot_desc->col_name() + "`"; + return Status::InternalError(ss.str()); + } } j++; } @@ -200,18 +208,6 @@ Status VMysqlScanNode::get_next(RuntimeState* state, vectorized::Block* block, b return Status::OK(); } -Status VMysqlScanNode::write_text_column(char* value, int value_length, SlotDescriptor* slot, - vectorized::MutableColumnPtr* column_ptr, - RuntimeState* state) { - if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) { - std::stringstream ss; - ss << "Fail to convert mysql value:'" << value << "' to " << slot->type() << " on column:`" - << slot->col_name() + "`"; - return Status::InternalError(ss.str()); - } - return Status::OK(); -} - Status VMysqlScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); diff --git a/be/src/vec/exec/vmysql_scan_node.h b/be/src/vec/exec/vmysql_scan_node.h index cdcfd205e29..742b94eaf50 100644 --- a/be/src/vec/exec/vmysql_scan_node.h +++ b/be/src/vec/exec/vmysql_scan_node.h @@ -20,12 +20,10 @@ #include <memory> #include "exec/scan_node.h" -#include "exec/text_converter.h" #include "runtime/descriptors.h" #include "vec/exec/scan/mysql_scanner.h" namespace doris { -class TextConverter; class TupleDescriptor; class RuntimeState; class Status; @@ -37,7 +35,7 @@ public: VMysqlScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~VMysqlScanNode() override = default; - // initialize mysql_scanner, and create text_converter. + // initialize mysql_scanner, and create text_serde. Status prepare(RuntimeState* state) override; // Start MySQL scan using mysql_scanner. @@ -54,8 +52,6 @@ public: Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; private: - Status write_text_column(char* value, int value_length, SlotDescriptor* slot, - vectorized::MutableColumnPtr* column_ptr, RuntimeState* state); // Write debug string of this into out. void debug_string(int indentation_level, std::stringstream* out) const override; @@ -79,7 +75,8 @@ private: // Jni helper for scanning an HBase table. std::unique_ptr<MysqlScanner> _mysql_scanner; // Helper class for converting text to other types; - std::unique_ptr<TextConverter> _text_converter; + DataTypeSerDeSPtrs _text_serdes; + DataTypeSerDe::FormatOptions _text_formatOptions; }; } // namespace vectorized } // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org