This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new eec1dfde3a [feature] (vec) instead of converting line to src tuple for stream load in vectorized. (#9314) eec1dfde3a is described below commit eec1dfde3a6b93eccc28fb1c0b58233987a2b856 Author: xiepengcheng01 <100340096+xiepengchen...@users.noreply.github.com> AuthorDate: Mon May 9 11:24:07 2022 +0800 [feature] (vec) instead of converting line to src tuple for stream load in vectorized. (#9314) Co-authored-by: xiepengcheng01 <xiepengchen...@xafj-palo-rpm64.xafj.baidu.com> --- be/src/exec/base_scanner.cpp | 41 ++++- be/src/exec/base_scanner.h | 15 +- be/src/exec/broker_scanner.cpp | 18 +- be/src/exec/broker_scanner.h | 6 +- be/src/vec/exec/vbroker_scan_node.cpp | 188 ++++++++++---------- be/src/vec/exec/vbroker_scan_node.h | 1 + be/src/vec/exec/vbroker_scanner.cpp | 263 ++++++++++++---------------- be/src/vec/exec/vbroker_scanner.h | 17 +- be/test/exec/multi_bytes_separator_test.cpp | 9 +- be/test/vec/exec/vbroker_scan_node_test.cpp | 72 ++++---- be/test/vec/exec/vbroker_scanner_test.cpp | 66 +++---- 11 files changed, 350 insertions(+), 346 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index ca5b08831f..8621cf75f8 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -107,10 +107,18 @@ Status BaseScanner::init_expr_ctxes() { // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor if (!_pre_filter_texprs.empty()) { - RETURN_IF_ERROR( - Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs)); - RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); - RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); + if (_state->enable_vectorized_exec()) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( + _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc, + _mem_tracker)); + RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state)); + } else { + RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, + &_pre_filter_ctxs)); + RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); + } } // Construct dest slots information @@ -133,11 +141,22 @@ Status BaseScanner::init_expr_ctxes() { << ", name=" << slot_desc->col_name(); return Status::InternalError(ss.str()); } - ExprContext* ctx = nullptr; - RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); - RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker)); - RETURN_IF_ERROR(ctx->open(_state)); - _dest_expr_ctx.emplace_back(ctx); + + if (_state->enable_vectorized_exec()) { + vectorized::VExprContext* ctx = nullptr; + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker)); + RETURN_IF_ERROR(ctx->open(_state)); + _dest_vexpr_ctx.emplace_back(ctx); + } else { + ExprContext* ctx = nullptr; + RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx)); + RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker)); + RETURN_IF_ERROR(ctx->open(_state)); + _dest_expr_ctx.emplace_back(ctx); + } + if (has_slot_id_map) { auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) { @@ -284,6 +303,10 @@ void BaseScanner::close() { if (!_pre_filter_ctxs.empty()) { Expr::close(_pre_filter_ctxs, _state); } + + if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) { + vectorized::VExpr::close(_vpre_filter_ctxs, _state); + } } } // namespace doris diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 13285ab6aa..d98e39e663 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -20,6 +20,7 @@ #include "common/status.h" #include "exprs/expr.h" +#include "vec/exprs/vexpr.h" #include "runtime/tuple.h" #include "util/runtime_profile.h" @@ -52,7 +53,12 @@ class BaseScanner { public: BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); }; + virtual ~BaseScanner() { + Expr::close(_dest_expr_ctx, _state); + if (_state->enable_vectorized_exec()) { + vectorized::VExpr::close(_dest_vexpr_ctx, _state); + } + }; virtual Status init_expr_ctxes(); // Open this scanner, will initialize information need to @@ -62,8 +68,8 @@ public: virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) = 0; // Get next block - virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) { - return Status::NotSupported("Not Implemented get next"); + virtual Status get_next(vectorized::Block* block, bool* eof) { + return Status::NotSupported("Not Implemented get block"); } // Close this scanner @@ -95,6 +101,9 @@ protected: // Dest tuple descriptor and dest expr context const TupleDescriptor* _dest_tuple_desc; std::vector<ExprContext*> _dest_expr_ctx; + // for vectorized + std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; + std::vector<vectorized::VExprContext*> _vpre_filter_ctxs; // the map values of dest slot id to src slot desc // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index cda19d8611..d35e8428a4 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -468,8 +468,7 @@ Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* return fill_dest_tuple(tuple, tuple_pool, fill_tuple); } -// Convert one row to this tuple -Status BrokerScanner::_line_to_src_tuple(const Slice& line) { +Status BrokerScanner::_line_split_to_values(const Slice& line) { bool is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; if (!is_proto_format && !validate_utf8(line.data, line.size)) { RETURN_IF_ERROR(_state->append_error_msg_to_file( @@ -546,6 +545,17 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) { } } + _success = true; + return Status::OK(); +} + +// Convert one row to this tuple +Status BrokerScanner::_line_to_src_tuple(const Slice& line) { + RETURN_IF_ERROR(_line_split_to_values(line)); + if (!_success) { + return Status::OK(); + } + for (int i = 0; i < _split_values.size(); ++i) { auto slot_desc = _src_slot_descs[i]; const Slice& value = _split_values[i]; @@ -560,11 +570,11 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) { str_slot->len = value.size; } + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.num_of_columns_from_file) { - fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path); + fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path); } - _success = true; return Status::OK(); } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 344d3789ae..2b6ac2d302 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -65,8 +65,8 @@ public: virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override; - Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) override { - return Status::NotSupported("Not Implemented get columns"); + Status get_next(vectorized::Block* block, bool* eof) override { + return Status::NotSupported("Not Implemented get block"); } // Close this scanner @@ -78,6 +78,8 @@ protected: Status _line_to_src_tuple(const Slice& line); + Status _line_split_to_values(const Slice& line); + private: Status open_file_reader(); Status create_decompressor(TFileFormatType::type type); diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index d12f3e8d85..09338a1959 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, return Status::OK(); } - std::shared_ptr<vectorized::Block> scanner_block; - { - std::unique_lock<std::mutex> l(_batch_queue_lock); - while (_process_status.ok() && !_runtime_state->is_cancelled() && - _num_running_scanners > 0 && _block_queue.empty()) { - SCOPED_TIMER(_wait_scanner_timer); - _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); - } - if (!_process_status.ok()) { - // Some scanner process failed. - return _process_status; + const int batch_size = _runtime_state->batch_size(); + while (true) { + std::shared_ptr<vectorized::Block> scanner_block; + { + std::unique_lock<std::mutex> l(_batch_queue_lock); + while (_process_status.ok() && !_runtime_state->is_cancelled() && + _num_running_scanners > 0 && _block_queue.empty()) { + SCOPED_TIMER(_wait_scanner_timer); + _queue_reader_cond.wait_for(l, std::chrono::seconds(1)); + } + if (!_process_status.ok()) { + // Some scanner process failed. + return _process_status; + } + if (_runtime_state->is_cancelled()) { + if (update_status(Status::Cancelled("Cancelled"))) { + _queue_writer_cond.notify_all(); + } + return _process_status; + } + if (!_block_queue.empty()) { + scanner_block = _block_queue.front(); + _block_queue.pop_front(); + } } - if (_runtime_state->is_cancelled()) { - if (update_status(Status::Cancelled("Cancelled"))) { - _queue_writer_cond.notify_all(); + + // All scanner has been finished, and all cached batch has been read + if (!scanner_block) { + if (_mutable_block && !_mutable_block->empty()) { + *block = _mutable_block->to_block(); + reached_limit(block, eos); + LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit."; } - return _process_status; + _scan_finished.store(true); + *eos = true; + return Status::OK(); } - if (!_block_queue.empty()) { - scanner_block = _block_queue.front(); - _block_queue.pop_front(); + // notify one scanner + _queue_writer_cond.notify_one(); + + if (UNLIKELY(!_mutable_block)) { + _mutable_block.reset(new MutableBlock(scanner_block->clone_empty())); } - } - // All scanner has been finished, and all cached batch has been read - if (scanner_block == nullptr) { - _scan_finished.store(true); - *eos = true; - return Status::OK(); + if (_mutable_block->rows() + scanner_block->rows() < batch_size) { + // merge scanner_block into _mutable_block + _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); + continue; + } else { + if (_mutable_block->empty()) { + // directly use scanner_block + *block = *scanner_block; + } else { + // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next. + *block = _mutable_block->to_block(); + _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns()); + _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows()); + } + break; + } } - // notify one scanner - _queue_writer_cond.notify_one(); - - reached_limit(scanner_block.get(), eos); - *block = *scanner_block; - + reached_limit(block, eos); if (*eos) { _scan_finished.store(true); _queue_writer_cond.notify_all(); @@ -120,75 +146,53 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter); RETURN_IF_ERROR(scanner->open()); bool scanner_eof = false; - - const int batch_size = _runtime_state->batch_size(); - size_t slot_num = _tuple_desc->slots().size(); - while (!scanner_eof) { - std::shared_ptr<vectorized::Block> block(new vectorized::Block()); - std::vector<vectorized::MutableColumnPtr> columns(slot_num); - for (int i = 0; i < slot_num; i++) { - columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + RETURN_IF_CANCELLED(_runtime_state); + // If we have finished all works + if (_scan_finished.load() || !_process_status.ok()) { + return Status::OK(); } - while (columns[0]->size() < batch_size && !scanner_eof) { - RETURN_IF_CANCELLED(_runtime_state); - // If we have finished all works - if (_scan_finished.load()) { - return Status::OK(); - } - - RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof)); - if (scanner_eof) { - break; - } + std::shared_ptr<vectorized::Block> block(new vectorized::Block()); + RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof)); + if (block->rows() == 0) { + continue; + } + auto old_rows = block->rows(); + RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), + _tuple_desc->slots().size())); + counter->num_rows_unselected += old_rows - block->rows(); + if (block->rows() == 0) { + continue; } - if (!columns[0]->empty()) { - auto n_columns = 0; - for (const auto slot_desc : _tuple_desc->slots()) { - block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - - auto old_rows = block->rows(); - - RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), - _tuple_desc->slots().size())); - - counter->num_rows_unselected += old_rows - block->rows(); - - std::unique_lock<std::mutex> l(_batch_queue_lock); - while (_process_status.ok() && !_scan_finished.load() && - !_runtime_state->is_cancelled() && - // stop pushing more batch if - // 1. too many batches in queue, or - // 2. at least one batch in queue and memory exceed limit. - (_block_queue.size() >= _max_buffered_batches || - (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) { - _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); - } - // Process already set failed, so we just return OK - if (!_process_status.ok()) { - return Status::OK(); - } - // Scan already finished, just return - if (_scan_finished.load()) { - return Status::OK(); - } - // Runtime state is canceled, just return cancel - if (_runtime_state->is_cancelled()) { - return Status::Cancelled("Cancelled"); - } - // Queue size Must be smaller than _max_buffered_batches - _block_queue.push_back(block); - - // Notify reader to - _queue_reader_cond.notify_one(); + std::unique_lock<std::mutex> l(_batch_queue_lock); + while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() && + // stop pushing more batch if + // 1. too many batches in queue, or + // 2. at least one batch in queue and memory exceed limit. + (_block_queue.size() >= _max_buffered_batches || + (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) { + _queue_writer_cond.wait_for(l, std::chrono::seconds(1)); } - } + // Process already set failed, so we just return OK + if (!_process_status.ok()) { + return Status::OK(); + } + // Scan already finished, just return + if (_scan_finished.load()) { + return Status::OK(); + } + // Runtime state is canceled, just return cancel + if (_runtime_state->is_cancelled()) { + return Status::Cancelled("Cancelled"); + } + // Queue size Must be smaller than _max_buffered_batches + _block_queue.push_back(block); + // Notify reader to + _queue_reader_cond.notify_one(); + } return Status::OK(); } diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h index 9a3b2fe362..4ccebed5fb 100644 --- a/be/src/vec/exec/vbroker_scan_node.h +++ b/be/src/vec/exec/vbroker_scan_node.h @@ -51,6 +51,7 @@ private: Status scanner_scan(const TBrokerScanRange& scan_range, ScannerCounter* counter); std::deque<std::shared_ptr<vectorized::Block>> _block_queue; + std::unique_ptr<MutableBlock> _mutable_block; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index 28e2f24c22..39302099a6 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -21,25 +21,41 @@ #include <iostream> #include <sstream> +#include "exec/text_converter.h" #include "exec/exec_node.h" #include "exprs/expr_context.h" #include "exec/plain_text_line_reader.h" +#include "util/utf8_check.h" namespace doris::vectorized { + +bool is_null(const Slice& slice) { + return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N'; +} + VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) : BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, - counter) {} + counter) { + _text_converter.reset(new (std::nothrow) TextConverter('\\')); +} -Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) { +VBrokerScanner::~VBrokerScanner() {} + +Status VBrokerScanner::get_next(Block* output_block, bool* eof) { SCOPED_TIMER(_read_timer); const int batch_size = _state->batch_size(); + // Get batch lines + int slot_num = _src_slot_descs.size(); + std::vector<vectorized::MutableColumnPtr> columns(slot_num); + for (int i = 0; i < slot_num; i++) { + columns[i] = _src_slot_descs[i]->get_empty_mutable_column(); + } - // Get one line while (columns[0]->size() < batch_size && !_scanner_eof) { if (_cur_line_reader == nullptr || _cur_line_reader_eof) { RETURN_IF_ERROR(open_next_reader()); @@ -62,7 +78,7 @@ Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eo { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), columns)); + RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns)); if (_success) { free_expr_local_allocations(); } @@ -73,53 +89,67 @@ Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eo } else { *eof = false; } - return Status::OK(); + return _fill_dest_block(output_block, columns); } -Status VBrokerScanner::_convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& columns) { - RETURN_IF_ERROR(_line_to_src_tuple(line)); - if (!_success) { - // If not success, which means we met an invalid row, return. +Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableColumnPtr>& columns) { + if (columns.empty() || columns[0]->size() == 0) { return Status::OK(); } - return _fill_dest_columns(columns); + std::unique_ptr<vectorized::Block> tmp_block(new vectorized::Block()); + auto n_columns = 0; + for (const auto slot_desc : _src_slot_descs) { + tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + auto old_rows = tmp_block->rows(); + // filter + if (!_vpre_filter_ctxs.empty()) { + for (auto vexpr_ctx : _vpre_filter_ctxs) { + RETURN_IF_ERROR(VExprContext::filter_block(vexpr_ctx, tmp_block.get(), + _dest_tuple_desc->slots().size())); + _counter->num_rows_unselected += old_rows - tmp_block->rows(); + old_rows = tmp_block->rows(); + } + } + + Status status; + // expr + if (!_dest_vexpr_ctx.empty()) { + *dest_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _dest_vexpr_ctx, *tmp_block, status); + if (UNLIKELY(dest_block->rows() == 0)) { + _success = false; + return status; + } + } else { + *dest_block = *tmp_block; + } + + return status; } -Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns) { - // filter src tuple by preceding filter first - if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) { - _counter->num_rows_unselected++; - _success = false; +Status VBrokerScanner::_fill_dest_columns(const Slice& line, + std::vector<MutableColumnPtr>& columns) { + RETURN_IF_ERROR(_line_split_to_values(line)); + if (!_success) { + // If not success, which means we met an invalid row, return. return Status::OK(); } - // convert and fill dest tuple - int ctx_idx = 0; - for (auto slot_desc : _dest_tuple_desc->slots()) { - if (!slot_desc->is_materialized()) { + + int idx = 0; + for (int i = 0; i < _split_values.size(); ++i) { + int dest_index = idx++; + + auto src_slot_desc = _src_slot_descs[i]; + if (!src_slot_desc->is_materialized()) { continue; } - int dest_index = ctx_idx++; - auto* column_ptr = columns[dest_index].get(); - - ExprContext* ctx = _dest_expr_ctx[dest_index]; - void* value = ctx->get_value(_src_tuple_row); - if (value == nullptr) { - // Only when the expr return value is null, we will check the error message. - std::string expr_error = ctx->get_error_msg(); - if (!expr_error.empty()) { - RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { - return _src_tuple_row->to_string(*(_row_desc.get())); - }, - [&]() -> std::string { return expr_error; }, &_scanner_eof)); - _counter->num_rows_filtered++; - // The ctx is reused, so must clear the error state and message. - ctx->clear_error_msg(); - _success = false; - return Status::OK(); - } + const Slice& value = _split_values[i]; + if (is_null(value)) { // If _strict_mode is false, _src_slot_descs_order_by_dest size could be zero if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr) && !_src_tuple->is_null( @@ -140,7 +170,7 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns fmt::format_to(error_msg, "column({}) value is incorrect while strict mode is {}, " "src value is {}", - slot_desc->col_name(), _strict_mode, raw_string); + src_slot_desc->col_name(), _strict_mode, raw_string); return error_msg.data(); }, &_scanner_eof)); @@ -148,7 +178,8 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns _success = false; return Status::OK(); } - if (!slot_desc->is_nullable()) { + + if (!src_slot_desc->is_nullable()) { RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { return _src_tuple_row->to_string(*(_row_desc.get())); @@ -158,7 +189,7 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns fmt::format_to( error_msg, "column({}) values is null while columns is not nullable", - slot_desc->col_name()); + src_slot_desc->col_name()); return error_msg.data(); }, &_scanner_eof)); @@ -166,124 +197,50 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns _success = false; return Status::OK(); } - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr); + // nullable + auto* nullable_column = + reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get()); nullable_column->insert_data(nullptr, 0); continue; } - if (slot_desc->is_nullable()) { - auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr); - nullable_column->get_null_map_data().push_back(0); - column_ptr = &nullable_column->get_nested_column(); - } - char* value_ptr = (char*)value; - switch (slot_desc->type().type) { - case TYPE_BOOLEAN: { - assert_cast<ColumnVector<UInt8>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_TINYINT: { - assert_cast<ColumnVector<Int8>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_SMALLINT: { - assert_cast<ColumnVector<Int16>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_INT: { - assert_cast<ColumnVector<Int32>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_BIGINT: { - assert_cast<ColumnVector<Int64>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_LARGEINT: { - assert_cast<ColumnVector<Int128>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_FLOAT: { - assert_cast<ColumnVector<Float32>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_DOUBLE: { - assert_cast<ColumnVector<Float64>*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_CHAR: { - Slice* slice = reinterpret_cast<Slice*>(value_ptr); - assert_cast<ColumnString*>(column_ptr) - ->insert_data(slice->data, strnlen(slice->data, slice->size)); - break; - } - case TYPE_VARCHAR: - case TYPE_STRING: { - Slice* slice = reinterpret_cast<Slice*>(value_ptr); - assert_cast<ColumnString*>(column_ptr)->insert_data(slice->data, slice->size); - break; - } - case TYPE_OBJECT: { - Slice* slice = reinterpret_cast<Slice*>(value_ptr); - // insert_default() - auto* target_column = assert_cast<ColumnBitmap*>(column_ptr); - target_column->insert_default(); - BitmapValue* pvalue = nullptr; - int pos = target_column->size() - 1; - pvalue = &target_column->get_element(pos); + RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc, + &columns[dest_index], _state)); + } - if (slice->size != 0) { - BitmapValue value; - value.deserialize(slice->data); - *pvalue = std::move(value); - } else { - *pvalue = std::move(*reinterpret_cast<BitmapValue*>(slice->data)); - } - break; - } - case TYPE_HLL: { - Slice* slice = reinterpret_cast<Slice*>(value_ptr); - auto* target_column = assert_cast<ColumnHLL*>(column_ptr); + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.num_of_columns_from_file) { + RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file, + range.columns_from_path, columns)); + } - target_column->insert_default(); - HyperLogLog* pvalue = nullptr; - int pos = target_column->size() - 1; - pvalue = &target_column->get_element(pos); - if (slice->size != 0) { - HyperLogLog value; - value.deserialize(*slice); - *pvalue = std::move(value); - } else { - *pvalue = std::move(*reinterpret_cast<HyperLogLog*>(slice->data)); - } - break; - } - case TYPE_DECIMALV2: { - assert_cast<ColumnDecimal<Decimal128>*>(column_ptr) - ->insert_data(reinterpret_cast<char*>(value_ptr), 0); - break; - } - case TYPE_DATETIME: { - DateTimeValue value = *reinterpret_cast<DateTimeValue*>(value_ptr); - VecDateTimeValue date; - date.convert_dt_to_vec_dt(&value); - assert_cast<ColumnVector<Int64>*>(column_ptr) - ->insert_data(reinterpret_cast<char*>(&date), 0); - break; - } - case TYPE_DATE: { - DateTimeValue value = *reinterpret_cast<DateTimeValue*>(value_ptr); - VecDateTimeValue date; - date.convert_dt_to_vec_dt(&value); - assert_cast<ColumnVector<Int64>*>(column_ptr) - ->insert_data(reinterpret_cast<char*>(&date), 0); - break; - } - default: { - break; - } - } + return Status::OK(); +} + +Status VBrokerScanner::_fill_columns_from_path(int start, + const std::vector<std::string>& columns_from_path, + std::vector<MutableColumnPtr>& columns) { + // values of columns from path can not be null + for (int i = 0; i < columns_from_path.size(); ++i) { + int dest_index = i + start; + auto slot_desc = _src_slot_descs.at(dest_index); + const std::string& column_from_path = columns_from_path[i]; + RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()), + column_from_path.size(), slot_desc, &columns[dest_index], + _state)); + } + return Status::OK(); +} + +Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot, + vectorized::MutableColumnPtr* column_ptr, + RuntimeState* state) { + if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) { + std::stringstream ss; + ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`" + << slot->col_name() + "`"; + return Status::InternalError(ss.str()); } - _success = true; return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h index 469e38d4e8..57c4004728 100644 --- a/be/src/vec/exec/vbroker_scanner.h +++ b/be/src/vec/exec/vbroker_scanner.h @@ -27,17 +27,26 @@ public: const std::vector<TBrokerRangeDesc>& ranges, const std::vector<TNetworkAddress>& broker_addresses, const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); - ~VBrokerScanner() override = default; + ~VBrokerScanner(); virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override { return Status::NotSupported("Not Implemented get next"); } - virtual Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof) override; + Status get_next(Block* block, bool* eof) override; private: - Status _convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& columns); - Status _fill_dest_columns(std::vector<MutableColumnPtr>& columns); + std::unique_ptr<TextConverter> _text_converter; + + Status _write_text_column(char* value, int length, SlotDescriptor* slot, + MutableColumnPtr* column_ptr, RuntimeState* state); + + Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns); + + Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns); + + Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path, + std::vector<MutableColumnPtr>& columns); }; } // namespace doris::vectorized diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp index 9a99c354bb..3712f6b141 100644 --- a/be/test/exec/multi_bytes_separator_test.cpp +++ b/be/test/exec/multi_bytes_separator_test.cpp @@ -37,7 +37,10 @@ namespace doris { class MultiBytesSeparatorTest : public testing::Test { public: - MultiBytesSeparatorTest() {} + MultiBytesSeparatorTest() : _runtime_state(TQueryGlobals()) {} + +private: + RuntimeState _runtime_state; protected: virtual void SetUp() {} @@ -56,8 +59,8 @@ TEST_F(MultiBytesSeparatorTest, normal) { const std::vector<TBrokerRangeDesc> ranges; const std::vector<TNetworkAddress> broker_addresses; const std::vector<TExpr> pre_filter_texprs; - BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_texprs, - nullptr); + BrokerScanner scanner(&_runtime_state, nullptr, params, ranges, broker_addresses, + pre_filter_texprs, nullptr); #define private public diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp index c0c091d9d6..a0d11d3ac9 100644 --- a/be/test/vec/exec/vbroker_scan_node_test.cpp +++ b/be/test/vec/exec/vbroker_scan_node_test.cpp @@ -46,6 +46,7 @@ public: VBrokerScanNodeTest() : _runtime_state(TQueryGlobals()) { init(); _runtime_state._instance_mem_tracker.reset(new MemTracker()); + _runtime_state._query_options.enable_vectorized_engine = true; } void init(); static void SetUpTestCase() { @@ -277,7 +278,7 @@ void VBrokerScanNodeTest::init_desc_table() { type.types.push_back(node); } slot_desc.slotType = type; - slot_desc.columnPos = 1; + slot_desc.columnPos = 2; slot_desc.byteOffset = 32; slot_desc.nullIndicatorByte = 0; slot_desc.nullIndicatorBit = -1; @@ -304,7 +305,7 @@ void VBrokerScanNodeTest::init_desc_table() { type.types.push_back(node); } slot_desc.slotType = type; - slot_desc.columnPos = 1; + slot_desc.columnPos = 3; slot_desc.byteOffset = 48; slot_desc.nullIndicatorByte = 0; slot_desc.nullIndicatorBit = -1; @@ -466,37 +467,31 @@ TEST_F(VBrokerScanNodeTest, normal) { doris::vectorized::Block block; bool eos = false; status = scan_node.get_next(&_runtime_state, &block, &eos); - ASSERT_EQ(3, block.rows()); + ASSERT_EQ(4, block.rows()); ASSERT_EQ(4, block.columns()); - ASSERT_FALSE(eos); - - auto columns = block.get_columns(); - ASSERT_EQ(columns[0]->get_int(0), 1); - ASSERT_EQ(columns[0]->get_int(1), 4); - ASSERT_EQ(columns[0]->get_int(2), 8); - - ASSERT_EQ(columns[1]->get_int(0), 2); - ASSERT_EQ(columns[1]->get_int(1), 5); - ASSERT_EQ(columns[1]->get_int(2), 9); + ASSERT_TRUE(eos); - ASSERT_EQ(columns[2]->get_int(0), 3); - ASSERT_EQ(columns[2]->get_int(1), 6); - ASSERT_EQ(columns[2]->get_int(2), 10); + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 4); + ASSERT_EQ(columns[0].to_string(0), "1"); + ASSERT_EQ(columns[0].to_string(1), "4"); + ASSERT_EQ(columns[0].to_string(2), "8"); + ASSERT_EQ(columns[0].to_string(3), "4"); - ASSERT_EQ(columns[3]->get_int(0), 1); - ASSERT_EQ(columns[3]->get_int(1), 1); - ASSERT_EQ(columns[3]->get_int(2), 1); + ASSERT_EQ(columns[1].to_string(0), "2"); + ASSERT_EQ(columns[1].to_string(1), "5"); + ASSERT_EQ(columns[1].to_string(2), "9"); + ASSERT_EQ(columns[1].to_string(3), "5"); - block.clear(); - status = scan_node.get_next(&_runtime_state, &block, &eos); - ASSERT_EQ(1, block.rows()); - ASSERT_FALSE(eos); + ASSERT_EQ(columns[2].to_string(0), "3"); + ASSERT_EQ(columns[2].to_string(1), "6"); + ASSERT_EQ(columns[2].to_string(2), "10"); + ASSERT_EQ(columns[2].to_string(3), "6"); - columns = block.get_columns(); - ASSERT_EQ(columns[0]->get_int(0), 4); - ASSERT_EQ(columns[1]->get_int(0), 5); - ASSERT_EQ(columns[2]->get_int(0), 6); - ASSERT_EQ(columns[3]->get_int(0), 2); + ASSERT_EQ(columns[3].to_string(0), "1"); + ASSERT_EQ(columns[3].to_string(1), "1"); + ASSERT_EQ(columns[3].to_string(2), "1"); + ASSERT_EQ(columns[3].to_string(3), "2"); block.clear(); status = scan_node.get_next(&_runtime_state, &block, &eos); @@ -610,20 +605,21 @@ TEST_F(VBrokerScanNodeTest, where_binary_pre) { ASSERT_EQ(2, block.rows()); ASSERT_EQ(4, block.columns()); - auto columns = block.get_columns(); - ASSERT_EQ(columns[0]->get_int(0), 1); - ASSERT_EQ(columns[0]->get_int(1), 4); + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 4); + ASSERT_EQ(columns[0].to_string(0), "1"); + ASSERT_EQ(columns[0].to_string(1), "4"); - ASSERT_EQ(columns[1]->get_int(0), 2); - ASSERT_EQ(columns[1]->get_int(1), 5); + ASSERT_EQ(columns[1].to_string(0), "2"); + ASSERT_EQ(columns[1].to_string(1), "5"); - ASSERT_EQ(columns[2]->get_int(0), 3); - ASSERT_EQ(columns[2]->get_int(1), 6); + ASSERT_EQ(columns[2].to_string(0), "3"); + ASSERT_EQ(columns[2].to_string(1), "6"); - ASSERT_EQ(columns[3]->get_int(0), 1); - ASSERT_EQ(columns[3]->get_int(1), 1); + ASSERT_EQ(columns[3].to_string(0), "1"); + ASSERT_EQ(columns[3].to_string(1), "1"); - ASSERT_FALSE(eos); + ASSERT_TRUE(eos); block.clear(); status = scan_node.get_next(&_runtime_state, &block, &eos); diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 2d80401cd8..5ead638bef 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -362,28 +362,25 @@ TEST_F(VBrokerScannerTest, normal) { auto st = scanner.open(); ASSERT_TRUE(st.ok()); - int slot_count = 3; - auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id); - std::vector<vectorized::MutableColumnPtr> columns(slot_count); - for (int i = 0; i < slot_count; i++) { - columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); - } + std::unique_ptr<vectorized::Block> block(new vectorized::Block()); bool eof = false; - st = scanner.get_next(columns, &eof); + st = scanner.get_next(block.get(), &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); + auto columns = block->get_columns(); + ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0]->get_int(0), 1); - ASSERT_EQ(columns[0]->get_int(1), 4); - ASSERT_EQ(columns[0]->get_int(2), 8); + ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1"); + ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "4"); + ASSERT_EQ(columns[0]->get_data_at(2).to_string(), "8"); - ASSERT_EQ(columns[1]->get_int(0), 2); - ASSERT_EQ(columns[1]->get_int(1), 5); - ASSERT_EQ(columns[1]->get_int(2), 9); + ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2"); + ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "5"); + ASSERT_EQ(columns[1]->get_data_at(2).to_string(), "9"); - ASSERT_EQ(columns[2]->get_int(0), 3); - ASSERT_EQ(columns[2]->get_int(1), 6); - ASSERT_EQ(columns[2]->get_int(2), 10); + ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3"); + ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "6"); + ASSERT_EQ(columns[2]->get_data_at(2).to_string(), "10"); } TEST_F(VBrokerScannerTest, normal2) { @@ -408,26 +405,22 @@ TEST_F(VBrokerScannerTest, normal2) { auto st = scanner.open(); ASSERT_TRUE(st.ok()); - int slot_count = 3; - auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id); - std::vector<vectorized::MutableColumnPtr> columns(slot_count); - for (int i = 0; i < slot_count; i++) { - columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); - } - + std::unique_ptr<vectorized::Block> block(new vectorized::Block()); bool eof = false; - st = scanner.get_next(columns, &eof); + st = scanner.get_next(block.get(), &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); + auto columns = block->get_columns(); + ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0]->get_int(0), 1); - ASSERT_EQ(columns[0]->get_int(1), 3); + ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1"); + ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "3"); - ASSERT_EQ(columns[1]->get_int(0), 2); - ASSERT_EQ(columns[1]->get_int(1), 4); + ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2"); + ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "4"); - ASSERT_EQ(columns[2]->get_int(0), 3); - ASSERT_EQ(columns[2]->get_int(1), 5); + ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3"); + ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "5"); } TEST_F(VBrokerScannerTest, normal5) { @@ -446,18 +439,15 @@ TEST_F(VBrokerScannerTest, normal5) { auto st = scanner.open(); ASSERT_TRUE(st.ok()); - int slot_count = 3; - auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id); - std::vector<vectorized::MutableColumnPtr> columns(slot_count); - for (int i = 0; i < slot_count; i++) { - columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column(); - } + std::unique_ptr<vectorized::Block> block(new vectorized::Block()); bool eof = false; // end of file - st = scanner.get_next(columns, &eof); + st = scanner.get_next(block.get(), &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); - ASSERT_EQ(columns[0]->size(), 0); + auto columns = block->get_columns(); + ASSERT_EQ(columns.size(), 0); } + } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org