This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3894de49d2 [Enhancement](topn) support two phase read for topn query (#15642) 3894de49d2 is described below commit 3894de49d289aa531cc6f4f7bd161cc55dc48c5b Author: lihangyu <15605149...@163.com> AuthorDate: Thu Jan 19 10:01:33 2023 +0800 [Enhancement](topn) support two phase read for topn query (#15642) This PR optimize topn query like `SELECT * FROM tableX ORDER BY columnA ASC/DESC LIMIT N`. TopN is is compose of SortNode and ScanNode, when user table is wide like 100+ columns the order by clause is just a few columns.But ScanNode need to scan all data from storage engine even if the limit is very small.This may lead to lots of read amplification.So In this PR I devide TopN query into two phase: 1. The first phase we just need to read `columnA`'s data from storage engine along with an extra RowId column called `__DORIS_ROWID_COL__`.The other columns are pruned from ScanNode. 2. The second phase I put it in the ExchangeNode beacuase it's the central node for topn nodes in the cluster.The ExchangeNode will spawn a RPC to other nodes using the RowIds(sorted and limited from SortNode) read from the first phase and read row by row from storage engine. After the second phase read, Block will contain all the data needed for the query --- be/src/common/config.h | 1 + be/src/common/consts.h | 1 + be/src/exec/CMakeLists.txt | 1 + be/src/exec/rowid_fetcher.cpp | 131 ++++++++++++++++++++ be/src/{common/consts.h => exec/rowid_fetcher.h} | 33 +++-- be/src/exec/tablet_info.h | 2 + be/src/exprs/runtime_filter.h | 2 +- be/src/olap/iterators.h | 2 + be/src/olap/rowset/beta_rowset_reader.cpp | 2 + be/src/olap/rowset/segment_v2/column_reader.h | 50 ++++++++ be/src/olap/rowset/segment_v2/segment.h | 2 + be/src/olap/rowset/segment_v2/segment_iterator.cpp | 8 ++ be/src/olap/schema.h | 9 ++ be/src/olap/tablet.cpp | 14 +++ be/src/olap/tablet.h | 2 + be/src/olap/tablet_schema.h | 5 + be/src/olap/utils.h | 7 ++ be/src/runtime/descriptors.cpp | 15 ++- be/src/runtime/descriptors.h | 9 ++ be/src/runtime/query_fragments_ctx.h | 1 - be/src/service/internal_service.cpp | 133 +++++++++++++++++++++ be/src/service/internal_service.h | 3 + be/src/vec/common/sort/heap_sorter.cpp | 3 + be/src/vec/common/sort/sorter.cpp | 3 + be/src/vec/common/sort/sorter.h | 7 +- be/src/vec/core/block.cpp | 12 +- be/src/vec/core/block.h | 6 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 20 +++- be/src/vec/exec/scan/scanner_context.cpp | 6 +- be/src/vec/exec/scan/vscanner.cpp | 7 +- be/src/vec/exec/vexchange_node.cpp | 32 ++++- be/src/vec/exec/vexchange_node.h | 9 ++ be/src/vec/exec/vsort_node.cpp | 2 - be/src/vec/exprs/vslot_ref.cpp | 7 +- be/src/vec/utils/util.hpp | 12 +- .../main/java/org/apache/doris/common/Config.java | 3 + .../java/org/apache/doris/analysis/Analyzer.java | 7 ++ .../java/org/apache/doris/analysis/SelectStmt.java | 96 ++++++++++++++- .../org/apache/doris/analysis/SlotDescriptor.java | 20 +++- .../java/org/apache/doris/analysis/SlotRef.java | 13 ++ .../java/org/apache/doris/analysis/SortInfo.java | 18 +++ .../main/java/org/apache/doris/catalog/Column.java | 1 + .../org/apache/doris/planner/ExchangeNode.java | 28 ++++- .../org/apache/doris/planner/OlapScanNode.java | 3 + .../org/apache/doris/planner/OriginalPlanner.java | 54 +++++++++ .../java/org/apache/doris/planner/PlanNode.java | 3 - .../java/org/apache/doris/planner/SortNode.java | 4 + .../java/org/apache/doris/qe/SessionVariable.java | 11 ++ gensrc/proto/descriptors.proto | 2 + gensrc/proto/internal_service.proto | 20 ++++ gensrc/thrift/Descriptors.thrift | 4 + gensrc/thrift/Exprs.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 4 + 53 files changed, 807 insertions(+), 44 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 06bae98e2d..86bfd1981e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -178,6 +178,7 @@ CONF_Int32(doris_scanner_thread_pool_thread_num, "48"); CONF_Int32(doris_scanner_thread_pool_queue_size, "102400"); // default thrift client connect timeout(in seconds) CONF_mInt32(thrift_connect_timeout_seconds, "3"); +CONF_mInt32(fetch_rpc_timeout_seconds, "20"); // default thrift client retry interval (in milliseconds) CONF_mInt64(thrift_client_retry_interval_ms, "1000"); // max row count number for single scan range, used in segmentv1 diff --git a/be/src/common/consts.h b/be/src/common/consts.h index 4e2045a1ce..3966a88346 100644 --- a/be/src/common/consts.h +++ b/be/src/common/consts.h @@ -25,6 +25,7 @@ const std::string CSV = "csv"; const std::string CSV_WITH_NAMES = "csv_with_names"; const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__"; +const std::string ROWID_COL = "__DORIS_ROWID_COL__"; constexpr int MAX_DECIMAL32_PRECISION = 9; constexpr int MAX_DECIMAL64_PRECISION = 18; diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 678b865b1c..a36857d414 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -63,6 +63,7 @@ set(EXEC_FILES odbc_connector.cpp table_connector.cpp schema_scanner.cpp + rowid_fetcher.cpp ) if (WITH_LZO) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp new file mode 100644 index 0000000000..a9a326c0e3 --- /dev/null +++ b/be/src/exec/rowid_fetcher.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/rowid_fetcher.h" + +#include "bthread/countdown_event.h" +#include "exec/tablet_info.h" // DorisNodesInfo +#include "gen_cpp/Types_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "runtime/exec_env.h" // ExecEnv +#include "runtime/runtime_state.h" // RuntimeState +#include "util/brpc_client_cache.h" // BrpcClientCache +#include "util/defer_op.h" +#include "vec/core/block.h" // Block + +namespace doris { + +Status RowIDFetcher::init(DorisNodesInfo* nodes_info) { + for (auto [node_id, node_info] : nodes_info->nodes_info()) { + auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + node_info.host, node_info.brpc_port); + if (!client) { + LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host + << ", port=" << node_info.brpc_port; + return Status::InternalError("RowIDFetcher failed to init rpc client"); + } + _stubs.push_back(client); + } + return Status::OK(); +} + +static std::string format_rowid(const GlobalRowLoacation& location) { + return fmt::format("{} {} {} {}", location.tablet_id, + location.row_location.rowset_id.to_string(), + location.row_location.segment_id, location.row_location.row_id); +} + +PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_ids) { + PMultiGetRequest mget_req; + _tuple_desc->to_protobuf(mget_req.mutable_desc()); + for (auto slot : _tuple_desc->slots()) { + slot->to_protobuf(mget_req.add_slots()); + } + for (size_t i = 0; i < row_ids.size(); ++i) { + PMultiGetRequest::RowId row_id; + StringRef row_id_rep = row_ids.get_data_at(i); + auto location = reinterpret_cast<const GlobalRowLoacation*>(row_id_rep.data); + row_id.set_tablet_id(location->tablet_id); + row_id.set_rowset_id(location->row_location.rowset_id.to_string()); + row_id.set_segment_id(location->row_location.segment_id); + row_id.set_ordinal_id(location->row_location.row_id); + *mget_req.add_rowids() = std::move(row_id); + } + mget_req.set_be_exec_version(_st->be_exec_version()); + return mget_req; +} + +static void fetch_callback(bthread::CountdownEvent* counter) { + Defer __defer([&] { counter->signal(); }); +} + +static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps, + const std::vector<brpc::Controller>& cntls, + vectorized::MutableBlock* output_block) { + for (const auto& cntl : cntls) { + if (cntl.Failed()) { + LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText() + << ", host:" << cntl.remote_side(); + return Status::InternalError(cntl.ErrorText()); + } + } + for (const auto& resp : rsps) { + Status st(resp.status()); + if (!st.ok()) { + LOG(WARNING) << "Failed to fetch " << st.to_string(); + return st; + } + vectorized::Block partial_block(resp.block()); + output_block->merge(partial_block); + } + return Status::OK(); +} + +Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids, + vectorized::MutableBlock* res_block) { + CHECK(!_stubs.empty()); + res_block->clear_column_data(); + vectorized::MutableBlock mblock({_tuple_desc}, row_ids->size()); + PMultiGetRequest mget_req = _init_fetch_request(assert_cast<const vectorized::ColumnString&>( + *vectorized::remove_nullable(row_ids).get())); + std::vector<PMultiGetResponse> resps(_stubs.size()); + std::vector<brpc::Controller> cntls(_stubs.size()); + bthread::CountdownEvent counter(_stubs.size()); + for (size_t i = 0; i < _stubs.size(); ++i) { + cntls[i].set_timeout_ms(config::fetch_rpc_timeout_seconds * 1000); + auto callback = brpc::NewCallback(fetch_callback, &counter); + _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback); + } + counter.wait(); + RETURN_IF_ERROR(MergeRPCResults(resps, cntls, &mblock)); + // final sort by row_ids sequence, since row_ids is already sorted + vectorized::Block tmp = mblock.to_block(); + std::unordered_map<std::string, uint32_t> row_order; + vectorized::ColumnPtr row_id_column = tmp.get_columns().back(); + for (size_t x = 0; x < row_id_column->size(); ++x) { + auto location = + reinterpret_cast<const GlobalRowLoacation*>(row_id_column->get_data_at(x).data); + row_order[format_rowid(*location)] = x; + } + for (size_t x = 0; x < row_ids->size(); ++x) { + auto location = reinterpret_cast<const GlobalRowLoacation*>(row_ids->get_data_at(x).data); + res_block->add_row(&tmp, row_order[format_rowid(*location)]); + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/common/consts.h b/be/src/exec/rowid_fetcher.h similarity index 56% copy from be/src/common/consts.h copy to be/src/exec/rowid_fetcher.h index 4e2045a1ce..57101c5033 100644 --- a/be/src/common/consts.h +++ b/be/src/exec/rowid_fetcher.h @@ -17,17 +17,28 @@ #pragma once -#include <string> +#include "gen_cpp/internal_service.pb.h" +#include "vec/core/block.h" namespace doris { -namespace BeConsts { -const std::string CSV = "csv"; -const std::string CSV_WITH_NAMES = "csv_with_names"; -const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; -const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__"; - -constexpr int MAX_DECIMAL32_PRECISION = 9; -constexpr int MAX_DECIMAL64_PRECISION = 18; -constexpr int MAX_DECIMAL128_PRECISION = 38; -} // namespace BeConsts + +class DorisNodesInfo; +class RuntimeState; + +// fetch rows by global rowid +// tablet_id/rowset_name/segment_id/ordinal_id +class RowIDFetcher { +public: + RowIDFetcher(TupleDescriptor* desc, RuntimeState* st) : _tuple_desc(desc), _st(st) {} + Status init(DorisNodesInfo* nodes_info); + Status fetch(const vectorized::ColumnPtr& row_ids, vectorized::MutableBlock* block); + +private: + PMultiGetRequest _init_fetch_request(const vectorized::ColumnString& row_ids); + + std::vector<std::shared_ptr<PBackendService_Stub>> _stubs; + TupleDescriptor* _tuple_desc; + RuntimeState* _st; +}; + } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 3e955b9b6b..76d4fa8094 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -249,6 +249,8 @@ public: return nullptr; } + const std::unordered_map<int64_t, NodeInfo>& nodes_info() { return _nodes; } + private: std::unordered_map<int64_t, NodeInfo> _nodes; }; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index bc818982b0..9142ce0557 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -29,7 +29,7 @@ class IOBufAsZeroCopyInputStream; namespace doris { class Predicate; class ObjectPool; -class RuntimeState; +class ExprContext; class RuntimePredicateWrapper; class MemTracker; class TupleRow; diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index d564b2b238..db1580d06d 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -119,6 +119,8 @@ public: vectorized::VExpr* remaining_vconjunct_root = nullptr; // runtime state RuntimeState* runtime_state = nullptr; + RowsetId rowset_id; + int32_t tablet_id = 0; }; class RowwiseIterator { diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index c56345eacb..6612fff758 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -64,6 +64,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.stats = _stats; _read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt; _read_options.remaining_vconjunct_root = _context->remaining_vconjunct_root; + _read_options.rowset_id = _rowset->rowset_id(); + _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); if (read_context->lower_bound_keys != nullptr) { for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) { _read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i), diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 1cbcb085af..2d57c33430 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -417,6 +417,56 @@ private: vectorized::ColumnArray::ColumnOffsets& column_offsets); }; +class RowIdColumnIterator : public ColumnIterator { +public: + RowIdColumnIterator() = delete; + RowIdColumnIterator(int32_t tid, RowsetId rid, int32_t segid) + : _tablet_id(tid), _rowset_id(rid), _segment_id(segid) {} + + Status seek_to_first() override { + _current_rowid = 0; + return Status::OK(); + } + + Status seek_to_ordinal(ordinal_t ord_idx) override { + _current_rowid = ord_idx; + return Status::OK(); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { + bool has_null; + return next_batch(n, dst, &has_null); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { + for (size_t i = 0; i < *n; ++i) { + rowid_t row_id = _current_rowid + i; + GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id); + dst->insert_data(reinterpret_cast<const char*>(&location), sizeof(GlobalRowLoacation)); + } + _current_rowid += *n; + return Status::OK(); + } + + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override { + for (size_t i = 0; i < count; ++i) { + rowid_t row_id = rowids[i]; + GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id); + dst->insert_data(reinterpret_cast<const char*>(&location), sizeof(GlobalRowLoacation)); + } + return Status::OK(); + } + + ordinal_t get_current_ordinal() const override { return _current_rowid; } + +private: + rowid_t _current_rowid = 0; + int32_t _tablet_id = 0; + RowsetId _rowset_id; + int32_t _segment_id = 0; +}; + // This iterator is used to read default value column class DefaultValueColumnIterator : public ColumnIterator { public: diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 1ec4943cf2..47a1ebd176 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -112,6 +112,8 @@ public: return _footer.primary_key_index_meta().max_key(); }; + io::FileReaderSPtr file_reader() { return _file_reader; } + private: DISALLOW_COPY_AND_ASSIGN(Segment); Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 659b49b6fe..7d3bde6c5d 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -179,6 +179,9 @@ Status SegmentIterator::init(const StorageReadOptions& opts) { _remaining_vconjunct_root = opts.remaining_vconjunct_root; _column_predicate_info.reset(new ColumnPredicateInfo()); + if (_schema.rowid_col_idx() > 0) { + _opts.record_rowids = true; + } return Status::OK(); } @@ -688,6 +691,11 @@ Status SegmentIterator::_init_return_column_iterators() { } for (auto cid : _schema.column_ids()) { int32_t unique_id = _opts.tablet_schema->column(cid).unique_id(); + if (_opts.tablet_schema->column(cid).name() == BeConsts::ROWID_COL) { + _column_iterators[unique_id] = + new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id()); + continue; + } if (_column_iterators.count(unique_id) < 1) { RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid), &_column_iterators[unique_id])); diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index 4b06aed2b2..de639dc164 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -19,6 +19,7 @@ #include <vector> +#include "common/consts.h" #include "olap/aggregate_func.h" #include "olap/field.h" #include "olap/row_cursor_cell.h" @@ -52,6 +53,9 @@ public: if (column.is_key()) { ++num_key_columns; } + if (column.name() == BeConsts::ROWID_COL) { + _rowid_col_idx = cid; + } columns.push_back(column); } _delete_sign_idx = tablet_schema->delete_sign_idx(); @@ -72,6 +76,9 @@ public: if (columns[i].name() == DELETE_SIGN) { _delete_sign_idx = i; } + if (columns[i].name() == BeConsts::ROWID_COL) { + _rowid_col_idx = i; + } _unique_ids[i] = columns[i].unique_id(); } _init(columns, col_ids, num_key_columns); @@ -145,6 +152,7 @@ public: int32_t unique_id(size_t index) const { return _unique_ids[index]; } int32_t delete_sign_idx() const { return _delete_sign_idx; } bool has_sequence_col() const { return _has_sequence_col; } + int32_t rowid_col_idx() const { return _rowid_col_idx; }; private: void _init(const std::vector<TabletColumn>& cols, const std::vector<ColumnId>& col_ids, @@ -169,6 +177,7 @@ private: size_t _schema_size; int32_t _delete_sign_idx = -1; bool _has_sequence_col = false; + int32_t _rowid_col_idx = -1; }; } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f6eda348f3..06d4b71ca1 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -260,6 +260,20 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowset return res; } +RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) { + for (auto& version_rowset : _rs_version_map) { + if (version_rowset.second->rowset_id() == rowset_id) { + return version_rowset.second; + } + } + for (auto& stale_version_rowset : _stale_rs_version_map) { + if (stale_version_rowset.second->rowset_id() == rowset_id) { + return stale_version_rowset.second; + } + } + return nullptr; +} + Status Tablet::add_rowset(RowsetSharedPtr rowset) { DCHECK(rowset != nullptr); std::lock_guard<std::shared_mutex> wrlock(_meta_lock); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e10b9628ac..71151a6b84 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -344,6 +344,8 @@ public: int64_t start = -1); bool should_skip_compaction(CompactionType compaction_type, int64_t now); + RowsetSharedPtr get_rowset(const RowsetId& rowset_id); + private: Status _init_once_action(); void _print_missed_versions(const std::vector<Version>& missed_versions) const; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index bbbbfc896f..0ffcb3167d 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -64,6 +64,11 @@ public: size_t length() const { return _length; } size_t index_length() const { return _index_length; } void set_index_length(size_t index_length) { _index_length = index_length; } + void set_type(FieldType type) { _type = type; } + void set_is_key(bool is_key) { _is_key = is_key; } + void set_is_nullable(bool is_nullable) { _is_nullable = is_nullable; } + void set_unique_id(int32_t unique_id) { _unique_id = unique_id; } + void set_has_default_value(bool has) { _has_default_value = has; } FieldAggregationMethod aggregation() const { return _aggregation; } vectorized::AggregateFunctionPtr get_aggregate_function(vectorized::DataTypes argument_types, std::string suffix) const; diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index 964b973569..9c1a20d767 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -310,4 +310,11 @@ struct RowLocation { uint32_t row_id; }; +struct GlobalRowLoacation { + GlobalRowLoacation(uint32_t tid, RowsetId rsid, uint32_t sid, uint32_t rid) + : tablet_id(tid), row_location(rsid, sid, rid) {}; + uint32_t tablet_id; + RowLocation row_location; +}; + } // namespace doris diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 9eea456fb2..4272e9aa56 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -63,7 +63,9 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc) _slot_idx(tdesc.slotIdx), _slot_size(_type.get_slot_size()), _field_idx(-1), - _is_materialized(tdesc.isMaterialized) {} + _is_materialized(tdesc.isMaterialized), + _is_key(tdesc.is_key), + _need_materialize(tdesc.need_materialize) {} SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) : _id(pdesc.id()), @@ -74,11 +76,13 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) _null_indicator_offset(pdesc.null_indicator_byte(), pdesc.null_indicator_bit()), _col_name(pdesc.col_name()), _col_name_lower_case(to_lower(pdesc.col_name())), - _col_unique_id(-1), + _col_unique_id(pdesc.col_unique_id()), _slot_idx(pdesc.slot_idx()), _slot_size(_type.get_slot_size()), _field_idx(-1), - _is_materialized(pdesc.is_materialized()) {} + _is_materialized(pdesc.is_materialized()), + _is_key(pdesc.is_key()), + _need_materialize(true) {} void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const { pslot->set_id(_id); @@ -92,6 +96,8 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const { pslot->set_col_name(_col_name); pslot->set_slot_idx(_slot_idx); pslot->set_is_materialized(_is_materialized); + pslot->set_col_unique_id(_col_unique_id); + pslot->set_is_key(_is_key); } vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const { @@ -542,6 +548,9 @@ int RowDescriptor::get_column_id(int slot_id) const { int column_id_counter = 0; for (const auto tuple_desc : _tuple_desc_map) { for (const auto slot : tuple_desc->slots()) { + if (!slot->need_materialize()) { + continue; + } if (slot->id() == slot_id) { return column_id_counter; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index c49eebb6db..86e58a39a9 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -51,6 +51,7 @@ class SchemaScanner; class OlapTableSchemaParam; class PTupleDescriptor; class PSlotDescriptor; +class PInternalServiceImpl; // Location information for null indicator bit for particular slot. // For non-nullable slots, the byte_offset will be 0 and the bit_mask will be 0. @@ -116,11 +117,15 @@ public: int32_t col_unique_id() const { return _col_unique_id; } + bool is_key() const { return _is_key; } + bool need_materialize() const { return _need_materialize; } + private: friend class DescriptorTbl; friend class TupleDescriptor; friend class SchemaScanner; friend class OlapTableSchemaParam; + friend class PInternalServiceImpl; const SlotId _id; const TypeDescriptor _type; @@ -147,6 +152,9 @@ private: const bool _is_materialized; + const bool _is_key; + const bool _need_materialize; + SlotDescriptor(const TSlotDescriptor& tdesc); SlotDescriptor(const PSlotDescriptor& pdesc); }; @@ -342,6 +350,7 @@ private: friend class DescriptorTbl; friend class SchemaScanner; friend class OlapTableSchemaParam; + friend class PInternalServiceImpl; const TupleId _id; TableDescriptor* _table_desc; diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 4d480c9efe..a01c3924c0 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -165,7 +165,6 @@ private: std::atomic<bool> _is_cancelled {false}; std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; - vectorized::RuntimePredicate _runtime_predicate; }; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 86c6a0347d..7eafe18bda 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -22,10 +22,14 @@ #include <string> #include "common/config.h" +#include "common/consts.h" #include "gen_cpp/BackendService.h" #include "gen_cpp/internal_service.pb.h" #include "http/http_client.h" +#include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_factory.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/segment_loader.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "runtime/buffer_control_block.h" @@ -39,6 +43,7 @@ #include "runtime/thread_context.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" +#include "util/defer_op.h" #include "util/md5.h" #include "util/proto_util.h" #include "util/ref_count_closure.h" @@ -48,6 +53,8 @@ #include "util/telemetry/telemetry.h" #include "util/thrift_util.h" #include "util/uid_util.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_string.h" #include "vec/exec/format/csv/csv_reader.h" #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/json/new_json_reader.h" @@ -949,4 +956,130 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( Status::OK().to_protobuf(response->mutable_status()); } +static Status read_by_rowids( + std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc, + const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>& rowids, + vectorized::Block* sub_block) { + //read from row_range.first to row_range.second + for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) { + MonotonicStopWatch watch; + watch.start(); + auto row_id = rowids[i]; + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( + row_id.tablet_id(), true /*include deleted*/); + RowsetId rowset_id; + rowset_id.init(row_id.rowset_id()); + if (!tablet) { + continue; + } + BetaRowsetSharedPtr rowset = + std::static_pointer_cast<BetaRowset>(tablet->get_rowset(rowset_id)); + if (!rowset) { + LOG(INFO) << "no such rowset " << rowset_id; + continue; + } + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + VLOG_DEBUG << "get tablet schema column_num:" << tablet_schema->num_columns() + << ", version:" << tablet_schema->schema_version() + << ", cost(us):" << watch.elapsed_time() / 1000; + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + // find segment + auto it = std::find_if(segment_cache.get_segments().begin(), + segment_cache.get_segments().end(), + [&row_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == row_id.segment_id(); + }); + if (it == segment_cache.get_segments().end()) { + continue; + } + segment_v2::SegmentSharedPtr segment = *it; + for (int x = 0; x < desc.slots().size() - 1; ++x) { + int index = tablet_schema->field_index(desc.slots()[x]->col_unique_id()); + segment_v2::ColumnIterator* column_iterator = nullptr; + vectorized::MutableColumnPtr column = + sub_block->get_by_position(x).column->assume_mutable(); + if (index < 0) { + column->insert_default(); + continue; + } else { + RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema->column(index), + &column_iterator)); + } + std::unique_ptr<segment_v2::ColumnIterator> ptr_guard(column_iterator); + segment_v2::ColumnIteratorOptions opt; + OlapReaderStatistics stats; + opt.file_reader = segment->file_reader().get(); + opt.stats = &stats; + opt.use_page_cache = !config::disable_storage_page_cache; + column_iterator->init(opt); + std::vector<segment_v2::rowid_t> rowids { + static_cast<segment_v2::rowid_t>(row_id.ordinal_id())}; + RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column)); + } + LOG_EVERY_N(INFO, 100) << "multiget_data single_row, cost(us):" + << watch.elapsed_time() / 1000; + GlobalRowLoacation row_location(row_id.tablet_id(), rowset->rowset_id(), + row_id.segment_id(), row_id.ordinal_id()); + sub_block->get_columns().back()->assume_mutable()->insert_data( + reinterpret_cast<const char*>(&row_location), sizeof(GlobalRowLoacation)); + } + return Status::OK(); +} + +Status PInternalServiceImpl::_multi_get(const PMultiGetRequest* request, + PMultiGetResponse* response) { + TupleDescriptor desc(request->desc()); + std::vector<SlotDescriptor> slots; + slots.reserve(request->slots().size()); + for (const auto& pslot : request->slots()) { + slots.push_back(SlotDescriptor(pslot)); + desc.add_slot(&slots.back()); + } + assert(desc.slots().back()->col_name() == BeConsts::ROWID_COL); + vectorized::Block block(desc.slots(), request->rowids().size()); + RETURN_IF_ERROR( + read_by_rowids(std::pair {0, request->rowids_size()}, desc, request->rowids(), &block)); + std::vector<size_t> char_type_idx; + for (size_t i = 0; i < desc.slots().size(); i++) { + auto column_desc = desc.slots()[i]; + auto type_desc = column_desc->type(); + do { + if (type_desc.type == TYPE_CHAR) { + char_type_idx.emplace_back(i); + break; + } else if (type_desc.type != TYPE_ARRAY) { + break; + } + // for Array<Char> or Array<Array<Char>> + type_desc = type_desc.children[0]; + } while (true); + } + // shrink char_type suffix zero data + block.shrink_char_type_column_suffix_zero(char_type_idx); + VLOG_DEBUG << "dump block:" << block.dump_data(0, 10) + << ", be_exec_version:" << request->be_exec_version(); + + [[maybe_unused]] size_t compressed_size = 0; + [[maybe_unused]] size_t uncompressed_size = 0; + int be_exec_version = request->has_be_exec_version() ? request->be_exec_version() : 0; + RETURN_IF_ERROR(block.serialize(be_exec_version, response->mutable_block(), &uncompressed_size, + &compressed_size, segment_v2::CompressionTypePB::LZ4)); + return Status::OK(); +} + +void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* controller, + const PMultiGetRequest* request, + PMultiGetResponse* response, + google::protobuf::Closure* done) { + // multi get data by rowid + MonotonicStopWatch watch; + watch.start(); + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + Status st = _multi_get(request, response); + st.to_protobuf(response->mutable_status()); + LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; +} + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 9b2a4db254..a591fbd65c 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -151,6 +151,8 @@ public: const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) override; + void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request, + PMultiGetResponse* response, google::protobuf::Closure* done) override; private: Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, @@ -176,6 +178,7 @@ private: void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, int64_t txn_id, int64_t tablet_id, int64_t node_id, bool is_succeed); + Status _multi_get(const PMultiGetRequest* request, PMultiGetResponse* response); private: ExecEnv* _exec_env; diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index 0ab1b81795..18bddaaf7a 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -45,6 +45,9 @@ Status HeapSorter::append_block(Block* block) { int i = 0; const auto& convert_nullable_flags = _vsort_exec_exprs.get_convert_nullable_flags(); for (auto column_id : valid_column_ids) { + if (column_id < 0) { + continue; + } if (convert_nullable_flags[i]) { auto column_ptr = make_nullable(block->get_by_position(column_id).column); new_block.insert({column_ptr, diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 9bc0a1034c..225deffa2b 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -247,6 +247,9 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) { int i = 0; const auto& convert_nullable_flags = _vsort_exec_exprs.get_convert_nullable_flags(); for (auto column_id : valid_column_ids) { + if (column_id < 0) { + continue; + } if (convert_nullable_flags[i]) { auto column_ptr = make_nullable(src_block.get_by_position(column_id).column); new_block.insert( diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 697453fe5a..486e336e57 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -18,6 +18,7 @@ #pragma once #include <queue> +#include "common/consts.h" #include "common/status.h" #include "vec/common/sort/vsort_exec_exprs.h" #include "vec/core/block.h" @@ -34,7 +35,11 @@ class MergeSorterState { public: MergeSorterState(const RowDescriptor& row_desc, int64_t offset, int64_t limit, RuntimeState* state, RuntimeProfile* profile) - : unsorted_block_(new Block(VectorizedUtils::create_empty_block(row_desc))), + // create_empty_block should ignore invalid slots, unsorted_block + // should be same structure with arrival block from child node + // since block from child node may ignored these slots + : unsorted_block_(new Block( + VectorizedUtils::create_empty_block(row_desc, true /*ignore invalid slot*/))), offset_(offset), limit_(limit), profile_(profile) { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index ebe46fdd0c..a23ecb2a75 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -54,8 +54,12 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data {data_} { initialize_index_by_name(); } -Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { +Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size, + bool ignore_trivial_slot) { for (const auto slot_desc : slots) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } auto column_ptr = slot_desc->get_empty_mutable_column(); column_ptr->reserve(block_size); insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), @@ -919,9 +923,13 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor } } -MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int reserve_size) { +MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int reserve_size, + bool ignore_trivial_slot) { for (auto tuple_desc : tuple_descs) { for (auto slot_desc : tuple_desc->slots()) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } _data_types.emplace_back(slot_desc->get_data_type_ptr()); _columns.emplace_back(_data_types.back()->create_column()); if (reserve_size != 0) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 220a0ff361..4ebc018b5b 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -74,7 +74,8 @@ public: Block(std::initializer_list<ColumnWithTypeAndName> il); Block(const ColumnsWithTypeAndName& data_); Block(const PBlock& pblock); - Block(const std::vector<SlotDescriptor*>& slots, size_t block_size); + Block(const std::vector<SlotDescriptor*>& slots, size_t block_size, + bool ignore_trivial_slot = false); /// insert the column at the specified position void insert(size_t position, const ColumnWithTypeAndName& elem); @@ -391,7 +392,8 @@ public: MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int reserve_size = 0); + MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int reserve_size = 0, + bool igore_trivial_slot = false); MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()) {} diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 049c5e2882..31991e3999 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -91,6 +91,22 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, _tablet_schema->append_column(TabletColumn(column_desc)); } } + + { + if (_output_tuple_desc->slots().back()->col_name() == BeConsts::ROWID_COL) { + // inject ROWID_COL + TabletColumn rowid_column; + rowid_column.set_is_nullable(false); + rowid_column.set_name(BeConsts::ROWID_COL); + // avoid column reader init error + rowid_column.set_has_default_value(true); + // fake unique id + rowid_column.set_unique_id(INT32_MAX); + rowid_column.set_type(FieldType::OLAP_FIELD_TYPE_STRING); + _tablet_schema->append_column(rowid_column); + } + } + { std::shared_lock rdlock(_tablet->get_header_lock()); const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); @@ -333,7 +349,9 @@ Status NewOlapScanner::_init_return_columns() { if (!slot->is_materialized()) { continue; } - + if (!slot->need_materialize()) { + continue; + } int32_t index = slot->col_unique_id() >= 0 ? _tablet_schema->field_index(slot->col_unique_id()) : _tablet_schema->field_index(slot->col_name()); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 29b379ecfe..6ce31490d0 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -55,7 +55,8 @@ Status ScannerContext::init() { // So use _output_tuple_desc; int64_t free_blocks_memory_usage = 0; for (int i = 0; i < pre_alloc_block_count; ++i) { - auto block = new vectorized::Block(_output_tuple_desc->slots(), real_block_size); + auto block = new vectorized::Block(_output_tuple_desc->slots(), real_block_size, + true /*ignore invalid slots*/); free_blocks_memory_usage += block->allocated_bytes(); _free_blocks.emplace_back(block); } @@ -93,7 +94,8 @@ vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) { *get_free_block = false; COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1); - return new vectorized::Block(_real_tuple_desc->slots(), _state->batch_size()); + return new vectorized::Block(_real_tuple_desc->slots(), _state->batch_size(), + true /*ignore invalid slots*/); } void ScannerContext::return_free_block(vectorized::Block* block) { diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index a718687c6b..7a956ecdb5 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -40,6 +40,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; if (!block->mem_reuse()) { for (const auto slot_desc : _output_tuple_desc->slots()) { + if (!slot_desc->need_materialize()) { + // should be ignore from reading + continue; + } block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); @@ -80,8 +84,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { Status VScanner::_filter_output_block(Block* block) { auto old_rows = block->rows(); - Status st = - VExprContext::filter_block(_vconjunct_ctx, block, _output_tuple_desc->slots().size()); + Status st = VExprContext::filter_block(_vconjunct_ctx, block, block->columns()); _counter.num_rows_unselected += old_rows - block->rows(); return st; } diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 069caaf635..d31da2418c 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -17,12 +17,15 @@ #include "vec/exec/vexchange_node.h" +#include "common/consts.h" +#include "exec/rowid_fetcher.h" #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/pipeline.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "util/defer_op.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/runtime/vdata_stream_recvr.h" @@ -45,10 +48,15 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { if (!_is_merging) { return Status::OK(); } - RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.exchange_node.sort_info, _pool)); _is_asc_order = tnode.exchange_node.sort_info.is_asc_order; _nulls_first = tnode.exchange_node.sort_info.nulls_first; + + if (tnode.exchange_node.__isset.nodes_info) { + _nodes_info = _pool->add(new DorisNodesInfo(tnode.exchange_node.nodes_info)); + } + _use_two_phase_read = tnode.exchange_node.sort_info.__isset.use_two_phase_read && + tnode.exchange_node.sort_info.use_two_phase_read; return Status::OK(); } @@ -87,6 +95,19 @@ Status VExchangeNode::open(RuntimeState* state) { return Status::OK(); } +Status VExchangeNode::_second_phase_fetch_data(RuntimeState* state, Block* final_block) { + auto row_id_col = final_block->get_by_position(final_block->columns() - 1); + auto tuple_desc = _row_descriptor.tuple_descriptors()[0]; + RowIDFetcher id_fetcher(tuple_desc, state); + RETURN_IF_ERROR(id_fetcher.init(_nodes_info)); + MutableBlock materialized_block(_row_descriptor.tuple_descriptors(), final_block->rows()); + // fetch will sort block by sequence of ROWID_COL + RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, &materialized_block)); + // Notice swap may change the structure of final_block + final_block->swap(materialized_block.to_block()); + return Status::OK(); +} + Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next"); SCOPED_TIMER(runtime_profile()->total_time_counter()); @@ -97,6 +118,12 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { _is_ready = true; return Status::OK(); } + if (_use_two_phase_read) { + // Block structure may be changed by calling _second_phase_fetch_data() before. + // So we should clear block before _stream_recvr->get_next, since + // blocks in VSortedRunMerger may not compatible with this block. + block->clear(); + } auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (!_is_merging) { @@ -119,6 +146,9 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { } COUNTER_SET(_rows_returned_counter, _num_rows_returned); } + if (_use_two_phase_read && block->rows() > 0) { + RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); + } return status; } diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index 2c63e03a5c..1d2738fda0 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -20,6 +20,8 @@ #include <memory> #include "exec/exec_node.h" +#include "exec/tablet_info.h" // DorisNodesInfo +#include "runtime/descriptors.h" #include "vec/common/sort/vsort_exec_exprs.h" namespace doris { @@ -47,6 +49,9 @@ public: // Status collect_query_statistics(QueryStatistics* statistics) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } + // final materializtion, used only in topn node + Status _second_phase_fetch_data(RuntimeState* state, Block* final_block); + private: int _num_senders; bool _is_merging; @@ -61,6 +66,10 @@ private: VSortExecExprs _vsort_exec_exprs; std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; + + // for fetch data by rowids + DorisNodesInfo* _nodes_info = nullptr; + bool _use_two_phase_read = false; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index fd7c88af75..3744c349c6 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -82,7 +82,6 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { } _sorter->init_profile(_runtime_profile.get()); - return Status::OK(); } @@ -127,7 +126,6 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool old_top = std::move(new_top); } } - if (!_reuse_mem) { input_block->clear(); } diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp index 18871ac773..2ba9c0b526 100644 --- a/be/src/vec/exprs/vslot_ref.cpp +++ b/be/src/vec/exprs/vslot_ref.cpp @@ -50,12 +50,17 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor& if (slot_desc == nullptr) { return Status::InternalError("couldn't resolve slot descriptor {}", _slot_id); } + _column_name = &slot_desc->col_name(); + if (!slot_desc->need_materialize()) { + // slot should be ignored manually + _column_id = -1; + return Status::OK(); + } _column_id = desc.get_column_id(_slot_id); if (_column_id < 0) { LOG(INFO) << "VSlotRef - invalid slot id: " << _slot_id << " desc:" << desc.debug_string(); return Status::InternalError("VSlotRef - invalid slot id {}", _slot_id); } - _column_name = &slot_desc->col_name(); return Status::OK(); } diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index da8e3d19af..80574d0ae5 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -34,10 +34,14 @@ public: return create_columns_with_type_and_name(row_desc); } - static ColumnsWithTypeAndName create_columns_with_type_and_name(const RowDescriptor& row_desc) { + static ColumnsWithTypeAndName create_columns_with_type_and_name( + const RowDescriptor& row_desc, bool ignore_trivial_slot = false) { ColumnsWithTypeAndName columns_with_type_and_name; for (const auto& tuple_desc : row_desc.tuple_descriptors()) { for (const auto& slot_desc : tuple_desc->slots()) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } columns_with_type_and_name.emplace_back(nullptr, slot_desc->get_data_type_ptr(), slot_desc->col_name()); } @@ -45,10 +49,14 @@ public: return columns_with_type_and_name; } - static ColumnsWithTypeAndName create_empty_block(const RowDescriptor& row_desc) { + static ColumnsWithTypeAndName create_empty_block(const RowDescriptor& row_desc, + bool ignore_trivial_slot = false) { ColumnsWithTypeAndName columns_with_type_and_name; for (const auto& tuple_desc : row_desc.tuple_descriptors()) { for (const auto& slot_desc : tuple_desc->slots()) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } columns_with_type_and_name.emplace_back( slot_desc->get_data_type_ptr()->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index fa6d78c8ed..d9472377a3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1955,5 +1955,8 @@ public class Config extends ConfigBase { */ @ConfField(masterOnly = true) public static int hms_events_polling_interval_ms = 10000; + + @ConfField(mutable = false) + public static int topn_two_phase_limit_threshold = 512; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 151bb22717..c9884614f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -551,6 +551,10 @@ public class Analyzer { isInlineView = inlineView; } + public boolean isInlineViewAnalyzer() { + return isInlineView; + } + public void setExplicitViewAlias(String alias) { explicitViewAlias = alias; } @@ -997,6 +1001,9 @@ public class Analyzer { result.setStats(srcSlotDesc.getStats()); result.setType(srcSlotDesc.getType()); result.setIsNullable(srcSlotDesc.getIsNullable()); + if (srcSlotDesc.getColumn() != null) { + result.setColumn(srcSlotDesc.getColumn()); + } // result.setItemTupleDesc(srcSlotDesc.getItemTupleDesc()); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index f4915a4e58..724d2effbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; @@ -557,7 +558,6 @@ public class SelectStmt extends QueryStmt { "cannot combine SELECT DISTINCT with analytic functions"); } } - whereClauseRewrite(); if (whereClause != null) { if (checkGroupingFn(whereClause)) { @@ -576,7 +576,6 @@ public class SelectStmt extends QueryStmt { } analyzer.registerConjuncts(whereClause, false, getTableRefIds()); } - createSortInfo(analyzer); if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) { if (groupingInfo != null) { @@ -591,6 +590,33 @@ public class SelectStmt extends QueryStmt { analyzeAggregation(analyzer); createAnalyticInfo(analyzer); eliminatingSortNode(); + if (checkEnableTwoPhaseRead(analyzer)) { + // If optimize enabled, we try our best to read less columns from ScanNode, + // here we analyze conjunct exprs and ordering exprs before resultExprs, + // rest of resultExprs will be marked as `INVALID`, such columns will + // be prevent from reading from ScanNode.Those columns will be finally + // read by the second fetch phase + LOG.debug("two phase read optimize enabled"); + // Expr.analyze(resultExprs, analyzer); + Set<SlotRef> resultSlots = Sets.newHashSet(); + Set<SlotRef> orderingSlots = Sets.newHashSet(); + Set<SlotRef> conjuntSlots = Sets.newHashSet(); + TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), resultSlots); + TreeNode.collect(sortInfo.getOrderingExprs(), Predicates.instanceOf(SlotRef.class), orderingSlots); + if (whereClause != null) { + whereClause.collect(SlotRef.class, conjuntSlots); + } + resultSlots.removeAll(orderingSlots); + resultSlots.removeAll(conjuntSlots); + // reset slots need to do fetch column + for (SlotRef slot : resultSlots) { + // invalid slots will be pruned from reading from ScanNode + slot.setInvalid(); + } + LOG.debug("resultsSlots {}", resultSlots); + LOG.debug("orderingSlots {}", orderingSlots); + LOG.debug("conjuntSlots {}", conjuntSlots); + } if (evaluateOrderBy) { createSortTupleInfo(analyzer); } @@ -615,6 +641,72 @@ public class SelectStmt extends QueryStmt { } } + // Check whether enable two phase read optimize, if enabled query will be devieded into two phase read: + // 1. read conjuncts columns and order by columns along with an extra RowId column from ScanNode + // 2. sort and filter data, and get final RowId column, spawn RPC to other BE to fetch final data + // 3. final matrialize all data + public boolean checkEnableTwoPhaseRead(Analyzer analyzer) { + // only vectorized mode and session opt variable enabled + if (ConnectContext.get() == null + || ConnectContext.get().getSessionVariable() == null + || !ConnectContext.get().getSessionVariable().enableVectorizedEngine + || !ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) { + return false; + } + if (!evaluateOrderBy) { + // Need evaluate orderby, if sort node was eliminated then this optmization + // could be useless + return false; + } + // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...` query + if (getAggInfo() != null + || getHavingPred() != null + || getWithClause() != null) { + return false; + } + if (!analyzer.isRootAnalyzer()) { + // ensure no sub query + return false; + } + // If select stmt has inline view or this is an inline view query stmt analyze call + if (hasInlineView() || analyzer.isInlineViewAnalyzer()) { + return false; + } + // single olap table + List<TableRef> tblRefs = getTableRefs(); + if (tblRefs.size() != 1 || !(tblRefs.get(0) instanceof BaseTableRef)) { + return false; + } + TableRef tbl = tblRefs.get(0); + if (tbl.getTable().getType() != Table.TableType.OLAP) { + return false; + } + LOG.debug("table ref {}", tbl); + // Need enable light schema change, since opt rely on + // column_unique_id of each slot + OlapTable olapTable = (OlapTable) tbl.getTable(); + if (!olapTable.getEnableLightSchemaChange()) { + return false; + } + // Only TOPN query at present + if (getOrderByElements() == null + || !hasLimit() + || getLimit() == 0 + || getLimit() > ConnectContext.get().getSessionVariable().twoPhaseReadLimitThreshold) { + return false; + } + // Check order by exprs are all slot refs + // Rethink? implement more generic to support all exprs + LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs()); + LOG.debug("getOrderByElements {}", getOrderByElements()); + for (OrderByElement orderby : getOrderByElements()) { + if (!(orderby.getExpr() instanceof SlotRef)) { + return false; + } + } + return true; + } + public List<TupleId> getTableRefIds() { List<TupleId> result = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index e4682a4586..79026840b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -69,6 +69,9 @@ public class SlotDescriptor { private boolean isMultiRef; // used for load to get more information of varchar and decimal private Type originType; + // If set to false, then such slots will be ignored during + // materialize them.Used to optmize to read less data and less memory usage + private boolean needMaterialize = true; public SlotDescriptor(SlotId id, TupleDescriptor parent) { this.id = id; @@ -108,6 +111,14 @@ public class SlotDescriptor { return isAgg; } + public void setInvalid() { + this.needMaterialize = false; + } + + public boolean isInvalid() { + return !this.needMaterialize; + } + public void setIsAgg(boolean agg) { isAgg = agg; } @@ -255,6 +266,12 @@ public class SlotDescriptor { return sourceExprs; } + public int getUniqueId() { + if (column == null) { + return -1; + } + return column.getUniqueId(); + } /** * Initializes a slot by setting its source expression information @@ -301,10 +318,11 @@ public class SlotDescriptor { TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), (originType != null ? originType.toThrift() : type.toThrift()), -1, byteOffset, nullIndicatorByte, nullIndicatorBit, ((column != null) ? column.getName() : ""), slotIdx, isMaterialized); - + tSlotDescriptor.setNeedMaterialize(needMaterialize); if (column != null) { LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId()); tSlotDescriptor.setColUniqueId(column.getUniqueId()); + tSlotDescriptor.setIsKey(column.isKey()); } return tSlotDescriptor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 85afb1f98e..3d16e9af1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -122,6 +122,14 @@ public class SlotRef extends Expr { return desc.getId(); } + public void setInvalid() { + this.desc.setInvalid(); + } + + public boolean isInvalid() { + return this.desc.isInvalid(); + } + public Column getColumn() { if (desc == null) { return null; @@ -289,6 +297,7 @@ public class SlotRef extends Expr { protected void toThrift(TExprNode msg) { msg.node_type = TExprNodeType.SLOT_REF; msg.slot_ref = new TSlotRef(desc.getId().asInt(), desc.getParent().getId().asInt()); + msg.slot_ref.setColUniqueId(desc.getUniqueId()); msg.setOutputColumn(outputColumn); } @@ -437,6 +446,10 @@ public class SlotRef extends Expr { this.label = label; } + public boolean hasCol() { + return this.col != null; + } + public String getColumnName() { return col; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java index f090c2af80..7cdd2d0d0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java @@ -63,6 +63,7 @@ public class SortInfo { // Input expressions materialized into sortTupleDesc_. One expr per slot in // sortTupleDesc_. private List<Expr> sortTupleSlotExprs; + private boolean useTwoPhaseRead = false; public SortInfo(List<Expr> orderingExprs, List<Boolean> isAscOrder, List<Boolean> nullsFirstParams) { @@ -145,6 +146,14 @@ public class SortInfo { sortTupleDesc = tupleDesc; } + public void setUseTwoPhaseRead() { + useTwoPhaseRead = true; + } + + public boolean useTwoPhaseRead() { + return useTwoPhaseRead; + } + public TupleDescriptor getSortTupleDescriptor() { return sortTupleDesc; } @@ -258,6 +267,7 @@ public class SortInfo { // Update the tuple descriptor used to materialize the input of the sort. setMaterializedTupleInfo(sortTupleDesc, sortTupleExprs); + LOG.debug("sortTupleDesc {}", sortTupleDesc); return substOrderBy; } @@ -285,6 +295,11 @@ public class SortInfo { SlotDescriptor materializedDesc = analyzer.addSlotDescriptor(sortTupleDesc); materializedDesc.initFromExpr(origOrderingExpr); materializedDesc.setIsMaterialized(true); + SlotRef origSlotRef = origOrderingExpr.getSrcSlotRef(); + LOG.debug("origOrderingExpr {}", origOrderingExpr); + if (origSlotRef != null) { + materializedDesc.setColumn(origSlotRef.getColumn()); + } SlotRef materializedRef = new SlotRef(materializedDesc); substOrderBy.put(origOrderingExpr, materializedRef); materializedOrderingExprs.add(origOrderingExpr); @@ -301,6 +316,9 @@ public class SortInfo { Expr.treesToThrift(orderingExprs), isAscOrder, nullsFirstParams); + if (useTwoPhaseRead) { + sortInfo.setUseTwoPhaseRead(true); + } return sortInfo; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index adae901430..57fff2fb17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -58,6 +58,7 @@ public class Column implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(Column.class); public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__"; public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__"; + public static final String ROWID_COL = "__DORIS_ROWID_COL__"; private static final String COLUMN_ARRAY_CHILDREN = "item"; public static final int COLUMN_UNIQUE_ID_INIT_VALUE = -1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index d613fd9102..b1d54ec909 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -21,19 +21,22 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TNodeInfo; +import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TSortInfo; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; @@ -161,10 +164,10 @@ public class ExchangeNode extends PlanNode { msg.exchange_node.addToInputRowTuples(tid.asInt()); } if (mergeInfo != null) { - TSortInfo sortInfo = new TSortInfo( - Expr.treesToThrift(mergeInfo.getOrderingExprs()), - mergeInfo.getIsAscOrder(), mergeInfo.getNullsFirst()); - msg.exchange_node.setSortInfo(sortInfo); + msg.exchange_node.setSortInfo(mergeInfo.toThrift()); + if (mergeInfo.useTwoPhaseRead()) { + msg.exchange_node.setNodesInfo(createNodesInfo()); + } } msg.exchange_node.setOffset(offset); } @@ -187,4 +190,17 @@ public class ExchangeNode extends PlanNode { return prefix + "offset: " + offset + "\n"; } + /** + * Set the parameters used to fetch data by rowid column + * after init(). + */ + private TPaloNodesInfo createNodesInfo() { + TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) { + Backend backend = systemInfoService.getBackend(id); + nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); + } + return nodesInfo; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 1ecf8cc295..03088cdd18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -966,6 +966,9 @@ public class OlapScanNode extends ScanNode { sortInfo.getMaterializedOrderingExprs().forEach(expr -> { output.append(prefix).append(prefix).append(expr.toSql()).append("\n"); }); + if (sortInfo.useTwoPhaseRead()) { + output.append(prefix).append("OPT TWO PHASE\n"); + } } if (sortLimit != -1) { output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index a5472959b3..f54d0a2fdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -32,8 +32,10 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.qe.ConnectContext; @@ -239,6 +241,8 @@ public class OriginalPlanner extends Planner { } else { List<Expr> resExprs = Expr.substituteList(queryStmt.getResultExprs(), rootFragment.getPlanRoot().getOutputSmap(), analyzer, false); + LOG.debug("result Exprs {}", queryStmt.getResultExprs()); + LOG.debug("substitute result Exprs {}", resExprs); rootFragment.setOutputExprs(resExprs); } LOG.debug("finalize plan fragments"); @@ -259,6 +263,9 @@ public class OriginalPlanner extends Planner { isBlockQuery = false; LOG.debug("this isn't block query"); } + if (selectStmt.checkEnableTwoPhaseRead(analyzer)) { + injectRowIdColumnSlot(); + } } } @@ -334,6 +341,52 @@ public class OriginalPlanner extends Planner { topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId())); } + + private SlotDescriptor injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(tupleDesc); + LOG.debug("inject slot {}", slotDesc); + String name = Column.ROWID_COL; + Column col = new Column(name, Type.STRING, false, null, false, "", + "rowid column"); + slotDesc.setType(Type.STRING); + slotDesc.setColumn(col); + slotDesc.setIsNullable(false); + slotDesc.setIsMaterialized(true); + // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask + slotDesc.setNullIndicatorBit(-1); + slotDesc.setNullIndicatorByte(0); + return slotDesc; + } + + // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] order by column1 limit n + // in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes + // in the second phase, we have n rows, we do a fetch rpc to get all rowids date for the n rows + // and reconconstruct the final block + private void injectRowIdColumnSlot() { + for (PlanFragment fragment : fragments) { + PlanNode node = fragment.getPlanRoot(); + PlanNode parent = null; + // OlapScanNode is the last node. + // So, just get the last two node and check if they are SortNode and OlapScan. + while (node.getChildren().size() != 0) { + parent = node; + node = node.getChildren().get(0); + } + + if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) { + continue; + } + SortNode sortNode = (SortNode) parent; + OlapScanNode scanNode = (OlapScanNode) node; + SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); + injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor()); + SlotRef extSlot = new SlotRef(slot); + sortNode.getResolvedTupleExprs().add(extSlot); + sortNode.getSortInfo().setUseTwoPhaseRead(); + break; + } + } + /** * Push sort down to olap scan. */ @@ -354,6 +407,7 @@ public class OriginalPlanner extends Planner { } SortNode sortNode = (SortNode) parent; OlapScanNode scanNode = (OlapScanNode) node; + if (!scanNode.checkPushSort(sortNode)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index d64e34deb6..8e74a38bc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -466,9 +466,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { } } - - - public String getExplainString() { return getExplainString("", "", TExplainLevel.VERBOSE); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 57bd28fd49..a31205a975 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -142,6 +142,10 @@ public class SortNode extends PlanNode { this.useTopnOpt = useTopnOpt; } + public List<Expr> getResolvedTupleExprs() { + return resolvedTupleExprs; + } + @Override public void setCompactData(boolean on) { this.compactData = on; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 996b79d944..d1eca98bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -254,6 +254,9 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; + public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt"; + public static final String TWO_PHASE_READ_OPT_LIMIT_THRESHOLD = "two_phase_read_opt_limit_threshold"; + // session origin value public Map<Field, String> sessionOriginValue = new HashMap<Field, String>(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -659,6 +662,14 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD, checker = "checkExternalSortBytesThreshold") public long externalSortBytesThreshold = 0; + // Whether enable two phase read optimization + // 1. read related rowids along with necessary column data + // 2. spawn fetch RPC to other nodes to get related data by sorted rowids + @VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT) + public boolean enableTwoPhaseReadOpt = true; + @VariableMgr.VarAttr(name = TWO_PHASE_READ_OPT_LIMIT_THRESHOLD) + public long twoPhaseReadLimitThreshold = 512; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index acde58bbfa..4bda9216d8 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -34,6 +34,8 @@ message PSlotDescriptor { required string col_name = 8; required int32 slot_idx = 9; required bool is_materialized = 10; + required int32 col_unique_id = 11; + required bool is_key = 12; }; message PTupleDescriptor { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 50bdb1bb68..413fef6a01 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -538,6 +538,25 @@ message PFetchTableSchemaResult { optional int32 column_nums = 2; repeated string column_names = 3; repeated PTypeDesc column_types = 4; +} + +message PMultiGetRequest { + message RowId { + optional int64 tablet_id = 1; + optional string rowset_id = 2; + optional uint64 segment_id = 3; + optional uint64 ordinal_id = 4; + }; + repeated RowId rowids = 1; + optional PTupleDescriptor desc = 2; + repeated PSlotDescriptor slots = 3; + // for compability + optional int32 be_exec_version = 4; +}; + +message PMultiGetResponse { + optional PBlock block = 1; + optional PStatus status = 2; }; service PBackendService { @@ -572,5 +591,6 @@ service PBackendService { rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns (PTabletWriteSlaveResult); rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) returns (PTabletWriteSlaveDoneResult); rpc fetch_table_schema(PFetchTableSchemaRequest) returns (PFetchTableSchemaResult); + rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse); }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 1db9bd0537..2aa3be3f5f 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -51,6 +51,10 @@ struct TSlotDescriptor { 9: required i32 slotIdx 10: required bool isMaterialized 11: optional i32 col_unique_id = -1 + 12: optional bool is_key = false + // If set to false, then such slots will be ignored during + // materialize them.Used to optmize to read less data and less memory usage + 13: optional bool need_materialize = true } struct TTupleDescriptor { diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift index 230deb51ff..84dacd0663 100644 --- a/gensrc/thrift/Exprs.thrift +++ b/gensrc/thrift/Exprs.thrift @@ -143,6 +143,7 @@ struct TTupleIsNullPredicate { struct TSlotRef { 1: required Types.TSlotId slot_id 2: required Types.TTupleId tuple_id + 3: optional i32 col_unique_id } struct TStringLiteral { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5b72633c39..6eed8b46cd 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -543,6 +543,8 @@ struct TSortInfo { // Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap 5: optional list<bool> slot_exprs_nullability_changed_flags + // Indicates whether topn query using two phase read + 6: optional bool use_two_phase_read } enum TPushAggOp { @@ -891,6 +893,8 @@ struct TExchangeNode { 2: optional TSortInfo sort_info // This is tHe number of rows to skip before returning results 3: optional i64 offset + // Nodes in this cluster, used for second phase fetch + 4: optional Descriptors.TPaloNodesInfo nodes_info } struct TOlapRewriteNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org