This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 8fa677b59c [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666) 8fa677b59c is described below commit 8fa677b59cc1567c0c81ef2fdbd25c1d319fe8db Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri May 20 11:43:03 2022 +0800 [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666) * [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner 1. fix bug of vjson scanner not support `range_from_file_path` 2. fix bug of vjson/vbrocker scanner core dump by src/dest slot nullable is different 3. fix bug of vparquest filter_block reference of column in not 1 4. refactor code to simple all the code It only changed vectorized load, not original row based load. Co-authored-by: lihaopeng <lihaop...@baidu.com> --- be/src/exec/base_scanner.cpp | 184 +++++++++++++++++---- be/src/exec/base_scanner.h | 30 ++-- be/src/exec/broker_scanner.cpp | 5 +- be/src/exec/broker_scanner.h | 4 - be/src/exec/json_scanner.cpp | 5 +- be/src/exec/json_scanner.h | 4 - be/src/exec/orc_scanner.cpp | 5 +- be/src/exec/orc_scanner.h | 4 - be/src/exec/parquet_scanner.cpp | 15 +- be/src/exec/parquet_scanner.h | 5 - be/src/vec/core/block.cpp | 18 +- be/src/vec/core/block.h | 3 + be/src/vec/data_types/data_type_nullable.cpp | 2 +- be/src/vec/exec/vbroker_scanner.cpp | 125 +------------- be/src/vec/exec/vbroker_scanner.h | 5 - be/src/vec/exec/vjson_scanner.cpp | 12 +- be/src/vec/exec/vparquet_scanner.cpp | 122 ++------------ be/src/vec/exec/vparquet_scanner.h | 6 +- be/test/vec/exec/vbroker_scanner_test.cpp | 37 +++-- be/test/vec/exec/vjson_scanner_test.cpp | 29 ++-- .../apache/doris/load/loadv2/LoadLoadingTask.java | 2 + .../doris/load/loadv2/LoadingTaskPlanner.java | 4 + 22 files changed, 268 insertions(+), 358 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index c4e1b5c056..e06b52de2f 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -28,14 +28,20 @@ #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) : _state(state), _params(params), + _ranges(ranges), + _broker_addresses(broker_addresses), + _next_range(0), _counter(counter), _src_tuple(nullptr), _src_tuple_row(nullptr), @@ -71,6 +77,22 @@ Status BaseScanner::open() { _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); _materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)"); + + DCHECK(!_ranges.empty()); + const auto& range = _ranges[0]; + _num_of_columns_from_file = range.__isset.num_of_columns_from_file + ? implicit_cast<int>(range.num_of_columns_from_file) + : implicit_cast<int>(_src_slot_descs.size()); + + // check consistency + if (range.__isset.num_of_columns_from_file) { + int size = range.columns_from_path.size(); + for (const auto& r : _ranges) { + if (r.columns_from_path.size() != size) { + return Status::InternalError("ranges have different number of columns."); + } + } + } return Status::OK(); } @@ -272,59 +294,135 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { } void* slot = dest_tuple->get_slot(slot_desc->tuple_offset()); RawValue::write(value, slot, slot_desc->type(), mem_pool); - continue; } _success = true; return Status::OK(); } -Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) { +Status BaseScanner::_filter_src_block() { + auto origin_column_num = _src_block.columns(); // filter block if (!_vpre_filter_ctxs.empty()) { for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - auto old_rows = temp_block->rows(); - RETURN_IF_ERROR( - vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num)); - _counter->num_rows_unselected += old_rows - temp_block->rows(); + auto old_rows = _src_block.rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block, + origin_column_num)); + _counter->num_rows_unselected += old_rows - _src_block.rows(); } } return Status::OK(); } -Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) { +Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { // Do vectorized expr here - Status status; - if (!_dest_vexpr_ctx.empty()) { - *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( - _dest_vexpr_ctx, *temp_block, status); - if (UNLIKELY(output_block->rows() == 0)) { - return status; + int ctx_idx = 0; + size_t rows = _src_block.rows(); + auto filter_column = vectorized::ColumnUInt8::create(rows, 1); + auto& filter_map = filter_column->get_data(); + + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + + auto* ctx = _dest_vexpr_ctx[dest_index]; + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); + auto column_ptr = _src_block.get_by_position(result_column_id).column; + + // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr + // is likely to be nullable + if (LIKELY(column_ptr->is_nullable())) { + auto nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + for (int i = 0; i < rows; ++i) { + if (filter_map[i] && nullable_column->is_null_at(i)) { + if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && + !_src_block.get_by_position(dest_index).column->is_null_at(i)) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _src_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + auto raw_value = + _src_block.get_by_position(ctx_idx).column->get_data_at( + i); + std::string raw_string = raw_value.to_string(); + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) value is incorrect while strict " + "mode is {}, " + "src value is {}", + slot_desc->col_name(), _strict_mode, raw_string); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } else if (!slot_desc->is_nullable()) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { + return _src_block.dump_one_line(i, _num_of_columns_from_file); + }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, + "column({}) values is null while columns is not " + "nullable", + slot_desc->col_name()); + return fmt::to_string(error_msg); + }, + &_scanner_eof)); + filter_map[i] = false; + } + } + } + if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr(); + } else if (slot_desc->is_nullable()) { + column_ptr = vectorized::make_nullable(column_ptr); } + dest_block->insert(vectorized::ColumnWithTypeAndName( + std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); } + // after do the dest block insert operation, clear _src_block to remove the reference of origin column + _src_block.clear(); + + size_t dest_size = dest_block->columns(); + // do filter + dest_block->insert(vectorized::ColumnWithTypeAndName( + std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), + "filter column")); + RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size)); + _counter->num_rows_filtered += rows - dest_block->rows(); + return Status::OK(); } -Status BaseScanner::fill_dest_block(vectorized::Block* dest_block, - std::vector<vectorized::MutableColumnPtr>& columns) { - if (columns.empty() || columns[0]->size() == 0) { - return Status::OK(); - } - - std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block()); - auto n_columns = 0; - for (const auto slot_desc : _src_slot_descs) { - temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); +// TODO: opt the reuse of src_block or dest_block column. some case we have to +// shallow copy the column of src_block to dest block +Status BaseScanner::_init_src_block() { + DCHECK(_src_block.columns() == 0); + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto data_type = slot_desc->get_data_type_ptr(); + _src_block.insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); } - RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size())); + return Status::OK(); +} - if (_dest_vexpr_ctx.empty()) { - *dest_block = *temp_block; - } else { - RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get())); +Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) { + *eof = _scanner_eof; + _fill_columns_from_path(); + if (LIKELY(_src_block.rows() > 0)) { + RETURN_IF_ERROR(BaseScanner::_filter_src_block()); + RETURN_IF_ERROR(BaseScanner::_materialize_dest_block(dest_block)); } return Status::OK(); @@ -337,7 +435,7 @@ void BaseScanner::fill_slots_of_columns_from_path( auto slot_desc = _src_slot_descs.at(i + start); _src_tuple->set_not_null(slot_desc->null_indicator_offset()); void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); - StringValue* str_slot = reinterpret_cast<StringValue*>(slot); + auto* str_slot = reinterpret_cast<StringValue*>(slot); const std::string& column_from_path = columns_from_path[i]; str_slot->ptr = const_cast<char*>(column_from_path.c_str()); str_slot->len = column_from_path.size(); @@ -360,4 +458,28 @@ void BaseScanner::close() { } } +void BaseScanner::_fill_columns_from_path() { + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.num_of_columns_from_file) { + size_t start = range.num_of_columns_from_file; + size_t rows = _src_block.rows(); + + for (size_t i = 0; i < range.columns_from_path.size(); ++i) { + auto slot_desc = _src_slot_descs.at(i + start); + if (slot_desc == nullptr) continue; + auto is_nullable = slot_desc->is_nullable(); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, + is_nullable); + auto data_column = data_type->create_column(); + const std::string& column_from_path = range.columns_from_path[i]; + for (size_t j = 0; j < rows; ++j) { + data_column->insert_data(const_cast<char*>(column_from_path.c_str()), + column_from_path.size()); + } + _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type, + slot_desc->col_name())); + } + } +} + } // namespace doris diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 2fccc62db8..1c2ce211b5 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -53,7 +53,10 @@ struct ScannerCounter { class BaseScanner { public: BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector<TBrokerRangeDesc>& ranges, + const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); + virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); if (_state->enable_vectorized_exec()) { @@ -77,21 +80,22 @@ public: virtual void close() = 0; Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); - Status fill_dest_block(vectorized::Block* dest_block, - std::vector<vectorized::MutableColumnPtr>& columns); - void fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path); void free_expr_local_allocations(); - Status filter_block(vectorized::Block* temp_block, size_t slot_num); - - Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block); - protected: + Status _fill_dest_block(vectorized::Block* dest_block, bool* eof); + virtual Status _init_src_block(); + RuntimeState* _state; const TBrokerScanRangeParams& _params; + + //const TBrokerScanRangeParams& _params; + const std::vector<TBrokerRangeDesc>& _ranges; + const std::vector<TNetworkAddress>& _broker_addresses; + int _next_range; // used for process stat ScannerCounter* _counter; @@ -109,9 +113,6 @@ protected: // Dest tuple descriptor and dest expr context const TupleDescriptor* _dest_tuple_desc; std::vector<ExprContext*> _dest_expr_ctx; - // for vectorized - std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; - std::vector<vectorized::VExprContext*> _vpre_filter_ctxs; // the map values of dest slot id to src slot desc // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; @@ -135,7 +136,16 @@ protected: bool _success = false; bool _scanner_eof = false; + // for vectorized load + std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; + std::vector<vectorized::VExprContext*> _vpre_filter_ctxs; + vectorized::Block _src_block; + int _num_of_columns_from_file; + private: + Status _filter_src_block(); + void _fill_columns_from_path(); + Status _materialize_dest_block(vectorized::Block* output_block); Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); }; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index d9453fecf0..2c21fd3d54 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -48,13 +48,10 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) - : BaseScanner(state, profile, params, pre_filter_texprs, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_decompressor(nullptr), - _next_range(0), _cur_line_reader_eof(false), _skip_lines(0) { if (params.__isset.column_separator_length && params.column_separator_length > 1) { diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 2b6ac2d302..f10ce68518 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -100,9 +100,6 @@ private: Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple); protected: - const std::vector<TBrokerRangeDesc>& _ranges; - const std::vector<TNetworkAddress>& _broker_addresses; - std::string _value_separator; std::string _line_delimiter; TFileFormatType::type _file_format_type; @@ -113,7 +110,6 @@ protected: FileReader* _cur_file_reader; LineReader* _cur_line_reader; Decompressor* _cur_decompressor; - int _next_range; bool _cur_line_reader_eof; // When we fetch range start from 0, header_type="csv_with_names" skip first line diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index a23ce44b03..0be3d4c089 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -40,13 +40,10 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) - : BaseScanner(state, profile, params, pre_filter_texprs, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_json_reader(nullptr), - _next_range(0), _cur_reader_eof(false), _read_json_by_line(false) { if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) { diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index 276b2dd077..ab2f479e60 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -78,9 +78,6 @@ protected: bool& num_as_string, bool& fuzzy_parse); protected: - const std::vector<TBrokerRangeDesc>& _ranges; - const std::vector<TNetworkAddress>& _broker_addresses; - std::string _jsonpath; std::string _jsonpath_file; @@ -91,7 +88,6 @@ protected: FileReader* _cur_file_reader; LineReader* _cur_line_reader; JsonReader* _cur_json_reader; - int _next_range; bool _cur_reader_eof; bool _read_json_by_line; diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index fdda223c1a..138eb729ef 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -120,11 +120,8 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) - : BaseScanner(state, profile, params, pre_filter_texprs, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), // _splittable(params.splittable), - _next_range(0), _cur_file_eof(true), _total_groups(0), _current_group(0), diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h index c13e331fcd..7ee4ab0b61 100644 --- a/be/src/exec/orc_scanner.h +++ b/be/src/exec/orc_scanner.h @@ -47,11 +47,7 @@ private: Status open_next_reader(); private: - const std::vector<TBrokerRangeDesc>& _ranges; - const std::vector<TNetworkAddress>& _broker_addresses; - // Reader - int _next_range; bool _cur_file_eof; // orc file reader object diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 3295dc4bc7..c6cb02e8c2 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -25,9 +25,6 @@ #include "exec/parquet_reader.h" #include "exec/s3_reader.h" #include "exec/text_converter.h" -#include "exec/text_converter.hpp" -#include "exprs/expr.h" -#include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/raw_value.h" #include "runtime/stream_load/load_stream_mgr.h" @@ -41,12 +38,9 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) - : BaseScanner(state, profile, params, pre_filter_texprs, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), // _splittable(params.splittable), _cur_file_reader(nullptr), - _next_range(0), _cur_file_eof(false) {} ParquetScanner::~ParquetScanner() { @@ -83,11 +77,8 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } - if (_scanner_eof) { - *eof = true; - } else { - *eof = false; - } + + *eof = _scanner_eof; return Status::OK(); } diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h index 535d3fe6c5..d66802dd95 100644 --- a/be/src/exec/parquet_scanner.h +++ b/be/src/exec/parquet_scanner.h @@ -74,13 +74,8 @@ protected: Status open_next_reader(); protected: - //const TBrokerScanRangeParams& _params; - const std::vector<TBrokerRangeDesc>& _ranges; - const std::vector<TNetworkAddress>& _broker_addresses; - // Reader ParquetReaderWrap* _cur_file_reader; - int _next_range; bool _cur_file_eof; // is read over? // used to hold current StreamLoadPipe diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 8d5880531a..aa482fcfbf 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -353,8 +353,8 @@ std::string Block::dump_names() const { std::string Block::dump_data(size_t begin, size_t row_limit) const { std::vector<std::string> headers; std::vector<size_t> headers_size; - for (auto it = data.begin(); it != data.end(); ++it) { - std::string s = fmt::format("{}({})", it->name, it->type->get_name()); + for (const auto& it : data) { + std::string s = fmt::format("{}({})", it.name, it.type->get_name()); headers_size.push_back(s.size() > 15 ? s.size() : 15); headers.emplace_back(s); } @@ -402,6 +402,20 @@ std::string Block::dump_data(size_t begin, size_t row_limit) const { return out.str(); } +std::string Block::dump_one_line(size_t row, int column_end) const { + assert(column_end < columns()); + fmt::memory_buffer line; + for (int i = 0; i < column_end; ++i) { + if (LIKELY(i != 0)) { + // TODO: need more effective function of to string. now the impl is slow + fmt::format_to(line, " {}", data[i].to_string(row)); + } else { + fmt::format_to(line, "{}", data[i].to_string(row)); + } + } + return fmt::to_string(line); +} + std::string Block::dump_structure() const { // WriteBufferFromOwnString out; std::stringstream out; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 375ef6906f..729f531291 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -242,6 +242,9 @@ public: /** Get block data in string. */ std::string dump_data(size_t begin = 0, size_t row_limit = 100) const; + /** Get one line data from block, only use in load data */ + std::string dump_one_line(size_t row, int column_end) const; + static Status filter_block(Block* block, int filter_conlumn_id, int column_to_keep); static void erase_useless_column(Block* block, int column_to_keep) { diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp index 5c3730b860..9e63fa5e24 100644 --- a/be/src/vec/data_types/data_type_nullable.cpp +++ b/be/src/vec/data_types/data_type_nullable.cpp @@ -47,7 +47,7 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get()); if (col.is_null_at(row_num)) { - return "\\N"; + return "NULL"; } else { return nested_data_type->to_string(col.get_nested_column(), row_num); } diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index dea6d55da5..3006dba788 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -44,18 +44,14 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile, _text_converter.reset(new (std::nothrow) TextConverter('\\')); } -VBrokerScanner::~VBrokerScanner() {} +VBrokerScanner::~VBrokerScanner() = default; Status VBrokerScanner::get_next(Block* output_block, bool* eof) { SCOPED_TIMER(_read_timer); + RETURN_IF_ERROR(_init_src_block()); const int batch_size = _state->batch_size(); - // Get batch lines - int slot_num = _src_slot_descs.size(); - std::vector<vectorized::MutableColumnPtr> columns(slot_num); - for (int i = 0; i < slot_num; i++) { - columns[i] = _src_slot_descs[i]->get_empty_mutable_column(); - } + auto columns = _src_block.mutate_columns(); while (columns[0]->size() < batch_size && !_scanner_eof) { if (_cur_line_reader == nullptr || _cur_line_reader_eof) { @@ -85,51 +81,8 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) { } } } - if (_scanner_eof) { - *eof = true; - } else { - *eof = false; - } - return _fill_dest_block(output_block, columns); -} - -Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableColumnPtr>& columns) { - if (columns.empty() || columns[0]->size() == 0) { - return Status::OK(); - } - - std::unique_ptr<vectorized::Block> tmp_block(new vectorized::Block()); - auto n_columns = 0; - for (const auto slot_desc : _src_slot_descs) { - tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - auto old_rows = tmp_block->rows(); - // filter - if (!_vpre_filter_ctxs.empty()) { - for (auto vexpr_ctx : _vpre_filter_ctxs) { - RETURN_IF_ERROR(VExprContext::filter_block(vexpr_ctx, tmp_block.get(), - _dest_tuple_desc->slots().size())); - _counter->num_rows_unselected += old_rows - tmp_block->rows(); - old_rows = tmp_block->rows(); - } - } - - Status status; - // expr - if (!_dest_vexpr_ctx.empty()) { - *dest_block = vectorized::VExprContext::get_output_block_after_execute_exprs( - _dest_vexpr_ctx, *tmp_block, status); - if (UNLIKELY(dest_block->rows() == 0)) { - _success = false; - return status; - } - } else { - *dest_block = *tmp_block; - } - return status; + return _fill_dest_block(output_block, eof); } Status VBrokerScanner::_fill_dest_columns(const Slice& line, @@ -151,57 +104,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, const Slice& value = _split_values[i]; if (is_null(value)) { - // If _strict_mode is false, _src_slot_descs_order_by_dest size could be zero - if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr) && - !_src_tuple->is_null( - _src_slot_descs_order_by_dest[dest_index]->null_indicator_offset())) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _src_tuple_row->to_string(*(_row_desc.get())); - }, - [&]() -> std::string { - // Type of the slot is must be Varchar in _src_tuple. - StringValue* raw_value = _src_tuple->get_string_slot( - _src_slot_descs_order_by_dest[dest_index]->tuple_offset()); - std::string raw_string; - if (raw_value != nullptr) { //is not null then get raw value - raw_string = raw_value->to_string(); - } - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, - "column({}) value is incorrect while strict mode is {}, " - "src value is {}", - src_slot_desc->col_name(), _strict_mode, raw_string); - return error_msg.data(); - }, - &_scanner_eof)); - _counter->num_rows_filtered++; - _success = false; - return Status::OK(); - } - - if (!src_slot_desc->is_nullable()) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _src_tuple_row->to_string(*(_row_desc.get())); - }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to( - error_msg, - "column({}) values is null while columns is not nullable", - src_slot_desc->col_name()); - return error_msg.data(); - }, - &_scanner_eof)); - _counter->num_rows_filtered++; - _success = false; - return Status::OK(); - } // nullable auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get()); - nullable_column->insert_data(nullptr, 0); + nullable_column->insert_default(); continue; } @@ -209,27 +115,6 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, &columns[dest_index], _state)); } - const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.num_of_columns_from_file) { - RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file, - range.columns_from_path, columns)); - } - - return Status::OK(); -} - -Status VBrokerScanner::_fill_columns_from_path(int start, - const std::vector<std::string>& columns_from_path, - std::vector<MutableColumnPtr>& columns) { - // values of columns from path can not be null - for (int i = 0; i < columns_from_path.size(); ++i) { - int dest_index = i + start; - auto slot_desc = _src_slot_descs.at(dest_index); - const std::string& column_from_path = columns_from_path[i]; - RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()), - column_from_path.size(), slot_desc, &columns[dest_index], - _state)); - } return Status::OK(); } diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h index 57c4004728..11d8b494fa 100644 --- a/be/src/vec/exec/vbroker_scanner.h +++ b/be/src/vec/exec/vbroker_scanner.h @@ -42,11 +42,6 @@ private: Status _write_text_column(char* value, int length, SlotDescriptor* slot, MutableColumnPtr* column_ptr, RuntimeState* state); - Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns); - Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns); - - Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path, - std::vector<MutableColumnPtr>& columns); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index b7645fecf3..e456e6dfc8 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -46,14 +46,10 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { SCOPED_TIMER(_read_timer); + RETURN_IF_ERROR(_init_src_block()); const int batch_size = _state->batch_size(); - size_t slot_num = _src_slot_descs.size(); - std::vector<vectorized::MutableColumnPtr> columns(slot_num); - auto string_type = make_nullable(std::make_shared<DataTypeString>()); - for (int i = 0; i < slot_num; i++) { - columns[i] = string_type->create_column(); - } + auto columns = _src_block.mutate_columns(); // Get one line while (columns[0]->size() < batch_size && !_scanner_eof) { if (_cur_file_reader == nullptr || _cur_reader_eof) { @@ -83,10 +79,8 @@ Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { COUNTER_UPDATE(_rows_read_counter, columns[0]->size()); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns)); - *eof = _scanner_eof; - return Status::OK(); + return _fill_dest_block(output_block, eof); } Status VJsonScanner::open_next_reader() { diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index 6a891850a7..037bc15028 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -36,29 +36,15 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), _batch(nullptr), - _arrow_batch_cur_idx(0), - _num_of_columns_from_file(0) {} -VParquetScanner::~VParquetScanner() {} + _arrow_batch_cur_idx(0) {} + +VParquetScanner::~VParquetScanner() = default; Status VParquetScanner::open() { RETURN_IF_ERROR(ParquetScanner::open()); if (_ranges.empty()) { return Status::OK(); } - auto range = _ranges[0]; - _num_of_columns_from_file = range.__isset.num_of_columns_from_file - ? implicit_cast<int>(range.num_of_columns_from_file) - : implicit_cast<int>(_src_slot_descs.size()); - - // check consistency - if (range.__isset.num_of_columns_from_file) { - int size = range.columns_from_path.size(); - for (const auto& r : _ranges) { - if (r.columns_from_path.size() != size) { - return Status::InternalError("ranges have different number of columns."); - } - } - } return Status::OK(); } @@ -99,9 +85,9 @@ Status VParquetScanner::_init_arrow_batch_if_necessary() { return status; } -Status VParquetScanner::_init_src_block(Block* block) { +Status VParquetScanner::_init_src_block() { size_t batch_pos = 0; - block->clear(); + _src_block.clear(); for (auto i = 0; i < _num_of_columns_from_file; ++i) { SlotDescriptor* slot_desc = _src_slot_descs[i]; if (slot_desc == nullptr) { @@ -118,7 +104,7 @@ Status VParquetScanner::_init_src_block(Block* block) { fmt::format("Not support arrow type:{}", array->type()->name())); } MutableColumnPtr data_column = data_type->create_column(); - block->insert( + _src_block.insert( ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); } return Status::OK(); @@ -150,15 +136,15 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) { return Status::OK(); } } - Block src_block; - RETURN_IF_ERROR(_init_src_block(&src_block)); + + RETURN_IF_ERROR(_init_src_block()); // convert arrow batch to block until reach the batch_size while (!_scanner_eof) { // cast arrow type to PT0 and append it to src block // for example: arrow::Type::INT16 => TYPE_SMALLINT - RETURN_IF_ERROR(_append_batch_to_src_block(&src_block)); + RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block)); // finalize the src block if full - if (src_block.rows() >= _state->batch_size()) { + if (_src_block.rows() >= _state->batch_size()) { break; } auto status = _next_arrow_batch(); @@ -173,94 +159,14 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) { _cur_file_eof = true; break; } - COUNTER_UPDATE(_rows_read_counter, src_block.rows()); + COUNTER_UPDATE(_rows_read_counter, _src_block.rows()); SCOPED_TIMER(_materialize_timer); // cast PT0 => PT1 // for example: TYPE_SMALLINT => TYPE_VARCHAR - RETURN_IF_ERROR(_cast_src_block(&src_block)); - // range of current file - _fill_columns_from_path(&src_block); - RETURN_IF_ERROR(_eval_conjunts(&src_block)); - // materialize, src block => dest columns - RETURN_IF_ERROR(_materialize_block(&src_block, block)); - *eof = _scanner_eof; - return Status::OK(); -} - -// eval conjuncts, for example: t1 > 1 -Status VParquetScanner::_eval_conjunts(Block* block) { - for (auto& vctx : _vpre_filter_ctxs) { - size_t orig_rows = block->rows(); - RETURN_IF_ERROR(VExprContext::filter_block(vctx, block, block->columns())); - _counter->num_rows_unselected += orig_rows - block->rows(); - } - return Status::OK(); -} + RETURN_IF_ERROR(_cast_src_block(&_src_block)); -void VParquetScanner::_fill_columns_from_path(Block* block) { - const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); - if (range.__isset.num_of_columns_from_file) { - size_t start = range.num_of_columns_from_file; - size_t rows = block->rows(); - for (size_t i = 0; i < range.columns_from_path.size(); ++i) { - auto slot_desc = _src_slot_descs.at(i + start); - if (slot_desc == nullptr) continue; - auto is_nullable = slot_desc->is_nullable(); - DataTypePtr data_type = - DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, is_nullable); - MutableColumnPtr data_column = data_type->create_column(); - const std::string& column_from_path = range.columns_from_path[i]; - for (size_t i = 0; i < rows; ++i) { - data_column->insert_data(const_cast<char*>(column_from_path.c_str()), - column_from_path.size()); - } - block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, - slot_desc->col_name())); - } - } -} - -Status VParquetScanner::_materialize_block(Block* block, Block* dest_block) { - int ctx_idx = 0; - size_t orig_rows = block->rows(); - auto filter_column = ColumnUInt8::create(orig_rows, 1); - for (auto slot_desc : _dest_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { - continue; - } - int dest_index = ctx_idx++; - - VExprContext* ctx = _dest_vexpr_ctx[dest_index]; - int result_column_id = 0; - // PT1 => dest primitive type - RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); - ColumnPtr& ptr = block->safe_get_by_position(result_column_id).column; - if (!slot_desc->is_nullable()) { - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*ptr)) { - if (nullable_column->has_null()) { - // fill filter if src has null value and dest column is not nullable - IColumn::Filter& filter = assert_cast<ColumnUInt8&>(*filter_column).get_data(); - const ColumnPtr& null_column_ptr = nullable_column->get_null_map_column_ptr(); - const auto& column_data = - assert_cast<const ColumnUInt8&>(*null_column_ptr).get_data(); - for (size_t i = 0; i < null_column_ptr->size(); ++i) { - filter[i] &= !column_data[i]; - } - } - ptr = nullable_column->get_nested_column_ptr(); - } - } - dest_block->insert(vectorized::ColumnWithTypeAndName( - std::move(ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); - } - size_t dest_size = dest_block->columns(); - // do filter - dest_block->insert(vectorized::ColumnWithTypeAndName( - std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), - "filter column")); - RETURN_IF_ERROR(Block::filter_block(dest_block, dest_size, dest_size)); - _counter->num_rows_filtered += orig_rows - dest_block->rows(); - return Status::OK(); + // materialize, src block => dest columns + return _fill_dest_block(block, eof); } // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h index 31749248d5..72ac280989 100644 --- a/be/src/vec/exec/vparquet_scanner.h +++ b/be/src/vec/exec/vparquet_scanner.h @@ -54,17 +54,13 @@ public: private: Status _next_arrow_batch(); Status _init_arrow_batch_if_necessary(); - Status _init_src_block(Block* block); + Status _init_src_block() override; Status _append_batch_to_src_block(Block* block); Status _cast_src_block(Block* block); - Status _eval_conjunts(Block* block); - Status _materialize_block(Block* block, Block* dest_block); - void _fill_columns_from_path(Block* block); private: std::shared_ptr<arrow::RecordBatch> _batch; size_t _arrow_batch_cur_idx; - int _num_of_columns_from_file; }; } // namespace doris::vectorized diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 1a39c1f8e0..713aefc4a7 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -41,6 +41,13 @@ public: init(); _profile = _runtime_state.runtime_profile(); _runtime_state._instance_mem_tracker.reset(new MemTracker()); + + TUniqueId unique_id; + TQueryOptions query_options; + query_options.__set_enable_vectorized_engine(true); + TQueryGlobals query_globals; + + _runtime_state.init(unique_id, query_options, query_globals, nullptr); } void init(); @@ -370,17 +377,17 @@ TEST_F(VBrokerScannerTest, normal) { auto columns = block->get_columns(); ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1"); - ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "4"); - ASSERT_EQ(columns[0]->get_data_at(2).to_string(), "8"); + ASSERT_EQ(columns[0]->get_int(0), 1); + ASSERT_EQ(columns[0]->get_int(1), 4); + ASSERT_EQ(columns[0]->get_int(2), 8); - ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2"); - ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "5"); - ASSERT_EQ(columns[1]->get_data_at(2).to_string(), "9"); + ASSERT_EQ(columns[1]->get_int(0), 2); + ASSERT_EQ(columns[1]->get_int(1), 5); + ASSERT_EQ(columns[1]->get_int(2), 9); - ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3"); - ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "6"); - ASSERT_EQ(columns[2]->get_data_at(2).to_string(), "10"); + ASSERT_EQ(columns[2]->get_int(0), 3); + ASSERT_EQ(columns[2]->get_int(1), 6); + ASSERT_EQ(columns[2]->get_int(2), 10); } TEST_F(VBrokerScannerTest, normal2) { @@ -413,14 +420,14 @@ TEST_F(VBrokerScannerTest, normal2) { auto columns = block->get_columns(); ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1"); - ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "3"); + ASSERT_EQ(columns[0]->get_int(0), 1); + ASSERT_EQ(columns[0]->get_int(1), 3); - ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2"); - ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "4"); + ASSERT_EQ(columns[1]->get_int(0), 2); + ASSERT_EQ(columns[1]->get_int(1), 4); - ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3"); - ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "5"); + ASSERT_EQ(columns[2]->get_int(0), 3); + ASSERT_EQ(columns[2]->get_int(1), 5); } TEST_F(VBrokerScannerTest, normal5) { diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index c96772a011..393ff80f16 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -47,7 +47,13 @@ public: VJsonScannerTest() : _runtime_state(TQueryGlobals()) { init(); _runtime_state._instance_mem_tracker.reset(new MemTracker()); - _runtime_state._exec_env = ExecEnv::GetInstance(); + + TUniqueId unique_id; + TQueryOptions query_options; + query_options.__set_enable_vectorized_engine(true); + TQueryGlobals query_globals; + + _runtime_state.init(unique_id, query_options, query_globals, nullptr); } void init(); static void SetUpTestCase() { @@ -391,7 +397,7 @@ void VJsonScannerTest::create_expr_info() { TTypeNode node; node.__set_type(TTypeNodeType::SCALAR); TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::BIGINT); + scalar_type.__set_type(TPrimitiveType::DOUBLE); node.__set_scalar_type(scalar_type); int_type.types.push_back(node); } @@ -553,6 +559,7 @@ TEST_F(VJsonScannerTest, simple_array_json) { range.format_type = TFileFormatType::FORMAT_JSON; range.strip_outer_array = true; range.__isset.strip_outer_array = true; + range.__set_num_as_string(true); range.splittable = true; range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; range.file_type = TFileType::FILE_LOCAL; @@ -583,9 +590,9 @@ TEST_F(VJsonScannerTest, simple_array_json) { ASSERT_EQ(columns[3].to_string(0), "8.950000"); ASSERT_EQ(columns[3].to_string(1), "12.990000"); ASSERT_EQ(columns[4].to_string(0), "1234"); - ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000"); - ASSERT_EQ(columns[5].to_string(0), "1234.123400"); - ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953"); + ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424"); + ASSERT_EQ(columns[5].to_string(0), "1234.123400000"); + ASSERT_EQ(columns[5].to_string(1), "9999999999999.999999000"); block.clear(); status = scan_node.get_next(&_runtime_state, &block, &eof); @@ -753,12 +760,12 @@ TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) { auto columns = block.get_columns_with_type_and_name(); ASSERT_EQ(columns.size(), 6); - ASSERT_EQ(columns[0].to_string(0), "\\N"); - ASSERT_EQ(columns[0].to_string(1), "\\N"); - ASSERT_EQ(columns[1].to_string(0), "\\N"); - ASSERT_EQ(columns[1].to_string(1), "\\N"); - ASSERT_EQ(columns[2].to_string(0), "\\N"); - ASSERT_EQ(columns[2].to_string(1), "\\N"); + ASSERT_EQ(columns[0].to_string(0), "NULL"); + ASSERT_EQ(columns[0].to_string(1), "NULL"); + ASSERT_EQ(columns[1].to_string(0), "NULL"); + ASSERT_EQ(columns[1].to_string(1), "NULL"); + ASSERT_EQ(columns[2].to_string(0), "NULL"); + ASSERT_EQ(columns[2].to_string(1), "NULL"); block.clear(); scan_node.close(&_runtime_state); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 303f080976..ae8aafa9cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; @@ -130,6 +131,7 @@ public class LoadLoadingTask extends LoadTask { planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); + curCoordinator.setExecVecEngine(Config.enable_vectorized_load); /* * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. * And the variable 'load_mem_limit' does not make any effect. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index d74b17f490..78fa7a23b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; @@ -125,6 +126,9 @@ public class LoadingTaskPlanner { scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism); scanNode.init(analyzer); scanNode.finalize(analyzer); + if (Config.enable_vectorized_load) { + scanNode.convertToVectoriezd(); + } scanNodes.add(scanNode); descTable.computeStatAndMemLayout(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org