This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch topn-lazy-materialize-poc in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/topn-lazy-materialize-poc by this push: new 3cefeab849b BE code topn lazy materialze (#48735) 3cefeab849b is described below commit 3cefeab849b769b7e26277c156e970b06b99973a Author: HappenLee <happen...@selectdb.com> AuthorDate: Thu Mar 6 12:58:40 2025 +0800 BE code topn lazy materialze (#48735) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/clucene | 2 +- be/src/common/consts.h | 1 + be/src/exec/rowid_fetcher.cpp | 203 +++++++++++++ be/src/exec/rowid_fetcher.h | 26 ++ be/src/olap/CMakeLists.txt | 3 +- be/src/olap/id_manager.h | 195 ++++++++++++ be/src/olap/rowset/segment_v2/column_reader.cpp | 27 ++ be/src/olap/rowset/segment_v2/column_reader.h | 35 +++ be/src/olap/rowset/segment_v2/segment_iterator.cpp | 11 + be/src/olap/schema.h | 6 +- be/src/olap/storage_engine.cpp | 4 + be/src/olap/tablet_schema.h | 9 + be/src/olap/utils.h | 11 + be/src/pipeline/dependency.cpp | 185 ++++++++++++ be/src/pipeline/dependency.h | 47 ++- be/src/pipeline/exec/cache_sink_operator.cpp | 1 - be/src/pipeline/exec/cache_sink_operator.h | 6 +- be/src/pipeline/exec/cache_source_operator.cpp | 2 +- be/src/pipeline/exec/cache_source_operator.h | 4 +- .../exec/materialization_sink_operator.cpp | 155 ++++++++++ .../pipeline/exec/materialization_sink_operator.h | 71 +++++ .../exec/materialization_source_operator.cpp | 59 ++++ .../exec/materialization_source_operator.h | 72 +++++ be/src/pipeline/exec/operator.cpp | 8 +- be/src/pipeline/exec/scan_operator.cpp | 10 + be/src/pipeline/pipeline_fragment_context.cpp | 21 ++ be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 3 + be/src/runtime/query_context.h | 1 - be/src/runtime/runtime_state.cpp | 5 + be/src/runtime/runtime_state.h | 8 + be/src/runtime/workload_group/workload_group.h | 5 + be/src/service/backend_options.h | 1 + be/src/service/internal_service.cpp | 38 +++ be/src/service/internal_service.h | 4 + be/src/util/ref_count_closure.h | 7 +- be/src/vec/columns/column.h | 1 - be/src/vec/common/string_ref.cpp | 5 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 8 + be/src/vec/exec/scan/vfile_scanner.cpp | 3 +- be/test/exec/hash_map/hash_table_method_test.cpp | 5 +- be/test/olap/id_manager_test.cpp | 107 +++++++ .../operator/materialization_shared_state_test.cpp | 336 +++++++++++++++++++++ gensrc/proto/internal_service.proto | 35 +++ tools/clickbench-tools/conf/doris-cluster.conf | 6 +- 45 files changed, 1724 insertions(+), 31 deletions(-) diff --git a/be/src/clucene b/be/src/clucene index 3236e18d93b..2204eaec46a 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 3236e18d93bf96481493d88c34b6c2515f3b0b75 +Subproject commit 2204eaec46a68e5e9a1876b7021f24839ecb2cf0 diff --git a/be/src/common/consts.h b/be/src/common/consts.h index 2ec9ae12679..548d5a771a2 100644 --- a/be/src/common/consts.h +++ b/be/src/common/consts.h @@ -27,6 +27,7 @@ const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__"; const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = "__TEMP__scanner_filtered"; const std::string ROWID_COL = "__DORIS_ROWID_COL__"; +const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__"; const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__"; const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__"; const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__"; diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index fa9f9571409..ac4023601e7 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -469,4 +469,207 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request, return Status::OK(); } +Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request, + PMultiGetResponseV2* response) { + if (request.request_block_descs_size()) { + OlapReaderStatistics stats; + std::vector<vectorized::Block> result_blocks(request.request_block_descs_size()); + int64_t acquire_tablet_ms = 0; + int64_t acquire_rowsets_ms = 0; + int64_t acquire_segments_ms = 0; + int64_t lookup_row_data_ms = 0; + std::string row_store_buffer; + + // Add counters for different file mapping types + std::unordered_map<FileMappingType, int64_t> file_type_counts; + + auto id_file_map = + ExecEnv::GetInstance()->get_id_manager()->get_id_file_map(request.query_id()); + if (!id_file_map) { + return Status::InternalError("Backend:{} id_file_map is null, query_id: {}", + BackendOptions::get_localhost(), + print_id(request.query_id())); + } + + for (int i = 0; i < request.request_block_descs_size(); ++i) { + const auto& request_block_desc = request.request_block_descs(i); + + auto& result_block = result_blocks[i]; + std::vector<SlotDescriptor> slots; + slots.reserve(request_block_desc.slots_size()); + for (const auto& pslot : request_block_desc.slots()) { + slots.push_back(SlotDescriptor(pslot)); + } + if (result_block.is_empty_column()) { + result_block = vectorized::Block(slots, request_block_desc.row_id_size()); + } + + TabletSchema full_read_schema; + for (const ColumnPB& column_pb : request_block_desc.column_descs()) { + full_read_schema.append_column(TabletColumn(column_pb)); + } + std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> iterator_map; + + RowStoreReadStruct row_store_read_struct(row_store_buffer); + if (request_block_desc.fetch_row_store()) { + for (int j = 0; j < request_block_desc.slots_size(); ++j) { + row_store_read_struct.serdes.emplace_back( + slots[i].get_data_type_ptr()->get_serde()); + row_store_read_struct.col_uid_to_idx[slots[i].col_unique_id()] = i; + row_store_read_struct.default_values.emplace_back(slots[i].col_default_value()); + } + } + + for (size_t j = 0; j < request_block_desc.row_id_size(); ++j) { + auto file_id = request_block_desc.file_id(j); + auto file_mapping = id_file_map->get_file_mapping(file_id); + if (!file_mapping) { + return Status::InternalError( + "Backend:{} file_mapping not found, query_id: {}, file_id: {}", + BackendOptions::get_localhost(), print_id(request.query_id()), file_id); + } + + // Count file mapping types + file_type_counts[file_mapping->type]++; + + if (file_mapping->type == FileMappingType::DORIS_FORMAT) { + RETURN_IF_ERROR(read_doris_format_row( + id_file_map, file_mapping, request_block_desc.row_id(j), slots, + full_read_schema, row_store_read_struct, stats, &acquire_tablet_ms, + &acquire_rowsets_ms, &acquire_segments_ms, &lookup_row_data_ms, + iterator_map, result_block)); + } + } + + [[maybe_unused]] size_t compressed_size = 0; + [[maybe_unused]] size_t uncompressed_size = 0; + int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0; + RETURN_IF_ERROR(result_block.serialize( + be_exec_version, response->add_blocks()->mutable_block(), &uncompressed_size, + &compressed_size, segment_v2::CompressionTypePB::LZ4)); + } + + // Build file type statistics string + std::string file_type_stats; + for (const auto& [type, count] : file_type_counts) { + if (!file_type_stats.empty()) { + file_type_stats += ", "; + } + file_type_stats += fmt::format("{}:{}", type, count); + } + + LOG(INFO) << "Query stats: " + << fmt::format( + "hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, " + "io_latency:{}ns, uncompressed_bytes_read:{}, bytes_read:{}, " + "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, " + "lookup_row_data_ms:{}, file_types:[{}]", + stats.cached_pages_num, stats.total_pages_num, + stats.compressed_bytes_read, stats.io_ns, + stats.uncompressed_bytes_read, stats.bytes_read, acquire_tablet_ms, + acquire_rowsets_ms, acquire_segments_ms, lookup_row_data_ms, + file_type_stats); + } + + if (request.has_gc_id_map() && request.gc_id_map()) { + ExecEnv::GetInstance()->get_id_manager()->remove_id_file_map(request.query_id()); + } + + return Status::OK(); +} + +Status RowIdStorageReader::read_doris_format_row( + const std::shared_ptr<IdFileMap>& id_file_map, + const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id, + std::vector<SlotDescriptor>& slots, const TabletSchema& full_read_schema, + RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& stats, + int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms, + int64_t* lookup_row_data_ms, + std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>& iterator_map, + vectorized::Block& result_block) { + auto [tablet_id, rowset_id, segment_id] = file_mapping->get_doris_format_info(); + BaseTabletSPtr tablet = scope_timer_run( + [&]() { + auto res = ExecEnv::get_tablet(tablet_id); + return !res.has_value() ? nullptr + : std::dynamic_pointer_cast<BaseTablet>(res.value()); + }, + acquire_tablet_ms); + if (!tablet) { + return Status::InternalError( + "Backend:{} tablet not found, tablet_id: {}, rowset_id: {}, segment_id: {}, " + "row_id: {}", + BackendOptions::get_localhost(), tablet_id, rowset_id.to_string(), segment_id, + row_id); + } + + BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>( + scope_timer_run([&]() { return id_file_map->get_temp_rowset(tablet_id, rowset_id); }, + acquire_rowsets_ms)); + if (!rowset) { + return Status::InternalError( + "Backend:{} rowset_id not found, tablet_id: {}, rowset_id: {}, segment_id: {}, " + "row_id: {}", + BackendOptions::get_localhost(), tablet_id, rowset_id.to_string(), segment_id, + row_id); + } + + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(scope_timer_run( + [&]() { + return SegmentLoader::instance()->load_segments(rowset, &segment_cache, true); + }, + acquire_segments_ms)); + + auto it = + std::find_if(segment_cache.get_segments().cbegin(), segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::InternalError( + "Backend:{} segment not found, tablet_id: {}, rowset_id: {}, segment_id: {}, " + "row_id: {}", + BackendOptions::get_localhost(), tablet_id, rowset_id.to_string(), segment_id, + row_id); + } + segment_v2::SegmentSharedPtr segment = *it; + + // if row_store_read_struct not empty, means the line we should read from row_store + if (!row_store_read_struct.default_values.empty()) { + CHECK(tablet->tablet_schema()->has_row_store_for_all_columns()); + RowLocation loc(rowset_id, segment->id(), row_id); + row_store_read_struct.row_store_buffer.clear(); + RETURN_IF_ERROR(scope_timer_run( + [&]() { + return tablet->lookup_row_data({}, loc, rowset, stats, + row_store_read_struct.row_store_buffer); + }, + lookup_row_data_ms)); + + vectorized::JsonbSerializeUtil::jsonb_to_block( + row_store_read_struct.serdes, row_store_read_struct.row_store_buffer.data(), + row_store_read_struct.row_store_buffer.size(), row_store_read_struct.col_uid_to_idx, + result_block, row_store_read_struct.default_values, {}); + } else { + for (int x = 0; x < slots.size(); ++x) { + vectorized::MutableColumnPtr column = + result_block.get_by_position(x).column->assume_mutable(); + IteratorKey iterator_key {.tablet_id = tablet_id, + .rowset_id = rowset_id, + .segment_id = segment_id, + .slot_id = slots[x].id()}; + IteratorItem& iterator_item = iterator_map[iterator_key]; + if (iterator_item.segment == nullptr) { + iterator_map[iterator_key].segment = segment; + } + segment = iterator_item.segment; + RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, &slots[x], row_id, + column, stats, iterator_item.iterator)); + } + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h index 1fc8b02a679..a43d76988d6 100644 --- a/be/src/exec/rowid_fetcher.h +++ b/be/src/exec/rowid_fetcher.h @@ -27,6 +27,7 @@ #include "common/status.h" #include "exec/tablet_info.h" // DorisNodesInfo +#include "olap/id_manager.h" #include "vec/core/block.h" #include "vec/data_types/data_type.h" @@ -36,6 +37,11 @@ class DorisNodesInfo; class RuntimeState; class TupleDescriptor; +struct FileMapping; +struct IteratorKey; +struct IteratorItem; +struct HashOfIteratorKey; + namespace vectorized { template <typename T> class ColumnStr; @@ -70,9 +76,29 @@ private: FetchOption _fetch_option; }; +struct RowStoreReadStruct { + RowStoreReadStruct(std::string& buffer) : row_store_buffer(buffer) {}; + std::string& row_store_buffer; + vectorized::DataTypeSerDeSPtrs serdes; + std::unordered_map<uint32_t, uint32_t> col_uid_to_idx; + std::vector<std::string> default_values; +}; + class RowIdStorageReader { public: static Status read_by_rowids(const PMultiGetRequest& request, PMultiGetResponse* response); + static Status read_by_rowids(const PMultiGetRequestV2& request, PMultiGetResponseV2* response); + +private: + static Status read_doris_format_row( + const std::shared_ptr<IdFileMap>& id_file_map, + const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id, + std::vector<SlotDescriptor>& slots, const TabletSchema& full_read_schema, + RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& stats, + int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms, + int64_t* lookup_row_data_ms, + std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>& iterator_map, + vectorized::Block& result_block); }; } // namespace doris diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index bf19ef26764..6aa10435524 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -22,7 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap") file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS *.cpp) -add_library(Olap STATIC ${SRC_FILES}) +add_library(Olap STATIC ${SRC_FILES} + id_manager.h) if (NOT USE_MEM_TRACKER) target_compile_options(Olap PRIVATE -Wno-unused-lambda-capture) diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h new file mode 100644 index 00000000000..cb056b792cb --- /dev/null +++ b/be/src/olap/id_manager.h @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <butil/macros.h> +#include <gen_cpp/BackendService_types.h> +#include <gen_cpp/Types_types.h> +#include <stddef.h> +#include <stdint.h> + +#include <functional> +#include <map> +#include <memory> +#include <mutex> +#include <set> +#include <shared_mutex> +#include <string> +#include <string_view> +#include <unordered_map> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "common/status.h" +#include "olap/olap_common.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" + +namespace doris { + +enum class FileMappingType { + DORIS_FORMAT, // for doris format file {tablet_id}{rowset_id}{segment_id} + ORC, + PARQUET +}; + +struct FileMapping { + FileMappingType type; + std::string value; + + FileMapping(FileMappingType t, std::string v) : type(t), value(std::move(v)) {}; + + FileMapping(int64_t tablet_id, RowsetId rowset_id, uint32_t segment_id) + : type(FileMappingType::DORIS_FORMAT) { + value.resize(sizeof(tablet_id) + sizeof(rowset_id) + sizeof(segment_id)); + auto* ptr = value.data(); + + memcpy(ptr, &tablet_id, sizeof(tablet_id)); + ptr += sizeof(tablet_id); + memcpy(ptr, &rowset_id, sizeof(rowset_id)); + ptr += sizeof(rowset_id); + memcpy(ptr, &segment_id, sizeof(segment_id)); + } + + std::tuple<int64_t, RowsetId, uint32_t> get_doris_format_info() const { + DCHECK(type == FileMappingType::DORIS_FORMAT); + DCHECK(value.size() == sizeof(int64_t) + sizeof(RowsetId) + sizeof(uint32_t)); + + auto* ptr = value.data(); + int64_t tablet_id; + memcpy(&tablet_id, ptr, sizeof(tablet_id)); + ptr += sizeof(tablet_id); + RowsetId rowset_id; + memcpy(&rowset_id, ptr, sizeof(rowset_id)); + ptr += sizeof(rowset_id); + uint32_t segment_id; + memcpy(&segment_id, ptr, sizeof(segment_id)); + + return std::make_tuple(tablet_id, rowset_id, segment_id); + } +}; + +class IdFileMap { +public: + IdFileMap(uint64_t expired_timestamp) : delayed_expired_timestamp(expired_timestamp) {} + + std::shared_ptr<FileMapping> get_file_mapping(uint32_t id) { + std::shared_lock lock(_mtx); + auto it = _id_map.find(id); + if (it == _id_map.end()) { + return nullptr; + } + return it->second; + } + + uint32 get_file_mapping_id(const std::shared_ptr<FileMapping>& mapping) { + DCHECK(mapping.get() != nullptr); + std::unique_lock lock(_mtx); + auto it = _mapping_to_id.find(mapping->value); + if (it != _mapping_to_id.end()) { + return it->second; + } + _id_map[_init_id++] = mapping; + _mapping_to_id[mapping->value] = _init_id - 1; + + return _init_id - 1; + } + + void add_temp_rowset(const RowsetSharedPtr& rowset) { + std::unique_lock lock(_mtx); + _temp_rowset_maps[{rowset->rowset_meta()->tablet_id(), rowset->rowset_id()}] = rowset; + } + + RowsetSharedPtr get_temp_rowset(const int64_t tablet_id, const RowsetId& rowset_id) { + std::shared_lock lock(_mtx); + auto it = _temp_rowset_maps.find({tablet_id, rowset_id}); + if (it == _temp_rowset_maps.end()) { + return nullptr; + } + return it->second; + } + + int64_t get_delayed_expired_timestamp() { return delayed_expired_timestamp; } + +private: + std::shared_mutex _mtx; + uint32_t _init_id = 0; + std::unordered_map<std::string_view, uint32_t> _mapping_to_id; + std::unordered_map<uint32_t, std::shared_ptr<FileMapping>> _id_map; + + // use in Doris Format to keep temp rowsets, preventing them from being deleted by compaction + std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> _temp_rowset_maps; + uint64_t delayed_expired_timestamp = 0; +}; + +class IdManager { +public: + static constexpr uint8_t ID_VERSION = 0; + + IdManager() = default; + + ~IdManager() { + std::unique_lock lock(_query_to_id_file_map_mtx); + _query_to_id_file_map.clear(); + } + + std::shared_ptr<IdFileMap> add_id_file_map(const UniqueId& query_id, int timeout) { + std::unique_lock lock(_query_to_id_file_map_mtx); + auto it = _query_to_id_file_map.find(query_id); + if (it == _query_to_id_file_map.end()) { + auto id_file_map = std::make_shared<IdFileMap>(UnixSeconds() + timeout); + _query_to_id_file_map[query_id] = id_file_map; + return id_file_map; + } + return it->second; + } + + void gc_expired_id_file_map(int64_t now) { + std::unique_lock lock(_query_to_id_file_map_mtx); + for (auto it = _query_to_id_file_map.begin(); it != _query_to_id_file_map.end();) { + if (it->second->get_delayed_expired_timestamp() <= now) { + it = _query_to_id_file_map.erase(it); + } else { + ++it; + } + } + } + + void remove_id_file_map(const UniqueId& query_id) { + std::unique_lock lock(_query_to_id_file_map_mtx); + _query_to_id_file_map.erase(query_id); + } + + std::shared_ptr<IdFileMap> get_id_file_map(const UniqueId& query_id) { + std::shared_lock lock(_query_to_id_file_map_mtx); + auto it = _query_to_id_file_map.find(query_id); + if (it == _query_to_id_file_map.end()) { + return nullptr; + } + return it->second; + } + +private: + DISALLOW_COPY_AND_ASSIGN(IdManager); + + phmap::flat_hash_map<UniqueId, std::shared_ptr<IdFileMap>> _query_to_id_file_map; + std::shared_mutex _query_to_id_file_map_mtx; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 9948e7fd8cf..7f88970eb4a 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -1753,4 +1753,31 @@ Status DefaultNestedColumnIterator::read_by_rowids(const rowid_t* rowids, const return Status::OK(); } +Status RowIdColumnIteratorV2::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, + bool* has_null) { + auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get()); + + for (size_t i = 0; i < *n; ++i) { + uint32_t row_id = _current_rowid + i; + GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id); + string_column->insert_data(reinterpret_cast<const char*>(&location), + sizeof(GlobalRowLoacationV2)); + } + _current_rowid += *n; + return Status::OK(); +} + +Status RowIdColumnIteratorV2::read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) { + auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get()); + + for (size_t i = 0; i < count; ++i) { + uint32_t row_id = rowids[i]; + GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id); + string_column->insert_data(reinterpret_cast<const char*>(&location), + sizeof(GlobalRowLoacationV2)); + } + return Status::OK(); +} + } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 2afc269a86c..91866ce97ad 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -634,6 +634,41 @@ private: int32_t _segment_id = 0; }; +// Add new RowIdColumnIteratorV2 +class RowIdColumnIteratorV2 : public ColumnIterator { +public: + RowIdColumnIteratorV2(uint8_t version, int64_t backend_id, uint32_t file_id) + : _version(version), _backend_id(backend_id), _file_id(file_id) {} + + Status seek_to_first() override { + _current_rowid = 0; + return Status::OK(); + } + + Status seek_to_ordinal(ordinal_t ord_idx) override { + _current_rowid = ord_idx; + return Status::OK(); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { + bool has_null; + return next_batch(n, dst, &has_null); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override; + + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override; + + ordinal_t get_current_ordinal() const override { return _current_rowid; } + +private: + uint32_t _current_rowid = 0; + uint8_t _version; + int64_t _backend_id; + uint32_t _file_id; +}; + class VariantRootColumnIterator : public ColumnIterator { public: VariantRootColumnIterator() = delete; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 5d2a3d8b60e..391e06d2e15 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -43,6 +43,7 @@ #include "olap/bloom_filter_predicate.h" #include "olap/column_predicate.h" #include "olap/field.h" +#include "olap/id_manager.h" #include "olap/iterators.h" #include "olap/like_column_predicate.h" #include "olap/match_predicate.h" @@ -1024,6 +1025,16 @@ Status SegmentIterator::_init_return_column_iterators() { new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id())); continue; } + + if (_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { + auto& id_file_map = _opts.runtime_state->get_id_file_map(); + uint32_t file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>( + _opts.tablet_id, _opts.rowset_id, _segment->id())); + _column_iterators[cid].reset(new RowIdColumnIteratorV2( + IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id)); + continue; + } + std::set<ColumnId> del_cond_id_set; _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set); std::vector<bool> tmp_is_pred_column; diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index 6414db4153a..b7198511120 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -68,7 +68,8 @@ public: if (column.is_key()) { ++num_key_columns; } - if (column.name() == BeConsts::ROWID_COL) { + if (column.name() == BeConsts::ROWID_COL || + column.name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { _rowid_col_idx = cid; } if (column.name() == VERSION_COL) { @@ -94,7 +95,8 @@ public: if (columns[i]->name() == DELETE_SIGN) { _delete_sign_idx = i; } - if (columns[i]->name() == BeConsts::ROWID_COL) { + if (columns[i]->name() == BeConsts::ROWID_COL || + columns[i]->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { _rowid_col_idx = i; } if (columns[i]->name() == VERSION_COL) { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 3ba65813492..21ad2fa051d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -53,6 +53,7 @@ #include "io/fs/local_file_system.h" #include "olap/binlog.h" #include "olap/data_dir.h" +#include "olap/id_manager.h" #include "olap/memtable_flush_executor.h" #include "olap/olap_common.h" #include "olap/olap_define.h" @@ -1491,6 +1492,9 @@ void BaseStorageEngine::_evict_querying_rowset() { } } } + + uint64_t now = UnixSeconds(); + ExecEnv::GetInstance()->get_id_manager()->gc_expired_id_file_map(now); } bool StorageEngine::add_broken_path(std::string path) { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 957b9adb2b9..14202758f5b 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -398,6 +398,15 @@ public: long row_store_page_size() const { return _row_store_page_size; } void set_storage_page_size(long storage_page_size) { _storage_page_size = storage_page_size; } long storage_page_size() const { return _storage_page_size; } + bool has_global_row_id() const { + for (auto [col_name, _] : _field_name_to_index) { + if (col_name.start_with(StringRef(BeConsts::GLOBAL_ROWID_COL.data(), + BeConsts::GLOBAL_ROWID_COL.size()))) { + return true; + } + } + return false; + } const std::vector<const TabletIndex*> inverted_indexes() const { std::vector<const TabletIndex*> inverted_indexes; diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index c163aad1148..242e39c2c54 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -263,4 +263,15 @@ struct GlobalRowLoacation { } }; +struct GlobalRowLoacationV2 { + GlobalRowLoacationV2(uint8_t ver, uint64_t bid, uint32_t fid, uint32_t rid) + : version(ver), backend_id(bid), file_id(fid), row_id(rid) {} + uint8_t version; + int64_t backend_id; + uint32_t file_id; + uint32_t row_id; + + auto operator<=>(const GlobalRowLoacationV2&) const = default; +}; + } // namespace doris diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 46e1a366b7b..603d77e93c9 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -27,12 +27,14 @@ #include "pipeline/pipeline_task.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" +#include "util/brpc_client_cache.h" #include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vslot_ref.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { #include "common/compile_check_begin.h" +#include "vec/utils/util.hpp" Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id, const std::string& name) { source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY")); @@ -456,4 +458,187 @@ void AggSharedState::refresh_top_limit(size_t row_id, limit_columns_min = limit_heap.top()._row_id; } +Status MaterializationSharedState::merge_multi_response(vectorized::Block* block) { + // init the response_blocks + if (response_blocks.empty()) { + response_blocks = std::vector<vectorized::MutableBlock>(block_order_results.size()); + } + + std::map<int64_t, std::pair<vectorized::Block, int>> _block_maps; + for (int i = 0; i < block_order_results.size(); ++i) { + for (auto& [backend_id, rpc_struct] : rpc_struct_map) { + vectorized::Block partial_block; + RETURN_IF_ERROR( + partial_block.deserialize(rpc_struct.callback->response_->blocks(i).block())); + + if (!partial_block.is_empty_column()) { + if (!response_blocks[i].columns()) { + response_blocks[i] = vectorized::MutableBlock(partial_block.clone_empty()); + } + _block_maps[backend_id] = std::make_pair(std::move(partial_block), 0); + } + } + + for (int j = 0; j < block_order_results[i].size(); ++j) { + auto backend_id = block_order_results[i][j]; + if (backend_id) { + auto& source_block_rows = _block_maps[backend_id]; + DCHECK(source_block_rows.second < source_block_rows.first.rows()); + for (int k = 0; k < response_blocks[i].columns(); ++k) { + response_blocks[i].get_column_by_position(k)->insert_from( + *source_block_rows.first.get_by_position(k).column, + source_block_rows.second); + } + source_block_rows.second++; + } else { + for (int k = 0; k < response_blocks[i].columns(); ++k) { + response_blocks[i].get_column_by_position(k)->insert_default(); + } + } + } + } + + // clear request/response + for (auto& [_, rpc_struct] : rpc_struct_map) { + for (int i = 0; i < rpc_struct.request.request_block_descs_size(); ++i) { + rpc_struct.request.mutable_request_block_descs(i)->clear_row_id(); + rpc_struct.request.mutable_request_block_descs(i)->clear_file_id(); + } + } + + for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i < origin_block.columns(); i++) { + if (i != rowid_to_block_loc) { + block->insert(origin_block.get_by_position(i)); + } else { + auto response_block = response_blocks[j].to_block(); + for (auto& data : response_block) { + block->insert(data); + } + if (++j < rowid_locs.size()) { + rowid_to_block_loc = rowid_locs[j]; + } + } + } + origin_block.clear(); + response_blocks.clear(); + + return Status::OK(); +} + +Dependency* MaterializationSharedState::create_source_dependency(int operator_id, int node_id, + const std::string& name) { + auto dep = + std::make_shared<CountedFinishDependency>(operator_id, node_id, name + "_DEPENDENCY"); + dep->set_shared_state(this); + // just block source wait for add the counter in sink + dep->add(0); + + source_deps.push_back(dep); + return source_deps.back().get(); +} + +Status MaterializationSharedState::create_muiltget_result(const vectorized::Columns& columns, + bool eos, bool gc_id_map) { + const auto rows = columns.empty() ? 0 : columns[0]->size(); + block_order_results.resize(columns.size()); + + for (int i = 0; i < columns.size(); ++i) { + const uint8_t* null_map = nullptr; + const vectorized::ColumnString* column_rowid = nullptr; + auto& column = columns[i]; + + if (auto column_ptr = check_and_get_column<vectorized::ColumnNullable>(*column)) { + null_map = column_ptr->get_null_map_data().data(); + column_rowid = assert_cast<const vectorized::ColumnString*>( + column_ptr->get_nested_column_ptr().get()); + } else { + column_rowid = assert_cast<const vectorized::ColumnString*>(column.get()); + } + + auto& block_order = block_order_results[i]; + block_order.resize(rows); + + for (int j = 0; j < rows; ++j) { + if (!null_map || !null_map[j]) { + DCHECK(column_rowid->get_data_at(j).size == sizeof(GlobalRowLoacationV2)); + GlobalRowLoacationV2 row_location = + *((GlobalRowLoacationV2*)column_rowid->get_data_at(j).data); + auto rpc_struct = rpc_struct_map.find(row_location.backend_id); + if (UNLIKELY(rpc_struct == rpc_struct_map.end())) { + return Status::InternalError( + "MaterializationSinkOperatorX failed to find rpc_struct, backend_id={}", + row_location.backend_id); + } + rpc_struct->second.request.mutable_request_block_descs(i)->add_row_id( + row_location.row_id); + rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id( + row_location.file_id); + block_order[j] = row_location.backend_id; + } else { + block_order[j] = 0; + } + } + } + + if (eos && gc_id_map) { + for (auto& [_, rpc_struct] : rpc_struct_map) { + rpc_struct.request.set_gc_id_map(true); + } + } + last_block = eos; + need_merge_block = rows > 0; + + return Status::OK(); +} + +Status MaterializationSharedState::init_multi_requests( + const TMaterializationNode& materialization_node, RuntimeState* state) { + PMultiGetRequestV2 multi_get_request; + // Initialize the base struct of PMultiGetRequestV2 + multi_get_request.set_be_exec_version(state->be_exec_version()); + multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id()); + auto query_id = multi_get_request.mutable_query_id(); + query_id->set_hi(state->query_id().hi); + query_id->set_lo(state->query_id().lo); + DCHECK_EQ(materialization_node.column_descs_lists.size(), + materialization_node.slot_locs_lists.size()); + + const auto& slots = state->desc_tbl() + .get_tuple_descriptor(materialization_node.intermediate_tuple_id) + ->slots(); + for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) { + auto request_block_desc = multi_get_request.add_request_block_descs(); + request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]); + // Initialize the column_descs and slot_locs + auto& column_descs = materialization_node.column_descs_lists[i]; + for (auto& column_desc_item : column_descs) { + TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs()); + } + + auto& slot_locs = materialization_node.slot_locs_lists[i]; + for (auto& slot_loc_item : slot_locs) { + slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots()); + } + } + + // Initialize the stubs and requests for each BE + for (const auto& node_info : materialization_node.nodes_info.nodes) { + auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + node_info.host, node_info.async_internal_port); + if (!client) { + LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host + << ", port=" << node_info.async_internal_port; + return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}", + node_info.host, node_info.async_internal_port); + } + rpc_struct_map.emplace(node_info.id, FetchRpcStruct {.stub = std::move(client), + .request = multi_get_request, + .callback = nullptr}); + } + // add be_num ad count finish counter for source dependency + ((CountedFinishDependency*)source_deps.back().get())->add((int)rpc_struct_map.size()); + + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index ea1928215b0..22f52f1ea77 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -29,12 +29,14 @@ #include "common/config.h" #include "common/logging.h" +#include "gen_cpp/internal_service.pb.h" #include "gutil/integral_types.h" #include "pipeline/common/agg_utils.h" #include "pipeline/common/join_utils.h" #include "pipeline/common/set_utils.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/join/process_hash_table_probe.h" +#include "util/ref_count_closure.h" #include "util/stack_util.h" #include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/sorter.h" @@ -83,7 +85,8 @@ struct BasicSharedState { virtual ~BasicSharedState() = default; - Dependency* create_source_dependency(int operator_id, int node_id, const std::string& name); + virtual Dependency* create_source_dependency(int operator_id, int node_id, + const std::string& name); Dependency* create_sink_dependency(int dest_id, int node_id, const std::string& name); }; @@ -173,12 +176,12 @@ public: CountedFinishDependency(int id, int node_id, std::string name) : Dependency(id, node_id, std::move(name), true) {} - void add() { + void add(uint32_t count = 1) { std::unique_lock<std::mutex> l(_mtx); if (!_counter) { block(); } - _counter++; + _counter += count; } void sub() { @@ -551,8 +554,8 @@ public: const int _child_count; }; -struct CacheSharedState : public BasicSharedState { - ENABLE_FACTORY_CREATOR(CacheSharedState) +struct DataQueueSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(DataQueueSharedState) public: DataQueue data_queue; }; @@ -872,5 +875,39 @@ class QueryGlobalDependency final : public Dependency { ~QueryGlobalDependency() override = default; Dependency* is_blocked_by(PipelineTask* task = nullptr) override; }; + +struct FetchRpcStruct { + std::shared_ptr<PBackendService_Stub> stub; + PMultiGetRequestV2 request; + std::shared_ptr<doris::DummyBrpcCallback<PMultiGetResponseV2>> callback; + MonotonicStopWatch rpc_timer; +}; + +struct MaterializationSharedState : public BasicSharedState { + ENABLE_FACTORY_CREATOR(MaterializationSharedState) +public: + MaterializationSharedState() = default; + + Status init_multi_requests(const TMaterializationNode& tnode, RuntimeState* state); + Status create_muiltget_result(const vectorized::Columns& columns, bool eos, bool gc_id_map); + Status merge_multi_response(vectorized::Block* block); + + Dependency* create_source_dependency(int operator_id, int node_id, + const std::string& name) override; + + bool rpc_struct_inited = false; + Status rpc_status = Status::OK(); + bool last_block = false; + // empty materialization sink block not need to merge block + bool need_merge_block = true; + vectorized::Block origin_block; + // The rowid column of the origin block. should be replaced by the column of the result block. + std::vector<int> rowid_locs; + std::vector<vectorized::MutableBlock> response_blocks; + std::map<int64_t, FetchRpcStruct> rpc_struct_map; + // Register each line in which block to ensure the order of the result. + // Zero means NULL value. + std::vector<std::vector<int64_t>> block_order_results; +}; #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/cache_sink_operator.cpp b/be/src/pipeline/exec/cache_sink_operator.cpp index f1a8a2c888e..93d50212875 100644 --- a/be/src/pipeline/exec/cache_sink_operator.cpp +++ b/be/src/pipeline/exec/cache_sink_operator.cpp @@ -40,7 +40,6 @@ Status CacheSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); - // auto& p = _parent->cast<Parent>(); _shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks()); return Status::OK(); diff --git a/be/src/pipeline/exec/cache_sink_operator.h b/be/src/pipeline/exec/cache_sink_operator.h index d4f2113ed06..c67ada5735c 100644 --- a/be/src/pipeline/exec/cache_sink_operator.h +++ b/be/src/pipeline/exec/cache_sink_operator.h @@ -33,14 +33,14 @@ namespace pipeline { class DataQueue; class CacheSinkOperatorX; -class CacheSinkLocalState final : public PipelineXSinkLocalState<CacheSharedState> { +class CacheSinkLocalState final : public PipelineXSinkLocalState<DataQueueSharedState> { public: ENABLE_FACTORY_CREATOR(CacheSinkLocalState); CacheSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; friend class CacheSinkOperatorX; - using Base = PipelineXSinkLocalState<CacheSharedState>; + using Base = PipelineXSinkLocalState<DataQueueSharedState>; using Parent = CacheSinkOperatorX; }; @@ -59,7 +59,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; std::shared_ptr<BasicSharedState> create_shared_state() const override { - std::shared_ptr<BasicSharedState> ss = std::make_shared<CacheSharedState>(); + std::shared_ptr<BasicSharedState> ss = std::make_shared<DataQueueSharedState>(); ss->id = operator_id(); for (auto& dest : dests_id()) { ss->related_op_ids.insert(dest); diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index ec9f9ecc572..9db41c4cd85 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -34,7 +34,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); - ((CacheSharedState*)_dependency->shared_state()) + ((DataQueueSharedState*)_dependency->shared_state()) ->data_queue.set_source_dependency(_shared_state->source_deps.front()); const auto& scan_ranges = info.scan_ranges; bool hit_cache = false; diff --git a/be/src/pipeline/exec/cache_source_operator.h b/be/src/pipeline/exec/cache_source_operator.h index 651f9ff5596..ec95301cbea 100644 --- a/be/src/pipeline/exec/cache_source_operator.h +++ b/be/src/pipeline/exec/cache_source_operator.h @@ -36,10 +36,10 @@ namespace pipeline { class DataQueue; class CacheSourceOperatorX; -class CacheSourceLocalState final : public PipelineXLocalState<CacheSharedState> { +class CacheSourceLocalState final : public PipelineXLocalState<DataQueueSharedState> { public: ENABLE_FACTORY_CREATOR(CacheSourceLocalState); - using Base = PipelineXLocalState<CacheSharedState>; + using Base = PipelineXLocalState<DataQueueSharedState>; using Parent = CacheSourceOperatorX; CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; diff --git a/be/src/pipeline/exec/materialization_sink_operator.cpp b/be/src/pipeline/exec/materialization_sink_operator.cpp new file mode 100644 index 00000000000..acdae4c508b --- /dev/null +++ b/be/src/pipeline/exec/materialization_sink_operator.cpp @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/materialization_sink_operator.h" + +#include <bthread/countdown_event.h> +#include <fmt/format.h> +#include <gen_cpp/data.pb.h> +#include <gen_cpp/internal_service.pb.h> +#include <gen_cpp/olap_file.pb.h> +#include <gen_cpp/types.pb.h> + +#include <utility> + +#include "common/status.h" +#include "pipeline/exec/operator.h" +#include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" +#include "vec/columns/column.h" +#include "vec/core/block.h" + +namespace doris { +namespace pipeline { + +Status MaterializationSinkOperatorX::init(const doris::TPlanNode& tnode, + doris::RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); + DCHECK(tnode.__isset.materialization_node); + _materialization_node = tnode.materialization_node; + _gc_id_map = tnode.materialization_node.gc_id_map; + // Create result_expr_ctx_lists_ from thrift exprs. + auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(fetch_expr_lists, _rowid_exprs)); + return Status::OK(); +} + +Status MaterializationSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_rowid_exprs, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_rowid_exprs, state)); + return Status::OK(); +} + +template <typename Response> +class MaterializationCallback : public ::doris::DummyBrpcCallback<Response> { + ENABLE_FACTORY_CREATOR(MaterializationCallback); + +public: + MaterializationCallback(std::weak_ptr<TaskExecutionContext> tast_exec_ctx, + MaterializationSharedState* shared_state, MonotonicStopWatch& rpc_timer) + : _tast_exec_ctx(std::move(tast_exec_ctx)), + _shared_state(shared_state), + _rpc_timer(rpc_timer) {} + + ~MaterializationCallback() override = default; + MaterializationCallback(const MaterializationCallback& other) = delete; + MaterializationCallback& operator=(const MaterializationCallback& other) = delete; + + void call() noexcept override { + auto tast_exec_ctx = _tast_exec_ctx.lock(); + if (!tast_exec_ctx) { + return; + } + + _rpc_timer.stop(); + if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) { + std::string err = fmt::format( + "failed to send brpc when exchange, error={}, error_text={}, client: {}, " + "latency = {}", + berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()), + ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(), + BackendOptions::get_localhost(), + ::doris::DummyBrpcCallback<Response>::cntl_->latency_us()); + _shared_state->rpc_status = Status::InternalError(err); + } else { + _shared_state->rpc_status = + Status::create(doris::DummyBrpcCallback<Response>::response_->status()); + } + ((CountedFinishDependency*)_shared_state->source_deps.back().get())->sub(); + } + +private: + std::weak_ptr<TaskExecutionContext> _tast_exec_ctx; + MaterializationSharedState* _shared_state; + MonotonicStopWatch& _rpc_timer; +}; + +Status MaterializationSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, + bool eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + if (!local_state._shared_state->rpc_struct_inited) { + RETURN_IF_ERROR( + local_state._shared_state->init_multi_requests(_materialization_node, state)); + } + + if (in_block->rows() > 0 || eos) { + // block the pipeline wait the rpc response + if (!eos) { + local_state._shared_state->sink_deps.back()->block(); + } + // execute the rowid exprs + vectorized::Columns columns; + if (in_block->rows() != 0) { + local_state._shared_state->rowid_locs.resize(_rowid_exprs.size()); + for (int i = 0; i < _rowid_exprs.size(); ++i) { + auto& rowid_expr = _rowid_exprs[i]; + RETURN_IF_ERROR( + rowid_expr->execute(in_block, &local_state._shared_state->rowid_locs[i])); + columns.emplace_back( + in_block->get_by_position(local_state._shared_state->rowid_locs[i]).column); + } + local_state._shared_state->origin_block.swap(*in_block); + } + RETURN_IF_ERROR( + local_state._shared_state->create_muiltget_result(columns, eos, _gc_id_map)); + + for (auto& [backend_id, rpc_struct] : local_state._shared_state->rpc_struct_map) { + auto callback = MaterializationCallback<PMultiGetResponseV2>::create_shared( + state->get_task_execution_context(), local_state._shared_state, + rpc_struct.rpc_timer); + callback->cntl_->set_timeout_ms(config::fetch_rpc_timeout_seconds * 1000); + auto closure = + AutoReleaseClosure<int, ::doris::DummyBrpcCallback<PMultiGetResponseV2>>:: + create_unique( + std::make_shared<int>(), callback, state->get_query_ctx_weak(), + "Materialization Sink node id:" + std::to_string(node_id()) + + " target_backend_id:" + std::to_string(backend_id)); + // send brpc request + rpc_struct.callback = callback; + rpc_struct.rpc_timer.start(); + rpc_struct.stub->multiget_data_v2(callback->cntl_.get(), &rpc_struct.request, + callback->response_.get(), closure.release()); + } + } + + return Status::OK(); +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/materialization_sink_operator.h b/be/src/pipeline/exec/materialization_sink_operator.h new file mode 100644 index 00000000000..813d12e017d --- /dev/null +++ b/be/src/pipeline/exec/materialization_sink_operator.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { + +class MaterializationSinkOperatorX; +class MaterializationSinkLocalState final + : public PipelineXSinkLocalState<MaterializationSharedState> { +public: + ENABLE_FACTORY_CREATOR(MaterializationSinkLocalState); + MaterializationSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + +private: + friend class MaterializationSinkOperatorX; + using Base = PipelineXSinkLocalState<MaterializationSharedState>; + using Parent = MaterializationSinkOperatorX; +}; + +class MaterializationSinkOperatorX final : public DataSinkOperatorX<MaterializationSinkLocalState> { +public: + using Base = DataSinkOperatorX<MaterializationSinkLocalState>; + + friend class MaterializationSinkLocalState; + MaterializationSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, + const TPlanNode& tnode) + : Base(sink_id, tnode.node_id, child_id) { + _name = "MATERIALIZATION_SINK_OPERATOR"; + } + ~MaterializationSinkOperatorX() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; + +private: + // Materialized slot by this node. The i-th result expr list refers to a slot of RowId + TMaterializationNode _materialization_node; + vectorized::VExprContextSPtrs _rowid_exprs; + bool _gc_id_map = false; +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/materialization_source_operator.cpp b/be/src/pipeline/exec/materialization_source_operator.cpp new file mode 100644 index 00000000000..20e7c6351d9 --- /dev/null +++ b/be/src/pipeline/exec/materialization_source_operator.cpp @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/materialization_source_operator.h" + +#include <utility> + +#include "common/status.h" +#include "vec/core/block.h" + +namespace doris::pipeline { + +Status MaterializationSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + bool* eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + + if (!local_state._shared_state->rpc_status.ok()) { + return local_state._shared_state->rpc_status; + } + + // clear origin block, do merge response to build a ret block + block->clear(); + if (local_state._shared_state->need_merge_block) { + SCOPED_TIMER(local_state._merge_response_timer); + RETURN_IF_ERROR(local_state._shared_state->merge_multi_response(block)); + } + + *eos = local_state._shared_state->last_block; + if (!*eos) { + local_state._shared_state->sink_deps.back()->ready(); + ((CountedFinishDependency*)(local_state._shared_state->source_deps.back().get())) + ->add(local_state._shared_state->rpc_struct_map.size()); + } else { + uint64_t max_rpc_time = 0; + for (auto& [_, rpc_struct] : local_state._shared_state->rpc_struct_map) { + max_rpc_time = std::max(max_rpc_time, rpc_struct.rpc_timer.elapsed_time()); + } + COUNTER_SET(local_state._max_rpc_timer, (int64_t)max_rpc_time); + } + + return Status::OK(); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/materialization_source_operator.h b/be/src/pipeline/exec/materialization_source_operator.h new file mode 100644 index 00000000000..0c6a8b91047 --- /dev/null +++ b/be/src/pipeline/exec/materialization_source_operator.h @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "vec/core/block.h" + +namespace doris { +#include "common/compile_check_begin.h" +class RuntimeState; + +namespace pipeline { + +class MaterializationSourceOperatorX; +class MaterializationSourceLocalState final + : public PipelineXLocalState<MaterializationSharedState> { +public: + ENABLE_FACTORY_CREATOR(MaterializationSourceLocalState); + using Base = PipelineXLocalState<MaterializationSharedState>; + using Parent = MaterializationSourceOperatorX; + MaterializationSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(doris::RuntimeState* state, doris::pipeline::LocalStateInfo& info) override { + RETURN_IF_ERROR(Base::init(state, info)); + _max_rpc_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "MaxRpcTime", 2); + _merge_response_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "MergeResponseTime", 2); + return Status::OK(); + } + +private: + RuntimeProfile::Counter* _max_rpc_timer = nullptr; + RuntimeProfile::Counter* _merge_response_timer = nullptr; + + friend class MaterializationSourceOperatorX; + friend class OperatorX<MaterializationSourceLocalState>; +}; + +class MaterializationSourceOperatorX final : public OperatorX<MaterializationSourceLocalState> { +public: + using Base = OperatorX<MaterializationSourceLocalState>; + MaterializationSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs) {}; + ~MaterializationSourceOperatorX() override = default; + + Status get_block(doris::RuntimeState* state, vectorized::Block* block, bool* eos) override; + + bool is_source() const override { return true; } +}; + +} // namespace pipeline +#include "common/compile_check_end.h" +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index d2d0d6a6827..a2f708392af 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -41,6 +41,8 @@ #include "pipeline/exec/iceberg_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" +#include "pipeline/exec/materialization_sink_operator.h" +#include "pipeline/exec/materialization_source_operator.h" #include "pipeline/exec/memory_scratch_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/mock_operator.h" @@ -706,6 +708,7 @@ DECLARE_OPERATOR(SetSinkLocalState<false>) DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) DECLARE_OPERATOR(CacheSinkLocalState) +DECLARE_OPERATOR(MaterializationSinkLocalState) #undef DECLARE_OPERATOR @@ -738,6 +741,7 @@ DECLARE_OPERATOR(MetaScanLocalState) DECLARE_OPERATOR(LocalExchangeSourceLocalState) DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) DECLARE_OPERATOR(CacheSourceLocalState) +DECLARE_OPERATOR(MaterializationSourceLocalState) #ifdef BE_TEST DECLARE_OPERATOR(MockLocalState) @@ -770,7 +774,7 @@ template class PipelineXSinkLocalState<MultiCastSharedState>; template class PipelineXSinkLocalState<SetSharedState>; template class PipelineXSinkLocalState<LocalExchangeSharedState>; template class PipelineXSinkLocalState<BasicSharedState>; -template class PipelineXSinkLocalState<CacheSharedState>; +template class PipelineXSinkLocalState<DataQueueSharedState>; template class PipelineXLocalState<HashJoinSharedState>; template class PipelineXLocalState<PartitionedHashJoinSharedState>; @@ -782,7 +786,7 @@ template class PipelineXLocalState<AggSharedState>; template class PipelineXLocalState<PartitionedAggSharedState>; template class PipelineXLocalState<FakeSharedState>; template class PipelineXLocalState<UnionSharedState>; -template class PipelineXLocalState<CacheSharedState>; +template class PipelineXLocalState<DataQueueSharedState>; template class PipelineXLocalState<MultiCastSharedState>; template class PipelineXLocalState<PartitionSortNodeSharedState>; template class PipelineXLocalState<SetSharedState>; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index ec283f46683..c8b19e3e1fc 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -102,6 +102,15 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { } RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); auto& p = _parent->cast<typename Derived::Parent>(); + + // init id_file_map() for runtime state + std::vector<SlotDescriptor*> slots = p._output_tuple_desc->slots(); + for (auto slot : slots) { + if (slot->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { + state->set_id_file_map(); + } + } + _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) { RETURN_IF_ERROR( @@ -1002,6 +1011,7 @@ Status ScanLocalState<Derived>::_start_scanners( // https://github.com/apache/doris/pull/44635 const int parallism_of_scan_operator = p.is_serial_operator() ? 1 : p.query_parallel_instance_num(); + _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), _scan_dependency, parallism_of_scan_operator); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 388b3102a21..2d8160d3e1e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -63,6 +63,8 @@ #include "pipeline/exec/iceberg_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" +#include "pipeline/exec/materialization_sink_operator.h" +#include "pipeline/exec/materialization_source_operator.h" #include "pipeline/exec/memory_scratch_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_sink.h" @@ -1584,6 +1586,25 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); break; } + case TPlanNodeType::MATERIALIZATION_NODE: { + op.reset(new MaterializationSourceOperatorX(pool, tnode, next_operator_id(), descs)); + RETURN_IF_ERROR(cur_pipe->add_operator( + op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + auto new_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(new_pipe->id()); + + DataSinkOperatorPtr sink(new MaterializationSinkOperatorX( + op->operator_id(), next_sink_operator_id(), pool, tnode)); + RETURN_IF_ERROR(new_pipe->set_sink(sink)); + RETURN_IF_ERROR(new_pipe->sink()->init(tnode, _runtime_state.get())); + cur_pipe = new_pipe; + break; + } case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, request)); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 1fefd9de642..b760e13f7c3 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -115,6 +115,7 @@ class LookupConnectionCache; class RowCache; class DummyLRUCache; class CacheManager; +class IdManager; class ProcessProfile; class HeapProfiler; class WalManager; @@ -330,6 +331,7 @@ public: LookupConnectionCache* get_lookup_connection_cache() { return _lookup_connection_cache; } RowCache* get_row_cache() { return _row_cache; } CacheManager* get_cache_manager() { return _cache_manager; } + IdManager* get_id_manager() { return _id_manager; } ProcessProfile* get_process_profile() { return _process_profile; } HeapProfiler* get_heap_profiler() { return _heap_profiler; } segment_v2::InvertedIndexSearcherCache* get_inverted_index_searcher_cache() { @@ -476,6 +478,7 @@ private: LookupConnectionCache* _lookup_connection_cache = nullptr; RowCache* _row_cache = nullptr; CacheManager* _cache_manager = nullptr; + IdManager* _id_manager = nullptr; ProcessProfile* _process_profile = nullptr; HeapProfiler* _heap_profiler = nullptr; segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 44aee2fde91..1f1ca3284ff 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -48,6 +48,7 @@ #include "io/cache/fs_file_cache_storage.h" #include "io/fs/file_meta_cache.h" #include "io/fs/local_file_reader.h" +#include "olap/id_manager.h" #include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/options.h" @@ -457,6 +458,7 @@ Status ExecEnv::_init_mem_env() { return Status::InternalError(ss.str()); } + _id_manager = new IdManager(); _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = @@ -801,6 +803,7 @@ void ExecEnv::destroy() { // cache_manager must be destoried after all cache. // https://github.com/apache/doris/issues/24082#issuecomment-1712544039 SAFE_DELETE(_cache_manager); + SAFE_DELETE(_id_manager); // _heartbeat_flags must be destoried after staroge engine SAFE_DELETE(_heartbeat_flags); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index fc3d80cdbe0..14d2c5c26f2 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -32,7 +32,6 @@ #include "common/config.h" #include "common/factory_creator.h" #include "common/object_pool.h" -#include "pipeline/dependency.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_filter_mgr.h" diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 712e18159fd..d3b42f77d7d 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -36,6 +36,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "io/fs/s3_file_system.h" +#include "olap/id_manager.h" #include "olap/storage_engine.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline_task.h" @@ -553,4 +554,8 @@ bool RuntimeState::low_memory_mode() const { return _query_ctx->low_memory_mode(); } +void RuntimeState::set_id_file_map() { + _id_file_map = _exec_env->get_id_manager()->add_id_file_map(_query_id, execution_timeout()); +} + } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index aa486d3b8b6..17d4126702c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -70,6 +70,7 @@ class Dependency; class DescriptorTbl; class ObjectPool; class ExecEnv; +class IdFileMap; class RuntimeFilterMgr; class MemTrackerLimiter; class QueryContext; @@ -661,6 +662,10 @@ public: int profile_level() const { return _profile_level; } + std::shared_ptr<IdFileMap>& get_id_file_map() { return _id_file_map; } + + void set_id_file_map(); + private: Status create_error_log_file(); @@ -784,6 +789,9 @@ private: // error file path on s3, ${bucket}/${prefix}/error_log/${label}_${fragment_instance_id} std::string _s3_error_log_file_path; std::mutex _s3_error_log_file_lock; + + // used for encoding the global lazy materialize + std::shared_ptr<IdFileMap> _id_file_map = nullptr; }; #define RETURN_IF_CANCELLED(state) \ diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 5f5b944f8e6..9d9e509c5cc 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -227,6 +227,11 @@ public: int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile); + vectorized::SimplifiedScanScheduler* get_remote_scan_task_scheduler() { + std::shared_lock<std::shared_mutex> r_lock(_mutex); + return _remote_scan_task_sched.get(); + } + private: void create_cgroup_cpu_ctl_no_lock(); void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info); diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h index 0052eb41530..543863a6309 100644 --- a/be/src/service/backend_options.h +++ b/be/src/service/backend_options.h @@ -37,6 +37,7 @@ public: static std::string get_be_endpoint(); static TBackend get_local_backend(); static void set_backend_id(int64_t backend_id); + static int64_t get_backend_id() { return _s_backend_id; } static void set_localhost(const std::string& host); static bool is_bind_ipv6(); static const char* get_service_bind_address(); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 19a0e88fe8f..b2029f0870d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -105,6 +105,8 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "runtime/types.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_manager.h" #include "service/backend_options.h" #include "service/point_query_executor.h" #include "util/arrow/row_batch.h" @@ -2056,6 +2058,42 @@ void PInternalService::multiget_data(google::protobuf::RpcController* controller } } +void PInternalService::multiget_data_v2(google::protobuf::RpcController* controller, + const PMultiGetRequestV2* request, + PMultiGetResponseV2* response, + google::protobuf::Closure* done) { + auto wg = ExecEnv::GetInstance()->workload_group_mgr()->get_group(request->wg_id()); + Status st = Status::OK(); + + if (!wg) [[unlikely]] { + brpc::ClosureGuard closure_guard(done); + st = Status::Error<TStatusCode::CANCELLED>("fail to find wg: wg id:" + + std::to_string(request->wg_id())); + st.to_protobuf(response->mutable_status()); + return; + } + + st = wg->get_remote_scan_task_scheduler()->submit_scan_task(vectorized::SimplifiedScanTask( + [request, response, done]() { + signal::set_signal_task_id(request->query_id()); + // multi get data by rowid + MonotonicStopWatch watch; + watch.start(); + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker()); + Status st = RowIdStorageReader::read_by_rowids(*request, response); + st.to_protobuf(response->mutable_status()); + LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; + }, + nullptr)); + + if (!st.ok()) { + brpc::ClosureGuard closure_guard(done); + st.to_protobuf(response->mutable_status()); + } +} + void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcController* cntl_base, const PGetTabletVersionsRequest* request, PGetTabletVersionsResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index e3d03a6a449..6ce0ae868c1 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -211,6 +211,10 @@ public: void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request, PMultiGetResponse* response, google::protobuf::Closure* done) override; + void multiget_data_v2(google::protobuf::RpcController* controller, + const PMultiGetRequestV2* request, PMultiGetResponseV2* response, + google::protobuf::Closure* done) override; + void tablet_fetch_data(google::protobuf::RpcController* controller, const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response, diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index 560aebb98ee..c1be06c691d 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -25,7 +25,6 @@ #include "runtime/query_context.h" #include "runtime/thread_context.h" #include "service/brpc.h" -#include "util/ref_count_closure.h" namespace doris { @@ -82,7 +81,7 @@ class AutoReleaseClosure : public google::protobuf::Closure { public: AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback, - std::weak_ptr<QueryContext> context = {}) + std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {}) : request_(req), callback_(callback), context_(std::move(context)) { this->cntl_ = callback->cntl_; this->response_ = callback->response_; @@ -113,10 +112,12 @@ public: // at any stage. std::shared_ptr<Request> request_; std::shared_ptr<ResponseType> response_; + std::string error_msg_; protected: virtual void _process_if_rpc_failed() { - std::string error_msg = "RPC meet failed: " + cntl_->ErrorText(); + std::string error_msg = + fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_); if (auto ctx = context_.lock(); ctx) { ctx->cancel(Status::NetworkError(error_msg)); } else { diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 50a780e4355..9eb38675bb5 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -718,7 +718,6 @@ using ColumnPtr = IColumn::Ptr; using MutableColumnPtr = IColumn::MutablePtr; using Columns = std::vector<ColumnPtr>; using MutableColumns = std::vector<MutableColumnPtr>; -using ColumnPtrs = std::vector<ColumnPtr>; using ColumnRawPtrs = std::vector<const IColumn*>; template <typename... Args> diff --git a/be/src/vec/common/string_ref.cpp b/be/src/vec/common/string_ref.cpp index 413c0338c10..e113694f34c 100644 --- a/be/src/vec/common/string_ref.cpp +++ b/be/src/vec/common/string_ref.cpp @@ -69,11 +69,14 @@ bool StringRef::end_with(char ch) const { } bool StringRef::start_with(const StringRef& search_string) const { - DCHECK(size >= search_string.size); if (search_string.size == 0) { return true; } + if (UNLIKELY(size < search_string.size)) { + return false; + } + #if defined(__SSE2__) || defined(__aarch64__) return memequalSSE2Wide(data, search_string.data, search_string.size); #else diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 6bae4b3319e..344136bbc28 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -41,6 +41,7 @@ #include "exprs/function_filter.h" #include "io/cache/block_file_cache_profile.h" #include "io/io_common.h" +#include "olap/id_manager.h" #include "olap/inverted_index_profile.h" #include "olap/olap_common.h" #include "olap/olap_tuple.h" @@ -410,6 +411,13 @@ Status NewOlapScanner::_init_tablet_reader_params( } } + if (tablet_schema->has_global_row_id()) { + auto& id_file_map = _state->get_id_file_map(); + for (auto rs_reader : _tablet_reader_params.rs_splits) { + id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset()); + } + } + return Status::OK(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index fe0c7315c5d..11819d1d75d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -345,8 +345,9 @@ void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) { if (child_expr->is_slot_ref()) { VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get()); slot_ids->emplace_back(slot_ref->slot_id()); + } else { + _get_slot_ids(child_expr.get(), slot_ids); } - _get_slot_ids(child_expr.get(), slot_ids); } } diff --git a/be/test/exec/hash_map/hash_table_method_test.cpp b/be/test/exec/hash_map/hash_table_method_test.cpp index 31a18421123..5ca6a6cca36 100644 --- a/be/test/exec/hash_map/hash_table_method_test.cpp +++ b/be/test/exec/hash_map/hash_table_method_test.cpp @@ -27,7 +27,7 @@ namespace doris::vectorized { template <typename HashMethodType> -void test_insert(HashMethodType& method, ColumnPtrs column) { +void test_insert(HashMethodType& method, Columns column) { using State = typename HashMethodType::State; ColumnRawPtrs key_raw_columns; for (auto column : column) { @@ -49,8 +49,7 @@ void test_insert(HashMethodType& method, ColumnPtrs column) { } template <typename HashMethodType> -void test_find(HashMethodType& method, ColumnPtrs column, - const std::vector<int64_t>& except_result) { +void test_find(HashMethodType& method, Columns column, const std::vector<int64_t>& except_result) { using State = typename HashMethodType::State; ColumnRawPtrs key_raw_columns; for (auto column : column) { diff --git a/be/test/olap/id_manager_test.cpp b/be/test/olap/id_manager_test.cpp new file mode 100644 index 00000000000..12488b163f7 --- /dev/null +++ b/be/test/olap/id_manager_test.cpp @@ -0,0 +1,107 @@ +#include "olap/id_manager.h" + +#include <gtest/gtest.h> + +#include <atomic> +#include <memory> +#include <thread> +#include <vector> + +#include "olap/olap_common.h" + +using namespace doris; + +TEST(IdFileMapTest, BasicOperations) { + IdFileMap id_file_map(1024); + + // Test adding a file mapping + auto mapping1 = + std::make_shared<FileMapping>(FileMapping {FileMappingType::DORIS_FORMAT, "file1"}); + uint32_t id1 = id_file_map.get_file_mapping_id(mapping1); + EXPECT_EQ(id1, 0); + + auto mapping2 = std::make_shared<FileMapping>(FileMapping {FileMappingType::ORC, "file2"}); + uint32_t id2 = id_file_map.get_file_mapping_id(mapping2); + EXPECT_EQ(id2, 1); + + // Test getting a file mapping + auto retrieved_mapping1 = id_file_map.get_file_mapping(id1); + EXPECT_EQ(retrieved_mapping1->type, FileMappingType::DORIS_FORMAT); + EXPECT_EQ(retrieved_mapping1->value, "file1"); + + auto retrieved_mapping2 = id_file_map.get_file_mapping(id2); + EXPECT_EQ(retrieved_mapping2->type, FileMappingType::ORC); + EXPECT_EQ(retrieved_mapping2->value, "file2"); + + // Test getting a non-existent file mapping + auto retrieved_mapping3 = id_file_map.get_file_mapping(999); + EXPECT_EQ(retrieved_mapping3, nullptr); +} + +TEST(IdFileMapTest, ConcurrentAddAndGet) { + IdFileMap id_file_map(1024); + std::vector<std::thread> threads; + std::atomic<int> counter(0); + + for (int i = 0; i < 10; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < 100; ++j) { + auto mapping = std::make_shared<FileMapping>(FileMapping { + FileMappingType::DORIS_FORMAT, "file" + std::to_string(counter++)}); + uint32_t id = id_file_map.get_file_mapping_id(mapping); + auto retrieved_mapping = id_file_map.get_file_mapping(id); + EXPECT_EQ(retrieved_mapping->type, mapping->type); + EXPECT_EQ(retrieved_mapping->value, mapping->value); + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } +} + +TEST(IdManagerTest, BasicOperations) { + IdManager id_manager; + + // Test adding an IdFileMap + UniqueId query_id1 = UniqueId::gen_uid(); + auto id_file_map1 = id_manager.add_id_file_map(query_id1, 1024); + EXPECT_NE(id_file_map1, nullptr); + + UniqueId query_id2 = UniqueId::gen_uid(); + auto id_file_map2 = id_manager.add_id_file_map(query_id2, 1024); + EXPECT_NE(id_file_map2, nullptr); + + // Test getting an existing IdFileMap + auto retrieved_id_file_map1 = id_manager.add_id_file_map(query_id1, 1024); + EXPECT_EQ(retrieved_id_file_map1, id_file_map1); + + // Test removing an IdFileMap + id_manager.remove_id_file_map(query_id1); + auto retrieved_id_file_map2 = id_manager.add_id_file_map(query_id1, 1024); + EXPECT_NE(retrieved_id_file_map2, id_file_map1); +} + +TEST(IdManagerTest, ConcurrentAddAndRemove) { + IdManager id_manager; + std::vector<std::thread> threads; + + for (int i = 0; i < 10; ++i) { + threads.emplace_back([&]() { + for (int j = 0; j < 10; ++j) { + UniqueId query_id = UniqueId::gen_uid(); + auto id_file_map = id_manager.add_id_file_map(query_id, 1024); + EXPECT_NE(id_file_map, nullptr); + + id_manager.remove_id_file_map(query_id); + auto retrieved_id_file_map = id_manager.add_id_file_map(query_id, 1024); + EXPECT_NE(retrieved_id_file_map, id_file_map); + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } +} diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp b/be/test/pipeline/operator/materialization_shared_state_test.cpp new file mode 100644 index 00000000000..6d3a6f28ffa --- /dev/null +++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include "pipeline/dependency.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" + +namespace doris::pipeline { + +class MaterializationSharedStateTest : public testing::Test { +protected: + void SetUp() override { + _shared_state = std::make_shared<MaterializationSharedState>(); + + // Setup test data types + _string_type = std::make_shared<vectorized::DataTypeString>(); + _int_type = std::make_shared<vectorized::DataTypeInt32>(); + + // Create origin block with rowid column (ColumnString type) + _shared_state->origin_block = vectorized::Block(); + _shared_state->origin_block.insert({_string_type->create_column(), _string_type, "rowid"}); + _shared_state->origin_block.insert({_int_type->create_column(), _int_type, "value"}); + + // Add rowid location + _shared_state->rowid_locs = {0}; // First column is rowid + + // Setup RPC structs for two backends + _backend_id1 = 1001; + _backend_id2 = 1002; + _shared_state->rpc_struct_map[_backend_id1] = FetchRpcStruct(); + _shared_state->rpc_struct_map[_backend_id2] = FetchRpcStruct(); + _shared_state->rpc_struct_map[_backend_id1].request.add_request_block_descs(); + _shared_state->rpc_struct_map[_backend_id2].request.add_request_block_descs(); + } + + std::shared_ptr<MaterializationSharedState> _shared_state; + std::shared_ptr<vectorized::DataTypeString> _string_type; + std::shared_ptr<vectorized::DataTypeInt32> _int_type; + int64_t _backend_id1; + int64_t _backend_id2; +}; + +TEST_F(MaterializationSharedStateTest, TestCreateSourceDependency) { + // Test creating source dependencies + int test_op_id = 100; + int test_node_id = 200; + std::string test_name = "TEST"; + + auto* dep = _shared_state->create_source_dependency(test_op_id, test_node_id, test_name); + + // Verify the dependency was created correctly + ASSERT_NE(dep, nullptr); + EXPECT_EQ(dep->id(), test_op_id); + EXPECT_EQ(dep->name(), test_name + "_DEPENDENCY"); + + // Verify it was added to source_deps + EXPECT_EQ(_shared_state->source_deps.size(), 1); + EXPECT_EQ(_shared_state->source_deps[0].get(), dep); +} + +TEST_F(MaterializationSharedStateTest, TestCreateMultiGetResult) { + // Create test columns for rowids + vectorized::Columns columns; + auto rowid_col = _string_type->create_column(); + auto* col_data = reinterpret_cast<vectorized::ColumnString*>(rowid_col.get()); + + // Create test GlobalRowLoacationV2 data + GlobalRowLoacationV2 loc1(0, _backend_id1, 1, 1); + GlobalRowLoacationV2 loc2(0, _backend_id2, 2, 2); + + col_data->insert_data(reinterpret_cast<const char*>(&loc1), sizeof(GlobalRowLoacationV2)); + col_data->insert_data(reinterpret_cast<const char*>(&loc2), sizeof(GlobalRowLoacationV2)); + columns.push_back(std::move(rowid_col)); + + // Test creating multiget result + Status st = _shared_state->create_muiltget_result(columns, true, true); + EXPECT_TRUE(st.ok()); + + // Verify block_order_results + EXPECT_EQ(_shared_state->block_order_results.size(), columns.size()); + EXPECT_EQ(_shared_state->last_block, true); +} + +TEST_F(MaterializationSharedStateTest, TestMergeMultiResponse) { + // 1. Setup origin block with nullable rowid column + auto nullable_rowid_col = vectorized::ColumnNullable::create(_string_type->create_column(), + vectorized::ColumnUInt8::create()); + nullable_rowid_col->insert_data((char*)&nullable_rowid_col, 4); + nullable_rowid_col->insert_data(nullptr, 4); + nullable_rowid_col->insert_data((char*)&nullable_rowid_col, 4); + + auto value_col = _int_type->create_column(); + value_col->insert(100); + value_col->insert(101); + value_col->insert(200); + + // Add test data to origin block + _shared_state->origin_block = vectorized::Block( + {{std::move(nullable_rowid_col), vectorized::make_nullable(_string_type), "rowid"}, + {std::move(value_col), _int_type, "value"}}); + + // Set rowid column location + _shared_state->rowid_locs = {0}; + + // 2. Setup response blocks from multiple backends + // Backend 1's response + { + vectorized::Block resp_block1; + auto resp_value_col1 = _int_type->create_column(); + auto* value_col_data1 = + reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get()); + value_col_data1->insert(100); + value_col_data1->insert(101); + resp_block1.insert( + {make_nullable(std::move(resp_value_col1)), make_nullable(_int_type), "value"}); + + auto callback1 = std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>(); + callback1->response_.reset(new PMultiGetResponseV2()); + auto serialized_block = callback1->response_->add_blocks()->mutable_block(); + size_t uncompressed_size = 0; + size_t compressed_size = 0; + auto s = resp_block1.serialize(0, serialized_block, &uncompressed_size, &compressed_size, + CompressionTypePB::LZ4); + EXPECT_TRUE(s.ok()); + + _shared_state->rpc_struct_map[_backend_id1].callback = callback1; + } + + // Backend 2's response + { + vectorized::Block resp_block2; + auto resp_value_col2 = _int_type->create_column(); + auto* value_col_data2 = + reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get()); + value_col_data2->insert(200); + resp_block2.insert( + {make_nullable(std::move(resp_value_col2)), make_nullable(_int_type), "value"}); + + auto callback2 = std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>(); + callback2->response_.reset(new PMultiGetResponseV2()); + auto serialized_block = callback2->response_->add_blocks()->mutable_block(); + + size_t uncompressed_size = 0; + size_t compressed_size = 0; + auto s = resp_block2.serialize(0, serialized_block, &uncompressed_size, &compressed_size, + CompressionTypePB::LZ4); + EXPECT_TRUE(s.ok()); + + _shared_state->rpc_struct_map[_backend_id2].callback = callback2; + } + + // 3. Setup block order results to control merge order + _shared_state->block_order_results = { + {_backend_id1, 0, _backend_id2} // First block order: BE1,BE1,BE2 + }; + + // 4. Test merging responses + vectorized::Block result_block; + Status st = _shared_state->merge_multi_response(&result_block); + EXPECT_TRUE(st.ok()); + + // 5. Verify merged result + EXPECT_EQ(result_block.columns(), 2); // Should have original rowid column and value column + EXPECT_EQ(result_block.rows(), 3); // Total 3 rows from both backends + + // Verify the value column data is merged in correct order + auto* merged_value_col = result_block.get_by_position(0).column.get(); + EXPECT_EQ(*((int*)merged_value_col->get_data_at(0).data), 100); // First value from BE1 + EXPECT_EQ(merged_value_col->get_data_at(1).data, + nullptr); // Second value from BE1, replace by null + EXPECT_EQ(*((int*)merged_value_col->get_data_at(2).data), 200); // Third value from BE2 +} + +TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) { + // 1. Setup origin block with multiple nullable rowid columns + auto nullable_rowid_col1 = vectorized::ColumnNullable::create( + _string_type->create_column(), vectorized::ColumnUInt8::create()); + nullable_rowid_col1->insert_data((char*)&nullable_rowid_col1, 4); + nullable_rowid_col1->insert_data(nullptr, 4); + nullable_rowid_col1->insert_data((char*)&nullable_rowid_col1, 4); + + auto nullable_rowid_col2 = vectorized::ColumnNullable::create( + _string_type->create_column(), vectorized::ColumnUInt8::create()); + nullable_rowid_col2->insert_data((char*)&nullable_rowid_col2, 4); + nullable_rowid_col2->insert_data((char*)&nullable_rowid_col2, 4); + nullable_rowid_col2->insert_data(nullptr, 4); + + auto value_col1 = _int_type->create_column(); + value_col1->insert(100); + value_col1->insert(101); + value_col1->insert(102); + + auto value_col2 = _int_type->create_column(); + value_col2->insert(200); + value_col2->insert(201); + value_col2->insert(202); + + // Add test data to origin block with multiple columns + _shared_state->origin_block = vectorized::Block( + {{std::move(nullable_rowid_col1), vectorized::make_nullable(_string_type), "rowid1"}, + {std::move(nullable_rowid_col2), vectorized::make_nullable(_string_type), "rowid2"}, + {std::move(value_col1), _int_type, "value1"}, + {std::move(value_col2), _int_type, "value2"}}); + + // Set multiple rowid column locations + _shared_state->rowid_locs = {0, 1}; + + // 2. Setup response blocks from multiple backends for first rowid + { + vectorized::Block resp_block1; + auto resp_value_col1 = _int_type->create_column(); + auto* value_col_data1 = + reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get()); + value_col_data1->insert(100); + resp_block1.insert( + {make_nullable(std::move(resp_value_col1)), make_nullable(_int_type), "value1"}); + + auto callback1 = std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>(); + callback1->response_.reset(new PMultiGetResponseV2()); + auto serialized_block = callback1->response_->add_blocks()->mutable_block(); + size_t uncompressed_size = 0; + size_t compressed_size = 0; + auto s = resp_block1.serialize(0, serialized_block, &uncompressed_size, &compressed_size, + CompressionTypePB::LZ4); + EXPECT_TRUE(s.ok()); + + _shared_state->rpc_struct_map[_backend_id1].callback = callback1; + } + + // Backend 2's response for first rowid + { + vectorized::Block resp_block2; + auto resp_value_col2 = _int_type->create_column(); + auto* value_col_data2 = + reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get()); + value_col_data2->insert(102); + resp_block2.insert( + {make_nullable(std::move(resp_value_col2)), make_nullable(_int_type), "value1"}); + + auto callback2 = std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>(); + callback2->response_.reset(new PMultiGetResponseV2()); + auto serialized_block = callback2->response_->add_blocks()->mutable_block(); + size_t uncompressed_size = 0; + size_t compressed_size = 0; + auto s = resp_block2.serialize(0, serialized_block, &uncompressed_size, &compressed_size, + CompressionTypePB::LZ4); + EXPECT_TRUE(s.ok()); + + _shared_state->rpc_struct_map[_backend_id2].callback = callback2; + } + + // Add second block responses for second rowid + { + vectorized::Block resp_block1; + auto resp_value_col1 = _int_type->create_column(); + auto* value_col_data1 = + reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get()); + value_col_data1->insert(200); + resp_block1.insert( + {make_nullable(std::move(resp_value_col1)), make_nullable(_int_type), "value2"}); + + auto serialized_block = _shared_state->rpc_struct_map[_backend_id1] + .callback->response_->add_blocks() + ->mutable_block(); + size_t uncompressed_size = 0; + size_t compressed_size = 0; + auto s = resp_block1.serialize(0, serialized_block, &uncompressed_size, &compressed_size, + CompressionTypePB::LZ4); + EXPECT_TRUE(s.ok()); + } + + { + vectorized::Block resp_block2; + auto resp_value_col2 = _int_type->create_column(); + auto* value_col_data2 = + reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get()); + value_col_data2->insert(201); + resp_block2.insert( + {make_nullable(std::move(resp_value_col2)), make_nullable(_int_type), "value2"}); + + auto serialized_block = _shared_state->rpc_struct_map[_backend_id2] + .callback->response_->add_blocks() + ->mutable_block(); + size_t uncompressed_size = 0; + size_t compressed_size = 0; + auto s = resp_block2.serialize(0, serialized_block, &uncompressed_size, &compressed_size, + CompressionTypePB::LZ4); + EXPECT_TRUE(s.ok()); + } + + // 3. Setup block order results for both rowids + _shared_state->block_order_results = { + {_backend_id1, 0, _backend_id2}, // First block order: BE1,null,BE2 + {_backend_id1, _backend_id2, 0} // Second block order: BE1,BE2,null + }; + + // 4. Test merging responses + vectorized::Block result_block; + Status st = _shared_state->merge_multi_response(&result_block); + EXPECT_TRUE(st.ok()); + + // 5. Verify merged result + EXPECT_EQ(result_block.columns(), 4); // Should have two rowid columns and two value columns + EXPECT_EQ(result_block.rows(), 3); // Total 3 rows from both backends + + // Verify the first value column data is merged in correct order + auto* merged_value_col1 = result_block.get_by_position(0).column.get(); + EXPECT_EQ(*((int*)merged_value_col1->get_data_at(0).data), 100); + EXPECT_EQ(merged_value_col1->get_data_at(1).data, nullptr); + EXPECT_EQ(*((int*)merged_value_col1->get_data_at(2).data), 102); + + // Verify the second value column data is merged in correct order + auto* merged_value_col2 = result_block.get_by_position(1).column.get(); + EXPECT_EQ(*((int*)merged_value_col2->get_data_at(0).data), 200); + EXPECT_EQ(*((int*)merged_value_col2->get_data_at(1).data), 201); + EXPECT_EQ(merged_value_col2->get_data_at(2).data, nullptr); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 837d3f4a941..df406115c14 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -781,6 +781,40 @@ message PMultiGetResponse { repeated PRowLocation row_locs = 5; }; +// Eeach block have own schema to read +message PRequestBlockDesc { + optional bool fetch_row_store = 1; + repeated PSlotDescriptor slots = 2; + repeated ColumnPB column_descs = 3; + repeated uint32 file_id = 4; + repeated uint32 row_id = 5; +} + +message PMultiGetRequestV2 { + repeated PRequestBlockDesc request_block_descs = 1; + + // for compability + optional int32 be_exec_version = 2; + optional PUniqueId query_id = 3; + optional bool gc_id_map = 4; + optional uint64 wg_id = 5; +}; + +message PMultiGetBlockV2 { + optional PBlock block = 1; + // more effecient serialization fields for row store + enum RowFormat { + JSONB = 0; + }; + optional RowFormat format = 2; + repeated bytes binary_row_data = 3; +} + +message PMultiGetResponseV2 { + optional PStatus status = 1; + repeated PMultiGetBlockV2 blocks = 2; +}; + message PFetchColIdsRequest { message PFetchColIdParam { required int64 indexId = 1; @@ -1026,6 +1060,7 @@ service PBackendService { rpc outfile_write_success(POutfileWriteSuccessRequest) returns (POutfileWriteSuccessResult); rpc fetch_table_schema(PFetchTableSchemaRequest) returns (PFetchTableSchemaResult); rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse); + rpc multiget_data_v2(PMultiGetRequestV2) returns (PMultiGetResponseV2); rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns (PGetFileCacheMetaResponse); rpc tablet_fetch_data(PTabletKeyLookupRequest) returns (PTabletKeyLookupResponse); rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns (PFetchColIdsResponse); diff --git a/tools/clickbench-tools/conf/doris-cluster.conf b/tools/clickbench-tools/conf/doris-cluster.conf index cc7d8a2602e..7d70684941e 100644 --- a/tools/clickbench-tools/conf/doris-cluster.conf +++ b/tools/clickbench-tools/conf/doris-cluster.conf @@ -20,11 +20,11 @@ export FE_HOST='127.0.0.1' # BE host export BE_HOST='127.0.0.1' # http_port in fe.conf -export FE_HTTP_PORT=8030 +export FE_HTTP_PORT=8137 # webserver_port in be.conf -export BE_WEBSERVER_PORT=8040 +export BE_WEBSERVER_PORT=8147 # query_port in fe.conf -export FE_QUERY_PORT=9030 +export FE_QUERY_PORT=9137 # Doris username export USER='root' # Doris password --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org