This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 27eed937b3 [pipelineX](es scan) Support ES scan operator (#24824)
27eed937b3 is described below

commit 27eed937b37f19e50f329a01e59759c620d4f056
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Sun Sep 24 00:32:38 2023 +0800

    [pipelineX](es scan) Support ES scan operator (#24824)
    
    Support ES scan operator
---
 be/src/pipeline/exec/es_scan_operator.cpp          | 159 +++++++++++++++++++++
 be/src/pipeline/exec/es_scan_operator.h            |  83 +++++++++++
 be/src/pipeline/exec/scan_operator.cpp             |   3 +
 be/src/pipeline/pipeline_x/operator.cpp            |   2 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  17 +--
 be/src/vec/exec/runtime_filter_consumer.h          |   3 +-
 be/src/vec/exec/scan/new_es_scanner.cpp            |  18 +++
 be/src/vec/exec/scan/new_es_scanner.h              |   5 +
 .../external_table_p0/es/test_es_query.groovy      |   1 -
 .../es/test_es_query_nereids.groovy                |   1 -
 .../es/test_es_query_no_http_url.groovy            |   1 -
 11 files changed, 278 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
new file mode 100644
index 0000000000..e4b1c4956b
--- /dev/null
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/es_scan_operator.h"
+
+#include "exec/es/es_scan_reader.h"
+#include "exec/es/es_scroll_query.h"
+#include "vec/exec/scan/new_es_scanner.h"
+
+namespace doris::pipeline {
+
+// Prefer to the local host
+static std::string get_host_and_port(const 
std::vector<doris::TNetworkAddress>& es_hosts) {
+    std::string host_port;
+    std::string localhost = doris::BackendOptions::get_localhost();
+
+    doris::TNetworkAddress host = es_hosts[0];
+    for (auto& es_host : es_hosts) {
+        if (es_host.hostname == localhost) {
+            host = es_host;
+            break;
+        }
+    }
+
+    host_port = host.hostname;
+    host_port += ":";
+    host_port += std::to_string(host.port);
+    return host_port;
+}
+
+Status EsScanLocalState::_init_profile() {
+    RETURN_IF_ERROR(Base::_init_profile());
+    _es_profile.reset(new RuntimeProfile("EsIterator"));
+    Base::_scanner_profile->add_child(_es_profile.get(), true, nullptr);
+
+    _rows_read_counter = ADD_COUNTER(_es_profile, "RowsRead", TUnit::UNIT);
+    _read_timer = ADD_TIMER(_es_profile, "TotalRawReadTime(*)");
+    _materialize_timer = ADD_TIMER(_es_profile, "MaterializeTupleTime(*)");
+    return Status::OK();
+}
+
+Status EsScanLocalState::_process_conjuncts() {
+    RETURN_IF_ERROR(Base::_process_conjuncts());
+    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+        return Status::OK();
+    }
+
+    
CHECK(Base::_parent->cast<EsScanOperatorX>()._properties.find(ESScanReader::KEY_QUERY_DSL)
 !=
+          Base::_parent->cast<EsScanOperatorX>()._properties.end());
+    return Status::OK();
+}
+
+Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
+    if (_scan_ranges.empty()) {
+        Base::_eos_dependency->set_ready_for_read();
+        return Status::OK();
+    }
+
+    auto& p = Base::_parent->cast<EsScanOperatorX>();
+    for (auto& es_scan_range : _scan_ranges) {
+        // Collect the information from scan range to properties
+        std::map<std::string, std::string> properties(p._properties);
+        properties[ESScanReader::KEY_INDEX] = es_scan_range->index;
+        if (es_scan_range->__isset.type) {
+            properties[ESScanReader::KEY_TYPE] = es_scan_range->type;
+        }
+        properties[ESScanReader::KEY_SHARD] = 
std::to_string(es_scan_range->shard_id);
+        properties[ESScanReader::KEY_BATCH_SIZE] =
+                
std::to_string(vectorized::RuntimeFilterConsumer::_state->batch_size());
+        properties[ESScanReader::KEY_HOST_PORT] = 
get_host_and_port(es_scan_range->es_hosts);
+        // push down limit to Elasticsearch
+        // if predicate in _conjunct_ctxs can not be processed by 
Elasticsearch, we can not push down limit operator to Elasticsearch
+        if (p.limit() != -1 &&
+            p.limit() <= 
vectorized::RuntimeFilterConsumer::_state->batch_size()) {
+            properties[ESScanReader::KEY_TERMINATE_AFTER] = 
std::to_string(p.limit());
+        }
+
+        bool doc_value_mode = false;
+        properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
+                properties, p._column_names, p._docvalue_context, 
&doc_value_mode);
+
+        std::shared_ptr<vectorized::NewEsScanner> scanner = 
vectorized::NewEsScanner::create_shared(
+                vectorized::RuntimeFilterConsumer::_state, this, 
p._limit_per_scanner, p._tuple_id,
+                properties, p._docvalue_context, doc_value_mode,
+                vectorized::RuntimeFilterConsumer::_state->runtime_profile());
+
+        RETURN_IF_ERROR(
+                scanner->prepare(vectorized::RuntimeFilterConsumer::_state, 
Base::_conjuncts));
+        scanners->push_back(scanner);
+    }
+
+    return Status::OK();
+}
+
+void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+    for (auto& es_scan_range : scan_ranges) {
+        DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
+        _scan_ranges.emplace_back(new 
TEsScanRange(es_scan_range.scan_range.es_scan_range));
+    }
+}
+
+EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
+                                 const DescriptorTbl& descs)
+        : ScanOperatorX<EsScanLocalState>(pool, tnode, descs),
+          _tuple_id(tnode.es_scan_node.tuple_id),
+          _tuple_desc(nullptr) {
+    ScanOperatorX<EsScanLocalState>::_output_tuple_id = 
tnode.es_scan_node.tuple_id;
+}
+
+Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::init(tnode, state));
+
+    // use TEsScanNode
+    _properties = tnode.es_scan_node.properties;
+
+    if (tnode.es_scan_node.__isset.docvalue_context) {
+        _docvalue_context = tnode.es_scan_node.docvalue_context;
+    }
+
+    if (tnode.es_scan_node.__isset.fields_context) {
+        _fields_context = tnode.es_scan_node.fields_context;
+    }
+    return Status::OK();
+}
+
+Status EsScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
+
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_tuple_desc == nullptr) {
+        return Status::InternalError("Failed to get tuple descriptor, 
_tuple_id=i{}", _tuple_id);
+    }
+
+    // set up column name vector for ESScrollQueryBuilder
+    for (auto slot_desc : _tuple_desc->slots()) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        _column_names.push_back(slot_desc->col_name());
+    }
+
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/es_scan_operator.h 
b/be/src/pipeline/exec/es_scan_operator.h
new file mode 100644
index 0000000000..96f53c02cd
--- /dev/null
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <string>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/exec/scan_operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris {
+class ExecNode;
+
+namespace vectorized {
+class NewOlapScanner;
+}
+} // namespace doris
+
+namespace doris::pipeline {
+
+class EsScanOperatorX;
+class EsScanLocalState final : public ScanLocalState<EsScanLocalState> {
+public:
+    using Parent = EsScanOperatorX;
+    using Base = ScanLocalState<EsScanLocalState>;
+    ENABLE_FACTORY_CREATOR(EsScanLocalState);
+    EsScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, 
parent) {}
+
+private:
+    friend class vectorized::NewOlapScanner;
+
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    Status _init_profile() override;
+    Status _process_conjuncts() override;
+    Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
+
+    std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges;
+    std::unique_ptr<RuntimeProfile> _es_profile;
+    RuntimeProfile::Counter* _rows_read_counter;
+    RuntimeProfile::Counter* _read_timer;
+    RuntimeProfile::Counter* _materialize_timer;
+};
+
+class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
+public:
+    EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+
+private:
+    friend class EsScanLocalState;
+
+    TupleId _tuple_id;
+    TupleDescriptor* _tuple_desc;
+
+    std::map<std::string, std::string> _properties;
+    std::map<std::string, std::string> _fields_context;
+    std::map<std::string, std::string> _docvalue_context;
+
+    std::vector<std::string> _column_names;
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index b372e2dab8..d67228848f 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -21,6 +21,7 @@
 
 #include <memory>
 
+#include "pipeline/exec/es_scan_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/operator.h"
 #include "vec/exec/runtime_filter_consumer.h"
@@ -1432,5 +1433,7 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
 
 template class ScanOperatorX<OlapScanLocalState>;
 template class ScanLocalState<OlapScanLocalState>;
+template class ScanOperatorX<EsScanLocalState>;
+template class ScanLocalState<EsScanLocalState>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 8d228623c5..2937312976 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -28,6 +28,7 @@
 #include "pipeline/exec/datagen_operator.h"
 #include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
+#include "pipeline/exec/es_scan_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
 #include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
@@ -393,6 +394,7 @@ DECLARE_OPERATOR_X(PartitionSortSinkLocalState)
 #define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
 DECLARE_OPERATOR_X(HashJoinProbeLocalState)
 DECLARE_OPERATOR_X(OlapScanLocalState)
+DECLARE_OPERATOR_X(EsScanLocalState)
 DECLARE_OPERATOR_X(AnalyticLocalState)
 DECLARE_OPERATOR_X(SortLocalState)
 DECLARE_OPERATOR_X(AggLocalState)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1f7289aced..c2becd1683 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -51,6 +51,7 @@
 #include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
+#include "pipeline/exec/es_scan_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
 #include "pipeline/exec/exchange_source_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
@@ -87,16 +88,6 @@
 #include "util/telemetry/telemetry.h"
 #include "util/uid_util.h"
 #include "vec/common/assert_cast.h"
-#include "vec/exec/join/vhash_join_node.h"
-#include "vec/exec/scan/new_es_scan_node.h"
-#include "vec/exec/scan/new_file_scan_node.h"
-#include "vec/exec/scan/new_odbc_scan_node.h"
-#include "vec/exec/scan/new_olap_scan_node.h"
-#include "vec/exec/scan/vmeta_scan_node.h"
-#include "vec/exec/scan/vscan_node.h"
-#include "vec/exec/vaggregation_node.h"
-#include "vec/exec/vexchange_node.h"
-#include "vec/exec/vunion_node.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
 namespace doris::pipeline {
@@ -568,6 +559,12 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
         break;
     }
+    case TPlanNodeType::ES_SCAN_NODE:
+    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
+        op.reset(new EsScanOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        break;
+    }
     case TPlanNodeType::EXCHANGE_NODE: {
         int num_senders = find_with_default(request.per_exch_num_senders, 
tnode.node_id, 0);
         DCHECK_GT(num_senders, 0);
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index a6527fae62..ed7a097901 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -63,10 +63,9 @@ protected:
     std::vector<bool> _runtime_filter_ready_flag;
     doris::Mutex _rf_locks;
     phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
-
-private:
     RuntimeState* _state;
 
+private:
     int32_t _filter_id;
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp 
b/be/src/vec/exec/scan/new_es_scanner.cpp
index 0bd492b79c..867bbb67cc 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -58,6 +58,24 @@ NewEsScanner::NewEsScanner(RuntimeState* state, 
NewEsScanNode* parent, int64_t l
           _docvalue_context(docvalue_context),
           _doc_value_mode(doc_value_mode) {}
 
+NewEsScanner::NewEsScanner(RuntimeState* state, pipeline::ScanLocalStateBase* 
local_state,
+                           int64_t limit, TupleId tuple_id,
+                           const std::map<std::string, std::string>& 
properties,
+                           const std::map<std::string, std::string>& 
docvalue_context,
+                           bool doc_value_mode, RuntimeProfile* profile)
+        : VScanner(state, local_state, limit, profile),
+          _is_init(false),
+          _es_eof(false),
+          _properties(properties),
+          _line_eof(false),
+          _batch_eof(false),
+          _tuple_id(tuple_id),
+          _tuple_desc(nullptr),
+          _es_reader(nullptr),
+          _es_scroll_parser(nullptr),
+          _docvalue_context(docvalue_context),
+          _doc_value_mode(doc_value_mode) {}
+
 Status NewEsScanner::prepare(RuntimeState* state, const VExprContextSPtrs& 
conjuncts) {
     VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
     RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts));
diff --git a/be/src/vec/exec/scan/new_es_scanner.h 
b/be/src/vec/exec/scan/new_es_scanner.h
index 90b61344de..10ee1c438d 100644
--- a/be/src/vec/exec/scan/new_es_scanner.h
+++ b/be/src/vec/exec/scan/new_es_scanner.h
@@ -56,6 +56,11 @@ public:
                  const std::map<std::string, std::string>& docvalue_context, 
bool doc_value_mode,
                  RuntimeProfile* profile);
 
+    NewEsScanner(RuntimeState* state, pipeline::ScanLocalStateBase* 
local_state, int64_t limit,
+                 TupleId tuple_id, const std::map<std::string, std::string>& 
properties,
+                 const std::map<std::string, std::string>& docvalue_context, 
bool doc_value_mode,
+                 RuntimeProfile* profile);
+
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
diff --git a/regression-test/suites/external_table_p0/es/test_es_query.groovy 
b/regression-test/suites/external_table_p0/es/test_es_query.groovy
index a4e5d9705d..03b38cfc5f 100644
--- a/regression-test/suites/external_table_p0/es/test_es_query.groovy
+++ b/regression-test/suites/external_table_p0/es/test_es_query.groovy
@@ -16,7 +16,6 @@
 // under the License.
 
 suite("test_es_query", "p0,external,es,external_docker,external_docker_es") {
-
     String enabled = context.config.otherConfigs.get("enableEsTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
diff --git 
a/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy 
b/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
index a292701200..70f7678d1d 100644
--- a/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
+++ b/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy
@@ -16,7 +16,6 @@
 // under the License.
 
 suite("test_es_query_nereids", 
"p0,external,es,external_docker,external_docker_es") {
-
     String enabled = context.config.otherConfigs.get("enableEsTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
diff --git 
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy 
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
index 7aad2f4fba..3f592e10c6 100644
--- 
a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
+++ 
b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy
@@ -16,7 +16,6 @@
 // under the License.
 
 suite("test_es_query_no_http_url", 
"p0,external,es,external_docker,external_docker_es") {
-
     String enabled = context.config.otherConfigs.get("enableEsTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to