This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c631f4f8a8820efbf634edcfa45595fe2fcffe73 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Apr 22 11:34:13 2024 +0800 [fix](schema change) resolve the use count check of source logical column (#33932) Fix error like: ``` 8# google::LogMessageFatal::~LogMessageFatal() in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 9# doris::vectorized::Block::clear_column_data(int) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 10# doris::vectorized::ParquetReader::get_next_block(doris::vectorized::Block*, unsigned long*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/format/parquet/vparquet_reader.cpp:514 11# doris::vectorized::VFileScanner::_get_block_impl(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vfile_scanner.cpp:333 12# doris::vectorized::VScanner::get_block(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vscanner.cpp:132 13# doris::vectorized::VScanner::get_block_after_projects(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vscanner.cpp:99 ``` Because source logical column is the destination logical column if logical converter is consistent. Previously, the reference of column was reset after the conversion was completed, but if an EOF occurred, it was returned in advance, but EOF is not a true error. ``` if (_logical_converter->is_consistent()) { // If logical converter is consistent, _src_logical_column is the final destination column, // other components will check the use count _src_logical_column.reset(); } ``` --- .../exec/format/parquet/parquet_column_convert.cpp | 13 ---- .../exec/format/parquet/parquet_column_convert.h | 74 ++++++++++++++-------- .../exec/format/parquet/vparquet_column_reader.cpp | 4 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 3 +- 4 files changed, 49 insertions(+), 45 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.cpp b/be/src/vec/exec/format/parquet/parquet_column_convert.cpp index 9afc394843e..57f1f54b7b9 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.cpp +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.cpp @@ -108,19 +108,6 @@ ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type s } // remove the old cached data _cached_src_physical_column->assume_mutable()->clear(); - if (is_consistent()) { - if (dst_logical_type->is_nullable()) { - auto doris_nullable_column = const_cast<ColumnNullable*>( - static_cast<const ColumnNullable*>(dst_logical_column.get())); - _src_logical_column = ColumnNullable::create( - _cached_src_physical_column, doris_nullable_column->get_null_map_column_ptr()); - } else { - _src_logical_column = _cached_src_physical_column; - } - } else { - _src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_column, - dst_logical_type); - } if (dst_logical_type->is_nullable()) { // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.h b/be/src/vec/exec/format/parquet/parquet_column_convert.h index ede6e426488..bc6bc232327 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.h +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.h @@ -160,7 +160,6 @@ class PhysicalToLogicalConverter { protected: ColumnPtr _cached_src_physical_column = nullptr; DataTypePtr _cached_src_physical_type = nullptr; - ColumnPtr _src_logical_column = nullptr; std::unique_ptr<converter::ColumnTypeConverter> _logical_converter = nullptr; std::string _error_msg; @@ -179,18 +178,37 @@ public: PhysicalToLogicalConverter() = default; virtual ~PhysicalToLogicalConverter() = default; - virtual Status physical_convert(ColumnPtr& src_physical_col) { return Status::OK(); } + virtual Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) { + return Status::OK(); + } - Status convert(ColumnPtr& src_physical_col, MutableColumnPtr& dst_logical_col) { - // convert physical values and save in _src_logical_column - RETURN_IF_ERROR(physical_convert(src_physical_col)); - RETURN_IF_ERROR(_logical_converter->convert(_src_logical_column, dst_logical_col)); - if (_logical_converter->is_consistent()) { - // If logical converter is consistent, _src_logical_column is the final destination column, - // other components will check the use count - _src_logical_column.reset(); + Status convert(ColumnPtr& src_physical_col, TypeDescriptor src_logical_type, + const DataTypePtr& dst_logical_type, ColumnPtr& dst_logical_col, + bool is_dict_filter) { + if (is_dict_filter) { + src_logical_type = TypeDescriptor(PrimitiveType::TYPE_INT); } - return Status::OK(); + if (is_consistent() && _logical_converter->is_consistent()) { + return Status::OK(); + } + ColumnPtr src_logical_column; + if (is_consistent()) { + if (dst_logical_type->is_nullable()) { + auto doris_nullable_column = const_cast<ColumnNullable*>( + static_cast<const ColumnNullable*>(dst_logical_col.get())); + src_logical_column = + ColumnNullable::create(_cached_src_physical_column, + doris_nullable_column->get_null_map_column_ptr()); + } else { + src_logical_column = _cached_src_physical_column; + } + } else { + src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_col, + dst_logical_type); + } + RETURN_IF_ERROR(physical_convert(src_physical_col, src_logical_column)); + auto converted_column = dst_logical_col->assume_mutable(); + return _logical_converter->convert(src_logical_column, converted_column); } virtual ColumnPtr get_physical_column(tparquet::Type::type src_physical_type, @@ -227,7 +245,7 @@ public: bool support() override { return false; } - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { return Status::InternalError("Unsupported physical to logical type: {}", _error_msg); } }; @@ -235,11 +253,11 @@ public: // for tinyint, smallint template <PrimitiveType IntPrimitiveType> class LittleIntPhysicalConverter : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using DstCppType = typename PrimitiveTypeTraits<IntPrimitiveType>::CppType; using DstColumnType = typename PrimitiveTypeTraits<IntPrimitiveType>::ColumnType; ColumnPtr from_col = remove_nullable(src_physical_col); - MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = from_col->size(); // always comes from tparquet::Type::INT32 @@ -262,9 +280,9 @@ private: public: FixedSizeBinaryConverter(int type_length) : _type_length(type_length) {} - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr from_col = remove_nullable(src_physical_col); - MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable(); auto* src_data = static_cast<const ColumnUInt8*>(from_col.get()); size_t length = src_data->size(); @@ -294,9 +312,9 @@ class FixedSizeToDecimal : public PhysicalToLogicalConverter { public: FixedSizeToDecimal(int32_t type_length) : _type_length(type_length) {} - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); #define M(FixedTypeLength, ValueCopyType) \ case FixedTypeLength: \ @@ -372,10 +390,10 @@ private: template <typename DecimalType, DecimalScaleParams::ScaleType ScaleType> class StringToDecimal : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using ValueCopyType = DecimalType::NativeType; ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); DecimalScaleParams& scale_params = _convert_params->decimal_scale; @@ -413,10 +431,10 @@ class StringToDecimal : public PhysicalToLogicalConverter { template <typename NumberType, typename DecimalType, DecimalScaleParams::ScaleType ScaleType> class NumberToDecimal : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using ValueCopyType = DecimalType::NativeType; ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); auto* src_data = @@ -441,9 +459,9 @@ class NumberToDecimal : public PhysicalToLogicalConverter { }; class Int32ToDate : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); size_t start_idx = dst_col->size(); @@ -463,9 +481,9 @@ class Int32ToDate : public PhysicalToLogicalConverter { }; struct Int64ToTimestamp : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); size_t start_idx = dst_col->size(); @@ -487,9 +505,9 @@ struct Int64ToTimestamp : public PhysicalToLogicalConverter { }; struct Int96toTimestamp : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size() / sizeof(ParquetInt96); auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 71b103d7fdb..2a3782ab449 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -565,8 +565,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } } while (false); - auto converted_column = doris_column->assume_mutable(); - return _converter->convert(resolved_column, converted_column); + return _converter->convert(resolved_column, _field_schema->type, type, doris_column, + is_dict_filter); } Status ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_reader, diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 0ad5f83c1de..4dfbd6a380f 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -268,8 +268,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column chunk_reader.decode_values(data_column, resolved_type, run_length_map, false)); } } - auto converted_column = doris_column->assume_mutable(); - return _converter->convert(src_column, converted_column); + return _converter->convert(src_column, field_schema->type, data_type, doris_column, false); } // Only the unit test depend on this, but it is wrong, should not use TTupleDesc to create tuple desc, not --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org