This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5423de68dd [refactor](new-scan) remove old file scan node (#13433) 5423de68dd is described below commit 5423de68dd9c7bf480823ea0a0faca84c8fd3f82 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Oct 19 14:25:32 2022 +0800 [refactor](new-scan) remove old file scan node (#13433) All these files are not used anymore, can be removed. --- be/src/common/config.h | 6 - be/src/exec/exec_node.cpp | 7 +- be/src/vec/CMakeLists.txt | 7 - be/src/vec/exec/file_arrow_scanner.cpp | 259 ----------- be/src/vec/exec/file_arrow_scanner.h | 118 ----- be/src/vec/exec/file_scan_node.cpp | 508 --------------------- be/src/vec/exec/file_scan_node.h | 148 ------ be/src/vec/exec/file_scanner.cpp | 199 -------- be/src/vec/exec/file_scanner.h | 112 ----- be/src/vec/exec/file_text_scanner.cpp | 294 ------------ be/src/vec/exec/file_text_scanner.h | 72 --- be/src/vec/exec/scan/new_file_arrow_scanner.cpp | 265 ----------- be/src/vec/exec/scan/new_file_arrow_scanner.h | 89 ---- be/src/vec/exec/scan/new_file_scan_node.cpp | 27 +- be/src/vec/exec/scan/new_file_scanner.cpp | 317 ------------- be/src/vec/exec/scan/new_file_scanner.h | 100 ---- be/src/vec/exec/scan/new_file_text_scanner.cpp | 263 ----------- be/src/vec/exec/scan/new_file_text_scanner.h | 66 --- .../apache/doris/planner/StreamLoadPlanner.java | 2 +- .../load_p0/broker_load/test_broker_load.groovy | 28 +- 20 files changed, 18 insertions(+), 2869 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 046ad3b234..ecbbd7b64d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -864,12 +864,6 @@ CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); // so as to avoid occupying the execution thread for a long time. CONF_mInt32(max_fragment_start_wait_time_seconds, "30"); -// Temp config. True to use new file scan node to do load job. Will remove after fully test. -CONF_mBool(enable_new_load_scan_node, "false"); - -// Temp config. True to use new file scanner. Will remove after fully test. -CONF_mBool(enable_new_file_scanner, "false"); - // Hide webserver page for safety. // Hide the be config page for webserver. CONF_Bool(hide_webserver_config_page, "false"); diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 39e917ec6b..f04f832328 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -59,7 +59,6 @@ #include "util/debug_util.h" #include "util/runtime_profile.h" #include "vec/core/block.h" -#include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" @@ -613,13 +612,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::FILE_SCAN_NODE: - // *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); - if (config::enable_new_scan_node) { + if (state->enable_vectorized_exec()) { *node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs)); } else { - *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs)); + return Status::InternalError("Not support file scan node in non-vec engine"); } - return Status::OK(); case TPlanNodeType::REPEAT_NODE: diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 58a9701c90..d5739e25b4 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -230,10 +230,6 @@ set(VEC_FILES runtime/vorc_writer.cpp utils/arrow_column_to_doris_column.cpp runtime/vsorted_run_merger.cpp - exec/file_arrow_scanner.cpp - exec/file_scanner.cpp - exec/file_scan_node.cpp - exec/file_text_scanner.cpp exec/format/parquet/vparquet_column_chunk_reader.cpp exec/format/parquet/vparquet_group_reader.cpp exec/format/parquet/vparquet_page_index.cpp @@ -250,10 +246,7 @@ set(VEC_FILES exec/scan/scanner_scheduler.cpp exec/scan/new_olap_scan_node.cpp exec/scan/new_olap_scanner.cpp - exec/scan/new_file_arrow_scanner.cpp exec/scan/new_file_scan_node.cpp - exec/scan/new_file_scanner.cpp - exec/scan/new_file_text_scanner.cpp exec/scan/vfile_scanner.cpp exec/scan/new_odbc_scanner.cpp exec/scan/new_odbc_scan_node.cpp diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp deleted file mode 100644 index 3a5d6c2f5b..0000000000 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ /dev/null @@ -1,259 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/file_arrow_scanner.h" - -#include <memory> - -#include "exec/arrow/parquet_reader.h" -#include "io/buffered_reader.h" -#include "io/file_factory.h" -#include "io/hdfs_reader_writer.h" -#include "runtime/descriptors.h" -#include "vec/utils/arrow_column_to_doris_column.h" - -namespace doris::vectorized { - -FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, - ScannerCounter* counter) - : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter), - _cur_file_reader(nullptr), - _cur_file_eof(false), - _batch(nullptr), - _arrow_batch_cur_idx(0) { - _convert_arrow_block_timer = ADD_TIMER(_profile, "ConvertArrowBlockTimer"); -} - -FileArrowScanner::~FileArrowScanner() { - FileArrowScanner::close(); -} - -Status FileArrowScanner::_open_next_reader() { - // open_file_reader - if (_cur_file_reader != nullptr) { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - - while (true) { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - const TFileRangeDesc& range = _ranges[_next_range++]; - std::unique_ptr<FileReader> file_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path, - range.start_offset, range.file_size, 0, - file_reader)); - RETURN_IF_ERROR(file_reader->open()); - if (file_reader->size() == 0) { - file_reader->close(); - continue; - } - - int32_t num_of_columns_from_file = _file_slot_descs.size(); - - _cur_file_reader = - _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file, - range.start_offset, range.size); - - auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - Status status = - _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); - if (status.is_end_of_file()) { - continue; - } else { - if (!status.ok()) { - std::stringstream ss; - ss << " file: " << range.path << " error:" << status.get_error_msg(); - return Status::InternalError(ss.str()); - } else { - _update_profile(_cur_file_reader->statistics()); - return status; - } - } - } -} - -Status FileArrowScanner::open() { - RETURN_IF_ERROR(FileScanner::open()); - if (_ranges.empty()) { - return Status::OK(); - } - return Status::OK(); -} - -// get next available arrow batch -Status FileArrowScanner::_next_arrow_batch() { - _arrow_batch_cur_idx = 0; - // first, init file reader - if (_cur_file_reader == nullptr || _cur_file_eof) { - RETURN_IF_ERROR(_open_next_reader()); - _cur_file_eof = false; - } - // second, loop until find available arrow batch or EOF - while (!_scanner_eof) { - RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); - if (_cur_file_eof) { - RETURN_IF_ERROR(_open_next_reader()); - _cur_file_eof = false; - continue; - } - if (_batch->num_rows() == 0) { - continue; - } - return Status::OK(); - } - return Status::EndOfFile("EOF"); -} - -Status FileArrowScanner::_init_arrow_batch_if_necessary() { - // 1. init batch if first time - // 2. reset reader if end of file - Status status = Status::OK(); - if (_scanner_eof) { - return Status::EndOfFile("EOF"); - } - if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { - return _next_arrow_batch(); - } - return status; -} - -Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) { - SCOPED_TIMER(_read_timer); - // init arrow batch - { - Status st = _init_arrow_batch_if_necessary(); - if (!st.ok()) { - if (!st.is_end_of_file()) { - return st; - } - *eof = true; - return Status::OK(); - } - } - - RETURN_IF_ERROR(init_block(block)); - // convert arrow batch to block until reach the batch_size - while (!_scanner_eof) { - // cast arrow type to PT0 and append it to block - // for example: arrow::Type::INT16 => TYPE_SMALLINT - RETURN_IF_ERROR(_append_batch_to_block(block)); - // finalize the block if full - if (_rows >= _state->batch_size()) { - break; - } - auto status = _next_arrow_batch(); - // if ok, append the batch to the columns - if (status.ok()) { - continue; - } - // return error if not EOF - if (!status.is_end_of_file()) { - return status; - } - _cur_file_eof = true; - break; - } - - return finalize_block(block, eof); -} - -Status FileArrowScanner::_append_batch_to_block(Block* block) { - SCOPED_TIMER(_convert_arrow_block_timer); - size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()), - (_batch->num_rows() - _arrow_batch_cur_idx)); - for (auto i = 0; i < _file_slot_descs.size(); ++i) { - SlotDescriptor* slot_desc = _file_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - std::string real_column_name = _cur_file_reader->is_case_sensitive() - ? slot_desc->col_name() - : slot_desc->col_name_lower_case(); - auto* array = _batch->GetColumnByName(real_column_name).get(); - auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - RETURN_IF_ERROR(arrow_column_to_doris_column( - array, _arrow_batch_cur_idx, column_with_type_and_name.column, - column_with_type_and_name.type, num_elements, _state->timezone_obj())); - } - _rows += num_elements; - _arrow_batch_cur_idx += num_elements; - return _fill_columns_from_path(block, num_elements); -} - -void VFileParquetScanner::_update_profile(std::shared_ptr<Statistics>& statistics) { - COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups); - COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows); - COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes); - COUNTER_UPDATE(_total_rows_counter, statistics->total_rows); - COUNTER_UPDATE(_total_groups_counter, statistics->total_groups); - COUNTER_UPDATE(_total_bytes_counter, statistics->total_bytes); -} - -void FileArrowScanner::close() { - FileScanner::close(); - if (_cur_file_reader != nullptr) { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } -} - -VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, - ScannerCounter* counter) - : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) { - _init_profiles(profile); -} - -ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader( - const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, - range_start_offset, range_size, false); -} - -void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) { - _filtered_row_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsFiltered", TUnit::UNIT); - _filtered_rows_counter = ADD_COUNTER(_profile, "ParquetRowsFiltered", TUnit::UNIT); - _filtered_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesFiltered", TUnit::BYTES); - _total_rows_counter = ADD_COUNTER(_profile, "ParquetRowsTotal", TUnit::UNIT); - _total_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsTotal", TUnit::UNIT); - _total_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesTotal", TUnit::BYTES); -} - -VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, - ScannerCounter* counter) - : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} - -ArrowReaderWrap* VFileORCScanner::_new_arrow_reader( - const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, - range_start_offset, range_size, false); -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h deleted file mode 100644 index 113bd54d6e..0000000000 --- a/be/src/vec/exec/file_arrow_scanner.h +++ /dev/null @@ -1,118 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <arrow/array.h> -#include <exec/arrow/arrow_reader.h> -#include <exec/arrow/orc_reader.h> - -#include <map> -#include <memory> -#include <sstream> -#include <string> -#include <unordered_map> -#include <vector> - -#include "common/status.h" -#include "exec/base_scanner.h" -#include "util/runtime_profile.h" -#include "vec/exec/file_scanner.h" - -namespace doris::vectorized { - -// VArrow scanner convert the data read from orc|parquet to doris's columns. -class FileArrowScanner : public FileScanner { -public: - FileArrowScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - - ~FileArrowScanner() override; - - // Open this scanner, will initialize information need to - Status open() override; - - Status get_next(Block* block, bool* eof) override; - - void close() override; - -protected: - virtual ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs, - FileReader* file_reader, - int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) = 0; - virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {} - -private: - // Read next buffer from reader - Status _open_next_reader(); - Status _next_arrow_batch(); - Status _init_arrow_batch_if_necessary(); - Status _append_batch_to_block(Block* block); - -private: - // Reader - ArrowReaderWrap* _cur_file_reader; - bool _cur_file_eof; // is read over? - std::shared_ptr<arrow::RecordBatch> _batch; - size_t _arrow_batch_cur_idx; - RuntimeProfile::Counter* _convert_arrow_block_timer; -}; - -class VFileParquetScanner final : public FileArrowScanner { -public: - VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - - ~VFileParquetScanner() override = default; - -protected: - ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs, - FileReader* file_reader, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) override; - - void _init_profiles(RuntimeProfile* profile) override; - void _update_profile(std::shared_ptr<Statistics>& statistics) override; - -private: - RuntimeProfile::Counter* _filtered_row_groups_counter; - RuntimeProfile::Counter* _filtered_rows_counter; - RuntimeProfile::Counter* _filtered_bytes_counter; - RuntimeProfile::Counter* _total_rows_counter; - RuntimeProfile::Counter* _total_groups_counter; - RuntimeProfile::Counter* _total_bytes_counter; -}; - -class VFileORCScanner final : public FileArrowScanner { -public: - VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - - ~VFileORCScanner() override = default; - -protected: - ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs, - FileReader* file_reader, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) override; - void _init_profiles(RuntimeProfile* profile) override {}; -}; - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp deleted file mode 100644 index 3a3f9634e9..0000000000 --- a/be/src/vec/exec/file_scan_node.cpp +++ /dev/null @@ -1,508 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/file_scan_node.h" - -#include "common/config.h" -#include "gen_cpp/PlanNodes_types.h" -#include "runtime/memory/mem_tracker.h" -#include "runtime/runtime_filter_mgr.h" -#include "runtime/runtime_state.h" -#include "runtime/string_value.h" -#include "runtime/tuple.h" -#include "runtime/tuple_row.h" -#include "util/priority_thread_pool.hpp" -#include "util/runtime_profile.h" -#include "util/thread.h" -#include "util/types.h" -#include "vec/exec/file_arrow_scanner.h" -#include "vec/exec/file_text_scanner.h" -#include "vec/exprs/vcompound_pred.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" - -namespace doris::vectorized { - -FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ScanNode(pool, tnode, descs), - _tuple_id(tnode.file_scan_node.tuple_id), - _runtime_state(nullptr), - _tuple_desc(nullptr), - _num_running_scanners(0), - _scan_finished(false), - _max_buffered_batches(32), - _wait_scanner_timer(nullptr), - _runtime_filter_descs(tnode.runtime_filters) {} - -Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ScanNode::init(tnode, state)); - - int filter_size = _runtime_filter_descs.size(); - _runtime_filter_ctxs.resize(filter_size); - _runtime_filter_ready_flag.resize(filter_size); - for (int i = 0; i < filter_size; ++i) { - IRuntimeFilter* runtime_filter = nullptr; - const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( - RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id())); - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, - &runtime_filter)); - - _runtime_filter_ctxs[i].runtimefilter = runtime_filter; - _runtime_filter_ready_flag[i] = false; - _rf_locks.push_back(std::make_unique<std::mutex>()); - } - - return Status::OK(); -} - -Status FileScanNode::prepare(RuntimeState* state) { - VLOG_QUERY << "FileScanNode prepare"; - RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - // get tuple desc - _runtime_state = state; - _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); - if (_tuple_desc == nullptr) { - std::stringstream ss; - ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id; - return Status::InternalError(ss.str()); - } - - // Initialize slots map - for (auto slot : _tuple_desc->slots()) { - auto pair = _slots_map.emplace(slot->col_name(), slot); - if (!pair.second) { - std::stringstream ss; - ss << "Failed to insert slot, col_name=" << slot->col_name(); - return Status::InternalError(ss.str()); - } - } - - // Profile - _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); - _filter_timer = ADD_TIMER(runtime_profile(), "PredicateFilteredTime"); - _num_rows_filtered = ADD_COUNTER(runtime_profile(), "PredicateFilteredRows", TUnit::UNIT); - _num_scanners = ADD_COUNTER(runtime_profile(), "NumScanners", TUnit::UNIT); - - return Status::OK(); -} - -Status FileScanNode::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); - SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - RETURN_IF_CANCELLED(state); - - RETURN_IF_ERROR(_acquire_and_build_runtime_filter(state)); - - RETURN_IF_ERROR(start_scanners()); - - return Status::OK(); -} - -Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) { - // acquire runtime filter - _runtime_filter_ctxs.resize(_runtime_filter_descs.size()); - for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { - auto& filter_desc = _runtime_filter_descs[i]; - IRuntimeFilter* runtime_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); - DCHECK(runtime_filter != nullptr); - if (runtime_filter == nullptr) { - continue; - } - bool ready = runtime_filter->is_ready(); - if (!ready) { - ready = runtime_filter->await(); - } - if (ready) { - _runtime_filter_ctxs[i].apply_mark = true; - _runtime_filter_ctxs[i].runtimefilter = runtime_filter; - - // TODO: currently, after calling get_push_expr_ctxs(), the func ptr in runtime_filter - // will be released, and it will not be used again for building vexpr. - // - // std::list<ExprContext*> expr_context; - // RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context)); - // for (auto ctx : expr_context) { - // ctx->prepare(state, row_desc()); - // ctx->open(state); - // int index = _conjunct_ctxs.size(); - // _conjunct_ctxs.push_back(ctx); - // // it's safe to store address from a fix-resized vector - // _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i]; - // } - } - } - - // rebuild vexpr - for (int i = 0; i < _runtime_filter_ctxs.size(); ++i) { - if (!_runtime_filter_ctxs[i].apply_mark) { - continue; - } - IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter; - std::vector<VExpr*> vexprs; - runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor); - if (vexprs.empty()) { - continue; - } - auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; - for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { - TExprNode texpr_node; - texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); - texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); - texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); - VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); - new_node->add_child(last_expr); - new_node->add_child(vexprs[j]); - last_expr = new_node; - } - auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); - auto expr_status = new_vconjunct_ctx_ptr->prepare(state, _row_descriptor); - if (UNLIKELY(!expr_status.OK())) { - LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; - vexprs.clear(); - break; - } - - expr_status = new_vconjunct_ctx_ptr->open(state); - if (UNLIKELY(!expr_status.OK())) { - LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; - vexprs.clear(); - break; - } - if (_vconjunct_ctx_ptr) { - _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); - } - _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); - *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; - _runtime_filter_ready_flag[i] = true; - } - return Status::OK(); -} - -Status FileScanNode::start_scanners() { - { - std::unique_lock<std::mutex> l(_batch_queue_lock); - _num_running_scanners = _scan_ranges.size(); - } - - _scanners_status.resize(_scan_ranges.size()); - COUNTER_UPDATE(_num_scanners, _scan_ranges.size()); - ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token(); - PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool(); - for (int i = 0; i < _scan_ranges.size(); ++i) { - Status submit_status = Status::OK(); - if (thread_token != nullptr) { - submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this, - i, _scan_ranges.size(), - std::ref(_scanners_status[i]))); - } else { - PriorityThreadPool::WorkFunction task = - std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(), - std::ref(_scanners_status[i])); - if (!thread_pool->offer(task)) { - submit_status = Status::Cancelled("Failed to submit scan task"); - } - } - if (!submit_status.ok()) { - LOG(WARNING) << "Failed to assign file scanner task to thread pool! " - << submit_status.get_error_msg(); - _scanners_status[i].set_value(submit_status); - for (int j = i + 1; j < _scan_ranges.size(); ++j) { - _scanners_status[j].set_value(Status::Cancelled("Cancelled")); - } - { - std::lock_guard<std::mutex> l(_batch_queue_lock); - update_status(submit_status); - _num_running_scanners -= _scan_ranges.size() - i; - } - _queue_writer_cond.notify_all(); - break; - } - } - return Status::OK(); -} - -Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - // check if CANCELLED. - if (state->is_cancelled()) { - std::unique_lock<std::mutex> l(_batch_queue_lock); - if (update_status(Status::Cancelled("Cancelled"))) { - // Notify all scanners - _queue_writer_cond.notify_all(); - } - } - - if (_scan_finished.load()) { - *eos = true; - return Status::OK(); - } - - const int batch_size = _runtime_state->batch_size(); - while (true) { - std::shared_ptr<vectorized::Block> scanner_block; - { - std::unique_lock<std::mutex> l(_batch_queue_lock); - while (_process_status.ok() && !_runtime_state->is_cancelled() && - _num_running_scanners > 0 && _block_queue.empty()) { - SCOPED_TIMER(_wait_scanner_timer); - _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); - } - if (!_process_status.ok()) { - // Some scanner process failed. - return _process_status; - } - if (_runtime_state->is_cancelled()) { - if (update_status(Status::Cancelled("Cancelled"))) { - _queue_writer_cond.notify_all(); - } - return _process_status; - } - if (!_block_queue.empty()) { - scanner_block = _block_queue.front(); - _block_queue.pop_front(); - } - } - - // All scanner has been finished, and all cached batch has been read - if (!scanner_block) { - if (_mutable_block && !_mutable_block->empty()) { - *block = _mutable_block->to_block(); - reached_limit(block, eos); - LOG_IF(INFO, *eos) << "FileScanNode ReachedLimit."; - } - _scan_finished.store(true); - *eos = true; - return Status::OK(); - } - // notify one scanner - _queue_writer_cond.notify_one(); - - if (UNLIKELY(!_mutable_block)) { - _mutable_block.reset(new MutableBlock(scanner_block->clone_empty())); - } - - if (_mutable_block->rows() + scanner_block->rows() < batch_size) { - // merge scanner_block into _mutable_block - _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); - continue; - } else { - if (_mutable_block->empty()) { - // directly use scanner_block - *block = std::move(*scanner_block); - } else { - // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next. - *block = _mutable_block->to_block(); - _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns()); - _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); - } - break; - } - } - - reached_limit(block, eos); - if (*eos) { - _scan_finished.store(true); - _queue_writer_cond.notify_all(); - LOG(INFO) << "FileScanNode ReachedLimit."; - } else { - *eos = false; - } - - return Status::OK(); -} - -Status FileScanNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - SCOPED_TIMER(_runtime_profile->total_time_counter()); - _scan_finished.store(true); - _queue_writer_cond.notify_all(); - _queue_reader_cond.notify_all(); - { - std::unique_lock<std::mutex> l(_batch_queue_lock); - _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; }); - } - for (int i = 0; i < _scanners_status.size(); i++) { - std::future<Status> f = _scanners_status[i].get_future(); - RETURN_IF_ERROR(f.get()); - } - // Close - _batch_queue.clear(); - - for (auto& filter_desc : _runtime_filter_descs) { - IRuntimeFilter* runtime_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); - DCHECK(runtime_filter != nullptr); - runtime_filter->consumer_close(); - } - - for (auto& ctx : _stale_vexpr_ctxs) { - (*ctx)->close(state); - } - - return ExecNode::close(state); -} - -Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter) { - //create scanner object and open - std::unique_ptr<FileScanner> scanner = create_scanner(scan_range, counter); - RETURN_IF_ERROR(scanner->open()); - bool scanner_eof = false; - while (!scanner_eof) { - RETURN_IF_CANCELLED(_runtime_state); - // If we have finished all works - if (_scan_finished.load() || !_process_status.ok()) { - return Status::OK(); - } - - std::shared_ptr<vectorized::Block> block(new vectorized::Block()); - RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof)); - if (block->rows() == 0) { - continue; - } - auto old_rows = block->rows(); - { - SCOPED_TIMER(_filter_timer); - RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), - _tuple_desc->slots().size())); - } - counter->num_rows_unselected += old_rows - block->rows(); - if (block->rows() == 0) { - continue; - } - - std::unique_lock<std::mutex> l(_batch_queue_lock); - while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() && - // stop pushing more batch if - // 1. too many batches in queue, or - // 2. at least one batch in queue and memory exceed limit. - (_block_queue.size() >= _max_buffered_batches || !_block_queue.empty())) { - _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); - } - // Process already set failed, so we just return OK - if (!_process_status.ok()) { - return Status::OK(); - } - // Scan already finished, just return - if (_scan_finished.load()) { - return Status::OK(); - } - // Runtime state is canceled, just return cancel - if (_runtime_state->is_cancelled()) { - return Status::Cancelled("Cancelled"); - } - // Queue size Must be smaller than _max_buffered_batches - _block_queue.push_back(block); - - // Notify reader to process - _queue_reader_cond.notify_one(); - } - return Status::OK(); -} - -void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) { - Thread::set_self_name("file_scanner"); - Status status = Status::OK(); - ScannerCounter counter; - const TFileScanRange& scan_range = - _scan_ranges[start_idx].scan_range.ext_scan_range.file_scan_range; - status = scanner_scan(scan_range, &counter); - if (!status.ok()) { - LOG(WARNING) << "Scanner[" << start_idx - << "] process failed. status=" << status.get_error_msg(); - } - - // Update stats - _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); - _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected); - COUNTER_UPDATE(_num_rows_filtered, counter.num_rows_unselected); - - // scanner is going to finish - { - std::lock_guard<std::mutex> l(_batch_queue_lock); - if (!status.ok()) { - update_status(status); - } - // This scanner will finish - _num_running_scanners--; - } - _queue_reader_cond.notify_all(); - // If one scanner failed, others don't need scan any more - if (!status.ok()) { - _queue_writer_cond.notify_all(); - } - p_status.set_value(status); -} - -std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range, - ScannerCounter* counter) { - FileScanner* scan = nullptr; - switch (scan_range.params.format_type) { - case TFileFormatType::FORMAT_PARQUET: - scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, _pre_filter_texprs, counter); - break; - case TFileFormatType::FORMAT_ORC: - scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, _pre_filter_texprs, counter); - break; - - default: - scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, _pre_filter_texprs, counter); - } - scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs); - std::unique_ptr<FileScanner> scanner(scan); - return scanner; -} - -// This function is called after plan node has been prepared. -Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { - int max_scanners = config::doris_scanner_thread_pool_thread_num; - if (scan_ranges.size() <= max_scanners) { - _scan_ranges = scan_ranges; - } else { - // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); - } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; - } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); - } - _scan_ranges.shrink_to_fit(); - LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); - } - return Status::OK(); -} - -void FileScanNode::debug_string(int ident_level, std::stringstream* out) const { - (*out) << "FileScanNode"; -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h deleted file mode 100644 index 850c74db0c..0000000000 --- a/be/src/vec/exec/file_scan_node.h +++ /dev/null @@ -1,148 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <atomic> -#include <condition_variable> -#include <future> -#include <map> -#include <mutex> -#include <string> -#include <thread> -#include <vector> - -#include "common/status.h" -#include "exec/base_scanner.h" -#include "exec/scan_node.h" -#include "exprs/runtime_filter.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "runtime/descriptors.h" -#include "vec/exec/file_scanner.h" -namespace doris { - -class RuntimeState; -class Status; - -namespace vectorized { -class FileScanNode final : public ScanNode { -public: - FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~FileScanNode() override = default; - - // Called after create this scan node - Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; - - // Prepare partition infos & set up timer - Status prepare(RuntimeState* state) override; - - // Start file scan using ParquetScanner or OrcScanner. - Status open(RuntimeState* state) override; - - // Fill the next row batch by calling next() on the scanner, - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { - return Status::NotSupported("Not Implemented FileScanNode::get_next."); - } - - Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; - - // Close the scanner, and report errors. - Status close(RuntimeState* state) override; - - // No use - Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; - -private: - // Write debug string of this into out. - void debug_string(int indentation_level, std::stringstream* out) const override; - - // Update process status to one failed status, - // NOTE: Must hold the mutex of this scan node - bool update_status(const Status& new_status) { - if (_process_status.ok()) { - _process_status = new_status; - return true; - } - return false; - } - - std::unique_ptr<FileScanner> create_scanner(const TFileScanRange& scan_range, - ScannerCounter* counter); - - Status start_scanners(); - - void scanner_worker(int start_idx, int length, std::promise<Status>& p_status); - // Scan one range - Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter); - - Status _acquire_and_build_runtime_filter(RuntimeState* state); - - TupleId _tuple_id; - RuntimeState* _runtime_state; - TupleDescriptor* _tuple_desc; - std::map<std::string, SlotDescriptor*> _slots_map; - std::vector<TScanRangeParams> _scan_ranges; - - std::mutex _batch_queue_lock; - std::condition_variable _queue_reader_cond; - std::condition_variable _queue_writer_cond; - std::deque<std::shared_ptr<RowBatch>> _batch_queue; - - int _num_running_scanners; - - std::atomic<bool> _scan_finished; - - Status _process_status; - - std::vector<std::promise<Status>> _scanners_status; - - int _max_buffered_batches; - - // The origin preceding filter exprs. - // These exprs will be converted to expr context - // in XXXScanner. - // Because the row descriptor used for these exprs is `src_row_desc`, - // which is initialized in XXXScanner. - std::vector<TExpr> _pre_filter_texprs; - - RuntimeProfile::Counter* _wait_scanner_timer; - RuntimeProfile::Counter* _num_rows_filtered; - RuntimeProfile::Counter* _filter_timer; - RuntimeProfile::Counter* _num_scanners; - - std::deque<std::shared_ptr<vectorized::Block>> _block_queue; - std::unique_ptr<MutableBlock> _mutable_block; - -protected: - struct RuntimeFilterContext { - RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {} - bool apply_mark; - IRuntimeFilter* runtimefilter; - }; - - const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const { - return _runtime_filter_descs; - } - std::vector<TRuntimeFilterDesc> _runtime_filter_descs; - std::vector<RuntimeFilterContext> _runtime_filter_ctxs; - std::vector<bool> _runtime_filter_ready_flag; - std::vector<std::unique_ptr<std::mutex>> _rf_locks; - std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs; - std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs; -}; -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp deleted file mode 100644 index 04eb03db83..0000000000 --- a/be/src/vec/exec/file_scanner.cpp +++ /dev/null @@ -1,199 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "file_scanner.h" - -#include <fmt/format.h> - -#include <vec/data_types/data_type_factory.hpp> - -#include "common/logging.h" -#include "common/utils.h" -#include "exec/exec_node.h" -#include "exec/text_converter.hpp" -#include "exprs/expr_context.h" -#include "runtime/descriptors.h" -#include "runtime/raw_value.h" -#include "runtime/runtime_state.h" -#include "runtime/tuple.h" - -namespace doris::vectorized { - -FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) - : _state(state), - _params(params), - _ranges(ranges), - _next_range(0), - _counter(counter), - _mem_pool(std::make_unique<MemPool>()), - _pre_filter_texprs(pre_filter_texprs), - _profile(profile), - _rows_read_counter(nullptr), - _read_timer(nullptr), - _scanner_eof(false) { - _text_converter.reset(new (std::nothrow) TextConverter('\\')); -} - -Status FileScanner::open() { - _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); - _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); - return _init_expr_ctxes(); -} - -void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId, - const std::vector<ExprContext*>& conjunct_ctxs) { - _conjunct_ctxs = conjunct_ctxs; - _tupleId = tupleId; -} - -Status FileScanner::_init_expr_ctxes() { - const TupleDescriptor* src_tuple_desc = - _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); - if (src_tuple_desc == nullptr) { - std::stringstream ss; - ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id; - return Status::InternalError(ss.str()); - } - DCHECK(!_ranges.empty()); - - std::map<SlotId, int> _full_src_index_map; - std::map<SlotId, SlotDescriptor*> _full_src_slot_map; - int index = 0; - for (const auto& slot_desc : src_tuple_desc->slots()) { - _full_src_slot_map.emplace(slot_desc->id(), slot_desc); - _full_src_index_map.emplace(slot_desc->id(), index++); - } - - _num_of_columns_from_file = _params.num_of_columns_from_file; - for (const auto& slot_info : _params.required_slots) { - auto slot_id = slot_info.slot_id; - auto it = _full_src_slot_map.find(slot_id); - if (it == std::end(_full_src_slot_map)) { - std::stringstream ss; - ss << "Unknown source slot descriptor, slot_id=" << slot_id; - return Status::InternalError(ss.str()); - } - _required_slot_descs.emplace_back(it->second); - if (slot_info.is_file_slot) { - _file_slot_descs.emplace_back(it->second); - auto iti = _full_src_index_map.find(slot_id); - _file_slot_index_map.emplace(slot_id, iti->second); - } else { - _partition_slot_descs.emplace_back(it->second); - auto iti = _full_src_index_map.find(slot_id); - _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); - } - } - - _row_desc.reset(new RowDescriptor(_state->desc_tbl(), - std::vector<TupleId>({_params.src_tuple_id}), - std::vector<bool>({false}))); - - // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor - if (!_pre_filter_texprs.empty()) { - // for vectorized, preceding filter exprs should be compounded to one passed from fe. - DCHECK(_pre_filter_texprs.size() == 1); - _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( - _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc)); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); - } - - return Status::OK(); -} - -void FileScanner::close() { - if (_vpre_filter_ctx_ptr) { - (*_vpre_filter_ctx_ptr)->close(_state); - } - if (_rows_read_counter) { - COUNTER_UPDATE(_rows_read_counter, _read_row_counter); - } -} - -Status FileScanner::init_block(vectorized::Block* block) { - (*block).clear(); - _rows = 0; - for (const auto& slot_desc : _required_slot_descs) { - if (slot_desc == nullptr) { - continue; - } - auto is_nullable = slot_desc->is_nullable(); - auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), - is_nullable); - if (data_type == nullptr) { - return Status::NotSupported( - fmt::format("Not support type for column:{}", slot_desc->col_name())); - } - MutableColumnPtr data_column = data_type->create_column(); - (*block).insert( - ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); - } - return Status::OK(); -} - -Status FileScanner::_filter_block(vectorized::Block* _block) { - auto origin_column_num = (*_block).columns(); - // filter block - auto old_rows = (*_block).rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, _block, - origin_column_num)); - _counter->num_rows_unselected += old_rows - (*_block).rows(); - return Status::OK(); -} - -Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) { - *eof = _scanner_eof; - _read_row_counter += _block->rows(); - if (LIKELY(_rows > 0)) { - RETURN_IF_ERROR(_filter_block(_block)); - } - - return Status::OK(); -} - -Status FileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) { - const TFileRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { - for (const auto& slot_desc : _partition_slot_descs) { - if (slot_desc == nullptr) continue; - auto it = _partition_slot_index_map.find(slot_desc->id()); - if (it == std::end(_partition_slot_index_map)) { - std::stringstream ss; - ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id(); - return Status::InternalError(ss.str()); - } - const std::string& column_from_path = range.columns_from_path[it->second]; - - auto doris_column = _block->get_by_name(slot_desc->col_name()).column; - IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); - - for (size_t j = 0; j < rows; ++j) { - _text_converter->write_vec_column(slot_desc, col_ptr, - const_cast<char*>(column_from_path.c_str()), - column_from_path.size(), true, false); - } - } - } - return Status::OK(); -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h deleted file mode 100644 index df4c1d4ef6..0000000000 --- a/be/src/vec/exec/file_scanner.h +++ /dev/null @@ -1,112 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "common/status.h" -#include "exec/base_scanner.h" -#include "exec/text_converter.h" -#include "exprs/expr.h" -#include "util/runtime_profile.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" - -namespace doris::vectorized { -class FileScanNode; -class FileScanner { -public: - FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - - virtual ~FileScanner() = default; - - virtual void reg_conjunct_ctxs(const TupleId& tupleId, - const std::vector<ExprContext*>& conjunct_ctxs); - - // Open this scanner, will initialize information need to - virtual Status open(); - - // Get next block - virtual Status get_next(vectorized::Block* block, bool* eof) { - return Status::NotSupported("Not Implemented get block"); - } - - // Close this scanner - virtual void close() = 0; - - std::vector<bool>* mutable_runtime_filter_marks() { return &_runtime_filter_marks; } - -protected: - virtual void _init_profiles(RuntimeProfile* profile) = 0; - - Status finalize_block(vectorized::Block* dest_block, bool* eof); - Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows); - Status init_block(vectorized::Block* block); - - std::unique_ptr<TextConverter> _text_converter; - - RuntimeState* _state; - const TFileScanRangeParams& _params; - - const std::vector<TFileRangeDesc>& _ranges; - int _next_range; - // used for process stat - ScannerCounter* _counter; - - // Used for constructing tuple - std::vector<SlotDescriptor*> _required_slot_descs; - std::vector<SlotDescriptor*> _file_slot_descs; - std::map<SlotId, int> _file_slot_index_map; - std::vector<SlotDescriptor*> _partition_slot_descs; - std::map<SlotId, int> _partition_slot_index_map; - - std::unique_ptr<RowDescriptor> _row_desc; - - // Mem pool used to allocate _src_tuple and _src_tuple_row - std::unique_ptr<MemPool> _mem_pool; - - const std::vector<TExpr> _pre_filter_texprs; - - // Profile - RuntimeProfile* _profile; - RuntimeProfile::Counter* _rows_read_counter; - RuntimeProfile::Counter* _read_timer; - - bool _scanner_eof = false; - int _rows = 0; - long _read_row_counter = 0; - - std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr; - int _num_of_columns_from_file; - - // File formats based push down predicate - std::vector<ExprContext*> _conjunct_ctxs; - // slot_ids for parquet predicate push down are in tuple desc - TupleId _tupleId; - - // to record which runtime filters have been used - std::vector<bool> _runtime_filter_marks; - - FileScanNode* _parent; - -private: - Status _init_expr_ctxes(); - Status _filter_block(vectorized::Block* output_block); -}; - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp deleted file mode 100644 index aee756868e..0000000000 --- a/be/src/vec/exec/file_text_scanner.cpp +++ /dev/null @@ -1,294 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/file_text_scanner.h" - -#include <fmt/format.h> -#include <gen_cpp/internal_service.pb.h> - -#include <iostream> - -#include "exec/exec_node.h" -#include "exec/plain_text_line_reader.h" -#include "exec/text_converter.h" -#include "exec/text_converter.hpp" -#include "exprs/expr_context.h" -#include "io/buffered_reader.h" -#include "io/file_factory.h" -#include "io/hdfs_reader_writer.h" -#include "util/types.h" -#include "util/utf8_check.h" - -namespace doris::vectorized { - -FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, - const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, - ScannerCounter* counter) - : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter), - _cur_file_reader(nullptr), - _cur_line_reader(nullptr), - _cur_line_reader_eof(false), - _skip_lines(0), - _success(false) - -{ - _init_profiles(profile); - if (params.file_attributes.__isset.text_params) { - auto text_params = params.file_attributes.text_params; - if (text_params.__isset.column_separator) { - _value_separator = text_params.column_separator; - _value_separator_length = _value_separator.length(); - } - if (text_params.__isset.line_delimiter) { - _line_delimiter = text_params.line_delimiter; - _line_delimiter_length = _line_delimiter.length(); - } - } -} - -FileTextScanner::~FileTextScanner() { - close(); -} - -Status FileTextScanner::open() { - RETURN_IF_ERROR(FileScanner::open()); - - if (_ranges.empty()) { - return Status::OK(); - } - _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); - return Status::OK(); -} - -void FileTextScanner::close() { - FileScanner::close(); - - if (_cur_line_reader != nullptr) { - delete _cur_line_reader; - _cur_line_reader = nullptr; - } -} - -Status FileTextScanner::get_next(Block* block, bool* eof) { - SCOPED_TIMER(_read_timer); - RETURN_IF_ERROR(init_block(block)); - - const int batch_size = _state->batch_size(); - - int current_rows = _rows; - while (_rows < batch_size && !_scanner_eof) { - if (_cur_line_reader == nullptr || _cur_line_reader_eof) { - RETURN_IF_ERROR(_open_next_reader()); - // If there isn't any more reader, break this - if (_scanner_eof) { - continue; - } - } - const uint8_t* ptr = nullptr; - size_t size = 0; - RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof)); - if (_skip_lines > 0) { - _skip_lines--; - continue; - } - if (size == 0) { - // Read empty row, just continue - continue; - } - { - COUNTER_UPDATE(_rows_read_counter, 1); - RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block)); - } - if (_cur_line_reader_eof) { - RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows)); - current_rows = _rows; - } - } - - return finalize_block(block, eof); -} - -Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) { - RETURN_IF_ERROR(_line_split_to_values(line)); - if (!_success) { - // If not success, which means we met an invalid row, return. - return Status::OK(); - } - - for (int i = 0; i < _split_values.size(); ++i) { - auto slot_desc = _file_slot_descs[i]; - const Slice& value = _split_values[i]; - - auto doris_column = _block->get_by_name(slot_desc->col_name()).column; - IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); - _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false); - } - _rows++; - return Status::OK(); -} - -Status FileTextScanner::_open_next_reader() { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - - RETURN_IF_ERROR(_open_file_reader()); - RETURN_IF_ERROR(_open_line_reader()); - _next_range++; - - return Status::OK(); -} - -Status FileTextScanner::_open_file_reader() { - const TFileRangeDesc& range = _ranges[_next_range]; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path, - range.start_offset, range.file_size, 0, - _cur_file_reader)); - return _cur_file_reader->open(); -} - -Status FileTextScanner::_open_line_reader() { - if (_cur_line_reader != nullptr) { - delete _cur_line_reader; - _cur_line_reader = nullptr; - } - - const TFileRangeDesc& range = _ranges[_next_range]; - int64_t size = range.size; - if (range.start_offset != 0) { - if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { - std::stringstream ss; - ss << "For now we do not support split compressed file"; - return Status::InternalError(ss.str()); - } - size += 1; - // not first range will always skip one line - _skip_lines = 1; - } - - // open line reader - switch (_params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, - _line_delimiter, _line_delimiter_length); - break; - default: { - std::stringstream ss; - ss << "Unknown format type, cannot init line reader, type=" << _params.format_type; - return Status::InternalError(ss.str()); - } - } - - _cur_line_reader_eof = false; - - return Status::OK(); -} - -Status FileTextScanner::_line_split_to_values(const Slice& line) { - if (!validate_utf8(line.data, line.size)) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - []() -> std::string { return "Unable to display"; }, - []() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "{}", "Unable to display"); - return fmt::to_string(error_msg); - }, - &_scanner_eof)); - _counter->num_rows_filtered++; - _success = false; - return Status::OK(); - } - - RETURN_IF_ERROR(_split_line(line)); - - _success = true; - return Status::OK(); -} - -Status FileTextScanner::_split_line(const Slice& line) { - _split_values.clear(); - std::vector<Slice> tmp_split_values; - tmp_split_values.reserve(_num_of_columns_from_file); - - const char* value = line.data; - size_t start = 0; // point to the start pos of next col value. - size_t curpos = 0; // point to the start pos of separator matching sequence. - size_t p1 = 0; // point to the current pos of separator matching sequence. - size_t non_space = 0; // point to the last pos of non_space charactor. - - // Separator: AAAA - // - // p1 - // ▼ - // AAAA - // 1000AAAA2000AAAA - // ▲ ▲ - // Start │ - // curpos - - while (curpos < line.size) { - if (*(value + curpos + p1) != _value_separator[p1]) { - // Not match, move forward: - curpos += (p1 == 0 ? 1 : p1); - p1 = 0; - } else { - p1++; - if (p1 == _value_separator_length) { - // Match a separator - non_space = curpos; - // Trim tailing spaces. Be consistent with hive and trino's behavior. - if (_state->trim_tailing_spaces_for_external_table_query()) { - while (non_space > start && *(value + non_space - 1) == ' ') { - non_space--; - } - } - tmp_split_values.emplace_back(value + start, non_space - start); - start = curpos + _value_separator_length; - curpos = start; - p1 = 0; - non_space = 0; - } - } - } - - CHECK(curpos == line.size) << curpos << " vs " << line.size; - non_space = curpos; - if (_state->trim_tailing_spaces_for_external_table_query()) { - while (non_space > start && *(value + non_space - 1) == ' ') { - non_space--; - } - } - - tmp_split_values.emplace_back(value + start, non_space - start); - for (const auto& slot : _file_slot_descs) { - auto it = _file_slot_index_map.find(slot->id()); - if (it == std::end(_file_slot_index_map)) { - std::stringstream ss; - ss << "Unknown _file_slot_index_map, slot_id=" << slot->id(); - return Status::InternalError(ss.str()); - } - int index = it->second; - CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file; - _split_values.emplace_back(tmp_split_values[index]); - } - return Status::OK(); -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.h b/be/src/vec/exec/file_text_scanner.h deleted file mode 100644 index c632a842cb..0000000000 --- a/be/src/vec/exec/file_text_scanner.h +++ /dev/null @@ -1,72 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "common/consts.h" -#include "exec/base_scanner.h" -#include "exec/decompressor.h" -#include "exec/line_reader.h" -#include "exec/plain_binary_line_reader.h" -#include "exec/plain_text_line_reader.h" -#include "gen_cpp/PlanNodes_types.h" -#include "io/file_factory.h" -#include "io/file_reader.h" -#include "vec/exec/file_scanner.h" - -namespace doris::vectorized { -class FileTextScanner final : public FileScanner { -public: - FileTextScanner(RuntimeState* state, RuntimeProfile* profile, - const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges, - const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - - ~FileTextScanner() override; - - Status open() override; - - Status get_next(Block* block, bool* eof) override; - void close() override; - -protected: - void _init_profiles(RuntimeProfile* profile) override {} - -private: - Status _fill_file_columns(const Slice& line, vectorized::Block* _block); - Status _open_next_reader(); - Status _open_file_reader(); - Status _open_line_reader(); - Status _line_split_to_values(const Slice& line); - Status _split_line(const Slice& line); - // Reader - std::unique_ptr<FileReader> _cur_file_reader; - LineReader* _cur_line_reader; - bool _cur_line_reader_eof; - - // When we fetch range start from 0, header_type="csv_with_names" skip first line - // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line - // When we fetch range doesn't start - int _skip_lines; - std::vector<Slice> _split_values; - std::string _value_separator; - std::string _line_delimiter; - int _value_separator_length; - int _line_delimiter_length; - - bool _success; -}; -} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp deleted file mode 100644 index 5fa6c88764..0000000000 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp +++ /dev/null @@ -1,265 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/scan/new_file_arrow_scanner.h" - -#include "exec/arrow/orc_reader.h" -#include "exec/arrow/parquet_reader.h" -#include "io/file_factory.h" -#include "vec/exec/scan/vscan_node.h" -#include "vec/functions/simple_function_factory.h" -#include "vec/utils/arrow_column_to_doris_column.h" - -namespace doris::vectorized { - -NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, - int64_t limit, const TFileScanRange& scan_range, - RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs) - : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs), - _cur_file_reader(nullptr), - _cur_file_eof(false), - _batch(nullptr), - _arrow_batch_cur_idx(0) {} - -Status NewFileArrowScanner::open(RuntimeState* state) { - RETURN_IF_ERROR(NewFileScanner::open(state)); - // SCOPED_TIMER(_parent->_reader_init_timer); - - // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false); - return Status::OK(); -} - -Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { - // init arrow batch - { - Status st = _init_arrow_batch_if_necessary(); - if (!st.ok()) { - if (!st.is_end_of_file()) { - return st; - } - *eof = true; - return Status::OK(); - } - } - - *eof = false; - if (!_is_load) { - RETURN_IF_ERROR(init_block(block)); - } - // convert arrow batch to block until reach the batch_size - while (!_scanner_eof) { - // cast arrow type to PT0 and append it to block - // for example: arrow::Type::INT16 => TYPE_SMALLINT - RETURN_IF_ERROR(_append_batch_to_block(block)); - // finalize the block if full - if (_rows >= _state->batch_size()) { - break; - } - auto status = _next_arrow_batch(); - // if ok, append the batch to the columns - if (status.ok()) { - continue; - } - // return error if not EOF - if (!status.is_end_of_file()) { - return status; - } - _cur_file_eof = true; - break; - } - - if (_scanner_eof && block->rows() == 0) { - *eof = true; - } - // return finalize_block(block, eof); - return Status::OK(); -} - -Status NewFileArrowScanner::_init_arrow_batch_if_necessary() { - // 1. init batch if first time - // 2. reset reader if end of file - Status status = Status::OK(); - if (_scanner_eof) { - return Status::EndOfFile("EOF"); - } - if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { - return _next_arrow_batch(); - } - return status; -} - -Status NewFileArrowScanner::_append_batch_to_block(Block* block) { - size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()), - (_batch->num_rows() - _arrow_batch_cur_idx)); - for (auto i = 0; i < _file_slot_descs.size(); ++i) { - SlotDescriptor* slot_desc = _file_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - std::string real_column_name = _cur_file_reader->is_case_sensitive() - ? slot_desc->col_name() - : slot_desc->col_name_lower_case(); - auto* array = _batch->GetColumnByName(real_column_name).get(); - auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - RETURN_IF_ERROR(arrow_column_to_doris_column( - array, _arrow_batch_cur_idx, column_with_type_and_name.column, - column_with_type_and_name.type, num_elements, _state->timezone_obj())); - } - _rows += num_elements; - _arrow_batch_cur_idx += num_elements; - return _fill_columns_from_path(block, num_elements); -} - -Status NewFileArrowScanner::_convert_to_output_block(Block* output_block) { - if (!config::enable_new_load_scan_node) { - return Status::OK(); - } - if (_input_block_ptr == output_block) { - return Status::OK(); - } - RETURN_IF_ERROR(_cast_src_block(_input_block_ptr)); - if (LIKELY(_input_block_ptr->rows() > 0)) { - RETURN_IF_ERROR(_materialize_dest_block(output_block)); - } - - return Status::OK(); -} - -// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> -// primitive type(PT1) ==materialize_block==> dest primitive type -Status NewFileArrowScanner::_cast_src_block(Block* block) { - // cast primitive type(PT0) to primitive type(PT1) - for (size_t i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _required_slot_descs[i]; - if (slot_desc == nullptr) { - continue; - } - auto& arg = block->get_by_name(slot_desc->col_name()); - // remove nullable here, let the get_function decide whether nullable - auto return_type = slot_desc->get_data_type_ptr(); - ColumnsWithTypeAndName arguments { - arg, - {DataTypeString().create_column_const( - arg.column->size(), remove_nullable(return_type)->get_family_name()), - std::make_shared<DataTypeString>(), ""}}; - auto func_cast = - SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); - RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); - block->get_by_position(i).type = std::move(return_type); - } - return Status::OK(); -} - -Status NewFileArrowScanner::_next_arrow_batch() { - _arrow_batch_cur_idx = 0; - // first, init file reader - if (_cur_file_reader == nullptr || _cur_file_eof) { - RETURN_IF_ERROR(_open_next_reader()); - _cur_file_eof = false; - } - // second, loop until find available arrow batch or EOF - while (!_scanner_eof) { - RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof)); - if (_cur_file_eof) { - RETURN_IF_ERROR(_open_next_reader()); - _cur_file_eof = false; - continue; - } - if (_batch->num_rows() == 0) { - continue; - } - return Status::OK(); - } - return Status::EndOfFile("EOF"); -} - -Status NewFileArrowScanner::_open_next_reader() { - // open_file_reader - if (_cur_file_reader != nullptr) { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - - while (true) { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - const TFileRangeDesc& range = _ranges[_next_range++]; - std::unique_ptr<FileReader> file_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path, - range.start_offset, range.file_size, 0, - file_reader)); - RETURN_IF_ERROR(file_reader->open()); - if (file_reader->size() == 0) { - file_reader->close(); - continue; - } - - int32_t num_of_columns_from_file = _file_slot_descs.size(); - - _cur_file_reader = - _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file, - range.start_offset, range.size); - - auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id()); - // TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty. - Status status = - _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); - if (status.is_end_of_file()) { - continue; - } else { - if (!status.ok()) { - std::stringstream ss; - ss << " file: " << range.path << " error:" << status.get_error_msg(); - return Status::InternalError(ss.str()); - } else { - // _update_profile(_cur_file_reader->statistics()); - return status; - } - } - } -} - -NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, - int64_t limit, const TFileScanRange& scan_range, - RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs) - : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) { - // _init_profiles(profile); -} - -ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader( - const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, - range_start_offset, range_size, false); -} - -NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs) - : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {} - -ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader( - const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader, - int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, - range_start_offset, range_size, false); -} -} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h deleted file mode 100644 index 281373a70d..0000000000 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.h +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <exec/arrow/arrow_reader.h> - -#include "exprs/bloomfilter_predicate.h" -#include "exprs/function_filter.h" -#include "vec/exec/scan/new_file_scanner.h" -#include "vec/exec/scan/vscanner.h" - -namespace doris::vectorized { -class NewFileArrowScanner : public NewFileScanner { -public: - NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs); - Status open(RuntimeState* state) override; - -protected: - Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; - virtual ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs, - FileReader* file_reader, - int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) = 0; - // Convert input block to output block, if needed. - Status _convert_to_output_block(Block* output_block); - -private: - Status _open_next_reader(); - Status _next_arrow_batch(); - Status _init_arrow_batch_if_necessary(); - Status _append_batch_to_block(Block* block); - Status _cast_src_block(Block* block); - -private: - // Reader - ArrowReaderWrap* _cur_file_reader; - bool _cur_file_eof; // is read over? - std::shared_ptr<arrow::RecordBatch> _batch; - size_t _arrow_batch_cur_idx; -}; - -class NewFileParquetScanner final : public NewFileArrowScanner { -public: - NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs); - - ~NewFileParquetScanner() override = default; - -protected: - ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs, - FileReader* file_reader, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) override; - - void _init_profiles(RuntimeProfile* profile) override {}; -}; - -class NewFileORCScanner final : public NewFileArrowScanner { -public: - NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs); - - ~NewFileORCScanner() override = default; - -protected: - ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs, - FileReader* file_reader, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) override; - void _init_profiles(RuntimeProfile* profile) override {}; -}; -} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 039082ab1c..49319f1e6b 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -18,8 +18,6 @@ #include "vec/exec/scan/new_file_scan_node.h" #include "vec/columns/column_const.h" -#include "vec/exec/scan/new_file_arrow_scanner.h" -#include "vec/exec/scan/new_file_text_scanner.h" #include "vec/exec/scan/new_olap_scanner.h" #include "vec/exec/scan/vfile_scanner.h" #include "vec/functions/in.h" @@ -102,28 +100,9 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) { } VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) { - VScanner* scanner = nullptr; - if (config::enable_new_file_scanner) { - scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile()); - ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range); - } else { - switch (scan_range.params.format_type) { - case TFileFormatType::FORMAT_PARQUET: - scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range, - runtime_profile(), std::vector<TExpr>()); - break; - case TFileFormatType::FORMAT_ORC: - scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range, - runtime_profile(), std::vector<TExpr>()); - break; - - default: - scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range, - runtime_profile(), std::vector<TExpr>()); - break; - } - ((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get()); - } + VScanner* scanner = + new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile()); + ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range); _scanner_pool.add(scanner); // TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead? scanner->reg_conjunct_ctxs(_conjunct_ctxs); diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp deleted file mode 100644 index 6e511fe10f..0000000000 --- a/be/src/vec/exec/scan/new_file_scanner.cpp +++ /dev/null @@ -1,317 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/scan/new_file_scanner.h" - -#include <fmt/format.h> - -#include <vec/data_types/data_type_factory.hpp> - -#include "common/logging.h" -#include "common/utils.h" -#include "exec/exec_node.h" -#include "exec/text_converter.hpp" -#include "exprs/expr_context.h" -#include "runtime/descriptors.h" -#include "runtime/raw_value.h" -#include "runtime/runtime_state.h" -#include "runtime/tuple.h" -#include "vec/exec/scan/new_file_scan_node.h" - -namespace doris::vectorized { - -NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs) - : VScanner(state, static_cast<VScanNode*>(parent), limit), - _params(scan_range.params), - _ranges(scan_range.ranges), - _next_range(0), - _mem_pool(std::make_unique<MemPool>()), - _profile(profile), - _pre_filter_texprs(pre_filter_texprs), - _strict_mode(false) {} - -Status NewFileScanner::open(RuntimeState* state) { - RETURN_IF_ERROR(VScanner::open(state)); - RETURN_IF_ERROR(_init_expr_ctxes()); - return Status::OK(); -} - -Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) { - if (vconjunct_ctx_ptr != nullptr) { - // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. - RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); - } - - return Status::OK(); -} - -Status NewFileScanner::_init_expr_ctxes() { - if (_input_tuple_desc == nullptr) { - std::stringstream ss; - ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id; - return Status::InternalError(ss.str()); - } - DCHECK(!_ranges.empty()); - - std::map<SlotId, int> _full_src_index_map; - std::map<SlotId, SlotDescriptor*> _full_src_slot_map; - int index = 0; - for (const auto& slot_desc : _input_tuple_desc->slots()) { - _full_src_slot_map.emplace(slot_desc->id(), slot_desc); - _full_src_index_map.emplace(slot_desc->id(), index++); - } - - _num_of_columns_from_file = _params.num_of_columns_from_file; - for (const auto& slot_info : _params.required_slots) { - auto slot_id = slot_info.slot_id; - auto it = _full_src_slot_map.find(slot_id); - if (it == std::end(_full_src_slot_map)) { - std::stringstream ss; - ss << "Unknown source slot descriptor, slot_id=" << slot_id; - return Status::InternalError(ss.str()); - } - _required_slot_descs.emplace_back(it->second); - if (slot_info.is_file_slot) { - _file_slot_descs.emplace_back(it->second); - auto iti = _full_src_index_map.find(slot_id); - _file_slot_index_map.emplace(slot_id, iti->second); - } else { - _partition_slot_descs.emplace_back(it->second); - auto iti = _full_src_index_map.find(slot_id); - _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); - } - } - - _src_tuple = (doris::Tuple*)_mem_pool->allocate(_input_tuple_desc->byte_size()); - _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*)); - _src_tuple_row->set_tuple(0, _src_tuple); - _row_desc.reset(new RowDescriptor(_state->desc_tbl(), - std::vector<TupleId>({_params.src_tuple_id}), - std::vector<bool>({false}))); - - // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor - if (!_pre_filter_texprs.empty()) { - // for vectorized, preceding filter exprs should be compounded to one passed from fe. - DCHECK(_pre_filter_texprs.size() == 1); - _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( - _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc)); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); - } - - // Construct dest slots information - if (config::enable_new_load_scan_node) { - if (_output_tuple_desc == nullptr) { - return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}", - _params.dest_tuple_id); - } - - bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; - for (auto slot_desc : _output_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; - } - auto it = _params.expr_of_dest_slot.find(slot_desc->id()); - if (it == std::end(_params.expr_of_dest_slot)) { - return Status::InternalError("No expr for dest slot, id={}, name={}", - slot_desc->id(), slot_desc->col_name()); - } - - vectorized::VExprContext* ctx = nullptr; - RETURN_IF_ERROR( - vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); - RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get())); - RETURN_IF_ERROR(ctx->open(_state)); - _dest_vexpr_ctx.emplace_back(ctx); - if (has_slot_id_map) { - auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); - if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { - _src_slot_descs_order_by_dest.emplace_back(nullptr); - } else { - auto _src_slot_it = _full_src_slot_map.find(it1->second); - if (_src_slot_it == std::end(_full_src_slot_map)) { - return Status::InternalError("No src slot {} in src slot descs", - it1->second); - } - _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second); - } - } - } - } - - return Status::OK(); -} - -Status NewFileScanner::init_block(vectorized::Block* block) { - (*block).clear(); - _rows = 0; - for (const auto& slot_desc : _required_slot_descs) { - if (slot_desc == nullptr) { - continue; - } - auto is_nullable = slot_desc->is_nullable(); - auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), - is_nullable); - if (data_type == nullptr) { - return Status::NotSupported( - fmt::format("Not support type for column:{}", slot_desc->col_name())); - } - MutableColumnPtr data_column = data_type->create_column(); - (*block).insert( - ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); - } - return Status::OK(); -} - -Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) { - const TFileRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { - for (const auto& slot_desc : _partition_slot_descs) { - if (slot_desc == nullptr) continue; - auto it = _partition_slot_index_map.find(slot_desc->id()); - if (it == std::end(_partition_slot_index_map)) { - std::stringstream ss; - ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id(); - return Status::InternalError(ss.str()); - } - const std::string& column_from_path = range.columns_from_path[it->second]; - - auto doris_column = _block->get_by_name(slot_desc->col_name()).column; - IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); - - for (size_t j = 0; j < rows; ++j) { - _text_converter->write_vec_column(slot_desc, col_ptr, - const_cast<char*>(column_from_path.c_str()), - column_from_path.size(), true, false); - } - } - } - return Status::OK(); -} - -Status NewFileScanner::_filter_input_block(Block* block) { - if (!config::enable_new_load_scan_node) { - return Status::OK(); - } - if (_is_load) { - auto origin_column_num = block->columns(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, block, - origin_column_num)); - } - return Status::OK(); -} - -Status NewFileScanner::_materialize_dest_block(vectorized::Block* dest_block) { - // Do vectorized expr here - int ctx_idx = 0; - size_t rows = _input_block.rows(); - auto filter_column = vectorized::ColumnUInt8::create(rows, 1); - auto& filter_map = filter_column->get_data(); - auto origin_column_num = _input_block.columns(); - - for (auto slot_desc : _output_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; - } - int dest_index = ctx_idx++; - - auto* ctx = _dest_vexpr_ctx[dest_index]; - int result_column_id = -1; - // PT1 => dest primitive type - RETURN_IF_ERROR(ctx->execute(_input_block_ptr, &result_column_id)); - bool is_origin_column = result_column_id < origin_column_num; - auto column_ptr = - is_origin_column && _src_block_mem_reuse - ? _input_block.get_by_position(result_column_id).column->clone_resized(rows) - : _input_block.get_by_position(result_column_id).column; - - DCHECK(column_ptr != nullptr); - - // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr - // is likely to be nullable - if (LIKELY(column_ptr->is_nullable())) { - auto nullable_column = - reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); - for (int i = 0; i < rows; ++i) { - if (filter_map[i] && nullable_column->is_null_at(i)) { - if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && - !_input_block.get_by_position(dest_index).column->is_null_at(i)) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _input_block.dump_one_line(i, _num_of_columns_from_file); - }, - [&]() -> std::string { - auto raw_value = _input_block.get_by_position(ctx_idx) - .column->get_data_at(i); - std::string raw_string = raw_value.to_string(); - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, - "column({}) value is incorrect while strict " - "mode is {}, " - "src value is {}", - slot_desc->col_name(), _strict_mode, raw_string); - return fmt::to_string(error_msg); - }, - &_scanner_eof)); - filter_map[i] = false; - } else if (!slot_desc->is_nullable()) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _input_block.dump_one_line(i, _num_of_columns_from_file); - }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, - "column({}) values is null while columns is not " - "nullable", - slot_desc->col_name()); - return fmt::to_string(error_msg); - }, - &_scanner_eof)); - filter_map[i] = false; - } - } - } - if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr(); - } else if (slot_desc->is_nullable()) { - column_ptr = vectorized::make_nullable(column_ptr); - } - dest_block->insert(vectorized::ColumnWithTypeAndName( - std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); - } - - // after do the dest block insert operation, clear _src_block to remove the reference of origin column - if (_src_block_mem_reuse) { - _input_block.clear_column_data(origin_column_num); - } else { - _input_block.clear(); - } - - size_t dest_size = dest_block->columns(); - // do filter - dest_block->insert(vectorized::ColumnWithTypeAndName( - std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), - "filter column")); - RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size)); - - return Status::OK(); -} - -} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h deleted file mode 100644 index 50423bd3e6..0000000000 --- a/be/src/vec/exec/scan/new_file_scanner.h +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "exec/text_converter.h" -#include "exprs/bloomfilter_predicate.h" -#include "exprs/function_filter.h" -#include "runtime/tuple.h" -#include "vec/exec/scan/vscanner.h" - -namespace doris::vectorized { - -class NewFileScanNode; - -class NewFileScanner : public VScanner { -public: - NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs); - - Status open(RuntimeState* state) override; - - Status prepare(VExprContext** vconjunct_ctx_ptr); - -protected: - // Use prefilters to filter input block - Status _filter_input_block(Block* block); - Status _materialize_dest_block(vectorized::Block* output_block); - -protected: - virtual void _init_profiles(RuntimeProfile* profile) = 0; - - Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows); - Status init_block(vectorized::Block* block); - - std::unique_ptr<TextConverter> _text_converter; - - const TFileScanRangeParams& _params; - - const std::vector<TFileRangeDesc>& _ranges; - int _next_range; - - // Used for constructing tuple - std::vector<SlotDescriptor*> _required_slot_descs; - // File source slot descriptors - std::vector<SlotDescriptor*> _file_slot_descs; - // File slot id to index map. - std::map<SlotId, int> _file_slot_index_map; - // Partition source slot descriptors - std::vector<SlotDescriptor*> _partition_slot_descs; - // Partition slot id to index map - std::map<SlotId, int> _partition_slot_index_map; - std::unique_ptr<RowDescriptor> _row_desc; - doris::Tuple* _src_tuple; - TupleRow* _src_tuple_row; - - // Mem pool used to allocate _src_tuple and _src_tuple_row - std::unique_ptr<MemPool> _mem_pool; - - // Profile - RuntimeProfile* _profile; - RuntimeProfile::Counter* _rows_read_counter; - RuntimeProfile::Counter* _read_timer; - - bool _scanner_eof = false; - int _rows = 0; - int _num_of_columns_from_file; - - const std::vector<TExpr> _pre_filter_texprs; - - std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; - // to filter src tuple directly. - std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr; - - // the map values of dest slot id to src slot desc - // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr - std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; - - bool _src_block_mem_reuse = false; - bool _strict_mode; - -private: - Status _init_expr_ctxes(); -}; -} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp deleted file mode 100644 index 2202dac76b..0000000000 --- a/be/src/vec/exec/scan/new_file_text_scanner.cpp +++ /dev/null @@ -1,263 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/scan/new_file_text_scanner.h" - -#include "exec/plain_text_line_reader.h" -#include "io/file_factory.h" -#include "util/utf8_check.h" -#include "vec/exec/scan/vscan_node.h" - -namespace doris::vectorized { - -NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs) - : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs), - _cur_file_reader(nullptr), - _cur_line_reader(nullptr), - _cur_line_reader_eof(false), - _skip_lines(0), - _success(false) {} - -Status NewFileTextScanner::open(RuntimeState* state) { - RETURN_IF_ERROR(NewFileScanner::open(state)); - if (_ranges.empty()) { - return Status::OK(); - } - _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); - return Status::OK(); -} - -Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { - SCOPED_TIMER(_read_timer); - if (!_is_load) { - RETURN_IF_ERROR(init_block(block)); - } - const int batch_size = state->batch_size(); - *eof = false; - int current_rows = _rows; - while (_rows < batch_size && !_scanner_eof) { - if (_cur_line_reader == nullptr || _cur_line_reader_eof) { - RETURN_IF_ERROR(_open_next_reader()); - // If there isn't any more reader, break this - if (_scanner_eof) { - continue; - } - } - const uint8_t* ptr = nullptr; - size_t size = 0; - RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof)); - if (_skip_lines > 0) { - _skip_lines--; - continue; - } - if (size == 0) { - // Read empty row, just continue - continue; - } - { - COUNTER_UPDATE(_rows_read_counter, 1); - RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block)); - } - if (_cur_line_reader_eof) { - RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows)); - current_rows = _rows; - } - } - if (_scanner_eof && block->rows() == 0) { - *eof = true; - } - return Status::OK(); -} - -Status NewFileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) { - RETURN_IF_ERROR(_line_split_to_values(line)); - if (!_success) { - // If not success, which means we met an invalid row, return. - return Status::OK(); - } - - for (int i = 0; i < _split_values.size(); ++i) { - auto slot_desc = _file_slot_descs[i]; - const Slice& value = _split_values[i]; - - auto doris_column = _block->get_by_name(slot_desc->col_name()).column; - IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); - _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false); - } - _rows++; - return Status::OK(); -} - -Status NewFileTextScanner::_line_split_to_values(const Slice& line) { - if (!validate_utf8(line.data, line.size)) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - []() -> std::string { return "Unable to display"; }, - []() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "{}", "Unable to display"); - return fmt::to_string(error_msg); - }, - &_scanner_eof)); - _success = false; - return Status::OK(); - } - - RETURN_IF_ERROR(_split_line(line)); - - _success = true; - return Status::OK(); -} - -Status NewFileTextScanner::_open_next_reader() { - if (_next_range >= _ranges.size()) { - _scanner_eof = true; - return Status::OK(); - } - - RETURN_IF_ERROR(_open_file_reader()); - RETURN_IF_ERROR(_open_line_reader()); - _next_range++; - - return Status::OK(); -} - -Status NewFileTextScanner::_open_file_reader() { - const TFileRangeDesc& range = _ranges[_next_range]; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path, - range.start_offset, range.file_size, 0, - _cur_file_reader)); - return _cur_file_reader->open(); -} - -Status NewFileTextScanner::_open_line_reader() { - if (_cur_line_reader != nullptr) { - delete _cur_line_reader; - _cur_line_reader = nullptr; - } - - const TFileRangeDesc& range = _ranges[_next_range]; - int64_t size = range.size; - if (range.start_offset != 0) { - if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { - std::stringstream ss; - ss << "For now we do not support split compressed file"; - return Status::InternalError(ss.str()); - } - size += 1; - // not first range will always skip one line - _skip_lines = 1; - } - - // open line reader - switch (_params.format_type) { - case TFileFormatType::FORMAT_CSV_PLAIN: - _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, - _line_delimiter, _line_delimiter_length); - break; - default: { - std::stringstream ss; - ss << "Unknown format type, cannot init line reader, type=" << _params.format_type; - return Status::InternalError(ss.str()); - } - } - - _cur_line_reader_eof = false; - - return Status::OK(); -} - -Status NewFileTextScanner::_split_line(const Slice& line) { - _split_values.clear(); - std::vector<Slice> tmp_split_values; - tmp_split_values.reserve(_num_of_columns_from_file); - - const char* value = line.data; - size_t start = 0; // point to the start pos of next col value. - size_t curpos = 0; // point to the start pos of separator matching sequence. - size_t p1 = 0; // point to the current pos of separator matching sequence. - size_t non_space = 0; // point to the last pos of non_space charactor. - - // Separator: AAAA - // - // p1 - // ▼ - // AAAA - // 1000AAAA2000AAAA - // ▲ ▲ - // Start │ - // curpos - - while (curpos < line.size) { - if (*(value + curpos + p1) != _value_separator[p1]) { - // Not match, move forward: - curpos += (p1 == 0 ? 1 : p1); - p1 = 0; - } else { - p1++; - if (p1 == _value_separator_length) { - // Match a separator - non_space = curpos; - // Trim tailing spaces. Be consistent with hive and trino's behavior. - if (_state->trim_tailing_spaces_for_external_table_query()) { - while (non_space > start && *(value + non_space - 1) == ' ') { - non_space--; - } - } - tmp_split_values.emplace_back(value + start, non_space - start); - start = curpos + _value_separator_length; - curpos = start; - p1 = 0; - non_space = 0; - } - } - } - - CHECK(curpos == line.size) << curpos << " vs " << line.size; - non_space = curpos; - if (_state->trim_tailing_spaces_for_external_table_query()) { - while (non_space > start && *(value + non_space - 1) == ' ') { - non_space--; - } - } - - tmp_split_values.emplace_back(value + start, non_space - start); - for (const auto& slot : _file_slot_descs) { - auto it = _file_slot_index_map.find(slot->id()); - if (it == std::end(_file_slot_index_map)) { - std::stringstream ss; - ss << "Unknown _file_slot_index_map, slot_id=" << slot->id(); - return Status::InternalError(ss.str()); - } - int index = it->second; - CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file; - _split_values.emplace_back(tmp_split_values[index]); - } - return Status::OK(); -} - -Status NewFileTextScanner::_convert_to_output_block(Block* output_block) { - if (_input_block_ptr == output_block) { - return Status::OK(); - } - if (LIKELY(_input_block_ptr->rows() > 0)) { - RETURN_IF_ERROR(_materialize_dest_block(output_block)); - } - return Status::OK(); -} -} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h deleted file mode 100644 index 19cf6094f4..0000000000 --- a/be/src/vec/exec/scan/new_file_text_scanner.h +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <exec/arrow/arrow_reader.h> - -#include "exec/line_reader.h" -#include "exprs/bloomfilter_predicate.h" -#include "exprs/function_filter.h" -#include "vec/exec/scan/new_file_scanner.h" -#include "vec/exec/scan/vscanner.h" - -namespace doris::vectorized { -class NewFileTextScanner : public NewFileScanner { -public: - NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - const std::vector<TExpr>& pre_filter_texprs); - - Status open(RuntimeState* state) override; - -protected: - void _init_profiles(RuntimeProfile* profile) override {} - Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; - Status _convert_to_output_block(Block* output_block); - -private: - Status _fill_file_columns(const Slice& line, vectorized::Block* _block); - Status _open_next_reader(); - Status _open_file_reader(); - Status _open_line_reader(); - Status _line_split_to_values(const Slice& line); - Status _split_line(const Slice& line); - // Reader - std::unique_ptr<FileReader> _cur_file_reader; - LineReader* _cur_line_reader; - bool _cur_line_reader_eof; - - // When we fetch range start from 0, header_type="csv_with_names" skip first line - // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line - // When we fetch range doesn't start - int _skip_lines; - std::vector<Slice> _split_values; - std::string _value_separator; - std::string _line_delimiter; - int _value_separator_length; - int _line_delimiter_length; - - bool _success; -}; -} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 26447413d1..5d68f6edef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -172,7 +172,7 @@ public class StreamLoadPlanner { } // create scan node - if (Config.enable_new_load_scan_node) { + if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) { ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new PlanNodeId(0), scanTupleDesc); if (!Util.isCsvFormat(taskInfo.getFormatType())) { throw new AnalysisException( diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy index fa4d2e0c07..52451a5207 100644 --- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy @@ -185,22 +185,18 @@ suite("test_broker_load", "p0") { String[][] backends = sql """ show backends; """ assertTrue(backends.size() > 0) for (String[] backend in backends) { - StringBuilder setConfigCommand = new StringBuilder(); - setConfigCommand.append("curl -X POST http://") - setConfigCommand.append(backend[2]) - setConfigCommand.append(":") - setConfigCommand.append(backend[5]) - setConfigCommand.append("/api/update_config?") - String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag" - logger.info(command1) - String command2 = setConfigCommand.toString() + "enable_new_file_scanner=$flag" - logger.info(command2) - def process1 = command1.execute() - int code = process1.waitFor() - assertEquals(code, 0) - def process2 = command2.execute() - code = process1.waitFor() - assertEquals(code, 0) + // No need to set this config anymore, but leave this code sample here + // StringBuilder setConfigCommand = new StringBuilder(); + // setConfigCommand.append("curl -X POST http://") + // setConfigCommand.append(backend[2]) + // setConfigCommand.append(":") + // setConfigCommand.append(backend[5]) + // setConfigCommand.append("/api/update_config?") + // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag" + // logger.info(command1) + // def process1 = command1.execute() + // int code = process1.waitFor() + // assertEquals(code, 0) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org