This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e22f5891d2 [WIP](row store) two phase opt read row store (#18654) e22f5891d2 is described below commit e22f5891d2fba4a54cb421b065f8da5f00cc4286 Author: lihangyu <15605149...@163.com> AuthorDate: Tue May 16 13:21:58 2023 +0800 [WIP](row store) two phase opt read row store (#18654) --- be/src/exec/rowid_fetcher.cpp | 166 +++++++++++++++------ be/src/exec/rowid_fetcher.h | 27 +++- be/src/exec/tablet_info.h | 7 + be/src/olap/rowset/beta_rowset_writer.cpp | 1 - be/src/olap/rowset/segment_v2/segment_writer.cpp | 3 + be/src/olap/tablet.cpp | 5 +- be/src/olap/tablet.h | 2 +- be/src/olap/utils.h | 12 ++ be/src/runtime/descriptors.cpp | 4 +- be/src/runtime/descriptors.h | 2 +- be/src/service/internal_service.cpp | 125 ++++++++-------- be/src/service/internal_service.h | 2 +- be/src/service/point_query_executor.cpp | 8 +- be/src/vec/exec/vexchange_node.cpp | 27 ---- be/src/vec/exec/vexchange_node.h | 7 - be/src/vec/exprs/vexpr_context.h | 8 + be/src/vec/exprs/vslot_ref.cpp | 5 +- be/src/vec/jsonb/serialize.cpp | 4 + be/src/vec/sink/vresult_sink.cpp | 38 ++++- be/src/vec/sink/vresult_sink.h | 5 + .../org/apache/doris/analysis/CreateTableStmt.java | 6 +- .../java/org/apache/doris/analysis/SelectStmt.java | 63 +++++--- .../glue/translator/PhysicalPlanTranslator.java | 45 ++++++ .../org/apache/doris/planner/ExchangeNode.java | 22 --- .../org/apache/doris/planner/OriginalPlanner.java | 50 +++++-- .../java/org/apache/doris/planner/ResultSink.java | 16 ++ .../java/org/apache/doris/planner/SortNode.java | 2 +- .../org/apache/doris/system/SystemInfoService.java | 12 ++ gensrc/proto/internal_service.proto | 27 +++- gensrc/thrift/DataSinks.thrift | 11 +- gensrc/thrift/PlanNodes.thrift | 2 - .../scalar_types/sql/dup_key_2pr_q01.out | 8 + .../scalar_types/sql/dup_key_2pr_q02.out | 4 + .../scalar_types/sql/dup_key_2pr_q03.out | 4 + .../scalar_types/sql/dup_key_2pr_q04.out | 4 + .../suites/datatype_p0/scalar_types/load.groovy | 8 +- .../scalar_types/sql/dup_key_2pr_q01.sql | 1 + .../scalar_types/sql/dup_key_2pr_q02.sql | 1 + .../scalar_types/sql/dup_key_2pr_q03.sql | 1 + .../scalar_types/sql/dup_key_2pr_q04.sql | 1 + 40 files changed, 513 insertions(+), 233 deletions(-) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index 46909904f9..a90182bce9 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -18,22 +18,26 @@ #include "exec/rowid_fetcher.h" #include <brpc/callback.h> -#include <brpc/controller.h> #include <butil/endpoint.h> #include <fmt/format.h> +#include <gen_cpp/data.pb.h> #include <gen_cpp/internal_service.pb.h> +#include <gen_cpp/types.pb.h> #include <glog/logging.h> #include <stddef.h> #include <stdint.h> #include <algorithm> +#include <cstdint> #include <ostream> #include <string> #include <unordered_map> #include <utility> +#include <vector> #include "bthread/countdown_event.h" #include "common/config.h" +#include "common/consts.h" #include "exec/tablet_info.h" // DorisNodesInfo #include "olap/olap_common.h" #include "olap/utils.h" @@ -48,11 +52,15 @@ #include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" // Block +#include "vec/data_types/data_type_factory.hpp" +#include "vec/jsonb/serialize.h" namespace doris { -Status RowIDFetcher::init(DorisNodesInfo* nodes_info) { - for (auto [node_id, node_info] : nodes_info->nodes_info()) { +Status RowIDFetcher::init() { + DorisNodesInfo nodes_info; + nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info); + for (auto [node_id, node_info] : nodes_info.nodes_info()) { auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( node_info.host, node_info.brpc_port); if (!client) { @@ -65,29 +73,33 @@ Status RowIDFetcher::init(DorisNodesInfo* nodes_info) { return Status::OK(); } -static std::string format_rowid(const GlobalRowLoacation& location) { - return fmt::format("{} {} {} {}", location.tablet_id, - location.row_location.rowset_id.to_string(), - location.row_location.segment_id, location.row_location.row_id); -} - -PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_ids) { +PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_locs) const { PMultiGetRequest mget_req; - _tuple_desc->to_protobuf(mget_req.mutable_desc()); - for (auto slot : _tuple_desc->slots()) { + _fetch_option.desc->to_protobuf(mget_req.mutable_desc()); + for (SlotDescriptor* slot : _fetch_option.desc->slots()) { + // ignore rowid + if (slot->col_name() == BeConsts::ROWID_COL) { + continue; + } slot->to_protobuf(mget_req.add_slots()); } - for (size_t i = 0; i < row_ids.size(); ++i) { - PMultiGetRequest::RowId row_id; - StringRef row_id_rep = row_ids.get_data_at(i); + for (size_t i = 0; i < row_locs.size(); ++i) { + PRowLocation row_loc; + StringRef row_id_rep = row_locs.get_data_at(i); + // TODO: When transferring data between machines with different byte orders (endianness), + // not performing proper handling may lead to issues in parsing and exchanging the data. auto location = reinterpret_cast<const GlobalRowLoacation*>(row_id_rep.data); - row_id.set_tablet_id(location->tablet_id); - row_id.set_rowset_id(location->row_location.rowset_id.to_string()); - row_id.set_segment_id(location->row_location.segment_id); - row_id.set_ordinal_id(location->row_location.row_id); - *mget_req.add_rowids() = std::move(row_id); + row_loc.set_tablet_id(location->tablet_id); + row_loc.set_rowset_id(location->row_location.rowset_id.to_string()); + row_loc.set_segment_id(location->row_location.segment_id); + row_loc.set_ordinal_id(location->row_location.row_id); + *mget_req.add_row_locs() = std::move(row_loc); } - mget_req.set_be_exec_version(_st->be_exec_version()); + PUniqueId& query_id = *mget_req.mutable_query_id(); + query_id.set_hi(_fetch_option.runtime_state->query_id().hi); + query_id.set_lo(_fetch_option.runtime_state->query_id().lo); + mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version()); + mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store); return mget_req; } @@ -95,9 +107,12 @@ static void fetch_callback(bthread::CountdownEvent* counter) { Defer __defer([&] { counter->signal(); }); } -static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps, - const std::vector<brpc::Controller>& cntls, - vectorized::MutableBlock* output_block) { +Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, + const std::vector<PMultiGetResponse>& rsps, + const std::vector<brpc::Controller>& cntls, + vectorized::Block* output_block, + std::vector<PRowLocation>* rows_id) const { + output_block->clear(); for (const auto& cntl : cntls) { if (cntl.Failed()) { LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText() @@ -105,25 +120,61 @@ static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps, return Status::InternalError(cntl.ErrorText()); } } - for (const auto& resp : rsps) { + + auto merge_function = [&](const PMultiGetResponse& resp) { Status st(resp.status()); if (!st.ok()) { LOG(WARNING) << "Failed to fetch " << st.to_string(); return st; } + for (const PRowLocation& row_id : resp.row_locs()) { + rows_id->push_back(row_id); + } + // Merge binary rows + if (request.fetch_row_store()) { + CHECK(resp.row_locs().size() == resp.binary_row_data_size()); + if (output_block->is_empty_column()) { + *output_block = vectorized::Block(_fetch_option.desc->slots(), 1); + } + for (int i = 0; i < resp.binary_row_data_size(); ++i) { + vectorized::JsonbSerializeUtil::jsonb_to_block( + *_fetch_option.desc, resp.binary_row_data(i).data(), + resp.binary_row_data(i).size(), *output_block); + } + return Status::OK(); + } + // Merge partial blocks vectorized::Block partial_block(resp.block()); - RETURN_IF_ERROR(output_block->merge(partial_block)); + CHECK(resp.row_locs().size() == partial_block.rows()); + if (output_block->is_empty_column()) { + output_block->swap(partial_block); + } else if (partial_block.columns() != output_block->columns()) { + return Status::Error<ErrorCode::INTERNAL_ERROR>( + "Merge block not match, self:[{}], input:[{}], ", output_block->dump_types(), + partial_block.dump_types()); + } else { + for (int i = 0; i < output_block->columns(); ++i) { + output_block->get_by_position(i).column->assume_mutable()->insert_range_from( + *partial_block.get_by_position(i) + .column->convert_to_full_column_if_const() + .get(), + 0, partial_block.rows()); + } + } + return Status::OK(); + }; + + for (const auto& resp : rsps) { + RETURN_IF_ERROR(merge_function(resp)); } return Status::OK(); } -Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids, - vectorized::MutableBlock* res_block) { +Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids, + vectorized::Block* res_block) { CHECK(!_stubs.empty()); - res_block->clear_column_data(); - vectorized::MutableBlock mblock({_tuple_desc}, row_ids->size()); PMultiGetRequest mget_req = _init_fetch_request(assert_cast<const vectorized::ColumnString&>( - *vectorized::remove_nullable(row_ids).get())); + *vectorized::remove_nullable(column_row_ids).get())); std::vector<PMultiGetResponse> resps(_stubs.size()); std::vector<brpc::Controller> cntls(_stubs.size()); bthread::CountdownEvent counter(_stubs.size()); @@ -133,20 +184,51 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids, _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback); } counter.wait(); - RETURN_IF_ERROR(MergeRPCResults(resps, cntls, &mblock)); - // final sort by row_ids sequence, since row_ids is already sorted - vectorized::Block tmp = mblock.to_block(); - std::unordered_map<std::string, uint32_t> row_order; - vectorized::ColumnPtr row_id_column = tmp.get_columns().back(); - for (size_t x = 0; x < row_id_column->size(); ++x) { + + // Merge + std::vector<PRowLocation> rows_locs; + rows_locs.reserve(rows_locs.size()); + RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, &rows_locs)); + + // Final sort by row_ids sequence, since row_ids is already sorted if need + std::map<GlobalRowLoacation, size_t> positions; + for (size_t i = 0; i < rows_locs.size(); ++i) { + RowsetId rowset_id; + rowset_id.init(rows_locs[i].rowset_id()); + GlobalRowLoacation grl(rows_locs[i].tablet_id(), rowset_id, rows_locs[i].segment_id(), + rows_locs[i].ordinal_id()); + positions[grl] = i; + }; + vectorized::IColumn::Permutation permutation; + permutation.reserve(column_row_ids->size()); + for (size_t i = 0; i < column_row_ids->size(); ++i) { auto location = - reinterpret_cast<const GlobalRowLoacation*>(row_id_column->get_data_at(x).data); - row_order[format_rowid(*location)] = x; + reinterpret_cast<const GlobalRowLoacation*>(column_row_ids->get_data_at(i).data); + permutation.push_back(positions[*location]); + } + size_t num_rows = res_block->rows(); + for (size_t i = 0; i < res_block->columns(); ++i) { + res_block->get_by_position(i).column = + res_block->get_by_position(i).column->permute(permutation, num_rows); } - for (size_t x = 0; x < row_ids->size(); ++x) { - auto location = reinterpret_cast<const GlobalRowLoacation*>(row_ids->get_data_at(x).data); - res_block->add_row(&tmp, row_order[format_rowid(*location)]); + // shrink for char type + std::vector<size_t> char_type_idx; + for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) { + auto column_desc = _fetch_option.desc->slots()[i]; + auto type_desc = column_desc->type(); + do { + if (type_desc.type == TYPE_CHAR) { + char_type_idx.emplace_back(i); + break; + } else if (type_desc.type != TYPE_ARRAY) { + break; + } + // for Array<Char> or Array<Array<Char>> + type_desc = type_desc.children[0]; + } while (true); } + res_block->shrink_char_type_column_suffix_zero(char_type_idx); + VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10); return Status::OK(); } diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h index 7f789b1c40..ae57c295a5 100644 --- a/be/src/exec/rowid_fetcher.h +++ b/be/src/exec/rowid_fetcher.h @@ -17,12 +17,16 @@ #pragma once +#include <brpc/controller.h> +#include <gen_cpp/DataSinks_types.h> #include <gen_cpp/internal_service.pb.h> #include <memory> #include <vector> #include "common/status.h" +#include "exec/tablet_info.h" // DorisNodesInfo +#include "vec/core/block.h" #include "vec/data_types/data_type.h" namespace doris { @@ -38,18 +42,29 @@ class MutableBlock; // fetch rows by global rowid // tablet_id/rowset_name/segment_id/ordinal_id + +struct FetchOption { + TupleDescriptor* desc = nullptr; + RuntimeState* runtime_state = nullptr; + TFetchOption t_fetch_opt; +}; + class RowIDFetcher { public: - RowIDFetcher(TupleDescriptor* desc, RuntimeState* st) : _tuple_desc(desc), _st(st) {} - Status init(DorisNodesInfo* nodes_info); - Status fetch(const vectorized::ColumnPtr& row_ids, vectorized::MutableBlock* block); + RowIDFetcher(const FetchOption& fetch_opt) : _fetch_option(fetch_opt) {} + Status init(); + Status fetch(const vectorized::ColumnPtr& row_ids, vectorized::Block* block); private: - PMultiGetRequest _init_fetch_request(const vectorized::ColumnString& row_ids); + PMultiGetRequest _init_fetch_request(const vectorized::ColumnString& row_ids) const; + Status _merge_rpc_results(const PMultiGetRequest& request, + const std::vector<PMultiGetResponse>& rsps, + const std::vector<brpc::Controller>& cntls, + vectorized::Block* output_block, + std::vector<PRowLocation>* rows_id) const; std::vector<std::shared_ptr<PBackendService_Stub>> _stubs; - TupleDescriptor* _tuple_desc; - RuntimeState* _st; + FetchOption _fetch_option; }; } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index b4146ba6e0..bcfc5541eb 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -263,11 +263,18 @@ struct NodeInfo { class DorisNodesInfo { public: + DorisNodesInfo() = default; DorisNodesInfo(const TPaloNodesInfo& t_nodes) { for (auto& node : t_nodes.nodes) { _nodes.emplace(node.id, node); } } + void setNodes(const TPaloNodesInfo& t_nodes) { + _nodes.clear(); + for (auto& node : t_nodes.nodes) { + _nodes.emplace(node.id, node); + } + } const NodeInfo* find_node(int64_t id) const { auto it = _nodes.find(id); if (it != std::end(_nodes)) { diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 60652c9d8c..5955e97d2c 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -421,7 +421,6 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block, max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes); DCHECK(max_row_add > 0); } - size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add)); auto s = (*segment_writer)->append_block(block, row_offset, input_row_num); if (UNLIKELY(!s.ok())) { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index ac8715fd2d..bcf685a046 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -330,6 +330,9 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) { break; } } + if (row_column_id == 0) { + return; + } vectorized::ColumnString* row_store_column = static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id) .column->assume_mutable_ref() diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 6a2fc1553d..cfb7a22871 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2477,7 +2477,7 @@ Status Tablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segi Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, RowsetSharedPtr input_rowset, const TupleDescriptor* desc, - OlapReaderStatistics& stats, vectorized::Block* block, + OlapReaderStatistics& stats, std::string& values, bool write_to_cache) { // read row data BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); @@ -2525,11 +2525,12 @@ Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_ RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column_ptr)); assert(column_ptr->size() == 1); auto string_column = static_cast<vectorized::ColumnString*>(column_ptr.get()); + StringRef value = string_column->get_data_at(0); + values = value.to_string(); if (write_to_cache) { StringRef value = string_column->get_data_at(0); RowCache::instance()->insert({tablet_id(), encoded_key}, Slice {value.data, value.size}); } - vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column, *block); return Status::OK(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e666dcb16d..4f7fc8dd9e 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -402,7 +402,7 @@ public: // Lookup a row with TupleDescriptor and fill Block Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, RowsetSharedPtr rowset, const TupleDescriptor* desc, - OlapReaderStatistics& stats, vectorized::Block* block, + OlapReaderStatistics& stats, std::string& values, bool write_to_cache = false); Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid, diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index f91b625db3..e4a167b987 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -301,6 +301,18 @@ struct GlobalRowLoacation { : tablet_id(tid), row_location(rsid, sid, rid) {} uint32_t tablet_id; RowLocation row_location; + + bool operator==(const GlobalRowLoacation& rhs) const { + return tablet_id == rhs.tablet_id && row_location == rhs.row_location; + } + + bool operator<(const GlobalRowLoacation& rhs) const { + if (tablet_id != rhs.tablet_id) { + return tablet_id < rhs.tablet_id; + } else { + return row_location < rhs.row_location; + } + } }; } // namespace doris diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index c13e2b53d5..f4cb3f6dff 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -517,11 +517,11 @@ std::string RowDescriptor::debug_string() const { return ss.str(); } -int RowDescriptor::get_column_id(int slot_id) const { +int RowDescriptor::get_column_id(int slot_id, bool force_materialize_slot) const { int column_id_counter = 0; for (const auto tuple_desc : _tuple_desc_map) { for (const auto slot : tuple_desc->slots()) { - if (!slot->need_materialize()) { + if (!force_materialize_slot && !slot->need_materialize()) { continue; } if (slot->id() == slot_id) { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 8fd6654e13..48ea79d879 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -494,7 +494,7 @@ public: std::string debug_string() const; - int get_column_id(int slot_id) const; + int get_column_id(int slot_id, bool force_materialize_slot = false) const; private: // Initializes tupleIdxMap during c'tor using the _tuple_desc_map. diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 80d02f6d58..c674223b8a 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -101,6 +101,7 @@ #include "util/time.h" #include "util/uid_util.h" #include "vec/columns/column.h" +#include "vec/columns/column_string.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" @@ -109,6 +110,7 @@ #include "vec/exec/format/json/new_json_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/jsonb/serialize.h" #include "vec/runtime/vdata_stream_mgr.h" namespace google { @@ -1414,19 +1416,29 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( } } -static Status read_by_rowids( - std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc, - const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>& rowids, - vectorized::Block* sub_block) { - //read from row_range.first to row_range.second - for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) { +Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, + PMultiGetResponse* response) { + OlapReaderStatistics stats; + vectorized::Block result_block; + + // init desc + TupleDescriptor desc(request.desc()); + std::vector<SlotDescriptor> slots; + slots.reserve(request.slots().size()); + for (const auto& pslot : request.slots()) { + slots.push_back(SlotDescriptor(pslot)); + desc.add_slot(&slots.back()); + } + + // read row by row + for (size_t i = 0; i < request.row_locs_size(); ++i) { + const auto& row_loc = request.row_locs(i); MonotonicStopWatch watch; watch.start(); - auto row_id = rowids[i]; TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - row_id.tablet_id(), true /*include deleted*/); + row_loc.tablet_id(), true /*include deleted*/); RowsetId rowset_id; - rowset_id.init(row_id.rowset_id()); + rowset_id.init(row_loc.rowset_id()); if (!tablet) { continue; } @@ -1436,6 +1448,13 @@ static Status read_by_rowids( LOG(INFO) << "no such rowset " << rowset_id; continue; } + size_t row_size = 0; + Defer _defer([&]() { + LOG_EVERY_N(INFO, 100) + << "multiget_data single_row, cost(us):" << watch.elapsed_time() / 1000 + << ", row_size:" << row_size; + *response->add_row_locs() = row_loc; + }); const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); VLOG_DEBUG << "get tablet schema column_num:" << tablet_schema->num_columns() << ", version:" << tablet_schema->schema_version() @@ -1445,18 +1464,34 @@ static Status read_by_rowids( // find segment auto it = std::find_if(segment_cache.get_segments().begin(), segment_cache.get_segments().end(), - [&row_id](const segment_v2::SegmentSharedPtr& seg) { - return seg->id() == row_id.segment_id(); + [&row_loc](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == row_loc.segment_id(); }); if (it == segment_cache.get_segments().end()) { continue; } segment_v2::SegmentSharedPtr segment = *it; - for (int x = 0; x < desc.slots().size() - 1; ++x) { + GlobalRowLoacation row_location(row_loc.tablet_id(), rowset->rowset_id(), + row_loc.segment_id(), row_loc.ordinal_id()); + // fetch by row store, more effcient way + if (request.fetch_row_store()) { + CHECK(tablet->tablet_schema()->store_row_column()); + RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id()); + string* value = response->add_binary_row_data(); + RETURN_IF_ERROR(tablet->lookup_row_data({}, loc, rowset, &desc, stats, *value)); + row_size = value->size(); + continue; + } + + // fetch by column store + if (result_block.is_empty_column()) { + result_block = vectorized::Block(desc.slots(), request.row_locs().size()); + } + for (int x = 0; x < desc.slots().size(); ++x) { int index = tablet_schema->field_index(desc.slots()[x]->col_unique_id()); segment_v2::ColumnIterator* column_iterator = nullptr; vectorized::MutableColumnPtr column = - sub_block->get_by_position(x).column->assume_mutable(); + result_block.get_by_position(x).column->assume_mutable(); if (index < 0) { column->insert_default(); continue; @@ -1471,58 +1506,22 @@ static Status read_by_rowids( opt.stats = &stats; opt.use_page_cache = !config::disable_storage_page_cache; column_iterator->init(opt); - std::vector<segment_v2::rowid_t> rowids { - static_cast<segment_v2::rowid_t>(row_id.ordinal_id())}; - RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column)); + std::vector<segment_v2::rowid_t> single_row_loc { + static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())}; + RETURN_IF_ERROR(column_iterator->read_by_rowids(single_row_loc.data(), 1, column)); } - LOG_EVERY_N(INFO, 100) << "multiget_data single_row, cost(us):" - << watch.elapsed_time() / 1000; - GlobalRowLoacation row_location(row_id.tablet_id(), rowset->rowset_id(), - row_id.segment_id(), row_id.ordinal_id()); - sub_block->get_columns().back()->assume_mutable()->insert_data( - reinterpret_cast<const char*>(&row_location), sizeof(GlobalRowLoacation)); - } - return Status::OK(); -} - -Status PInternalServiceImpl::_multi_get(const PMultiGetRequest* request, - PMultiGetResponse* response) { - TupleDescriptor desc(request->desc()); - std::vector<SlotDescriptor> slots; - slots.reserve(request->slots().size()); - for (const auto& pslot : request->slots()) { - slots.push_back(SlotDescriptor(pslot)); - desc.add_slot(&slots.back()); } - assert(desc.slots().back()->col_name() == BeConsts::ROWID_COL); - vectorized::Block block(desc.slots(), request->rowids().size()); - RETURN_IF_ERROR( - read_by_rowids(std::pair {0, request->rowids_size()}, desc, request->rowids(), &block)); - std::vector<size_t> char_type_idx; - for (size_t i = 0; i < desc.slots().size(); i++) { - auto column_desc = desc.slots()[i]; - auto type_desc = column_desc->type(); - do { - if (type_desc.type == TYPE_CHAR) { - char_type_idx.emplace_back(i); - break; - } else if (type_desc.type != TYPE_ARRAY) { - break; - } - // for Array<Char> or Array<Array<Char>> - type_desc = type_desc.children[0]; - } while (true); + // serialize block if not empty + if (!result_block.is_empty_column()) { + VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10) + << ", be_exec_version:" << request.be_exec_version(); + [[maybe_unused]] size_t compressed_size = 0; + [[maybe_unused]] size_t uncompressed_size = 0; + int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0; + RETURN_IF_ERROR(result_block.serialize(be_exec_version, response->mutable_block(), + &uncompressed_size, &compressed_size, + segment_v2::CompressionTypePB::LZ4)); } - // shrink char_type suffix zero data - block.shrink_char_type_column_suffix_zero(char_type_idx); - VLOG_DEBUG << "dump block:" << block.dump_data(0, 10) - << ", be_exec_version:" << request->be_exec_version(); - - [[maybe_unused]] size_t compressed_size = 0; - [[maybe_unused]] size_t uncompressed_size = 0; - int be_exec_version = request->has_be_exec_version() ? request->be_exec_version() : 0; - RETURN_IF_ERROR(block.serialize(be_exec_version, response->mutable_block(), &uncompressed_size, - &compressed_size, segment_v2::CompressionTypePB::LZ4)); return Status::OK(); } @@ -1536,7 +1535,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro watch.start(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); - Status st = _multi_get(request, response); + Status st = _multi_get(*request, response); st.to_protobuf(response->mutable_status()); LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; }); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index dae16604ad..8c6057d978 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -207,7 +207,7 @@ private: void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, int64_t txn_id, int64_t tablet_id, int64_t node_id, bool is_succeed); - Status _multi_get(const PMultiGetRequest* request, PMultiGetResponse* response); + Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse* response); void _get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, const PFetchColIdsRequest* request, diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 21d6ac06ac..ab2ae7519e 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -23,6 +23,8 @@ #include <gen_cpp/internal_service.pb.h> #include <stdlib.h> +#include <vector> + #include "olap/lru_cache.h" #include "olap/olap_tuple.h" #include "olap/row_cursor.h" @@ -284,11 +286,15 @@ Status PointQueryExecutor::_lookup_row_data() { if (!_row_read_ctxs[i]._row_location.has_value()) { continue; } + std::string value; RETURN_IF_ERROR(_tablet->lookup_row_data( _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(), *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(), - _profile_metrics.read_stats, _result_block.get(), + _profile_metrics.read_stats, value, !config::disable_storage_row_cache /*whether write row cache*/)); + // serilize value to block, currently only jsonb row formt + vectorized::JsonbSerializeUtil::jsonb_to_block(*_reusable->tuple_desc(), value.data(), + value.size(), *_result_block); } return Status::OK(); } diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index a00f9cd839..8535f4a261 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -57,11 +57,6 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { _is_asc_order = tnode.exchange_node.sort_info.is_asc_order; _nulls_first = tnode.exchange_node.sort_info.nulls_first; - if (tnode.exchange_node.__isset.nodes_info) { - _nodes_info = _pool->add(new DorisNodesInfo(tnode.exchange_node.nodes_info)); - } - _use_two_phase_read = tnode.exchange_node.sort_info.__isset.use_two_phase_read && - tnode.exchange_node.sort_info.use_two_phase_read; return Status::OK(); } @@ -100,19 +95,6 @@ Status VExchangeNode::open(RuntimeState* state) { return Status::OK(); } -Status VExchangeNode::_second_phase_fetch_data(RuntimeState* state, Block* final_block) { - auto row_id_col = final_block->get_by_position(final_block->columns() - 1); - auto tuple_desc = _row_descriptor.tuple_descriptors()[0]; - RowIDFetcher id_fetcher(tuple_desc, state); - RETURN_IF_ERROR(id_fetcher.init(_nodes_info)); - MutableBlock materialized_block(_row_descriptor.tuple_descriptors(), final_block->rows()); - // fetch will sort block by sequence of ROWID_COL - RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, &materialized_block)); - // Notice swap may change the structure of final_block - final_block->swap(materialized_block.to_block()); - return Status::OK(); -} - Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next"); SCOPED_TIMER(runtime_profile()->total_time_counter()); @@ -123,12 +105,6 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { _is_ready = true; return Status::OK(); } - if (_use_two_phase_read) { - // Block structure may be changed by calling _second_phase_fetch_data() before. - // So we should clear block before _stream_recvr->get_next, since - // blocks in VSortedRunMerger may not compatible with this block. - block->clear(); - } auto status = _stream_recvr->get_next(block, eos); // In vsortrunmerger, it will set eos=true, and block not empty // so that eos==true, could not make sure that block not have valid data @@ -153,9 +129,6 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { } COUNTER_SET(_rows_returned_counter, _num_rows_returned); } - if (_use_two_phase_read && block->rows() > 0) { - RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); - } return status; } diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index 46f5aa619c..58b61dc9af 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -61,9 +61,6 @@ public: // Status collect_query_statistics(QueryStatistics* statistics) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } - // final materializtion, used only in topn node - Status _second_phase_fetch_data(RuntimeState* state, Block* final_block); - private: int _num_senders; bool _is_merging; @@ -78,10 +75,6 @@ private: VSortExecExprs _vsort_exec_exprs; std::vector<bool> _is_asc_order; std::vector<bool> _nulls_first; - - // for fetch data by rowids - DorisNodesInfo* _nodes_info = nullptr; - bool _use_two_phase_read = false; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 5069a5a615..e00d34559b 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -90,6 +90,10 @@ public: void clone_fn_contexts(VExprContext* other); + bool force_materialize_slot() const { return _force_materialize_slot; } + + void set_force_materialize_slot() { _force_materialize_slot = true; } + private: friend class VExpr; @@ -112,5 +116,9 @@ private: /// The depth of expression-tree. int _depth_num = 0; + + // This flag only works on VSlotRef. + // Force to materialize even if the slot need_materialize is false, we just ignore need_materialize flag + bool _force_materialize_slot = false; }; } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp index 4b2e182563..5a34999acc 100644 --- a/be/src/vec/exprs/vslot_ref.cpp +++ b/be/src/vec/exprs/vslot_ref.cpp @@ -27,6 +27,7 @@ #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" +#include "vec/exprs/vexpr_context.h" namespace doris { namespace vectorized { @@ -59,12 +60,12 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor& state->desc_tbl().debug_string()); } _column_name = &slot_desc->col_name(); - if (!slot_desc->need_materialize()) { + if (!context->force_materialize_slot() && !slot_desc->need_materialize()) { // slot should be ignored manually _column_id = -1; return Status::OK(); } - _column_id = desc.get_column_id(_slot_id); + _column_id = desc.get_column_id(_slot_id, context->force_materialize_slot()); if (_column_id < 0) { return Status::Error<ErrorCode::INTERNAL_ERROR>( "VSlotRef {} have invalid slot id: {}, desc: {}, slot_desc: {}, desc_tbl: {}", diff --git a/be/src/vec/jsonb/serialize.cpp b/be/src/vec/jsonb/serialize.cpp index 69bc603b2f..c8b2eff0f4 100644 --- a/be/src/vec/jsonb/serialize.cpp +++ b/be/src/vec/jsonb/serialize.cpp @@ -25,6 +25,10 @@ #include "olap/tablet_schema.h" #include "runtime/descriptors.h" +#include "runtime/jsonb_value.h" +#include "runtime/primitive_type.h" +#include "runtime/types.h" +#include "util/bitmap_value.h" #include "util/jsonb_document.h" #include "util/jsonb_stream.h" #include "util/jsonb_writer.h" diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 15d41f1f02..d61dc13875 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -24,14 +24,19 @@ #include <new> #include "common/config.h" +#include "common/consts.h" #include "common/object_pool.h" +#include "exec/rowid_fetcher.h" +#include "gutil/port.h" #include "runtime/buffer_control_block.h" +#include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" #include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" #include "vec/sink/vmysql_result_writer.h" #include "vec/sink/vresult_writer.h" @@ -51,7 +56,7 @@ VResultSink::VResultSink(const RowDescriptor& row_desc, const std::vector<TExpr> } else { _sink_type = sink.type; } - + _fetch_option = sink.fetch_option; _name = "ResultSink"; } @@ -61,6 +66,12 @@ Status VResultSink::prepare_exprs(RuntimeState* state) { // From the thrift expressions create the real exprs. RETURN_IF_ERROR( VExpr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_vexpr_ctxs)); + if (_fetch_option.use_two_phase_fetch) { + for (VExprContext* expr_ctx : _output_vexpr_ctxs) { + // Must materialize if it a slot, or the slot column id will be -1 + expr_ctx->set_force_materialize_slot(); + } + } // Prepare the exprs to run. RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return Status::OK(); @@ -100,9 +111,32 @@ Status VResultSink::open(RuntimeState* state) { return VExpr::open(_output_vexpr_ctxs, state); } +Status VResultSink::second_phase_fetch_data(RuntimeState* state, Block* final_block) { + auto row_id_col = final_block->get_by_position(final_block->columns() - 1); + CHECK(row_id_col.name == BeConsts::ROWID_COL); + auto tuple_desc = _row_desc.tuple_descriptors()[0]; + FetchOption fetch_option; + fetch_option.desc = tuple_desc; + fetch_option.t_fetch_opt = _fetch_option; + fetch_option.runtime_state = state; + RowIDFetcher id_fetcher(fetch_option); + RETURN_IF_ERROR(id_fetcher.init()); + RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, final_block)); + return Status::OK(); +} + Status VResultSink::send(RuntimeState* state, Block* block, bool eos) { INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VResultSink::send"); - return _writer->append_block(*block); + if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { + RETURN_IF_ERROR(second_phase_fetch_data(state, block)); + } + RETURN_IF_ERROR(_writer->append_block(*block)); + if (_fetch_option.use_two_phase_fetch) { + // Block structure may be changed by calling _second_phase_fetch_data(). + // So we should clear block in case of unmatched columns + block->clear(); + } + return Status::OK(); } Status VResultSink::close(RuntimeState* state, Status exec_status) { diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h index adf5dd249d..97916544bf 100644 --- a/be/src/vec/sink/vresult_sink.h +++ b/be/src/vec/sink/vresult_sink.h @@ -28,6 +28,7 @@ #include "common/status.h" #include "exec/data_sink.h" +#include "vec/sink/vresult_writer.h" namespace doris { class RuntimeState; @@ -141,6 +142,7 @@ public: private: Status prepare_exprs(RuntimeState* state); + Status second_phase_fetch_data(RuntimeState* state, Block* final_block); TResultSinkType::type _sink_type; // set file options when sink type is FILE std::unique_ptr<ResultFileOptions> _file_opts; @@ -156,6 +158,9 @@ private: std::shared_ptr<VResultWriter> _writer; RuntimeProfile* _profile; // Allocated from _pool int _buf_size; // Allocated from _pool + + // for fetch data by rowids + TFetchOption _fetch_option; }; } // namespace vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index d7a47f9279..18db7b5847 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -331,7 +331,6 @@ public class CreateTableStmt extends DdlStmt { } boolean enableUniqueKeyMergeOnWrite = false; - boolean enableStoreRowColumn = false; // analyze key desc if (engineName.equalsIgnoreCase("olap")) { // olap table @@ -401,7 +400,6 @@ public class CreateTableStmt extends DdlStmt { throw new AnalysisException( PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " property only support unique key table"); } - if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) { enableUniqueKeyMergeOnWrite = true; if (properties != null) { @@ -409,7 +407,6 @@ public class CreateTableStmt extends DdlStmt { // so we just clone a properties map here. enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite( new HashMap<>(properties)); - enableStoreRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties)); } } @@ -455,7 +452,8 @@ public class CreateTableStmt extends DdlStmt { columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE)); } } - if (enableStoreRowColumn) { + // add a hidden column as row store + if (properties != null && PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties))) { columnDefs.add(ColumnDef.newRowStoreColumnDef()); } if (Config.enable_hidden_version_column_by_default && keysDesc != null diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index c8bc8ddf43..0a9bd5a50e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -658,7 +658,10 @@ public class SelectStmt extends QueryStmt { Set<SlotRef> orderingSlots = Sets.newHashSet(); Set<SlotRef> conjuntSlots = Sets.newHashSet(); TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), resultSlots); - TreeNode.collect(sortInfo.getOrderingExprs(), Predicates.instanceOf(SlotRef.class), orderingSlots); + if (sortInfo != null) { + TreeNode.collect(sortInfo.getOrderingExprs(), + Predicates.instanceOf(SlotRef.class), orderingSlots); + } if (whereClause != null) { whereClause.collect(SlotRef.class, conjuntSlots); } @@ -714,20 +717,23 @@ public class SelectStmt extends QueryStmt { || !ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) { return false; } - if (!evaluateOrderBy) { - // Need evaluate orderby, if sort node was eliminated then this optmization - // could be useless - return false; - } - // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...` query + // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... [ORDER BY ...] [LIMIT ...]` query if (getAggInfo() != null || getHavingPred() != null || getWithClause() != null || getAnalyticInfo() != null) { return false; } + // ignore short circuit query + if (isPointQueryShortCircuit()) { + return false; + } + // ignore insert into select + if (fromInsert) { + return false; + } + // ensure no sub query if (!analyzer.isRootAnalyzer()) { - // ensure no sub query return false; } // If select stmt has inline view or this is an inline view query stmt analyze call @@ -750,24 +756,35 @@ public class SelectStmt extends QueryStmt { if (!olapTable.getEnableLightSchemaChange()) { return false; } - // Only TOPN query at present - if (getOrderByElements() == null - || !hasLimit() - || getLimit() <= 0 - || getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) { - return false; - } - // Check order by exprs are all slot refs - // Rethink? implement more generic to support all exprs - LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs()); - LOG.debug("getOrderByElements {}", getOrderByElements()); - for (Expr sortExpr : sortInfo.getOrderingExprs()) { - if (!(sortExpr instanceof SlotRef)) { + if (getOrderByElements() != null) { + if (!evaluateOrderBy) { + // Need evaluate orderby, if sort node was eliminated then this optmization + // could be useless return false; } + // case1: general topn query, like: select * from tbl where xxx order by yyy limit n + if (!hasLimit() + || getLimit() <= 0 + || getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) { + return false; + } + // Check order by exprs are all slot refs + // Rethink? implement more generic to support all exprs + LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs()); + LOG.debug("getOrderByElements {}", getOrderByElements()); + for (Expr sortExpr : sortInfo.getOrderingExprs()) { + if (!(sortExpr instanceof SlotRef)) { + return false; + } + } + isTwoPhaseOptEnabled = true; + return true; + } else { + // case2: optimize scan utilize row store column, query like select * from tbl where xxx [limit xxx] + // TODO: we only optimize query with select * at present + return olapTable.storeRowColumn() && selectList.getItems().stream().anyMatch(e -> e.isStar()); } - isTwoPhaseOptEnabled = true; - return true; + // return false; } public List<TupleId> getTableRefIds() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index b7214f7d9d..e0340ba20b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -38,6 +38,7 @@ import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Function.NullableMode; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; @@ -134,6 +135,7 @@ import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.RepeatNode; +import org.apache.doris.planner.ResultSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SchemaScanNode; import org.apache.doris.planner.SelectNode; @@ -146,6 +148,7 @@ import org.apache.doris.planner.external.HudiScanNode; import org.apache.doris.planner.external.iceberg.IcebergScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.tablefunction.TableValuedFunctionIf; +import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; @@ -200,6 +203,47 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla this.statsErrorEstimator = statsErrorEstimator; } + // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n] + // in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes + // in the second phase, we have n rows, we do a fetch rpc to get all rowids data for the n rows + // and reconconstruct the final block + private void setResultSinkFetchOptionIfNeed() { + boolean needFetch = false; + // Only single olap table should be fetched + OlapTable fetchOlapTable = null; + for (PlanFragment fragment : context.getPlanFragments()) { + PlanNode node = fragment.getPlanRoot(); + PlanNode parent = null; + // OlapScanNode is the last node. + // So, just get the last two node and check if they are SortNode and OlapScan. + while (node.getChildren().size() != 0) { + parent = node; + node = node.getChildren().get(0); + } + + // case1: general topn optimized query + if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) { + SortNode sortNode = (SortNode) parent; + OlapScanNode scanNode = (OlapScanNode) node; + if (sortNode.getUseTwoPhaseReadOpt()) { + needFetch = true; + fetchOlapTable = scanNode.getOlapTable(); + break; + } + } + } + for (PlanFragment fragment : context.getPlanFragments()) { + if (needFetch && fragment.getSink() instanceof ResultSink) { + TFetchOption fetchOption = new TFetchOption(); + fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn()); + fetchOption.setUseTwoPhaseFetch(true); + fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo()); + ((ResultSink) fragment.getSink()).setFetchOption(fetchOption); + break; + } + } + } + /** * Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree. * @@ -236,6 +280,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla for (PlanFragment fragment : context.getPlanFragments()) { fragment.finalize(null); } + setResultSinkFetchOptionIfNeed(); Collections.reverse(context.getPlanFragments()); context.getDescTable().computeMemLayout(); return rootFragment; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index c0cddcc242..ab82dad209 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -24,17 +24,12 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; -import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; -import org.apache.doris.system.Backend; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TNodeInfo; -import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -150,9 +145,6 @@ public class ExchangeNode extends PlanNode { } if (mergeInfo != null) { msg.exchange_node.setSortInfo(mergeInfo.toThrift()); - if (mergeInfo.useTwoPhaseRead()) { - msg.exchange_node.setNodesInfo(createNodesInfo()); - } } msg.exchange_node.setOffset(offset); } @@ -174,18 +166,4 @@ public class ExchangeNode extends PlanNode { public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return prefix + "offset: " + offset + "\n"; } - - /** - * Set the parameters used to fetch data by rowid column - * after init(). - */ - private TPaloNodesInfo createNodesInfo() { - TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); - SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); - for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) { - Backend backend = systemInfoService.getBackend(id); - nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); - } - return nodesInfo; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 49058a021c..42e72d6c84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -35,6 +35,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; @@ -51,6 +52,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.statistics.query.StatsDelta; +import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TRuntimeFilterMode; @@ -305,6 +307,10 @@ public class OriginalPlanner extends Planner { // Double check this plan to ensure it's a general topn query injectRowIdColumnSlot(); ((SortNode) singleNodePlan).setUseTwoPhaseReadOpt(true); + } else if (singleNodePlan instanceof OlapScanNode && singleNodePlan.getChildren().size() == 0) { + // Optimize query like `SELECT ... FROM <tbl> WHERE ... LIMIT ...`. + // This typically used when row store enabled, to reduce scan cost + injectRowIdColumnSlot(); } else { // This is not a general topn query, rollback needMaterialize flag for (SlotDescriptor slot : analyzer.getDescTbl().getSlotDescs().values()) { @@ -463,11 +469,13 @@ public class OriginalPlanner extends Planner { return slotDesc; } - // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] order by column1 limit n + // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n] // in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes // in the second phase, we have n rows, we do a fetch rpc to get all rowids date for the n rows // and reconconstruct the final block private void injectRowIdColumnSlot() { + boolean injected = false; + OlapTable olapTable = null; for (PlanFragment fragment : fragments) { PlanNode node = fragment.getPlanRoot(); PlanNode parent = null; @@ -478,17 +486,37 @@ public class OriginalPlanner extends Planner { node = node.getChildren().get(0); } - if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) { - continue; + // case1 + if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) { + SortNode sortNode = (SortNode) parent; + OlapScanNode scanNode = (OlapScanNode) node; + SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); + injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor()); + SlotRef extSlot = new SlotRef(slot); + sortNode.getResolvedTupleExprs().add(extSlot); + sortNode.getSortInfo().setUseTwoPhaseRead(); + injected = true; + olapTable = scanNode.getOlapTable(); + break; + } + // case2 + if ((node instanceof OlapScanNode) && parent == null) { + OlapScanNode scanNode = (OlapScanNode) node; + injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); + injected = true; + olapTable = scanNode.getOlapTable(); + break; + } + } + for (PlanFragment fragment : fragments) { + if (injected && fragment.getSink() instanceof ResultSink) { + TFetchOption fetchOption = new TFetchOption(); + fetchOption.setFetchRowStore(olapTable.storeRowColumn()); + fetchOption.setUseTwoPhaseFetch(true); + fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo()); + ((ResultSink) fragment.getSink()).setFetchOption(fetchOption); + break; } - SortNode sortNode = (SortNode) parent; - OlapScanNode scanNode = (OlapScanNode) node; - SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); - injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor()); - SlotRef extSlot = new SlotRef(slot); - sortNode.getResolvedTupleExprs().add(extSlot); - sortNode.getSortInfo().setUseTwoPhaseRead(); - break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java index c940851de8..0a79881d58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java @@ -21,6 +21,7 @@ import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TResultSink; /** @@ -30,6 +31,8 @@ import org.apache.doris.thrift.TResultSink; */ public class ResultSink extends DataSink { private final PlanNodeId exchNodeId; + // Two phase fetch option + private TFetchOption fetchOption; public ResultSink(PlanNodeId exchNodeId) { this.exchNodeId = exchNodeId; @@ -43,13 +46,26 @@ public class ResultSink extends DataSink { strBuilder.append("V"); } strBuilder.append("RESULT SINK\n"); + if (fetchOption != null) { + strBuilder.append(prefix).append(" ").append("OPT TWO PHASE\n"); + if (fetchOption.isFetchRowStore()) { + strBuilder.append(prefix).append(" ").append("FETCH ROW STORE\n"); + } + } return strBuilder.toString(); } + public void setFetchOption(TFetchOption fetchOption) { + this.fetchOption = fetchOption; + } + @Override protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK); TResultSink tResultSink = new TResultSink(); + if (fetchOption != null) { + tResultSink.setFetchOption(fetchOption); + } result.setResultSink(tResultSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 743c4fb54f..aab8f44186 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -144,7 +144,7 @@ public class SortNode extends PlanNode { } public boolean getUseTwoPhaseReadOpt() { - return useTopnOpt; + return this.useTwoPhaseReadOpt; } public void setUseTwoPhaseReadOpt(boolean useTwoPhaseReadOpt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index de4db69569..2c5fd90f21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -35,6 +35,8 @@ import org.apache.doris.common.util.NetUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend.BackendState; +import org.apache.doris.thrift.TNodeInfo; +import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; @@ -156,6 +158,16 @@ public class SystemInfoService { } }; + public static TPaloNodesInfo createAliveNodesInfo() { + TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) { + Backend backend = systemInfoService.getBackend(id); + nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); + } + return nodesInfo; + } + // for deploy manager public void addBackends(List<HostInfo> hostInfos, boolean isFree) throws UserException { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 3b905897c8..7a12d2be12 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1,4 +1,5 @@ // Licensed to the Apache Software Foundation (ASF) under one + // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -603,23 +604,35 @@ message PFetchTableSchemaResult { repeated PTypeDesc column_types = 4; } +message PRowLocation { + optional int64 tablet_id = 1; + optional string rowset_id = 2; + optional uint64 segment_id = 3; + optional uint64 ordinal_id = 4; +} + message PMultiGetRequest { - message RowId { - optional int64 tablet_id = 1; - optional string rowset_id = 2; - optional uint64 segment_id = 3; - optional uint64 ordinal_id = 4; - }; - repeated RowId rowids = 1; + repeated PRowLocation row_locs = 1; optional PTupleDescriptor desc = 2; repeated PSlotDescriptor slots = 3; // for compability optional int32 be_exec_version = 4; + optional bool fetch_row_store = 5; + optional PUniqueId query_id = 6; }; message PMultiGetResponse { optional PBlock block = 1; optional PStatus status = 2; + + // more effecient serialization fields for row store + enum RowFormat { + JSONB = 0; + }; + optional RowFormat format = 3; + repeated bytes binary_row_data = 4; + // for sorting rows + repeated PRowLocation row_locs = 5; }; message PFetchColIdsRequest { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index f2a191a777..7883aab0b6 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -160,9 +160,18 @@ struct TMultiCastDataStreamSink { 2: optional list<list<TPlanFragmentDestination>> destinations; } +struct TFetchOption { + 1: optional bool use_two_phase_fetch; + // Nodes in this cluster, used for second phase fetch + 2: optional Descriptors.TPaloNodesInfo nodes_info; + // Whether fetch row store + 3: optional bool fetch_row_store; +} + struct TResultSink { 1: optional TResultSinkType type; - 2: optional TResultFileSinkOptions file_options // deprecated + 2: optional TResultFileSinkOptions file_options; // deprecated + 3: optional TFetchOption fetch_option; } struct TResultFileSink { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index ba97ae77e5..8afc3fde4c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -908,8 +908,6 @@ struct TExchangeNode { 2: optional TSortInfo sort_info // This is tHe number of rows to skip before returning results 3: optional i64 offset - // Nodes in this cluster, used for second phase fetch - 4: optional Descriptors.TPaloNodesInfo nodes_info } struct TOlapRewriteNode { diff --git a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out new file mode 100644 index 0000000000..ed6e469cbf --- /dev/null +++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out @@ -0,0 +1,8 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !dup_key_2pr_q01 -- +\N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +\N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +2022-04-15T18:12:20 -1476674200 76077621753438032.418 false -81 -17630 -10269 -268792458 13778.415 -2.147435208942339E9 93226194917360130.580 2022-07-02 2022-08-07T20:32:33 2022-02-06 195.140.76.121 qwh...@ooba.org Maywood Junction 85 +2022-12-09T05:37:25 115193592 2535754112868463.975 true 85 -4281 22951 -1062042991 4304.926 -2.147336076742606E9 1661169792864805.553 2022-10-14 2022-07-19T18:25:57 2022-12-01 118.127.225.101 xgard...@talane.gov Milwaukee Point 50 +2022-08-07T21:42:59 380691749 46148671567644994.456 false 102 -6175 -147 -521124424 17149.252 -2.147205556984866E9 44529271870524715.164 2022-03-18 2022-02-07T01:28:54 2022-05-25 0.24.121.144 jamesd...@meezzy.org Glendale Hill 78 + diff --git a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out new file mode 100644 index 0000000000..ff911d4bf8 --- /dev/null +++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !dup_key_2pr_q02 -- +2023-01-10T15:40:04 -49426869 59390574629503991.413 true -19 -6791 -6536 240107090 22801.043 7.01523382557248E8 6338683861031199.120 2022-05-15 2022-01-26T07:36:05 2022-04-26 139.41.223.19 seanflo...@chatterpoint.gov Laurel Way 44 + diff --git a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out new file mode 100644 index 0000000000..c0b3e57752 --- /dev/null +++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !dup_key_2pr_q03 -- +2022-01-11T06:20:14 1799797425 30891281288216959.961 false -5 7495 -10784 1909249054 -31658.44 -1.992204200787845E9 6080150162640151.680 2022-04-08 2022-09-16T21:29:08 2022-04-18 29.121.52.42 ducimus_inventore_consequa...@dabfeed.net Coleman Lane 51 + diff --git a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out new file mode 100644 index 0000000000..b05344a0b4 --- /dev/null +++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !dup_key_2pr_q04 -- +2022-01-10T16:11:53 1906472648 30323570439371776.353 true -64 23204 -27723 1753599187 -28017.578 -8.7412093534361E7 34175877199258167.909 2023-01-07 2022-07-02T15:40:57 2022-10-09 60.230.5.23 asperiores_...@chatterpoint.name Summit Plaza 70 + diff --git a/regression-test/suites/datatype_p0/scalar_types/load.groovy b/regression-test/suites/datatype_p0/scalar_types/load.groovy index 1eb73201d3..954f1df03a 100644 --- a/regression-test/suites/datatype_p0/scalar_types/load.groovy +++ b/regression-test/suites/datatype_p0/scalar_types/load.groovy @@ -48,7 +48,7 @@ suite("test_scalar_types_load", "p0") { DUPLICATE KEY(`k1`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`k1`) BUCKETS 10 - PROPERTIES("replication_num" = "1"); + PROPERTIES("replication_num" = "1", "store_row_column" = "true"); """ // load data @@ -100,7 +100,7 @@ suite("test_scalar_types_load", "p0") { DUPLICATE KEY(`c_datetimev2`, `c_bigint`, `c_decimalv3`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`c_bigint`) BUCKETS 10 - PROPERTIES("replication_num" = "1"); + PROPERTIES("replication_num" = "1", "store_row_column" = "true"); """ // insert data into unique key table1 2 times @@ -136,7 +136,7 @@ suite("test_scalar_types_load", "p0") { UNIQUE KEY(`c_datetimev2`, `c_bigint`, `c_decimalv3`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`c_bigint`) BUCKETS 10 - PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true"); """ // insert data into unique key table1 2 times @@ -232,7 +232,7 @@ suite("test_scalar_types_load", "p0") { DUPLICATE KEY(`k1`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`k1`) BUCKETS 10 - PROPERTIES("replication_num" = "1"); + PROPERTIES("replication_num" = "1", "store_row_column" = "true"); """ // insert data into dup table with index diff --git a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql new file mode 100644 index 0000000000..a193207216 --- /dev/null +++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql @@ -0,0 +1 @@ +SELECT * FROM tbl_scalar_types_dup_3keys ORDER BY c_double, c_datetimev2, c_decimal LIMIT 5; \ No newline at end of file diff --git a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql new file mode 100644 index 0000000000..a181e88cb8 --- /dev/null +++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql @@ -0,0 +1 @@ +SELECT * FROM tbl_scalar_types_dup_3keys where c_datetimev2 = '2023-01-10 15:40:04'; \ No newline at end of file diff --git a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql new file mode 100644 index 0000000000..820b02760d --- /dev/null +++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql @@ -0,0 +1 @@ +SELECT * FROM tbl_scalar_types_dup_3keys where c_bigint = 1799797425; \ No newline at end of file diff --git a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql new file mode 100644 index 0000000000..7d91b8ba31 --- /dev/null +++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql @@ -0,0 +1 @@ +SELECT * FROM tbl_scalar_types_dup_3keys where c_largeint = '1753599187'; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org