This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d80b7b9689 [feature-wip](new-scan) support more load situation (#12953) d80b7b9689 is described below commit d80b7b9689da6da4042ed25333a02386c9238f9b Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Sep 27 21:48:32 2022 +0800 [feature-wip](new-scan) support more load situation (#12953) --- be/src/exec/arrow/arrow_reader.cpp | 15 +- be/src/exec/arrow/arrow_reader.h | 2 + be/src/exec/arrow/orc_reader.cpp | 25 +- be/src/exec/arrow/orc_reader.h | 5 + be/src/vec/CMakeLists.txt | 1 - be/src/vec/columns/column_const.h | 2 +- be/src/vec/exec/file_hdfs_scanner.cpp | 98 ------- be/src/vec/exec/file_hdfs_scanner.h | 57 ---- be/src/vec/exec/file_scan_node.cpp | 10 +- be/src/vec/exec/format/generic_reader.h | 4 + be/src/vec/exec/format/parquet/vparquet_reader.cpp | 57 ++-- be/src/vec/exec/format/parquet/vparquet_reader.h | 16 +- be/src/vec/exec/scan/vfile_scanner.cpp | 300 +++++++++++++++++---- be/src/vec/exec/scan/vfile_scanner.h | 61 +++-- be/src/vec/exec/scan/vscan_node.h | 1 + be/src/vec/exec/scan/vscanner.h | 4 - be/src/vec/exprs/vexpr_context.cpp | 2 +- be/src/vec/exprs/vliteral.cpp | 3 +- be/src/vec/utils/arrow_column_to_doris_column.cpp | 56 ++++ be/src/vec/utils/arrow_column_to_doris_column.h | 4 +- be/test/vec/exec/parquet/parquet_reader_test.cpp | 120 +-------- .../planner/external/ExternalFileScanNode.java | 74 ++++- .../doris/planner/external/FileScanProviderIf.java | 3 + .../doris/planner/external/HiveScanProvider.java | 6 + .../doris/planner/external/LoadScanProvider.java | 18 +- gensrc/thrift/PlanNodes.thrift | 11 +- 26 files changed, 535 insertions(+), 420 deletions(-) diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index d26efd32aa..72d4960a43 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -79,10 +79,7 @@ Status ArrowReaderWrap::column_indices() { if (iter != _map_column.end()) { _include_column_ids.emplace_back(iter->second); } else { - std::stringstream str_error; - str_error << "Invalid Column Name:" << slot_desc->col_name(); - LOG(WARNING) << str_error.str(); - return Status::InvalidArgument(str_error.str()); + _missing_cols.push_back(slot_desc->col_name()); } } return Status::OK(); @@ -103,10 +100,13 @@ int ArrowReaderWrap::get_column_index(std::string column_name) { Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) { size_t rows = 0; + bool tmp_eof = false; do { if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { - RETURN_IF_ERROR(next_batch(&_batch, eof)); - if (*eof) { + RETURN_IF_ERROR(next_batch(&_batch, &tmp_eof)); + // We need to make sure the eof is set to true iff block is empty. + if (tmp_eof) { + *eof = (rows == 0); return Status::OK(); } } @@ -128,7 +128,7 @@ Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) { } rows += num_elements; _arrow_batch_cur_idx += num_elements; - } while (!(*eof) && rows < _state->batch_size()); + } while (!tmp_eof && rows < _state->batch_size()); return Status::OK(); } @@ -138,7 +138,6 @@ Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, b if (_batch_eof) { _include_column_ids.clear(); *eof = true; - _batch_eof = false; return Status::OK(); } _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 35703e4bbd..2d83a1be01 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -137,6 +137,8 @@ protected: // The following fields are only valid when using "get_block()" interface. std::shared_ptr<arrow::RecordBatch> _batch; size_t _arrow_batch_cur_idx = 0; + // Save col names which need to be read but does not exist in file + std::vector<std::string> _missing_cols; }; } // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 65a67909ba..8f46a9bf21 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -26,6 +26,7 @@ #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "util/string_util.h" +#include "vec/utils/arrow_column_to_doris_column.h" namespace doris { @@ -67,12 +68,11 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, LOG(WARNING) << "failed to read schema, errmsg=" << maybe_schema.status(); return Status::InternalError("Failed to create orc file reader"); } - std::shared_ptr<arrow::Schema> schema = maybe_schema.ValueOrDie(); - for (size_t i = 0; i < schema->num_fields(); ++i) { + _schema = maybe_schema.ValueOrDie(); + for (size_t i = 0; i < _schema->num_fields(); ++i) { std::string schemaName = - _case_sensitive ? schema->field(i)->name() : to_lower(schema->field(i)->name()); + _case_sensitive ? _schema->field(i)->name() : to_lower(_schema->field(i)->name()); // orc index started from 1. - _map_column.emplace(schemaName, i + 1); } RETURN_IF_ERROR(column_indices()); @@ -82,6 +82,23 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, return Status::OK(); } +Status ORCReaderWrap::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + for (size_t i = 0; i < _schema->num_fields(); ++i) { + std::string schema_name = + _case_sensitive ? _schema->field(i)->name() : to_lower(_schema->field(i)->name()); + TypeDescriptor type; + RETURN_IF_ERROR( + vectorized::arrow_type_to_doris_type(_schema->field(i)->type()->id(), &type)); + name_to_type->emplace(schema_name, type); + } + + for (auto& col : _missing_cols) { + missing_cols->insert(col); + } + return Status::OK(); +} + Status ORCReaderWrap::_seek_start_stripe() { // If file was from Hms table, _range_start_offset is started from 3(magic word). // And if file was from load, _range_start_offset is always set to zero. diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index a6455e8400..2d394ccf7d 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -27,6 +27,7 @@ #include "common/status.h" #include "exec/arrow/arrow_reader.h" + namespace doris { // Reader of ORC file @@ -41,6 +42,9 @@ public: const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) override; + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; + private: Status _next_stripe_reader(bool* eof); Status _seek_start_stripe(); @@ -50,6 +54,7 @@ private: private: // orc file reader object std::unique_ptr<arrow::adapters::orc::ORCFileReader> _reader; + std::shared_ptr<arrow::Schema> _schema; bool _cur_file_eof; // is read over? int64_t _range_start_offset; int64_t _range_size; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 16eabe1e45..9632fa13d3 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -231,7 +231,6 @@ set(VEC_FILES exec/file_scanner.cpp exec/file_scan_node.cpp exec/file_text_scanner.cpp - exec/file_hdfs_scanner.cpp exec/format/parquet/vparquet_column_chunk_reader.cpp exec/format/parquet/vparquet_group_reader.cpp exec/format/parquet/vparquet_page_index.cpp diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index f001150bee..75561ae3e8 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -184,7 +184,7 @@ public: return false; } - // bool is_nullable() const override { return is_column_nullable(*data); } + // bool is_nullable() const override { return is_column_nullable(*data); } bool only_null() const override { return data->is_null_at(0); } bool is_numeric() const override { return data->is_numeric(); } bool is_fixed_and_contiguous() const override { return data->is_fixed_and_contiguous(); } diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp deleted file mode 100644 index ec891730c8..0000000000 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "file_hdfs_scanner.h" - -#include "io/file_factory.h" - -namespace doris::vectorized { - -ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, - ScannerCounter* counter) - : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} - -ParquetFileHdfsScanner::~ParquetFileHdfsScanner() { - ParquetFileHdfsScanner::close(); -} - -Status ParquetFileHdfsScanner::open() { - RETURN_IF_ERROR(FileScanner::open()); - if (_ranges.empty()) { - return Status::OK(); - } - RETURN_IF_ERROR(_get_next_reader()); - return Status::OK(); -} - -void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {} - -Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) { - if (_scanner_eof) { - *eof = true; - return Status::OK(); - } - RETURN_IF_ERROR(init_block(block)); - bool range_eof = false; - RETURN_IF_ERROR(_reader->get_next_block(block, &range_eof)); - if (block->rows() > 0) { - _fill_columns_from_path(block, block->rows()); - } - if (range_eof) { - RETURN_IF_ERROR(_get_next_reader()); - *eof = _scanner_eof; - } - return Status::OK(); -} - -Status ParquetFileHdfsScanner::_get_next_reader() { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - const TFileRangeDesc& range = _ranges[_next_range++]; - std::unique_ptr<FileReader> file_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, - file_reader)); - auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - if (tuple_desc->slots().empty()) { - return Status::EndOfFile("No Parquet column need load"); - } - std::vector<std::string> column_names; - for (int i = 0; i < _file_slot_descs.size(); i++) { - column_names.push_back(_file_slot_descs[i]->col_name()); - } - _reader.reset(new ParquetReader(_profile, _params, range, column_names, - _state->query_options().batch_size, - const_cast<cctz::time_zone*>(&_state->timezone_obj()))); - Status status = _reader->init_reader(_conjunct_ctxs); - if (!status.ok()) { - if (status.is_end_of_file()) { - return _get_next_reader(); - } - return status; - } - return Status::OK(); -} - -void ParquetFileHdfsScanner::close() { - FileScanner::close(); -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_hdfs_scanner.h b/be/src/vec/exec/file_hdfs_scanner.h deleted file mode 100644 index b9883b88b5..0000000000 --- a/be/src/vec/exec/file_hdfs_scanner.h +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "common/status.h" -#include "file_scanner.h" -#include "vec/core/block.h" -#include "vec/exec/format/parquet/vparquet_reader.h" - -namespace doris::vectorized { - -class HdfsFileScanner : public FileScanner { -public: - HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) - : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}; -}; - -class ParquetFileHdfsScanner : public HdfsFileScanner { -public: - ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - ~ParquetFileHdfsScanner(); - Status open() override; - - Status get_next(vectorized::Block* block, bool* eof) override; - void close() override; - -protected: - void _init_profiles(RuntimeProfile* profile) override; - -private: - Status _get_next_reader(); - -private: - std::shared_ptr<ParquetReader> _reader; -}; - -} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index dc164f8927..3a3f9634e9 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -30,7 +30,6 @@ #include "util/thread.h" #include "util/types.h" #include "vec/exec/file_arrow_scanner.h" -#include "vec/exec/file_hdfs_scanner.h" #include "vec/exec/file_text_scanner.h" #include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vexpr.h" @@ -459,13 +458,8 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& FileScanner* scan = nullptr; switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: - if (config::parquet_reader_using_internal) { - scan = new ParquetFileHdfsScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, _pre_filter_texprs, counter); - } else { - scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, _pre_filter_texprs, counter); - } + scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); break; case TFileFormatType::FORMAT_ORC: scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params, diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index a98d678fde..d838f4dac1 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -33,6 +33,10 @@ public: std::unordered_map<std::string, TypeDescriptor> map; return map; } + virtual Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + return Status::NotSupported("get_columns is not implemented"); + } virtual ~GenericReader() {} }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6d5718d181..5f595fec75 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -23,11 +23,12 @@ #include "parquet_thrift_util.h" namespace doris::vectorized { -ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, - const TFileRangeDesc& range, +ParquetReader::ParquetReader(RuntimeProfile* profile, FileReader* file_reader, + const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<std::string>& column_names, size_t batch_size, cctz::time_zone* ctz) : _profile(profile), + _file_reader(file_reader), _scan_params(params), _scan_range(range), _batch_size(batch_size), @@ -47,14 +48,15 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams ParquetReader::~ParquetReader() { close(); - if (_group_file_reader != _file_reader.get()) { - delete _group_file_reader; - _group_file_reader = nullptr; - } } void ParquetReader::close() { if (!_closed) { + if (_file_reader != nullptr) { + _file_reader->close(); + delete _file_reader; + } + if (_profile != nullptr) { COUNTER_UPDATE(_filtered_row_groups, _statistics.filtered_row_groups); COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups); @@ -68,26 +70,8 @@ void ParquetReader::close() { } Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) { - if (_file_reader == nullptr) { - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range, - _file_reader, 2048)); - // RowGroupReader has its own underlying buffer, so we should return file reader directly - // If RowGroupReaders use the same file reader with ParquetReader, the file position will change - // when ParquetReader try to read ColumnIndex meta, which causes performance cost - std::unique_ptr<FileReader> group_file_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range, - group_file_reader, 0)); - _group_file_reader = group_file_reader.release(); - RETURN_IF_ERROR(_group_file_reader->open()); - } else { - // test only - _group_file_reader = _file_reader.get(); - } - RETURN_IF_ERROR(_file_reader->open()); - if (_file_reader->size() == 0) { - return Status::EndOfFile("Empty Parquet File"); - } - RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); + CHECK(_file_reader != nullptr); + RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata)); _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { @@ -109,6 +93,8 @@ Status ParquetReader::_init_read_columns() { auto iter = _map_column.find(file_col_name); if (iter != _map_column.end()) { _include_column_ids.emplace_back(iter->second); + } else { + _missing_cols.push_back(file_col_name); } } // The same order as physical columns @@ -133,6 +119,21 @@ std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type( return map; } +Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + auto schema_desc = _file_metadata->schema(); + std::unordered_set<std::string> column_names; + schema_desc.get_column_names(&column_names); + for (auto name : column_names) { + auto field = schema_desc.get_column(name); + name_to_type->emplace(name, field->type); + } + for (auto& col : _missing_cols) { + missing_cols->insert(col); + } + return Status::OK(); +} + Status ParquetReader::get_next_block(Block* block, bool* eof) { int32_t num_of_readers = _row_group_readers.size(); DCHECK(num_of_readers <= _read_row_groups.size()); @@ -166,8 +167,8 @@ Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& c for (auto row_group_id : _read_row_groups) { auto& row_group = _t_metadata->row_groups[row_group_id]; std::shared_ptr<RowGroupReader> row_group_reader; - row_group_reader.reset(new RowGroupReader(_group_file_reader, _read_columns, row_group_id, - row_group, _ctz)); + row_group_reader.reset( + new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz)); std::vector<RowRange> candidate_row_ranges; RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges)); if (candidate_row_ranges.empty()) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index c91bc08059..73848ccd48 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -70,13 +70,14 @@ private: class ParquetReader : public GenericReader { public: - ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, - const TFileRangeDesc& range, const std::vector<std::string>& column_names, - size_t batch_size, cctz::time_zone* ctz); + ParquetReader(RuntimeProfile* profile, FileReader* file_reader, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + const std::vector<std::string>& column_names, size_t batch_size, + cctz::time_zone* ctz); virtual ~ParquetReader(); // for test - void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); } + void set_file_reader(FileReader* file_reader) { _file_reader = file_reader; } Status init_reader(std::vector<ExprContext*>& conjunct_ctxs); @@ -87,6 +88,8 @@ public: int64_t size() const { return _file_reader->size(); } std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override; + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; ParquetStatistics& statistics() { return _statistics; } @@ -120,10 +123,10 @@ private: private: RuntimeProfile* _profile; + // file reader is passed from file scanner, and owned by this parquet reader. + FileReader* _file_reader = nullptr; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; - std::unique_ptr<FileReader> _file_reader = nullptr; - FileReader* _group_file_reader = nullptr; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; @@ -144,6 +147,7 @@ private: std::unordered_map<int, tparquet::OffsetIndex> _col_offsets; const std::vector<std::string> _column_names; + std::vector<std::string> _missing_cols; ParquetStatistics _statistics; bool _closed = false; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 985676eb48..ffc44775e5 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -18,11 +18,13 @@ #include "vec/exec/scan/vfile_scanner.h" #include <fmt/format.h> +#include <thrift/protocol/TDebugProtocol.h> #include <vec/data_types/data_type_factory.hpp> #include "common/logging.h" #include "common/utils.h" +#include "exec/arrow/orc_reader.h" #include "exec/text_converter.hpp" #include "exprs/expr_context.h" #include "runtime/descriptors.h" @@ -49,6 +51,17 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + _get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime"); + _cast_to_input_block_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime"); + _fill_path_columns_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime"); + _fill_missing_columns_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime"); + _pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer"); + _convert_to_output_block_timer = + ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime"); + if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); @@ -64,12 +77,15 @@ Status VFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { _pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree( _state->obj_pool(), _params.pre_filter_exprs, _pre_conjunct_ctx_ptr.get())); - RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, *_src_row_desc)); RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state)); } } + _default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(), + std::vector<TupleId>({_real_tuple_desc->id()}), + std::vector<bool>({false}))); + return Status::OK(); } @@ -79,6 +95,25 @@ Status VFileScanner::open(RuntimeState* state) { return Status::OK(); } +// For query: +// [exist cols] [non-exist cols] [col from path] input ouput +// A B C D E +// _init_src_block x x x x x - x +// get_next_block x x x - - - x +// _cast_to_input_block - - - - - - - +// _fill_columns_from_path - - - - x - x +// _fill_missing_columns - - - x - - x +// _convert_to_output_block - - - - - - - +// +// For load: +// [exist cols] [non-exist cols] [col from path] input ouput +// A B C D E +// _init_src_block x x x x x x - +// get_next_block x x x - - x - +// _cast_to_input_block x x x - - x - +// _fill_columns_from_path - - - - x x - +// _fill_missing_columns - - - x - x - +// _convert_to_output_block - - - - - - x Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { do { if (_cur_reader == nullptr || _cur_reader_eof) { @@ -93,14 +128,20 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo // Init src block for load job based on the data file schema (e.g. parquet) // For query job, simply set _src_block_ptr to block. RETURN_IF_ERROR(_init_src_block(block)); - // Read next block. - RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof)); + { + SCOPED_TIMER(_get_block_timer); + // Read next block. + // Some of column in block may not be filled (column not exist in file) + RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &_cur_reader_eof)); + } if (_src_block_ptr->rows() > 0) { - // Convert the src block columns type to string in place. + // Convert the src block columns type to string in-place. RETURN_IF_ERROR(_cast_to_input_block(block)); // Fill rows in src block with partition columns from path. (e.g. Hive partition columns) RETURN_IF_ERROR(_fill_columns_from_path()); + // Fill columns not exist in file with null or default value + RETURN_IF_ERROR(_fill_missing_columns()); // Apply _pre_conjunct_ctx_ptr to filter src block. RETURN_IF_ERROR(_pre_filter_src_block()); // Convert src block to output block (dest block), string to dest data type and apply filters. @@ -125,13 +166,29 @@ Status VFileScanner::_init_src_block(Block* block) { return Status::OK(); } - _src_block.clear(); + // if (_src_block_init) { + // _src_block.clear_column_data(); + // _src_block_ptr = &_src_block; + // return Status::OK(); + // } - std::unordered_map<std::string, TypeDescriptor> name_to_type = _cur_reader->get_name_to_type(); + _src_block.clear(); size_t idx = 0; + // slots in _input_tuple_desc contains all slots describe in load statement, eg: + // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1" + // _input_tuple_desc will contains: k1, k2, tmp1 + // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1 + // _input_tuple_desc also contains columns from path for (auto& slot : _input_tuple_desc->slots()) { - DataTypePtr data_type = - DataTypeFactory::instance().create_data_type(name_to_type[slot->col_name()], true); + DataTypePtr data_type; + auto it = _name_to_col_type.find(slot->col_name()); + if (it == _name_to_col_type.end()) { + // not exist in file, using type from _input_tuple_desc + data_type = + DataTypeFactory::instance().create_data_type(slot->type(), slot->is_nullable()); + } else { + data_type = DataTypeFactory::instance().create_data_type(it->second, true); + } if (data_type == nullptr) { return Status::NotSupported(fmt::format("Not support arrow type:{}", slot->col_name())); } @@ -141,18 +198,20 @@ Status VFileScanner::_init_src_block(Block* block) { _src_block_name_to_idx.emplace(slot->col_name(), idx++); } _src_block_ptr = &_src_block; + _src_block_init = true; return Status::OK(); } Status VFileScanner::_cast_to_input_block(Block* block) { - if (_src_block_ptr == block) { + if (!_is_load) { return Status::OK(); } + SCOPED_TIMER(_cast_to_input_block_timer); // cast primitive type(PT0) to primitive type(PT1) size_t idx = 0; - for (size_t i = 0; i < _file_slot_descs.size(); ++i) { - SlotDescriptor* slot_desc = _file_slot_descs[i]; - if (slot_desc == nullptr) { + for (auto& slot_desc : _input_tuple_desc->slots()) { + if (_name_to_col_type.find(slot_desc->col_name()) == _name_to_col_type.end()) { + // skip columns which does not exist in file continue; } auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name()); @@ -177,6 +236,7 @@ Status VFileScanner::_fill_columns_from_path() { size_t rows = _src_block_ptr->rows(); const TFileRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { + SCOPED_TIMER(_fill_path_columns_timer); for (const auto& slot_desc : _partition_slot_descs) { if (slot_desc == nullptr) continue; auto it = _partition_slot_index_map.find(slot_desc->id()); @@ -200,11 +260,82 @@ Status VFileScanner::_fill_columns_from_path() { return Status::OK(); } +Status VFileScanner::_fill_missing_columns() { + if (_missing_cols.empty()) { + return Status::OK(); + } + + SCOPED_TIMER(_fill_missing_columns_timer); + int rows = _src_block_ptr->rows(); + for (auto slot_desc : _real_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) { + continue; + } + + auto it = _col_default_value_ctx.find(slot_desc->col_name()); + if (it == _col_default_value_ctx.end()) { + return Status::InternalError("failed to find default value expr for slot: {}", + slot_desc->col_name()); + } + if (it->second == nullptr) { + // no default column, fill with null + auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(_src_block_ptr->get_by_name(slot_desc->col_name()).column)) + .mutate() + .get()); + nullable_column->insert_many_defaults(rows); + } else { + // fill with default value + auto* ctx = it->second; + auto origin_column_num = _src_block_ptr->columns(); + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id)); + bool is_origin_column = result_column_id < origin_column_num; + if (!is_origin_column) { + auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column; + // result_column_ptr maybe a ColumnConst, convert it to a normal column + result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); + auto origin_column_type = _src_block_ptr->get_by_name(slot_desc->col_name()).type; + bool is_nullable = origin_column_type->is_nullable(); + _src_block_ptr->replace_by_position( + _src_block_ptr->get_position_by_name(slot_desc->col_name()), + is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); + _src_block_ptr->erase(result_column_id); + } + } + } + return Status::OK(); +} + +Status VFileScanner::_pre_filter_src_block() { + if (!_is_load) { + return Status::OK(); + } + if (_pre_conjunct_ctx_ptr) { + SCOPED_TIMER(_pre_filter_timer); + auto origin_column_num = _src_block_ptr->columns(); + auto old_rows = _src_block_ptr->rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr, + _src_block_ptr, origin_column_num)); + _counter.num_rows_unselected += old_rows - _src_block.rows(); + } + return Status::OK(); +} + Status VFileScanner::_convert_to_output_block(Block* block) { - if (_src_block_ptr == block) { + if (!_is_load) { return Status::OK(); } + SCOPED_TIMER(_convert_to_output_block_timer); + // The block is passed from scanner context's free blocks, + // which is initialized by src columns. + // But for load job, the block should be filled with dest columns. + // So need to clear it first. block->clear(); int ctx_idx = 0; @@ -217,7 +348,6 @@ Status VFileScanner::_convert_to_output_block(Block* block) { if (!slot_desc->is_materialized()) { continue; } - int dest_index = ctx_idx++; auto* ctx = _dest_vexpr_ctx[dest_index]; @@ -229,13 +359,15 @@ Status VFileScanner::_convert_to_output_block(Block* block) { is_origin_column && _src_block_mem_reuse ? _src_block.get_by_position(result_column_id).column->clone_resized(rows) : _src_block.get_by_position(result_column_id).column; + // column_ptr maybe a ColumnConst, convert it to a normal column + column_ptr = column_ptr->convert_to_full_column_if_const(); DCHECK(column_ptr != nullptr); // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr // is likely to be nullable if (LIKELY(column_ptr->is_nullable())) { - auto nullable_column = + const ColumnNullable* nullable_column = reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); for (int i = 0; i < rows; ++i) { if (filter_map[i] && nullable_column->is_null_at(i)) { @@ -280,10 +412,10 @@ Status VFileScanner::_convert_to_output_block(Block* block) { } } if (!slot_desc->is_nullable()) { - column_ptr = nullable_column->get_nested_column_ptr(); + column_ptr = remove_nullable(column_ptr); } } else if (slot_desc->is_nullable()) { - column_ptr = vectorized::make_nullable(column_ptr); + column_ptr = make_nullable(column_ptr); } block->insert(dest_index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), @@ -308,52 +440,61 @@ Status VFileScanner::_convert_to_output_block(Block* block) { return Status::OK(); } -Status VFileScanner::_pre_filter_src_block() { - if (_pre_conjunct_ctx_ptr) { - auto origin_column_num = _src_block_ptr->columns(); - auto old_rows = _src_block_ptr->rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctx_ptr, - _src_block_ptr, origin_column_num)); - _counter.num_rows_unselected += old_rows - _src_block.rows(); - } - return Status::OK(); -} - Status VFileScanner::_get_next_reader() { - if (_cur_reader != nullptr) { - delete _cur_reader; - _cur_reader = nullptr; - } while (true) { + _cur_reader.reset(nullptr); + _src_block_init = false; if (_next_range >= _ranges.size()) { _scanner_eof = true; return Status::OK(); } const TFileRangeDesc& range = _ranges[_next_range++]; - std::vector<std::string> column_names; + + // 1. create file reader + std::unique_ptr<FileReader> file_reader; + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, + range, file_reader)); + RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { + file_reader->close(); + continue; + } + + // 2. create reader for specific format + // TODO: add csv, json, avro + Status init_status; switch (_params.format_type) { case TFileFormatType::FORMAT_PARQUET: { - for (int i = 0; i < _file_slot_descs.size(); i++) { - column_names.push_back(_file_slot_descs[i]->col_name()); - } - _cur_reader = new ParquetReader(_profile, _params, range, column_names, - _state->query_options().batch_size, - const_cast<cctz::time_zone*>(&_state->timezone_obj())); - Status status = ((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs); - if (status.ok()) { - _cur_reader_eof = false; - return status; - } else if (status.is_end_of_file()) { - continue; - } else { - return status; - } + _cur_reader.reset( + new ParquetReader(_profile, file_reader.release(), _params, range, + _file_col_names, _state->query_options().batch_size, + const_cast<cctz::time_zone*>(&_state->timezone_obj()))); + init_status = ((ParquetReader*)(_cur_reader.get()))->init_reader(_conjunct_ctxs); + break; + } + case TFileFormatType::FORMAT_ORC: { + _cur_reader.reset(new ORCReaderWrap(_state, _file_slot_descs, file_reader.release(), + _num_of_columns_from_file, range.start_offset, + range.size, false)); + init_status = + ((ORCReaderWrap*)(_cur_reader.get())) + ->init_reader(_real_tuple_desc, _conjunct_ctxs, _state->timezone()); + break; } default: - std::stringstream error_msg; - error_msg << "Not supported file format " << _params.format_type; - return Status::InternalError(error_msg.str()); + return Status::InternalError("Not supported file format: {}", _params.format_type); } + + if (init_status.is_end_of_file()) { + continue; + } else if (!init_status.ok()) { + return Status::InternalError("failed to init reader for file {}, err: {}", range.path, + init_status.get_error_msg()); + } + + _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); + _cur_reader_eof = false; + break; } return Status::OK(); } @@ -382,6 +523,8 @@ Status VFileScanner::_init_expr_ctxes() { _file_slot_descs.emplace_back(it->second); auto iti = full_src_index_map.find(slot_id); _file_slot_index_map.emplace(slot_id, iti->second); + _file_slot_name_map.emplace(it->second->col_name(), iti->second); + _file_col_names.push_back(it->second->col_name()); } else { _partition_slot_descs.emplace_back(it->second); auto iti = full_src_index_map.find(slot_id); @@ -390,7 +533,9 @@ Status VFileScanner::_init_expr_ctxes() { } if (_is_load) { + // follow desc expr map and src default value expr map is only for load task. bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; + int idx = 0; for (auto slot_desc : _output_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { continue; @@ -402,11 +547,15 @@ Status VFileScanner::_init_expr_ctxes() { } vectorized::VExprContext* ctx = nullptr; - RETURN_IF_ERROR( - vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); - RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc)); - RETURN_IF_ERROR(ctx->open(_state)); + if (!it->second.nodes.empty()) { + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc)); + RETURN_IF_ERROR(ctx->open(_state)); + } _dest_vexpr_ctx.emplace_back(ctx); + _dest_slot_name_to_idx[slot_desc->col_name()] = idx++; + if (has_slot_id_map) { auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { @@ -423,8 +572,49 @@ Status VFileScanner::_init_expr_ctxes() { } } } + + for (auto slot_desc : _real_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + vectorized::VExprContext* ctx = nullptr; + auto it = _params.default_value_of_src_slot.find(slot_desc->id()); + // if does not exist or is empty, the default value will be null + if (it != std::end(_params.default_value_of_src_slot) && !it->second.nodes.empty()) { + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc)); + RETURN_IF_ERROR(ctx->open(_state)); + } + _col_default_value_ctx.emplace(slot_desc->col_name(), ctx); + } } return Status::OK(); } +Status VFileScanner::close(RuntimeState* state) { + if (_is_closed) { + return Status::OK(); + } + + for (auto ctx : _dest_vexpr_ctx) { + if (ctx != nullptr) { + ctx->close(state); + } + } + + for (auto it : _col_default_value_ctx) { + if (it.second != nullptr) { + it.second->close(state); + } + } + + if (_pre_conjunct_ctx_ptr) { + (*_pre_conjunct_ctx_ptr)->close(state); + } + + RETURN_IF_ERROR(VScanner::close(state)); + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 4bad47795d..6608a8bfd0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -46,6 +46,8 @@ public: Status open(RuntimeState* state) override; + Status close(RuntimeState* state) override; + public: Status prepare(VExprContext** vconjunct_ctx_ptr); @@ -65,24 +67,52 @@ protected: const std::vector<TFileRangeDesc>& _ranges; int _next_range; - GenericReader* _cur_reader; + std::unique_ptr<GenericReader> _cur_reader; bool _cur_reader_eof; // File source slot descriptors std::vector<SlotDescriptor*> _file_slot_descs; - // File slot id to index map. + // File slot id to index in _file_slot_descs std::map<SlotId, int> _file_slot_index_map; + // file col name to index in _file_slot_descs + std::map<std::string, int> _file_slot_name_map; + // col names from _file_slot_descs + std::vector<std::string> _file_col_names; // Partition source slot descriptors std::vector<SlotDescriptor*> _partition_slot_descs; - // Partition slot id to index map + // Partition slot id to index in _partition_slot_descs std::map<SlotId, int> _partition_slot_index_map; + // created from param.expr_of_dest_slot + // For query, it saves default value expr of all dest columns, or nullptr for NULL. + // For load, it saves convertion expr/default value of all dest columns. + std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; + // dest slot name to index in _dest_vexpr_ctx; + std::unordered_map<std::string, int> _dest_slot_name_to_idx; + // col name to default value expr + std::unordered_map<std::string, vectorized::VExprContext*> _col_default_value_ctx; + // the map values of dest slot id to src slot desc + // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr + std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; + // dest slot desc index to src slot desc index + std::unordered_map<int, int> _dest_slot_to_src_slot_index; + + std::unordered_map<std::string, size_t> _src_block_name_to_idx; + + // Get from GenericReader, save the existing columns in file to their type. + std::unordered_map<std::string, TypeDescriptor> _name_to_col_type; + // Get from GenericReader, save columns that requried by scan but not exist in file. + // These columns will be filled by default value or null. + std::unordered_set<std::string> _missing_cols; + + // For load task + std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr; + std::unique_ptr<RowDescriptor> _src_row_desc; + // row desc for default exprs + std::unique_ptr<RowDescriptor> _default_val_row_desc; // Mem pool used to allocate _src_tuple and _src_tuple_row std::unique_ptr<MemPool> _mem_pool; - // Dest tuple descriptor and dest expr context - const TupleDescriptor* _dest_tuple_desc; - // Profile RuntimeProfile* _profile; ScannerCounter _counter; @@ -91,22 +121,20 @@ protected: int _rows = 0; int _num_of_columns_from_file; - std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; - - // the map values of dest slot id to src slot desc - // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr - std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; - bool _src_block_mem_reuse = false; bool _strict_mode; + bool _src_block_init = false; Block* _src_block_ptr; Block _src_block; - // dest slot desc index to src slot desc index - std::unordered_map<int, int> _dest_slot_to_src_slot_index; - - std::unordered_map<std::string, size_t> _src_block_name_to_idx; +private: + RuntimeProfile::Counter* _get_block_timer = nullptr; + RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; + RuntimeProfile::Counter* _fill_path_columns_timer = nullptr; + RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; + RuntimeProfile::Counter* _pre_filter_timer = nullptr; + RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; private: Status _init_expr_ctxes(); @@ -114,6 +142,7 @@ private: Status _cast_to_input_block(Block* block); Status _pre_filter_src_block(); Status _convert_to_output_block(Block* block); + Status _fill_missing_columns(); void _reset_counter() { _counter.num_rows_unselected = 0; _counter.num_rows_filtered = 0; diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 6117d487bc..fbcf248a3c 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -37,6 +37,7 @@ public: : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} friend class VScanner; friend class NewOlapScanner; + friend class VFileScanner; friend class ScannerContext; Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index aff7a3a4ed..07313428df 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -137,10 +137,6 @@ protected: // and will be destroyed at the end. std::vector<VExprContext*> _stale_vexpr_ctxs; - // For load scanner - std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr; - std::unique_ptr<RowDescriptor> _src_row_desc; - // num of rows read from scanner int64_t _num_rows_read = 0; diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 019a7d802c..ccb1045cb1 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -32,7 +32,7 @@ VExprContext::VExprContext(VExpr* expr) _stale(false) {} VExprContext::~VExprContext() { - DCHECK(!_prepared || _closed); + DCHECK(!_prepared || _closed) << get_stack_trace(); for (int i = 0; i < _fn_contexts.size(); ++i) { delete _fn_contexts[i]; diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp index d87278b347..d99cdbdd97 100644 --- a/be/src/vec/exprs/vliteral.cpp +++ b/be/src/vec/exprs/vliteral.cpp @@ -194,7 +194,8 @@ Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* r std::string VLiteral::debug_string() const { std::stringstream out; - out << "VLiteral (type = " << _data_type->get_name(); + out << "VLiteral (name = " << _expr_name; + out << ", type = " << _data_type->get_name(); out << ", value = "; if (_column_ptr.get()->size() > 0) { StringRef ref = _column_ptr.get()->get_data_at(0); diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp index eacd1136b5..e71bf970a0 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.cpp +++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp @@ -408,4 +408,60 @@ Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arr return Status::NotSupported( fmt::format("Not support arrow type:{}", arrow_column->type()->name())); } + +Status arrow_type_to_doris_type(arrow::Type::type type, TypeDescriptor* return_type) { + switch (type) { + case arrow::Type::STRING: + case arrow::Type::BINARY: + case arrow::Type::FIXED_SIZE_BINARY: + return_type->type = TYPE_STRING; + break; + case arrow::Type::INT8: + return_type->type = TYPE_TINYINT; + break; + case arrow::Type::UINT8: + case arrow::Type::INT16: + return_type->type = TYPE_SMALLINT; + break; + case arrow::Type::UINT16: + case arrow::Type::INT32: + return_type->type = TYPE_INT; + break; + case arrow::Type::UINT32: + case arrow::Type::INT64: + return_type->type = TYPE_BIGINT; + break; + case arrow::Type::UINT64: + return_type->type = TYPE_LARGEINT; + break; + case arrow::Type::HALF_FLOAT: + case arrow::Type::FLOAT: + return_type->type = TYPE_FLOAT; + break; + case arrow::Type::DOUBLE: + return_type->type = TYPE_DOUBLE; + break; + case arrow::Type::BOOL: + return_type->type = TYPE_BOOLEAN; + break; + case arrow::Type::DATE32: + return_type->type = TYPE_DATEV2; + break; + case arrow::Type::DATE64: + return_type->type = TYPE_DATETIMEV2; + break; + case arrow::Type::TIMESTAMP: + return_type->type = TYPE_BIGINT; + break; + case arrow::Type::DECIMAL: + return_type->type = TYPE_DECIMALV2; + return_type->precision = 27; + return_type->scale = 9; + break; + default: + return Status::InternalError("unsupport type: {}", type); + } + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h b/be/src/vec/utils/arrow_column_to_doris_column.h index 9d5f077672..13edffadae 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.h +++ b/be/src/vec/utils/arrow_column_to_doris_column.h @@ -24,7 +24,7 @@ #include <memory> #include "common/status.h" -#include "runtime/primitive_type.h" +#include "runtime/types.h" #include "vec/core/column_with_type_and_name.h" // This files contains some utilities to convert Doris internal @@ -42,4 +42,6 @@ Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arr ColumnPtr& doris_column, const DataTypePtr& type, size_t num_elements, const cctz::time_zone& ctz); +Status arrow_type_to_doris_type(arrow::Type::type type, TypeDescriptor* return_type); + } // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 23bf6b353f..e8d3339b43 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -22,7 +22,6 @@ #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/data_types/data_type_factory.hpp" -#include "vec/exec/file_hdfs_scanner.h" #include "vec/exec/format/parquet/vparquet_reader.h" namespace doris { @@ -92,6 +91,7 @@ TEST_F(ParquetReaderTest, normal) { auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); LocalFileReader* reader = new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); + reader->open(); cctz::time_zone ctz; TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); @@ -106,8 +106,8 @@ TEST_F(ParquetReaderTest, normal) { scan_range.start_offset = 0; scan_range.size = 1000; } - auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz); - p_reader->set_file_reader(reader); + auto p_reader = + new ParquetReader(nullptr, reader, scan_params, scan_range, column_names, 992, &ctz); RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); runtime_state.init_instance_mem_tracker(); @@ -132,119 +132,5 @@ TEST_F(ParquetReaderTest, normal) { delete p_reader; } -TEST_F(ParquetReaderTest, scanner) { - TDescriptorTable t_desc_table; - TTableDescriptor t_table_desc; - - t_table_desc.id = 0; - t_table_desc.tableType = TTableType::OLAP_TABLE; - t_table_desc.numCols = 7; - t_table_desc.numClusteringCols = 0; - t_desc_table.tableDescriptors.push_back(t_table_desc); - t_desc_table.__isset.tableDescriptors = true; - - // init boolean and numeric slot - std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", "smallint_col", - "int_col", "bigint_col", "float_col", - "double_col"}; - for (int i = 0; i < numeric_types.size(); i++) { - TSlotDescriptor tslot_desc; - { - tslot_desc.id = i; - tslot_desc.parent = 0; - TTypeDesc type; - { - TTypeNode node; - node.__set_type(TTypeNodeType::SCALAR); - TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::type(i + 2)); - node.__set_scalar_type(scalar_type); - type.types.push_back(node); - } - tslot_desc.slotType = type; - tslot_desc.columnPos = 0; - tslot_desc.byteOffset = 0; - tslot_desc.nullIndicatorByte = 1; - tslot_desc.nullIndicatorBit = 1; - tslot_desc.colName = numeric_types[i]; - tslot_desc.slotIdx = 0; - tslot_desc.isMaterialized = true; - t_desc_table.slotDescriptors.push_back(tslot_desc); - } - } - - t_desc_table.__isset.slotDescriptors = true; - { - TTupleDescriptor t_tuple_desc; - t_tuple_desc.id = 0; - t_tuple_desc.byteSize = 16; - t_tuple_desc.numNullBytes = 0; - t_tuple_desc.tableId = 0; - t_tuple_desc.__isset.tableId = true; - t_desc_table.tupleDescriptors.push_back(t_tuple_desc); - } - - // set scan range - // std::vector<TScanRangeParams> scan_ranges; - TFileScanRange file_scan_range; - { - // TScanRangeParams scan_range_params; - // TFileScanRange file_scan_range; - TFileScanRangeParams params; - { - params.__set_src_tuple_id(0); - params.__set_num_of_columns_from_file(7); - params.file_type = TFileType::FILE_LOCAL; - params.format_type = TFileFormatType::FORMAT_PARQUET; - std::vector<TFileScanSlotInfo> file_slots; - for (int i = 0; i < numeric_types.size(); i++) { - TFileScanSlotInfo slot_info; - slot_info.slot_id = i; - slot_info.is_file_slot = true; - file_slots.emplace_back(slot_info); - } - params.__set_required_slots(file_slots); - } - file_scan_range.params = params; - TFileRangeDesc range; - { - range.start_offset = 0; - range.size = 1000; - range.path = "./be/test/exec/test_data/parquet_scanner/type-decoder.parquet"; - std::vector<std::string> columns_from_path {"value"}; - range.__set_columns_from_path(columns_from_path); - } - file_scan_range.ranges.push_back(range); - // scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range); - // scan_ranges.push_back(scan_range_params); - } - - std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>(); - RuntimeState runtime_state((TQueryGlobals())); - runtime_state.init_instance_mem_tracker(); - - DescriptorTbl* desc_tbl; - ObjectPool obj_pool; - DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl); - runtime_state.set_desc_tbl(desc_tbl); - ScannerCounter counter; - std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>(); - auto scan = new ParquetFileHdfsScanner(&runtime_state, runtime_state.runtime_profile(), - file_scan_range.params, file_scan_range.ranges, - pre_filter_texprs, &counter); - scan->reg_conjunct_ctxs(0, conjunct_ctxs); - Status st = scan->open(); - EXPECT_TRUE(st.ok()); - - bool eof = false; - Block* block = new Block(); - scan->get_next(block, &eof); - for (auto& col : block->get_columns_with_type_and_name()) { - ASSERT_EQ(col.column->size(), 10); - } - delete block; - delete scan; -} - } // namespace vectorized } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index da9fddc342..7500c73899 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -35,6 +35,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -45,6 +46,7 @@ import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileScanNode; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TPlanNode; @@ -72,16 +74,23 @@ public class ExternalFileScanNode extends ExternalScanNode { public List<Expr> conjuncts; public TupleDescriptor destTupleDescriptor; - + public Map<String, SlotDescriptor> destSlotDescByName; // === Set when init === public TupleDescriptor srcTupleDescriptor; + public Map<String, SlotDescriptor> srcSlotDescByName; public Map<String, Expr> exprMap; - public Map<String, SlotDescriptor> slotDescByName; public String timezone; // === Set when init === public TFileScanRangeParams params; + public void createDestSlotMap() { + Preconditions.checkNotNull(destTupleDescriptor); + destSlotDescByName = Maps.newHashMap(); + for (SlotDescriptor slot : destTupleDescriptor.getSlots()) { + destSlotDescByName.put(slot.getColumn().getName(), slot); + } + } } public enum Type { @@ -169,7 +178,7 @@ public class ExternalFileScanNode extends ExternalScanNode { break; case LOAD: for (FileGroupInfo fileGroupInfo : fileGroupInfos) { - this.scanProviders.add(new LoadScanProvider(fileGroupInfo)); + this.scanProviders.add(new LoadScanProvider(fileGroupInfo, desc)); } break; default: @@ -186,6 +195,7 @@ public class ExternalFileScanNode extends ExternalScanNode { private void initParamCreateContexts(Analyzer analyzer) throws UserException { for (FileScanProviderIf scanProvider : scanProviders) { ParamCreateContext context = scanProvider.createContext(analyzer); + context.createDestSlotMap(); // set where and preceding filter. // FIXME(cmy): we should support set different expr for different file group. initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer); @@ -255,20 +265,72 @@ public class ExternalFileScanNode extends ExternalScanNode { contexts.size() + " vs. " + scanProviders.size()); for (int i = 0; i < contexts.size(); ++i) { ParamCreateContext context = contexts.get(i); - finalizeParamsForLoad(context, analyzer); FileScanProviderIf scanProvider = scanProviders.get(i); + setDefaultValueExprs(scanProvider, context); + finalizeParamsForLoad(context, analyzer); createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); } } + protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context) + throws UserException { + TableIf tbl = scanProvider.getTargetTable(); + Preconditions.checkNotNull(tbl); + TExpr tExpr = new TExpr(); + tExpr.setNodes(Lists.newArrayList()); + + for (Column column : tbl.getBaseSchema()) { + Expr expr; + if (column.getDefaultValue() != null) { + if (column.getDefaultValueExprDef() != null) { + expr = column.getDefaultValueExpr(); + } else { + expr = new StringLiteral(column.getDefaultValue()); + } + } else { + if (column.isAllowNull()) { + expr = NullLiteral.create(org.apache.doris.catalog.Type.VARCHAR); + } else { + expr = null; + } + } + SlotDescriptor slotDesc = null; + switch (type) { + case LOAD: { + slotDesc = context.srcSlotDescByName.get(column.getName()); + break; + } + case QUERY: { + slotDesc = context.destSlotDescByName.get(column.getName()); + break; + } + default: + Preconditions.checkState(false, type); + } + // if slot desc is null, which mean it is a unrelated slot, just skip. + // eg: + // (a, b, c) set (x=a, y=b, z=c) + // c does not exist in file, the z will be filled with null, even if z has default value. + // and if z is not nullable, the load will fail. + if (slotDesc != null) { + if (expr != null) { + expr = castToSlot(slotDesc, expr); + context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), expr.treeToThrift()); + } else { + context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr); + } + } + } + } + protected void finalizeParamsForLoad(ParamCreateContext context, Analyzer analyzer) throws UserException { if (type != Type.LOAD) { context.params.setSrcTupleId(-1); return; } - Map<String, SlotDescriptor> slotDescByName = context.slotDescByName; + Map<String, SlotDescriptor> slotDescByName = context.srcSlotDescByName; Map<String, Expr> exprMap = context.exprMap; TupleDescriptor srcTupleDesc = context.srcTupleDescriptor; boolean negative = context.fileGroup.isNegative(); @@ -426,3 +488,5 @@ public class ExternalFileScanNode extends ExternalScanNode { } } + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java index 700d8be098..8ae7952169 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java @@ -19,6 +19,7 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; @@ -56,4 +57,6 @@ public interface FileScanProviderIf { int getInputSplitNum(); long getInputFileSize(); + + TableIf getTargetTable(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 7965a02711..1df3d639bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HiveBucketUtil; import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -86,6 +87,11 @@ public class HiveScanProvider implements HMSTableScanProviderIf { this.desc = desc; } + @Override + public TableIf getTargetTable() { + return hmsTable; + } + @Override public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { TFileFormatType type = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 33b0db2de7..d202ead466 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -22,7 +22,9 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; @@ -55,10 +57,12 @@ import java.util.Map; public class LoadScanProvider implements FileScanProviderIf { - FileGroupInfo fileGroupInfo; + private FileGroupInfo fileGroupInfo; + private TupleDescriptor destTupleDesc; - public LoadScanProvider(FileGroupInfo fileGroupInfo) { + public LoadScanProvider(FileGroupInfo fileGroupInfo, TupleDescriptor destTupleDesc) { this.fileGroupInfo = fileGroupInfo; + this.destTupleDesc = destTupleDesc; } @Override @@ -89,6 +93,7 @@ public class LoadScanProvider implements FileScanProviderIf { @Override public ParamCreateContext createContext(Analyzer analyzer) throws UserException { ParamCreateContext ctx = new ParamCreateContext(); + ctx.destTupleDescriptor = destTupleDesc; ctx.fileGroup = fileGroupInfo.getFileGroup(); ctx.timezone = analyzer.getTimezone(); @@ -169,7 +174,7 @@ public class LoadScanProvider implements FileScanProviderIf { */ private void initColumns(ParamCreateContext context, Analyzer analyzer) throws UserException { context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor(); - context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + context.srcSlotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); // for load job, column exprs is got from file group @@ -190,7 +195,7 @@ public class LoadScanProvider implements FileScanProviderIf { } List<Integer> srcSlotIds = Lists.newArrayList(); Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), - context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, srcSlotIds, + context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds, formatType(context.fileGroup.getFileFormat(), ""), null, VectorizedUtil.isVectorized()); int columnCountFromPath = 0; @@ -247,4 +252,9 @@ public class LoadScanProvider implements FileScanProviderIf { return TFileFormatType.FORMAT_CSV_PLAIN; } } + + @Override + public TableIf getTargetTable() { + return fileGroupInfo.getTargetTable(); + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4135099c8b..0a4529572b 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -261,17 +261,18 @@ struct TFileScanRangeParams { // The convert exprt map for load job // desc slot id -> expr 9: optional map<Types.TSlotId, Exprs.TExpr> expr_of_dest_slot + 10: optional map<Types.TSlotId, Exprs.TExpr> default_value_of_src_slot // This is the mapping of dest slot id and src slot id in load expr // It excludes the slot id which has the transform expr - 10: optional map<Types.TSlotId, Types.TSlotId> dest_sid_to_src_sid_without_trans + 11: optional map<Types.TSlotId, Types.TSlotId> dest_sid_to_src_sid_without_trans // strictMode is a boolean // if strict mode is true, the incorrect data (the result of cast is null) will not be loaded - 11: optional bool strict_mode + 12: optional bool strict_mode - 12: optional list<Types.TNetworkAddress> broker_addresses - 13: optional TFileAttributes file_attributes - 14: optional Exprs.TExpr pre_filter_exprs + 13: optional list<Types.TNetworkAddress> broker_addresses + 14: optional TFileAttributes file_attributes + 15: optional Exprs.TExpr pre_filter_exprs } struct TFileRangeDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org