This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 27eed937b3 [pipelineX](es scan) Support ES scan operator (#24824) 27eed937b3 is described below commit 27eed937b37f19e50f329a01e59759c620d4f056 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sun Sep 24 00:32:38 2023 +0800 [pipelineX](es scan) Support ES scan operator (#24824) Support ES scan operator --- be/src/pipeline/exec/es_scan_operator.cpp | 159 +++++++++++++++++++++ be/src/pipeline/exec/es_scan_operator.h | 83 +++++++++++ be/src/pipeline/exec/scan_operator.cpp | 3 + be/src/pipeline/pipeline_x/operator.cpp | 2 + .../pipeline_x/pipeline_x_fragment_context.cpp | 17 +-- be/src/vec/exec/runtime_filter_consumer.h | 3 +- be/src/vec/exec/scan/new_es_scanner.cpp | 18 +++ be/src/vec/exec/scan/new_es_scanner.h | 5 + .../external_table_p0/es/test_es_query.groovy | 1 - .../es/test_es_query_nereids.groovy | 1 - .../es/test_es_query_no_http_url.groovy | 1 - 11 files changed, 278 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp new file mode 100644 index 0000000000..e4b1c4956b --- /dev/null +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/es_scan_operator.h" + +#include "exec/es/es_scan_reader.h" +#include "exec/es/es_scroll_query.h" +#include "vec/exec/scan/new_es_scanner.h" + +namespace doris::pipeline { + +// Prefer to the local host +static std::string get_host_and_port(const std::vector<doris::TNetworkAddress>& es_hosts) { + std::string host_port; + std::string localhost = doris::BackendOptions::get_localhost(); + + doris::TNetworkAddress host = es_hosts[0]; + for (auto& es_host : es_hosts) { + if (es_host.hostname == localhost) { + host = es_host; + break; + } + } + + host_port = host.hostname; + host_port += ":"; + host_port += std::to_string(host.port); + return host_port; +} + +Status EsScanLocalState::_init_profile() { + RETURN_IF_ERROR(Base::_init_profile()); + _es_profile.reset(new RuntimeProfile("EsIterator")); + Base::_scanner_profile->add_child(_es_profile.get(), true, nullptr); + + _rows_read_counter = ADD_COUNTER(_es_profile, "RowsRead", TUnit::UNIT); + _read_timer = ADD_TIMER(_es_profile, "TotalRawReadTime(*)"); + _materialize_timer = ADD_TIMER(_es_profile, "MaterializeTupleTime(*)"); + return Status::OK(); +} + +Status EsScanLocalState::_process_conjuncts() { + RETURN_IF_ERROR(Base::_process_conjuncts()); + if (Base::_eos_dependency->read_blocked_by() == nullptr) { + return Status::OK(); + } + + CHECK(Base::_parent->cast<EsScanOperatorX>()._properties.find(ESScanReader::KEY_QUERY_DSL) != + Base::_parent->cast<EsScanOperatorX>()._properties.end()); + return Status::OK(); +} + +Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { + if (_scan_ranges.empty()) { + Base::_eos_dependency->set_ready_for_read(); + return Status::OK(); + } + + auto& p = Base::_parent->cast<EsScanOperatorX>(); + for (auto& es_scan_range : _scan_ranges) { + // Collect the information from scan range to properties + std::map<std::string, std::string> properties(p._properties); + properties[ESScanReader::KEY_INDEX] = es_scan_range->index; + if (es_scan_range->__isset.type) { + properties[ESScanReader::KEY_TYPE] = es_scan_range->type; + } + properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range->shard_id); + properties[ESScanReader::KEY_BATCH_SIZE] = + std::to_string(vectorized::RuntimeFilterConsumer::_state->batch_size()); + properties[ESScanReader::KEY_HOST_PORT] = get_host_and_port(es_scan_range->es_hosts); + // push down limit to Elasticsearch + // if predicate in _conjunct_ctxs can not be processed by Elasticsearch, we can not push down limit operator to Elasticsearch + if (p.limit() != -1 && + p.limit() <= vectorized::RuntimeFilterConsumer::_state->batch_size()) { + properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(p.limit()); + } + + bool doc_value_mode = false; + properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build( + properties, p._column_names, p._docvalue_context, &doc_value_mode); + + std::shared_ptr<vectorized::NewEsScanner> scanner = vectorized::NewEsScanner::create_shared( + vectorized::RuntimeFilterConsumer::_state, this, p._limit_per_scanner, p._tuple_id, + properties, p._docvalue_context, doc_value_mode, + vectorized::RuntimeFilterConsumer::_state->runtime_profile()); + + RETURN_IF_ERROR( + scanner->prepare(vectorized::RuntimeFilterConsumer::_state, Base::_conjuncts)); + scanners->push_back(scanner); + } + + return Status::OK(); +} + +void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { + for (auto& es_scan_range : scan_ranges) { + DCHECK(es_scan_range.scan_range.__isset.es_scan_range); + _scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range)); + } +} + +EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ScanOperatorX<EsScanLocalState>(pool, tnode, descs), + _tuple_id(tnode.es_scan_node.tuple_id), + _tuple_desc(nullptr) { + ScanOperatorX<EsScanLocalState>::_output_tuple_id = tnode.es_scan_node.tuple_id; +} + +Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::init(tnode, state)); + + // use TEsScanNode + _properties = tnode.es_scan_node.properties; + + if (tnode.es_scan_node.__isset.docvalue_context) { + _docvalue_context = tnode.es_scan_node.docvalue_context; + } + + if (tnode.es_scan_node.__isset.fields_context) { + _fields_context = tnode.es_scan_node.fields_context; + } + return Status::OK(); +} + +Status EsScanOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state)); + + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + return Status::InternalError("Failed to get tuple descriptor, _tuple_id=i{}", _tuple_id); + } + + // set up column name vector for ESScrollQueryBuilder + for (auto slot_desc : _tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + _column_names.push_back(slot_desc->col_name()); + } + + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h new file mode 100644 index 0000000000..96f53c02cd --- /dev/null +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <string> + +#include "common/status.h" +#include "operator.h" +#include "pipeline/exec/scan_operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris { +class ExecNode; + +namespace vectorized { +class NewOlapScanner; +} +} // namespace doris + +namespace doris::pipeline { + +class EsScanOperatorX; +class EsScanLocalState final : public ScanLocalState<EsScanLocalState> { +public: + using Parent = EsScanOperatorX; + using Base = ScanLocalState<EsScanLocalState>; + ENABLE_FACTORY_CREATOR(EsScanLocalState); + EsScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {} + +private: + friend class vectorized::NewOlapScanner; + + void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + Status _init_profile() override; + Status _process_conjuncts() override; + Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; + + std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges; + std::unique_ptr<RuntimeProfile> _es_profile; + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _materialize_timer; +}; + +class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> { +public: + EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + +private: + friend class EsScanLocalState; + + TupleId _tuple_id; + TupleDescriptor* _tuple_desc; + + std::map<std::string, std::string> _properties; + std::map<std::string, std::string> _fields_context; + std::map<std::string, std::string> _docvalue_context; + + std::vector<std::string> _column_names; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index b372e2dab8..d67228848f 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -21,6 +21,7 @@ #include <memory> +#include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/operator.h" #include "vec/exec/runtime_filter_consumer.h" @@ -1432,5 +1433,7 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized: template class ScanOperatorX<OlapScanLocalState>; template class ScanLocalState<OlapScanLocalState>; +template class ScanOperatorX<EsScanLocalState>; +template class ScanLocalState<EsScanLocalState>; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 8d228623c5..2937312976 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -28,6 +28,7 @@ #include "pipeline/exec/datagen_operator.h" #include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" #include "pipeline/exec/empty_set_operator.h" +#include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" @@ -393,6 +394,7 @@ DECLARE_OPERATOR_X(PartitionSortSinkLocalState) #define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX<LOCAL_STATE>; DECLARE_OPERATOR_X(HashJoinProbeLocalState) DECLARE_OPERATOR_X(OlapScanLocalState) +DECLARE_OPERATOR_X(EsScanLocalState) DECLARE_OPERATOR_X(AnalyticLocalState) DECLARE_OPERATOR_X(SortLocalState) DECLARE_OPERATOR_X(AggLocalState) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 1f7289aced..c2becd1683 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -51,6 +51,7 @@ #include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" #include "pipeline/exec/distinct_streaming_aggregation_source_operator.h" #include "pipeline/exec/empty_set_operator.h" +#include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" @@ -87,16 +88,6 @@ #include "util/telemetry/telemetry.h" #include "util/uid_util.h" #include "vec/common/assert_cast.h" -#include "vec/exec/join/vhash_join_node.h" -#include "vec/exec/scan/new_es_scan_node.h" -#include "vec/exec/scan/new_file_scan_node.h" -#include "vec/exec/scan/new_odbc_scan_node.h" -#include "vec/exec/scan/new_olap_scan_node.h" -#include "vec/exec/scan/vmeta_scan_node.h" -#include "vec/exec/scan/vscan_node.h" -#include "vec/exec/vaggregation_node.h" -#include "vec/exec/vexchange_node.h" -#include "vec/exec/vunion_node.h" #include "vec/runtime/vdata_stream_mgr.h" namespace doris::pipeline { @@ -568,6 +559,12 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } + case TPlanNodeType::ES_SCAN_NODE: + case TPlanNodeType::ES_HTTP_SCAN_NODE: { + op.reset(new EsScanOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + break; + } case TPlanNodeType::EXCHANGE_NODE: { int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0); DCHECK_GT(num_senders, 0); diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index a6527fae62..ed7a097901 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -63,10 +63,9 @@ protected: std::vector<bool> _runtime_filter_ready_flag; doris::Mutex _rf_locks; phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set; - -private: RuntimeState* _state; +private: int32_t _filter_id; std::vector<TRuntimeFilterDesc> _runtime_filter_descs; diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index 0bd492b79c..867bbb67cc 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -58,6 +58,24 @@ NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t l _docvalue_context(docvalue_context), _doc_value_mode(doc_value_mode) {} +NewEsScanner::NewEsScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, + int64_t limit, TupleId tuple_id, + const std::map<std::string, std::string>& properties, + const std::map<std::string, std::string>& docvalue_context, + bool doc_value_mode, RuntimeProfile* profile) + : VScanner(state, local_state, limit, profile), + _is_init(false), + _es_eof(false), + _properties(properties), + _line_eof(false), + _batch_eof(false), + _tuple_id(tuple_id), + _tuple_desc(nullptr), + _es_reader(nullptr), + _es_scroll_parser(nullptr), + _docvalue_context(docvalue_context), + _doc_value_mode(doc_value_mode) {} + Status NewEsScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) { VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts)); diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index 90b61344de..10ee1c438d 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -56,6 +56,11 @@ public: const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode, RuntimeProfile* profile); + NewEsScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, int64_t limit, + TupleId tuple_id, const std::map<std::string, std::string>& properties, + const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode, + RuntimeProfile* profile); + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/regression-test/suites/external_table_p0/es/test_es_query.groovy b/regression-test/suites/external_table_p0/es/test_es_query.groovy index a4e5d9705d..03b38cfc5f 100644 --- a/regression-test/suites/external_table_p0/es/test_es_query.groovy +++ b/regression-test/suites/external_table_p0/es/test_es_query.groovy @@ -16,7 +16,6 @@ // under the License. suite("test_es_query", "p0,external,es,external_docker,external_docker_es") { - String enabled = context.config.otherConfigs.get("enableEsTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") diff --git a/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy b/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy index a292701200..70f7678d1d 100644 --- a/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy +++ b/regression-test/suites/external_table_p0/es/test_es_query_nereids.groovy @@ -16,7 +16,6 @@ // under the License. suite("test_es_query_nereids", "p0,external,es,external_docker,external_docker_es") { - String enabled = context.config.otherConfigs.get("enableEsTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") diff --git a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy index 7aad2f4fba..3f592e10c6 100644 --- a/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy +++ b/regression-test/suites/external_table_p0/es/test_es_query_no_http_url.groovy @@ -16,7 +16,6 @@ // under the License. suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_docker_es") { - String enabled = context.config.otherConfigs.get("enableEsTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org