This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch new_scan_node in repository https://gitbox.apache.org/repos/asf/doris.git
commit 42c98c1b2436c9c6b571cd5e70fb9a5461717c1a Author: morningman <morning...@163.com> AuthorDate: Thu Aug 4 14:52:20 2022 +0800 1 --- be/src/vec/exec/scan/vscan_node.cpp | 156 ++++++++++++++++++++++++++++++++++++ be/src/vec/exec/scan/vscan_node.h | 131 ++++++++++++++++++++++++++++++ be/src/vec/exec/scan/vscanner.h | 40 +++++++++ 3 files changed, 327 insertions(+) diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp new file mode 100644 index 0000000000..44a8470e2b --- /dev/null +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/vscan_node.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { + +Status VScanNode::init(const TPlanNode &tnode, RuntimeState *state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + _direct_conjunct_size = _conjunct_ctxs.size(); + _runtime_state = state; + + const TQueryOptions& query_options = state->query_options(); + if (query_options.__isset.max_scan_key_num) { + _max_scan_key_num = query_options.max_scan_key_num; + } else { + _max_scan_key_num = config::doris_max_scan_key_num; + } + if (query_options.__isset.max_pushdown_conditions_per_column) { + _max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column; + } else { + _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; + } + + _max_scanner_queue_size_bytes = query_options.mem_limit / 20; + + RETURN_IF_ERROR(_register_runtime_filter()); + + return Status::OK(); +} + +Status VScanNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + RETURN_IF_ERROR(_init_profile()); + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + DCHECK(_tuple_desc != nullptr); + + // init profile for runtime filter + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = nullptr; + state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id, + &runtime_filter); + DCHECK(runtime_filter != nullptr); + runtime_filter->init_profile(_runtime_profile.get()); + } + return Status::OK(); +} + +Status VScanNode::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + RETURN_IF_ERROR(_acquire_runtime_filter()); + + RETURN_IF_ERROR(_process_conjuncts()); + RETURN_IF_ERROR(_init_scanners()); + RETURN_IF_ERROR(_start_scanners()); + return Status::OK(); +} + +Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + return Status::OK(); +} + +Status VScanNode::_init_scanners() { + + return Status::OK(); +} + +Status VScanNode::_start_scanners() { + return Status::OK(); +} + +Status VScanNode::_register_runtime_filter() { + int filter_size = _runtime_filter_descs.size(); + _runtime_filter_ctxs.resize(filter_size); + _runtime_filter_ready_flag.resize(filter_size); + for (int i = 0; i < filter_size; ++i) { + IRuntimeFilter* runtime_filter = nullptr; + const auto& filter_desc = _runtime_filter_descs[i]; + RETURN_IF_ERROR(_runtime_state->runtime_filter_mgr()->regist_filter( + RuntimeFilterRole::CONSUMER, filter_desc, _runtime_state->query_options(), id())); + RETURN_IF_ERROR(_runtime_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, + &runtime_filter)); + _runtime_filter_ctxs[i].runtime_filter = runtime_filter; + _runtime_filter_ready_flag[i] = false; + _rf_locks.push_back(std::make_unique<std::mutex>()); + } + return Status::OK(); +} + +Status VScanNode::_acquire_runtime_filter() { + _runtime_filter_ctxs.resize(_runtime_filter_descs.size()); + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + auto& filter_desc = _runtime_filter_descs[i]; + IRuntimeFilter* runtime_filter = nullptr; + _runtime_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); + DCHECK(runtime_filter != nullptr); + if (runtime_filter == nullptr) { + continue; + } + bool ready = runtime_filter->is_ready(); + if (!ready) { + ready = runtime_filter->await(); + } + if (ready) { + std::list<ExprContext*> expr_context; + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context)); + _runtime_filter_ctxs[i].apply_mark = true; + _runtime_filter_ctxs[i].runtimefilter = runtime_filter; + for (auto ctx : expr_context) { + ctx->prepare(_runtime_state, row_desc()); + ctx->open(_runtime_state); + int index = _conjunct_ctxs.size(); + _conjunct_ctxs.push_back(ctx); + _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i]; + } + } + } + + return Status::OK(); +} + +Status VScanNode::_process_conjuncts() { + return Status::OK(); +} + +Status VScanNode::_init_scanners() { + return Status::OK(); +} + +Status VScanNode::_start_scanners() { + return Status::OK(); +} + +} diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h new file mode 100644 index 0000000000..38f62d9840 --- /dev/null +++ b/be/src/vec/exec/scan/vscan_node.h @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/exec_node.h" +#include "exprs/runtime_filter.h" + +namespace doris::vectorized { + +class VScanner; +class VScanNode : public ExecNode { +public: + Status init(const TPlanNode &tnode, RuntimeState *state = nullptr) override; + + Status prepare(RuntimeState *state) override; + + Status open(RuntimeState *state) override; + + Status get_next(RuntimeState *state, RowBatch *row_batch, bool *eos) override { + return Status::NotSupported("Not implement"); + } + + Status get_next(RuntimeState *state, vectorized::Block *block, bool *eos) override; + + Status close(RuntimeState *state) override; + + enum PushdownType { + REJECT, // the predicate can not pushdown to the data source + FULL, // the predicate can be pushdown to data source, and data source can fully handle it. + PARTIAL // the predicate can be pushdown to data source, but it still need to be kept in conjuncts. + }; + +protected: + + // Different data sources register different profiles by implementing this method + virtual Status _init_profile() { + return Status::OK(); + } + + // Process predicates, extract the predicates in the conjuncts that can be pushed down + // to the data source, and convert them into common expressions structure ColumnPredicate. + // There are currently 3 types of predicates that can be pushed down to data sources: + // + // 1. Simple predicate, with column on left and constant on right, such as "a=1", "b in (1,2,3)" etc. + // 2. Bloom Filter, predicate condition generated by runtime filter + // 3. Function Filter, some data sources can accept function conditions, such as "a like 'abc%'" + // + // Predicates that can be fully processed by the data source will be removed from conjuncts + Status _process_conjuncts() { + return Status::OK(); + } + + // Create a set of scanners. + // The number of scanners is related to the implementation of the data source, predicate conditions, and scheduling strategy. + // So this method needs to be implemented separately by the subclass of ScanNode. + // Finally, a set of scanners that have been prepared are returned. + Status _init_scanners(); + + // Submit the scanner to the thread pool and start execution + Status _start_scanners(); + + // Different data sources can implement this method to determine whether a predicate + // can be processed by the data source + virtual PushdownType _is_conjuncts_acceptable(ExprContext *ctx) { + return PushdownType::REJECT + } + +protected: + + // conjuncts from sql directly + // the conjuncts vector may contain direct conjunct and runtime filter conjunct. + int _direct_conjunct_size; + int _max_scan_key_num; + int _max_pushdown_conditions_per_column; + int64_t _max_scanner_queue_size_bytes; + + // For runtime filters + struct RuntimeFilterContext { + RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {} + + bool apply_mark; + IRuntimeFilter *runtime_filter; + }; + + std::vector<TRuntimeFilterDesc> _runtime_filter_descs; + std::vector<RuntimeFilterContext> _runtime_filter_ctxs; + // set to true if the runtime filter is ready. + std::vector<bool> _runtime_filter_ready_flag; + std::vector<std::unique_ptr<std::mutex>> _rf_locks; + std::map<int, RuntimeFilterContext *> _conjunct_id_to_runtime_filter_ctxs; + + const TupleDescriptor *_tuple_desc; + RuntimeState *_runtime_state; + + std::list<VScanner *> _scanners; + Status _process_status; + std::atomic<bool> _scan_finished; + std::mutex _queue_lock; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + +private: + Status _register_runtime_filter(); + + Status _acquire_runtime_filter(); + + bool update_status(const Status& new_status) { + if (_process_status.ok()) { + _process_status = new_status; + return true; + } + return false; + } +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h new file mode 100644 index 0000000000..e1bbfe6141 --- /dev/null +++ b/be/src/vec/exec/scan/vscanner.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "runtime/runtime_state.h" + +namespace doris::vectorized { + +class Block; +class VScanner { +public: + Status prepare(RuntimeState* state) { return Status::OK(); } + + Status open(RuntimeState* state) { return Status::OK(); } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { + *eos = true; + return Status::OK(); + } + + Status close(RuntimeState* state) { return Status::OK(); } +}; + +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org