This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch new_scan_node
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 42c98c1b2436c9c6b571cd5e70fb9a5461717c1a
Author: morningman <morning...@163.com>
AuthorDate: Thu Aug 4 14:52:20 2022 +0800

    1
---
 be/src/vec/exec/scan/vscan_node.cpp | 156 ++++++++++++++++++++++++++++++++++++
 be/src/vec/exec/scan/vscan_node.h   | 131 ++++++++++++++++++++++++++++++
 be/src/vec/exec/scan/vscanner.h     |  40 +++++++++
 3 files changed, 327 insertions(+)

diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
new file mode 100644
index 0000000000..44a8470e2b
--- /dev/null
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/vscan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+
+Status VScanNode::init(const TPlanNode &tnode, RuntimeState *state) {
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+    _direct_conjunct_size = _conjunct_ctxs.size();
+    _runtime_state = state;
+
+    const TQueryOptions& query_options = state->query_options();
+    if (query_options.__isset.max_scan_key_num) {
+        _max_scan_key_num = query_options.max_scan_key_num;
+    } else {
+        _max_scan_key_num = config::doris_max_scan_key_num;
+    }
+    if (query_options.__isset.max_pushdown_conditions_per_column) {
+        _max_pushdown_conditions_per_column = 
query_options.max_pushdown_conditions_per_column;
+    } else {
+        _max_pushdown_conditions_per_column = 
config::max_pushdown_conditions_per_column;
+    }
+
+    _max_scanner_queue_size_bytes = query_options.mem_limit / 20;
+
+    RETURN_IF_ERROR(_register_runtime_filter());
+
+    return Status::OK();
+}
+
+Status VScanNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+
+    RETURN_IF_ERROR(_init_profile());
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    DCHECK(_tuple_desc != nullptr);
+
+    // init profile for runtime filter
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
+                                                        &runtime_filter);
+        DCHECK(runtime_filter != nullptr);
+        runtime_filter->init_profile(_runtime_profile.get());
+    }
+    return Status::OK();
+}
+
+Status VScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(ExecNode::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+
+    RETURN_IF_ERROR(_acquire_runtime_filter());
+
+    RETURN_IF_ERROR(_process_conjuncts());
+    RETURN_IF_ERROR(_init_scanners());
+    RETURN_IF_ERROR(_start_scanners());
+    return Status::OK();
+}
+
+Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, 
bool* eos) {
+    return Status::OK();
+}
+
+Status VScanNode::_init_scanners() {
+
+    return Status::OK();
+}
+
+Status VScanNode::_start_scanners() {
+    return Status::OK();
+}
+
+Status VScanNode::_register_runtime_filter() {
+    int filter_size = _runtime_filter_descs.size();
+    _runtime_filter_ctxs.resize(filter_size);
+    _runtime_filter_ready_flag.resize(filter_size);
+    for (int i = 0; i < filter_size; ++i) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        const auto& filter_desc = _runtime_filter_descs[i];
+        RETURN_IF_ERROR(_runtime_state->runtime_filter_mgr()->regist_filter(
+                RuntimeFilterRole::CONSUMER, filter_desc, 
_runtime_state->query_options(), id()));
+        
RETURN_IF_ERROR(_runtime_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
+                                                                        
&runtime_filter));
+        _runtime_filter_ctxs[i].runtime_filter = runtime_filter;
+        _runtime_filter_ready_flag[i] = false;
+        _rf_locks.push_back(std::make_unique<std::mutex>());
+    }
+    return Status::OK();
+}
+
+Status VScanNode::_acquire_runtime_filter() {
+    _runtime_filter_ctxs.resize(_runtime_filter_descs.size());
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        auto& filter_desc = _runtime_filter_descs[i];
+        IRuntimeFilter* runtime_filter = nullptr;
+        
_runtime_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, 
&runtime_filter);
+        DCHECK(runtime_filter != nullptr);
+        if (runtime_filter == nullptr) {
+            continue;
+        }
+        bool ready = runtime_filter->is_ready();
+        if (!ready) {
+            ready = runtime_filter->await();
+        }
+        if (ready) {
+            std::list<ExprContext*> expr_context;
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
+            _runtime_filter_ctxs[i].apply_mark = true;
+            _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+            for (auto ctx : expr_context) {
+                ctx->prepare(_runtime_state, row_desc());
+                ctx->open(_runtime_state);
+                int index = _conjunct_ctxs.size();
+                _conjunct_ctxs.push_back(ctx);
+                _conjunctid_to_runtime_filter_ctxs[index] = 
&_runtime_filter_ctxs[i];
+            }
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VScanNode::_process_conjuncts() {
+    return Status::OK();
+}
+
+Status VScanNode::_init_scanners() {
+    return Status::OK();
+}
+
+Status VScanNode::_start_scanners() {
+    return Status::OK();
+}
+
+}
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
new file mode 100644
index 0000000000..38f62d9840
--- /dev/null
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/exec_node.h"
+#include "exprs/runtime_filter.h"
+
+namespace doris::vectorized {
+
+class VScanner;
+class VScanNode : public ExecNode {
+public:
+    Status init(const TPlanNode &tnode, RuntimeState *state = nullptr) 
override;
+
+    Status prepare(RuntimeState *state) override;
+
+    Status open(RuntimeState *state) override;
+
+    Status get_next(RuntimeState *state, RowBatch *row_batch, bool *eos) 
override {
+        return Status::NotSupported("Not implement");
+    }
+
+    Status get_next(RuntimeState *state, vectorized::Block *block, bool *eos) 
override;
+
+    Status close(RuntimeState *state) override;
+
+    enum PushdownType {
+        REJECT, // the predicate can not pushdown to the data source
+        FULL,   // the predicate can be pushdown to data source, and data 
source can fully handle it.
+        PARTIAL // the predicate can be pushdown to data source, but it still 
need to be kept in conjuncts.
+    };
+
+protected:
+
+    // Different data sources register different profiles by implementing this 
method
+    virtual Status _init_profile() {
+        return Status::OK();
+    }
+
+    // Process predicates, extract the predicates in the conjuncts that can be 
pushed down
+    // to the data source, and convert them into common expressions structure 
ColumnPredicate.
+    // There are currently 3 types of predicates that can be pushed down to 
data sources:
+    //
+    // 1. Simple predicate, with column on left and constant on right, such as 
"a=1", "b in (1,2,3)" etc.
+    // 2. Bloom Filter, predicate condition generated by runtime filter
+    // 3. Function Filter, some data sources can accept function conditions, 
such as "a like 'abc%'"
+    //
+    // Predicates that can be fully processed by the data source will be 
removed from conjuncts
+    Status _process_conjuncts() {
+        return Status::OK();
+    }
+
+    // Create a set of scanners.
+    // The number of scanners is related to the implementation of the data 
source, predicate conditions, and scheduling strategy.
+    // So this method needs to be implemented separately by the subclass of 
ScanNode.
+    // Finally, a set of scanners that have been prepared are returned.
+    Status _init_scanners();
+
+    // Submit the scanner to the thread pool and start execution
+    Status _start_scanners();
+
+    // Different data sources can implement this method to determine whether a 
predicate
+    // can be processed by the data source
+    virtual PushdownType _is_conjuncts_acceptable(ExprContext *ctx) {
+        return PushdownType::REJECT
+    }
+
+protected:
+
+    // conjuncts from sql directly
+    // the conjuncts vector may contain direct conjunct and runtime filter 
conjunct.
+    int _direct_conjunct_size;
+    int _max_scan_key_num;
+    int _max_pushdown_conditions_per_column;
+    int64_t _max_scanner_queue_size_bytes;
+
+    // For runtime filters
+    struct RuntimeFilterContext {
+        RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
+
+        bool apply_mark;
+        IRuntimeFilter *runtime_filter;
+    };
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+    // set to true if the runtime filter is ready.
+    std::vector<bool> _runtime_filter_ready_flag;
+    std::vector<std::unique_ptr<std::mutex>> _rf_locks;
+    std::map<int, RuntimeFilterContext *> _conjunct_id_to_runtime_filter_ctxs;
+
+    const TupleDescriptor *_tuple_desc;
+    RuntimeState *_runtime_state;
+
+    std::list<VScanner *> _scanners;
+    Status _process_status;
+    std::atomic<bool> _scan_finished;
+    std::mutex _queue_lock;
+    std::condition_variable _queue_reader_cond;
+    std::condition_variable _queue_writer_cond;
+
+private:
+    Status _register_runtime_filter();
+
+    Status _acquire_runtime_filter();
+
+    bool update_status(const Status& new_status) {
+        if (_process_status.ok()) {
+            _process_status = new_status;
+            return true;
+        }
+        return false;
+    }
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
new file mode 100644
index 0000000000..e1bbfe6141
--- /dev/null
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "runtime/runtime_state.h"
+
+namespace doris::vectorized {
+
+class Block;
+class VScanner {
+public:
+    Status prepare(RuntimeState* state) { return Status::OK(); }
+
+    Status open(RuntimeState* state) { return Status::OK(); }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
{
+        *eos = true;
+        return Status::OK();
+    }
+
+    Status close(RuntimeState* state) { return Status::OK(); }
+};
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to