This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 6585cca5c9b176f3c4d82549efce9df07c62bebd Author: Pxl <952130...@qq.com> AuthorDate: Thu Mar 17 10:04:24 2022 +0800 [feature](vectorized) support lateral view (#8448) --- be/src/exec/exec_node.cpp | 13 +- be/src/exec/table_function_node.cpp | 69 ++++--- be/src/exec/table_function_node.h | 10 +- be/src/exprs/table_function/explode_split.cpp | 18 +- be/src/exprs/table_function/explode_split.h | 5 +- be/src/exprs/table_function/table_function.h | 46 ++++- .../table_function/table_function_factory.cpp | 57 ++++-- .../exprs/table_function/table_function_factory.h | 12 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/vrepeat_node.cpp | 36 ++-- be/src/vec/exec/vrepeat_node.h | 2 +- be/src/vec/exec/vtable_function_node.cpp | 224 +++++++++++++++++++++ .../vtable_function_node.h} | 29 ++- be/src/vec/exprs/table_function/vexplode_split.cpp | 99 +++++++++ .../exprs/table_function/vexplode_split.h} | 28 ++- be/src/vec/exprs/vexpr.h | 2 + be/src/vec/functions/function_fake.cpp | 1 + be/src/vec/functions/function_fake.h | 8 + 18 files changed, 554 insertions(+), 107 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 06ded1d..5141ddf 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -77,8 +77,10 @@ #include "vec/exec/vschema_scan_node.h" #include "vec/exec/vselect_node.h" #include "vec/exec/vsort_node.h" +#include "vec/exec/vtable_function_node.h" #include "vec/exec/vunion_node.h" #include "vec/exprs/vexpr.h" + namespace doris { const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate"; @@ -107,7 +109,9 @@ bool ExecNode::RowBatchQueue::AddBatchWithTimeout(RowBatch* batch, int64_t timeo RowBatch* ExecNode::RowBatchQueue::GetBatch() { RowBatch* result = nullptr; - if (blocking_get(&result)) return result; + if (blocking_get(&result)) { + return result; + } return nullptr; } @@ -387,6 +391,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::ANALYTIC_EVAL_NODE: case TPlanNodeType::SELECT_NODE: case TPlanNodeType::REPEAT_NODE: + case TPlanNodeType::TABLE_FUNCTION_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -570,7 +575,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::TABLE_FUNCTION_NODE: - *node = pool->add(new TableFunctionNode(pool, tnode, descs)); + if (state->enable_vectorized_exec()) { + *node = pool->add(new vectorized::VTableFunctionNode(pool, tnode, descs)); + } else { + *node = pool->add(new TableFunctionNode(pool, tnode, descs)); + } return Status::OK(); default: diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp index 8829c79..db26205 100644 --- a/be/src/exec/table_function_node.cpp +++ b/be/src/exec/table_function_node.cpp @@ -19,23 +19,21 @@ #include "exprs/expr.h" #include "exprs/expr_context.h" +#include "exprs/table_function/table_function_factory.h" #include "runtime/descriptors.h" #include "runtime/raw_value.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple_row.h" -#include "exprs/table_function/table_function_factory.h" +#include "vec/exprs/vexpr.h" namespace doris { -TableFunctionNode::TableFunctionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) { - -} +TableFunctionNode::TableFunctionNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs) {} -TableFunctionNode::~TableFunctionNode() { - -} +TableFunctionNode::~TableFunctionNode() {} Status TableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -46,14 +44,15 @@ Status TableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { _fn_ctxs.push_back(ctx); Expr* root = ctx->root(); - const std::string& tf_name = root->fn().name.function_name; - TableFunction* fn; - RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, _pool, &fn)); + const std::string& tf_name = root->fn().name.function_name; + TableFunction* fn = nullptr; + RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, false, _pool, &fn)); fn->set_expr_context(ctx); _fns.push_back(fn); } _fn_num = _fns.size(); _fn_values.resize(_fn_num); + _fn_value_lengths.resize(_fn_num); // Prepare output slot ids RETURN_IF_ERROR(_prepare_output_slot_ids(tnode)); @@ -83,7 +82,7 @@ Status TableFunctionNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT); - + RETURN_IF_ERROR(Expr::prepare(_fn_ctxs, state, _row_descriptor, expr_mem_tracker())); for (auto fn : _fns) { RETURN_IF_ERROR(fn->prepare()); @@ -97,6 +96,8 @@ Status TableFunctionNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(Expr::open(_fn_ctxs, state)); + RETURN_IF_ERROR(vectorized::VExpr::open(_vfn_ctxs, state)); + for (auto fn : _fns) { RETURN_IF_ERROR(fn->open()); } @@ -139,7 +140,7 @@ Status TableFunctionNode::_process_next_child_row() { // -1: all fns are not eos // >0: some of fns are eos int TableFunctionNode::_find_last_fn_eos_idx() { - for (int i = _fn_num - 1; i >=0; --i) { + for (int i = _fn_num - 1; i >= 0; --i) { if (!_fns[i]->eos()) { if (i == _fn_num - 1) { return -1; @@ -193,7 +194,7 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo _child_tuple_desc_size = child_rowdesc.tuple_descriptors().size(); for (int i = 0; i < _child_tuple_desc_size; ++i) { _child_slot_sizes.push_back(child_rowdesc.tuple_descriptors()[i]->slots().size()); - } + } } uint8_t* tuple_buffer = nullptr; @@ -205,12 +206,13 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo RETURN_IF_ERROR(state->check_query_state("TableFunctionNode, while getting next batch.")); if (_cur_child_batch == nullptr) { - _cur_child_batch.reset(new RowBatch(child_rowdesc, state->batch_size(), mem_tracker().get())); + _cur_child_batch.reset( + new RowBatch(child_rowdesc, state->batch_size(), mem_tracker().get())); } if (_child_batch_exhausted) { if (_child_eos) { // current child batch is exhausted, and no more batch from child node - break; + break; } // current child batch is exhausted, get next batch from child RETURN_IF_ERROR(_children[0]->get_next(state, _cur_child_batch.get(), &_child_eos)); @@ -238,7 +240,7 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo // some of table functions' results are exhausted if (!_roll_table_functions(idx)) { // continue to process next child row - continue; + continue; } } @@ -251,11 +253,11 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo // allocate memory for row batch for the first time if (tuple_buffer == nullptr) { int64_t tuple_buffer_size; - RETURN_IF_ERROR( - row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer)); + RETURN_IF_ERROR(row_batch->resize_and_allocate_tuple_buffer( + state, &tuple_buffer_size, &tuple_buffer)); tuple_ptr = reinterpret_cast<Tuple*>(tuple_buffer); } - + pre_tuple_ptr = tuple_ptr; // The tuples order in parent row batch should be // child1, child2, tf1, tf2, ... @@ -266,22 +268,27 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo TupleDescriptor* child_tuple_desc = child_rowdesc.tuple_descriptors()[tuple_idx]; TupleDescriptor* parent_tuple_desc = parent_rowdesc.tuple_descriptors()[tuple_idx]; - Tuple* child_tuple = _cur_child_tuple_row->get_tuple(child_rowdesc.get_tuple_idx(child_tuple_desc->id())); + Tuple* child_tuple = _cur_child_tuple_row->get_tuple( + child_rowdesc.get_tuple_idx(child_tuple_desc->id())); for (int j = 0; j < _child_slot_sizes[i]; ++j) { SlotDescriptor* child_slot_desc = child_tuple_desc->slots()[j]; SlotDescriptor* parent_slot_desc = parent_tuple_desc->slots()[j]; - if (_output_slot_ids[parent_slot_desc->id()] && !child_tuple->is_null(child_slot_desc->null_indicator_offset())) { + if (_output_slot_ids[parent_slot_desc->id()] && + !child_tuple->is_null(child_slot_desc->null_indicator_offset())) { // only write child slot if it is selected and not null. void* dest_slot = tuple_ptr->get_slot(parent_slot_desc->tuple_offset()); - RawValue::write(child_tuple->get_slot(child_slot_desc->tuple_offset()), dest_slot, parent_slot_desc->type(), row_batch->tuple_data_pool()); + RawValue::write(child_tuple->get_slot(child_slot_desc->tuple_offset()), + dest_slot, parent_slot_desc->type(), + row_batch->tuple_data_pool()); tuple_ptr->set_not_null(parent_slot_desc->null_indicator_offset()); } else { tuple_ptr->set_null(parent_slot_desc->null_indicator_offset()); } } parent_tuple_row->set_tuple(tuple_idx, tuple_ptr); - tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + parent_tuple_desc->byte_size()); + tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + + parent_tuple_desc->byte_size()); } // 2. copy function result @@ -290,13 +297,15 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo SlotDescriptor* parent_slot_desc = parent_tuple_desc->slots()[0]; void* dest_slot = tuple_ptr->get_slot(parent_slot_desc->tuple_offset()); if (_fn_values[i] != nullptr) { - RawValue::write(_fn_values[i], dest_slot, parent_slot_desc->type(), row_batch->tuple_data_pool()); + RawValue::write(_fn_values[i], dest_slot, parent_slot_desc->type(), + row_batch->tuple_data_pool()); tuple_ptr->set_not_null(parent_slot_desc->null_indicator_offset()); } else { tuple_ptr->set_null(parent_slot_desc->null_indicator_offset()); } parent_tuple_row->set_tuple(tuple_idx, tuple_ptr); - tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + parent_tuple_desc->byte_size()); + tuple_ptr = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple_ptr) + + parent_tuple_desc->byte_size()); } // 3. eval conjuncts @@ -311,7 +320,7 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo // Forward after write success. // Because data in `_fn_values` points to the data saved in functions. // And `forward` will change the data in functions. - bool tmp; + bool tmp = false; _fns[_fn_num - 1]->forward(&tmp); if (row_batch->at_capacity()) { @@ -321,7 +330,7 @@ Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, boo if (row_batch->at_capacity()) { break; - } + } } // end while cur_eos if (reached_limit()) { @@ -344,7 +353,9 @@ Status TableFunctionNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); Expr::close(_fn_ctxs, state); - COUNTER_SET(_num_rows_filtered_counter, static_cast<int64_t>(_num_rows_filtered)); + if (_num_rows_filtered_counter != nullptr) { + COUNTER_SET(_num_rows_filtered_counter, static_cast<int64_t>(_num_rows_filtered)); + } return ExecNode::close(state); } diff --git a/be/src/exec/table_function_node.h b/be/src/exec/table_function_node.h index 0e59790..fd06760 100644 --- a/be/src/exec/table_function_node.h +++ b/be/src/exec/table_function_node.h @@ -38,8 +38,7 @@ public: virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); virtual Status close(RuntimeState* state); -private: - +protected: Status _prepare_output_slot_ids(const TPlanNode& tnode); // return: @@ -48,12 +47,10 @@ private: // >0: some of fns are eos int _find_last_fn_eos_idx(); - Status _process_next_child_row(); + virtual Status _process_next_child_row(); bool _roll_table_functions(int last_eos_idx); -private: - int64_t _cur_child_offset = 0; TupleRow* _cur_child_tuple_row = nullptr; std::shared_ptr<RowBatch> _cur_child_batch; @@ -62,8 +59,11 @@ private: bool _child_batch_exhausted = true; std::vector<ExprContext*> _fn_ctxs; + std::vector<vectorized::VExprContext*> _vfn_ctxs; + std::vector<TableFunction*> _fns; std::vector<void*> _fn_values; + std::vector<int64_t> _fn_value_lengths; int _fn_num = 0; // std::unordered_set<SlotId> _output_slot_ids; diff --git a/be/src/exprs/table_function/explode_split.cpp b/be/src/exprs/table_function/explode_split.cpp index fa576dc..21defd3 100644 --- a/be/src/exprs/table_function/explode_split.cpp +++ b/be/src/exprs/table_function/explode_split.cpp @@ -27,8 +27,7 @@ ExplodeSplitTableFunction::ExplodeSplitTableFunction() { _fn_name = "explode_split"; } -ExplodeSplitTableFunction::~ExplodeSplitTableFunction() { -} +ExplodeSplitTableFunction::~ExplodeSplitTableFunction() {} Status ExplodeSplitTableFunction::prepare() { return Status::OK(); @@ -43,13 +42,14 @@ Status ExplodeSplitTableFunction::open() { if (fn_ctx->is_arg_constant(1)) { _is_delimiter_constant = true; StringVal* delimiter = reinterpret_cast<StringVal*>(fn_ctx->get_constant_arg(1)); - _const_delimter = StringPiece((char*) delimiter->ptr, delimiter->len); + _const_delimter = StringPiece((char*)delimiter->ptr, delimiter->len); } return Status::OK(); } Status ExplodeSplitTableFunction::process(TupleRow* tuple_row) { - CHECK(2 == _expr_context->root()->get_num_children()) << _expr_context->root()->get_num_children(); + CHECK(2 == _expr_context->root()->get_num_children()) + << _expr_context->root()->get_num_children(); _is_current_empty = false; _eos = false; @@ -61,12 +61,14 @@ Status ExplodeSplitTableFunction::process(TupleRow* tuple_row) { _cur_offset = 0; } else { if (_is_delimiter_constant) { - _backup = strings::Split(StringPiece((char*) text.ptr, text.len), _const_delimter); + _backup = strings::Split(StringPiece((char*)text.ptr, text.len), _const_delimter); } else { - StringVal delimiter = _expr_context->root()->get_child(1)->get_string_val(_expr_context, tuple_row); - _backup = strings::Split(StringPiece((char*) text.ptr, text.len), StringPiece((char*) delimiter.ptr, delimiter.len)); + StringVal delimiter = + _expr_context->root()->get_child(1)->get_string_val(_expr_context, tuple_row); + _backup = strings::Split(StringPiece((char*)text.ptr, text.len), + StringPiece((char*)delimiter.ptr, delimiter.len)); } - for (const std::string & str : _backup) { + for (const std::string& str : _backup) { _data.emplace_back(str); } _cur_size = _backup.size(); diff --git a/be/src/exprs/table_function/explode_split.h b/be/src/exprs/table_function/explode_split.h index ad80671..b8d1e2b 100644 --- a/be/src/exprs/table_function/explode_split.h +++ b/be/src/exprs/table_function/explode_split.h @@ -18,7 +18,6 @@ #pragma once #include "exprs/table_function/table_function.h" - #include "gutil/strings/stringpiece.h" #include "runtime/string_value.h" @@ -38,8 +37,7 @@ public: virtual Status forward(bool* eos) override; -private: - +protected: // The string value splitted from source, and will be referenced by // table function scan node. // the `_backup` saved the real string entity. @@ -50,7 +48,6 @@ private: // if true, the constant delimiter will be saved in `_const_delimter` bool _is_delimiter_constant = false; StringPiece _const_delimter; - }; } // namespace doris diff --git a/be/src/exprs/table_function/table_function.h b/be/src/exprs/table_function/table_function.h index 9671e6b..0aa6951 100644 --- a/be/src/exprs/table_function/table_function.h +++ b/be/src/exprs/table_function/table_function.h @@ -17,14 +17,18 @@ #pragma once +#include <fmt/core.h> +#include <stddef.h> + #include "common/status.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr_context.h" namespace doris { // TODO: think about how to manager memeory consumption of table functions. // Currently, the memory allocated from table function is from malloc directly. -class TableFunctionState { -}; +class TableFunctionState {}; class ExprContext; class TupleRow; @@ -34,24 +38,54 @@ public: virtual Status prepare() = 0; virtual Status open() = 0; + virtual Status process(TupleRow* tuple_row) = 0; + + // only used for vectorized. + virtual Status process_init(vectorized::Block* block) { + return Status::NotSupported( + fmt::format("vectorized table function {} not supported now.", _fn_name)); + } + + // only used for vectorized. + virtual Status process_row(size_t row_idx) { + return Status::NotSupported( + fmt::format("vectorized table function {} not supported now.", _fn_name)); + } + + // only used for vectorized. + virtual Status process_close() { + return Status::NotSupported( + fmt::format("vectorized table function {} not supported now.", _fn_name)); + } + virtual Status reset() = 0; + virtual Status get_value(void** output) = 0; + + // only used for vectorized. + virtual Status get_value_length(int64_t* length) { + *length = -1; + return Status::OK(); + } + virtual Status close() = 0; - virtual Status forward(bool *eos) = 0; + virtual Status forward(bool* eos) = 0; public: std::string name() const { return _fn_name; } bool eos() const { return _eos; } - void set_expr_context(ExprContext* expr_context) { - _expr_context = expr_context; + void set_expr_context(ExprContext* expr_context) { _expr_context = expr_context; } + void set_vexpr_context(vectorized::VExprContext* vexpr_context) { + _vexpr_context = vexpr_context; } protected: std::string _fn_name; - ExprContext* _expr_context; + ExprContext* _expr_context = nullptr; + vectorized::VExprContext* _vexpr_context = nullptr; // true if there is no more data can be read from this function. bool _eos = false; // true means the function result set from current row is empty(eg, source value is null or empty). diff --git a/be/src/exprs/table_function/table_function_factory.cpp b/be/src/exprs/table_function/table_function_factory.cpp index fc6ead5..4cdf267 100644 --- a/be/src/exprs/table_function/table_function_factory.cpp +++ b/be/src/exprs/table_function/table_function_factory.cpp @@ -21,28 +21,49 @@ #include "exprs/table_function/explode_bitmap.h" #include "exprs/table_function/explode_json_array.h" #include "exprs/table_function/explode_split.h" +#include "exprs/table_function/table_function.h" +#include "vec/exprs/table_function/vexplode_split.h" namespace doris { -Status TableFunctionFactory::get_fn(const std::string& fn_name, ObjectPool* pool, TableFunction** fn) { - if (fn_name == "explode_split") { - *fn = pool->add(new ExplodeSplitTableFunction()); - return Status::OK(); - } else if (fn_name == "explode_bitmap") { - *fn = pool->add(new ExplodeBitmapTableFunction()); - return Status::OK(); - } else if (fn_name == "explode_json_array_int") { - *fn = pool->add(new ExplodeJsonArrayTableFunction(ExplodeJsonArrayType::INT)); - return Status::OK(); - } else if (fn_name == "explode_json_array_double") { - *fn = pool->add(new ExplodeJsonArrayTableFunction(ExplodeJsonArrayType::DOUBLE)); - return Status::OK(); - } else if (fn_name == "explode_json_array_string") { - *fn = pool->add(new ExplodeJsonArrayTableFunction(ExplodeJsonArrayType::STRING)); - return Status::OK(); - } else { - return Status::NotSupported("Unknown table function: " + fn_name); +template <typename TableFunctionType> +struct TableFunctionCreator { + TableFunction* operator()() { return new TableFunctionType(); } +}; + +template <> +struct TableFunctionCreator<ExplodeJsonArrayTableFunction> { + ExplodeJsonArrayType type; + TableFunction* operator()() { return new ExplodeJsonArrayTableFunction(type); } +}; + +inline auto ExplodeJsonArrayIntCreator = + TableFunctionCreator<ExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::INT}; +inline auto ExplodeJsonArrayDoubleCreator = + TableFunctionCreator<ExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::DOUBLE}; +inline auto ExplodeJsonArrayStringCreator = + TableFunctionCreator<ExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::STRING}; + +//{fn_name,is_vectorized}->table_function_creator +const std::unordered_map<std::pair<std::string, bool>, std::function<TableFunction*()>> + TableFunctionFactory::_function_map { + {{"explode_split", false}, TableFunctionCreator<ExplodeSplitTableFunction>()}, + {{"explode_bitmap", false}, TableFunctionCreator<ExplodeBitmapTableFunction>()}, + {{"explode_json_array_int", false}, ExplodeJsonArrayIntCreator}, + {{"explode_json_array_double", false}, ExplodeJsonArrayDoubleCreator}, + {{"explode_json_array_string", false}, ExplodeJsonArrayStringCreator}, + {{"explode_split", true}, TableFunctionCreator<VExplodeSplitTableFunction>()}}; + +Status TableFunctionFactory::get_fn(const std::string& fn_name, bool is_vectorized, + ObjectPool* pool, TableFunction** fn) { + auto fn_iterator = _function_map.find({fn_name, is_vectorized}); + if (fn_iterator != _function_map.end()) { + *fn = pool->add(fn_iterator->second()); + return Status::OK(); } + + return Status::NotSupported(std::string(is_vectorized ? "vectorized " : "") + + "table function " + fn_name + " not support"); } } // namespace doris diff --git a/be/src/exprs/table_function/table_function_factory.h b/be/src/exprs/table_function/table_function_factory.h index ca5bd05..eaa1b86 100644 --- a/be/src/exprs/table_function/table_function_factory.h +++ b/be/src/exprs/table_function/table_function_factory.h @@ -17,10 +17,12 @@ #pragma once -#include "exprs/table_function/table_function_factory.h" -#include "exprs/table_function/explode_split.h" +#include <functional> +#include <unordered_map> #include "common/status.h" +#include "exprs/table_function/explode_split.h" +#include "exprs/table_function/table_function_factory.h" namespace doris { @@ -30,7 +32,11 @@ class TableFunctionFactory { public: TableFunctionFactory() {} ~TableFunctionFactory() {} - static Status get_fn(const std::string& fn_name, ObjectPool* pool, TableFunction** fn); + static Status get_fn(const std::string& fn_name, bool is_vectorized, ObjectPool* pool, + TableFunction** fn); + + const static std::unordered_map<std::pair<std::string, bool>, std::function<TableFunction*()>> + _function_map; }; } // namespace doris diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 6bea9a6..9a58191 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -92,6 +92,7 @@ set(VEC_FILES exec/vanalytic_eval_node.cpp exec/vassert_num_rows_node.cpp exec/vrepeat_node.cpp + exec/vtable_function_node.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp @@ -104,6 +105,7 @@ set(VEC_FILES exprs/vcast_expr.cpp exprs/vcase_expr.cpp exprs/vinfo_func.cpp + exprs/table_function/vexplode_split.cpp functions/math.cpp functions/function_bitmap.cpp functions/function_bitmap_variadic.cpp diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index f27b4f5..a6e36a2 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -16,6 +16,7 @@ // under the License. #include "vec/exec/vrepeat_node.h" + #include "exprs/expr.h" #include "gutil/strings/join.h" #include "runtime/runtime_state.h" @@ -23,7 +24,9 @@ namespace doris::vectorized { VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : RepeatNode(pool, tnode, descs), _child_block(nullptr), _virtual_tuple_id(tnode.repeat_node.output_tuple_id) {} + : RepeatNode(pool, tnode, descs), + _child_block(nullptr), + _virtual_tuple_id(tnode.repeat_node.output_tuple_id) {} Status VRepeatNode::prepare(RuntimeState* state) { VLOG_CRITICAL << "VRepeatNode::prepare"; @@ -50,9 +53,9 @@ Status VRepeatNode::prepare(RuntimeState* state) { } std::stringstream ss; - ss << "The output slots size " << _output_slots.size() - << " is not equal to the sum of child_slots_size " << _child_slots.size() - << ",virtual_slots_size " << _virtual_tuple_desc->slots().size(); + ss << "The output slots size " << _output_slots.size() + << " is not equal to the sum of child_slots_size " << _child_slots.size() + << ",virtual_slots_size " << _virtual_tuple_desc->slots().size(); if (_output_slots.size() != (_child_slots.size() + _virtual_tuple_desc->slots().size())) { return Status::InternalError(ss.str()); } @@ -102,13 +105,14 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl DCHECK_EQ(_child_slots[i]->col_name(), _output_slots[cur_col]->col_name()); std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx]; - bool is_repeat_slot = _all_slot_ids.find(_output_slots[cur_col]->id()) != _all_slot_ids.end(); + bool is_repeat_slot = + _all_slot_ids.find(_output_slots[cur_col]->id()) != _all_slot_ids.end(); bool is_set_null_slot = repeat_ids.find(_output_slots[cur_col]->id()) == repeat_ids.end(); const auto column_size = src_column.column->size(); if (is_repeat_slot) { DCHECK(_output_slots[cur_col]->is_nullable()); - auto* nullable_column = reinterpret_cast<ColumnNullable *>(columns[cur_col].get()); + auto* nullable_column = reinterpret_cast<ColumnNullable*>(columns[cur_col].get()); auto& null_map = nullable_column->get_null_map_data(); auto* column_ptr = columns[cur_col].get(); @@ -141,7 +145,7 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl auto* column_ptr = columns[cur_col].get(); DCHECK(!_output_slots[cur_col]->is_nullable()); - auto* col = assert_cast<ColumnVector<Int64> *>(column_ptr); + auto* col = assert_cast<ColumnVector<Int64>*>(column_ptr); for (size_t i = 0; i < child_block->rows(); ++i) { col->insert_value(val); } @@ -154,8 +158,9 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl auto n_columns = 0; if (!mem_reuse) { for (const auto slot_desc : _output_slots) { - output_block->insert( - ColumnWithTypeAndName(std::move(columns[n_columns++]), slot_desc->get_data_type_ptr(), slot_desc->col_name())); + output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); } } else { columns.clear(); @@ -168,8 +173,9 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { VLOG_CRITICAL << "VRepeatNode::get_next"; SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (state == NULL || block == NULL || eos == NULL) + if (state == nullptr || block == nullptr || eos == nullptr) { return Status::InternalError("input is NULL pointer"); + } RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); RETURN_IF_CANCELLED(state); @@ -181,13 +187,9 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { // current child block has finished its repeat, get child's next block if (_child_block->rows() == 0) { - if (_child_eos) { - *eos = true; - return Status::OK(); - } - - while (_child_block->rows() == 0 && ! _child_eos) + while (_child_block->rows() == 0 && !_child_eos) { RETURN_IF_ERROR(child(0)->get_next(state, _child_block.get(), &_child_eos)); + } if (_child_eos and _child_block->rows() == 0) { *eos = true; @@ -205,7 +207,7 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { _repeat_id_idx = 0; } - _num_rows_returned += block->rows(); + reached_limit(block, eos); COUNTER_SET(_rows_returned_counter, _num_rows_returned); VLOG_ROW << "VRepeatNode output rows: " << block->rows(); return Status::OK(); diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 7f3f91f..26efa9a 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -35,7 +35,6 @@ public: virtual Status prepare(RuntimeState* state) override; virtual Status open(RuntimeState* state) override; - using RepeatNode::get_next; virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override; virtual Status close(RuntimeState* state) override; @@ -43,6 +42,7 @@ protected: virtual void debug_string(int indentation_level, std::stringstream* out) const override; private: + using RepeatNode::get_next; Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block); std::unique_ptr<Block> _child_block; diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp new file mode 100644 index 0000000..30dd009 --- /dev/null +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vtable_function_node.h" + +#include "exprs/expr.h" +#include "exprs/expr_context.h" +#include "exprs/table_function/table_function.h" +#include "exprs/table_function/table_function_factory.h" +#include "vec/exprs/vexpr.h" + +namespace doris::vectorized { + +VTableFunctionNode::VTableFunctionNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : TableFunctionNode(pool, tnode, descs) {} + +Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + + for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) { + VExprContext* ctx = nullptr; + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, texpr, &ctx)); + _vfn_ctxs.push_back(ctx); + + VExpr* root = ctx->root(); + const std::string& tf_name = root->fn().name.function_name; + TableFunction* fn = nullptr; + RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, true, _pool, &fn)); + fn->set_vexpr_context(ctx); + _fns.push_back(fn); + } + _fn_num = _fns.size(); + _fn_values.resize(_fn_num); + _fn_value_lengths.resize(_fn_num); + + // Prepare output slot ids + RETURN_IF_ERROR(_prepare_output_slot_ids(tnode)); + return Status::OK(); +} + +Status VTableFunctionNode::prepare(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(TableFunctionNode::prepare(state)); + RETURN_IF_ERROR(VExpr::prepare(_vfn_ctxs, state, _row_descriptor, expr_mem_tracker())); + + // get current all output slots + for (const auto& tuple_desc : this->row_desc().tuple_descriptors()) { + for (const auto& slot_desc : tuple_desc->slots()) { + _output_slots.push_back(slot_desc); + } + } + + // get all input slots + for (const auto& child_tuple_desc : child(0)->row_desc().tuple_descriptors()) { + for (const auto& child_slot_desc : child_tuple_desc->slots()) { + _child_slots.push_back(child_slot_desc); + } + } + + _child_block.reset(new Block()); + _cur_child_offset = -1; + + return Status::OK(); +} + +Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + + RETURN_IF_CANCELLED(state); + + RETURN_IF_ERROR(get_expanded_block(state, block, eos)); + + reached_limit(block, eos); + + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + + return Status::OK(); +} + +Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) { + DCHECK(_child_block != nullptr); + + size_t column_size = _output_slots.size(); + bool mem_reuse = output_block->mem_reuse(); + + std::vector<vectorized::MutableColumnPtr> columns(column_size); + for (size_t i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*output_block->get_by_position(i).column).mutate(); + } else { + columns[i] = _output_slots[i]->get_empty_mutable_column(); + } + } + + while (true) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); + + // if child_block is empty, get data from child. + if (_child_block->rows() == 0) { + while (_child_block->rows() == 0 && !_child_eos) { + RETURN_IF_ERROR(child(0)->get_next(state, _child_block.get(), &_child_eos)); + } + if (_child_eos && _child_block->rows() == 0) { + *eos = true; + break; + } + + for (TableFunction* fn : _fns) { + RETURN_IF_ERROR(fn->process_init(_child_block.get())); + } + + RETURN_IF_ERROR(_process_next_child_row()); + } + + while (true) { + int idx = _find_last_fn_eos_idx(); + if (idx == 0) { + // all table functions' results are exhausted, process next child row. + RETURN_IF_ERROR(_process_next_child_row()); + if (_cur_child_offset == -1) { + break; + } + } else if (idx < _fn_num && idx != -1) { + // some of table functions' results are exhausted. + if (!_roll_table_functions(idx)) { + // continue to process next child row. + continue; + } + } + + // get slots from every table function. + // notice that _fn_values[i] may be null if the table function has empty result set. + for (int i = 0; i < _fn_num; i++) { + RETURN_IF_ERROR(_fns[i]->get_value(&_fn_values[i])); + RETURN_IF_ERROR(_fns[i]->get_value_length(&_fn_value_lengths[i])); + } + + // The tuples order in parent row batch should be + // child1, child2, tf1, tf2, ... + + // 1. copy data from child_block. + for (int i = 0; i < _child_slots.size(); i++) { + auto src_column = _child_block->get_by_position(i).column; + columns[i]->insert_from(*src_column, _cur_child_offset); + } + + // 2. copy function result + for (int i = 0; i < _fns.size(); i++) { + int output_slot_idx = i + _child_slots.size(); + if (_fn_values[i] == nullptr) { + columns[output_slot_idx]->insert_default(); + } else { + columns[output_slot_idx]->insert_data(reinterpret_cast<char*>(_fn_values[i]), + _fn_value_lengths[i]); + } + } + + bool tmp = false; + _fns[_fn_num - 1]->forward(&tmp); + + if (columns[_child_slots.size()]->size() >= state->batch_size()) { + break; + } + } + } + + if (!columns.empty() && !columns[0]->empty()) { + auto n_columns = 0; + if (!mem_reuse) { + for (const auto slot_desc : _output_slots) { + output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + } + + // 3. eval conjuncts + RETURN_IF_ERROR( + VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); + + return Status::OK(); +} + +Status VTableFunctionNode::_process_next_child_row() { + _cur_child_offset++; + + if (_cur_child_offset >= _child_block->rows()) { + // release block use count. + for (TableFunction* fn : _fns) { + RETURN_IF_ERROR(fn->process_close()); + } + + release_block_memory(*_child_block.get()); + _cur_child_offset = -1; + return Status::OK(); + } + + for (TableFunction* fn : _fns) { + RETURN_IF_ERROR(fn->process_row(_cur_child_offset)); + } + + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/functions/function_fake.cpp b/be/src/vec/exec/vtable_function_node.h similarity index 50% copy from be/src/vec/functions/function_fake.cpp copy to be/src/vec/exec/vtable_function_node.h index 0aa9bf0..1913cd1 100644 --- a/be/src/vec/functions/function_fake.cpp +++ b/be/src/vec/exec/vtable_function_node.h @@ -15,12 +15,31 @@ // specific language governing permissions and limitations // under the License. -#include "vec/functions/function_fake.h" +#pragma once + +#include "exec/table_function_node.h" namespace doris::vectorized { -void register_function_fake(SimpleFunctionFactory& factory) { - factory.register_function<FunctionFake<FunctionEsqueryImpl>>(); -} +class VTableFunctionNode : public TableFunctionNode { +public: + VTableFunctionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~VTableFunctionNode() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + +private: + Status _process_next_child_row() override; + + using TableFunctionNode::get_next; + + Status get_expanded_block(RuntimeState* state, Block* output_block, bool* eos); + + std::unique_ptr<Block> _child_block; + std::vector<SlotDescriptor*> _child_slots; + std::vector<SlotDescriptor*> _output_slots; +}; -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/table_function/vexplode_split.cpp b/be/src/vec/exprs/table_function/vexplode_split.cpp new file mode 100644 index 0000000..0e2a674 --- /dev/null +++ b/be/src/vec/exprs/table_function/vexplode_split.cpp @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exprs/table_function/vexplode_split.h" + +#include "common/status.h" +#include "gutil/strings/split.h" +#include "vec/columns/column.h" +#include "vec/exprs/vexpr.h" + +namespace doris { + +VExplodeSplitTableFunction::VExplodeSplitTableFunction() { + _fn_name = "vexplode_split"; +} + +Status VExplodeSplitTableFunction::open() { + return Status::OK(); +} + +Status VExplodeSplitTableFunction::process_init(vectorized::Block* block) { + CHECK(_vexpr_context->root()->children().size() == 2) + << "VExplodeSplitTableFunction must be have 2 children but have " + << _vexpr_context->root()->children().size(); + + int text_column_idx = -1; + int delimiter_column_idx = -1; + + _vexpr_context->root()->children()[0]->execute(_vexpr_context, block, &text_column_idx); + _vexpr_context->root()->children()[1]->execute(_vexpr_context, block, &delimiter_column_idx); + + _text_column = block->get_by_position(text_column_idx).column; + _delimiter_column = block->get_by_position(delimiter_column_idx).column; + + return Status::OK(); +} + +Status VExplodeSplitTableFunction::process_row(size_t row_idx) { + _is_current_empty = false; + _eos = false; + + StringRef text = _text_column->get_data_at(row_idx); + StringRef delimiter = _delimiter_column->get_data_at(row_idx); + + if (text.data == nullptr) { + _is_current_empty = true; + _cur_size = 0; + _cur_offset = 0; + } else { + //TODO: implement non-copy split string reference + _backup = strings::Split(StringPiece((char*)text.data, text.size), + StringPiece((char*)delimiter.data, delimiter.size)); + + _cur_size = _backup.size(); + _cur_offset = 0; + _is_current_empty = (_cur_size == 0); + } + return Status::OK(); +} + +Status VExplodeSplitTableFunction::process_close() { + _text_column = nullptr; + _delimiter_column = nullptr; + return Status::OK(); +} + +Status VExplodeSplitTableFunction::get_value(void** output) { + if (_is_current_empty) { + *output = nullptr; + } else { + *output = _backup[_cur_offset].data(); + } + return Status::OK(); +} + +Status VExplodeSplitTableFunction::get_value_length(int64_t* length) { + if (_is_current_empty) { + *length = -1; + } else { + *length = _backup[_cur_offset].length(); + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exprs/table_function/table_function_factory.h b/be/src/vec/exprs/table_function/vexplode_split.h similarity index 55% copy from be/src/exprs/table_function/table_function_factory.h copy to be/src/vec/exprs/table_function/vexplode_split.h index ca5bd05..52ebef6 100644 --- a/be/src/exprs/table_function/table_function_factory.h +++ b/be/src/vec/exprs/table_function/vexplode_split.h @@ -17,20 +17,30 @@ #pragma once -#include "exprs/table_function/table_function_factory.h" #include "exprs/table_function/explode_split.h" - -#include "common/status.h" +#include "gutil/strings/stringpiece.h" +#include "runtime/string_value.h" +#include "vec/columns/column.h" namespace doris { -class ObjectPool; -class TableFunction; -class TableFunctionFactory { +class VExplodeSplitTableFunction : public ExplodeSplitTableFunction { public: - TableFunctionFactory() {} - ~TableFunctionFactory() {} - static Status get_fn(const std::string& fn_name, ObjectPool* pool, TableFunction** fn); + VExplodeSplitTableFunction(); + virtual ~VExplodeSplitTableFunction() = default; + + virtual Status open() override; + virtual Status process_init(vectorized::Block* block) override; + virtual Status process_row(size_t row_idx) override; + virtual Status process_close() override; + virtual Status get_value(void** output) override; + virtual Status get_value_length(int64_t* length) override; + +private: + using ExplodeSplitTableFunction::process; + + vectorized::ColumnPtr _text_column; + vectorized::ColumnPtr _delimiter_column; }; } // namespace doris diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 6a95244..1a99aee 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -113,6 +113,8 @@ public: bool is_and_expr() { return _fn.name.function_name == "and"; } + const TFunction& fn() const { return _fn; } + /// Returns true if expr doesn't contain slotrefs, i.e., can be evaluated /// with get_value(NULL). The default implementation returns true if all of /// the children are constant. diff --git a/be/src/vec/functions/function_fake.cpp b/be/src/vec/functions/function_fake.cpp index 0aa9bf0..11a186c 100644 --- a/be/src/vec/functions/function_fake.cpp +++ b/be/src/vec/functions/function_fake.cpp @@ -21,6 +21,7 @@ namespace doris::vectorized { void register_function_fake(SimpleFunctionFactory& factory) { factory.register_function<FunctionFake<FunctionEsqueryImpl>>(); + factory.register_function<FunctionFake<FunctionExplodeSplitImpl>>(); } } // namespace doris::vectorized diff --git a/be/src/vec/functions/function_fake.h b/be/src/vec/functions/function_fake.h index 519fb08..8ad4776 100644 --- a/be/src/vec/functions/function_fake.h +++ b/be/src/vec/functions/function_fake.h @@ -20,6 +20,7 @@ #include "common/status.h" #include "vec/core/types.h" #include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" #include "vec/functions/simple_function_factory.h" #include "vec/utils/util.hpp" @@ -32,6 +33,13 @@ struct FunctionEsqueryImpl { } }; +struct FunctionExplodeSplitImpl { + static constexpr auto name = "explode_split"; + static DataTypePtr get_return_type_impl(const DataTypes& arguments) { + return std::make_shared<DataTypeString>(); + } +}; + //FunctionFake is use for some function call expr only work at prepare/open phase, do not support execute(). template <typename Impl> class FunctionFake : public IFunction { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org