This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3b9cb524bc [feature-wip](unique-key-merge-on-write) Add rowset tree, based on interval-tree, DSIP-018[3/3] (#10714) 3b9cb524bc is described below commit 3b9cb524bc17bf904489a81f6d96e613cea0d54e Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Mon Jul 11 23:10:38 2022 +0800 [feature-wip](unique-key-merge-on-write) Add rowset tree, based on interval-tree, DSIP-018[3/3] (#10714) * port from rowset-tree from kudu * use shared_ptr * some update * add mock rowset * some compatibility update * fix ut fail * reformat code --- be/src/olap/olap_common.h | 11 + be/src/olap/rowset/CMakeLists.txt | 3 +- be/src/olap/rowset/rowset_tree.cpp | 269 +++++++++++++++++++ be/src/olap/rowset/rowset_tree.h | 138 ++++++++++ be/test/CMakeLists.txt | 1 + be/test/olap/rowset/rowset_tree_test.cpp | 434 +++++++++++++++++++++++++++++++ be/test/testutil/mock_rowset.h | 96 +++++++ 7 files changed, 951 insertions(+), 1 deletion(-) diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 1334957dc5..92a1d6f241 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -414,4 +414,15 @@ struct RowsetId { } }; +// used for hash-struct of hash_map<RowsetId, Rowset*>. +struct HashOfRowsetId { + size_t operator()(const RowsetId& rowset_id) const { + size_t seed = 0; + seed = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed); + seed = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed); + seed = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed); + return seed; + } +}; + } // namespace doris diff --git a/be/src/olap/rowset/CMakeLists.txt b/be/src/olap/rowset/CMakeLists.txt index 131978a9c4..cd5a61636d 100644 --- a/be/src/olap/rowset/CMakeLists.txt +++ b/be/src/olap/rowset/CMakeLists.txt @@ -29,6 +29,7 @@ add_library(Rowset STATIC alpha_rowset_meta.cpp beta_rowset.cpp beta_rowset_reader.cpp - beta_rowset_writer.cpp) + beta_rowset_writer.cpp + rowset_tree.cpp) target_compile_options(Rowset PUBLIC) diff --git a/be/src/olap/rowset/rowset_tree.cpp b/be/src/olap/rowset/rowset_tree.cpp new file mode 100644 index 0000000000..d7cd879e28 --- /dev/null +++ b/be/src/olap/rowset/rowset_tree.cpp @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This file is copied from +// https://github.com/apache/kudu/blob/master/src/kudu/tablet/rowset_tree.cc +// and modified by Doris + +#include "olap/rowset/rowset_tree.h" + +#include <glog/logging.h> + +#include <cstddef> +#include <functional> +#include <memory> +#include <ostream> +#include <string> +#include <vector> + +#include "gutil/stl_util.h" +#include "gutil/strings/substitute.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta.h" +#include "util/interval_tree-inl.h" +#include "util/slice.h" + +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; + +namespace doris { + +namespace { + +// Lexicographic, first by slice, then by rowset pointer, then by segment id, then by start/stop +bool RSEndpointBySliceCompare(const RowsetTree::RSEndpoint& a, const RowsetTree::RSEndpoint& b) { + int slice_cmp = a.slice_.compare(b.slice_); + if (slice_cmp) return slice_cmp < 0; + ptrdiff_t rs_cmp = a.rowset_.get() - b.rowset_.get(); + if (rs_cmp) return rs_cmp < 0; + int seg_cmp = a.segment_id_ < b.segment_id_; + if (seg_cmp) return seg_cmp < 0; + if (a.endpoint_ != b.endpoint_) return a.endpoint_ == RowsetTree::START; + return false; +} + +// Wrapper used when making batch queries into the interval tree. +struct QueryStruct { + // The slice of the operation performing the query. + Slice slice; + // The original index of this slice in the incoming batch. + int idx; +}; + +} // anonymous namespace + +// Entry for use in the interval tree. +struct RowsetWithBounds { + string min_key; + string max_key; + + // NOTE: the ordering of struct fields here is purposeful: we access + // min_key and max_key frequently, so putting them first in the struct + // ensures they fill a single 64-byte cache line (each is 32 bytes). + // The 'rowset' pointer is accessed comparitively rarely. + RowsetSharedPtr rowset; + int32_t segment_id; +}; + +// Traits struct for IntervalTree. +struct RowsetIntervalTraits { + typedef Slice point_type; + typedef RowsetWithBounds* interval_type; + + static Slice get_left(const RowsetWithBounds* rs) { return Slice(rs->min_key); } + + static Slice get_right(const RowsetWithBounds* rs) { return Slice(rs->max_key); } + + static int compare(const Slice& a, const Slice& b) { return a.compare(b); } + + static int compare(const Slice& a, const QueryStruct& b) { return a.compare(b.slice); } + + static int compare(const QueryStruct& a, const Slice& b) { return -compare(b, a); } + + // When 'a' is std::nullopt: + // (1)'a' is +OO when 'positive_direction' is true; + // (2)'a' is -OO when 'positive_direction' is false. + static int compare(const std::optional<Slice>& a, const Slice& b, const EndpointIfNone& type) { + if (a == std::nullopt) { + return ((POSITIVE_INFINITY == type) ? 1 : -1); + } + + return compare(*a, b); + } + + static int compare(const Slice& a, const std::optional<Slice>& b, const EndpointIfNone& type) { + return -compare(b, a, type); + } +}; + +RowsetTree::RowsetTree() : initted_(false) {} + +Status RowsetTree::Init(const RowsetVector& rowsets) { + if (initted_) { + return Status::InternalError("Call Init method on a RowsetTree that's already inited!"); + } + std::vector<RowsetWithBounds*> entries; + ElementDeleter deleter(&entries); + entries.reserve(rowsets.size()); + std::vector<RSEndpoint> endpoints; + endpoints.reserve(rowsets.size() * 2); + + // Iterate over each of the provided Rowsets, fetching their + // bounds and adding them to the local vectors. + for (const RowsetSharedPtr& rs : rowsets) { + std::vector<KeyBoundsPB> segments_key_bounds; + Status s = rs->get_segments_key_bounds(&segments_key_bounds); + if (!s.ok()) { + LOG(WARNING) << "Unable to construct RowsetTree: " << rs->rowset_id() + << " unable to determine its bounds: " << s.to_string(); + return s; + } + DCHECK_EQ(segments_key_bounds.size(), rs->num_segments()); + + for (auto i = 0; i < rs->num_segments(); i++) { + unique_ptr<RowsetWithBounds> rsit(new RowsetWithBounds()); + rsit->rowset = rs; + rsit->segment_id = i; + string min_key = segments_key_bounds[i].min_key(); + string max_key = segments_key_bounds[i].max_key(); + DCHECK_LE(min_key.compare(max_key), 0) + << "Rowset min: " << min_key << " must be <= max: " << max_key; + + // Load bounds and save entry + rsit->min_key = std::move(min_key); + rsit->max_key = std::move(max_key); + + // Load into key endpoints. + endpoints.emplace_back(rsit->rowset, i, START, rsit->min_key); + endpoints.emplace_back(rsit->rowset, i, STOP, rsit->max_key); + + entries.push_back(rsit.release()); + } + } + + // Sort endpoints + std::sort(endpoints.begin(), endpoints.end(), RSEndpointBySliceCompare); + + // Install the vectors into the object. + entries_.swap(entries); + tree_.reset(new IntervalTree<RowsetIntervalTraits>(entries_)); + key_endpoints_.swap(endpoints); + all_rowsets_.assign(rowsets.begin(), rowsets.end()); + + // Build the mapping from RS_ID to RS. + rs_by_id_.clear(); + for (auto& rs : all_rowsets_) { + if (!rs_by_id_.insert({rs->rowset_id(), rs}).second) { + return Status::InternalError(strings::Substitute( + "Add rowset with $0 to rowset tree of tablet $1 failed", + rs->rowset_id().to_string(), rs->rowset_meta()->tablet_uid().to_string())); + } + } + + initted_ = true; + + return Status::OK(); +} + +void RowsetTree::FindRowsetsIntersectingInterval( + const std::optional<Slice>& lower_bound, const std::optional<Slice>& upper_bound, + vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const { + DCHECK(initted_); + + vector<RowsetWithBounds*> from_tree; + from_tree.reserve(all_rowsets_.size()); + tree_->FindIntersectingInterval(lower_bound, upper_bound, &from_tree); + rowsets->reserve(rowsets->size() + from_tree.size()); + for (RowsetWithBounds* rs : from_tree) { + rowsets->emplace_back(rs->rowset, rs->segment_id); + } +} + +void RowsetTree::FindRowsetsWithKeyInRange( + const Slice& encoded_key, vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const { + DCHECK(initted_); + + // Query the interval tree to efficiently find rowsets with known bounds + // whose ranges overlap the probe key. + vector<RowsetWithBounds*> from_tree; + from_tree.reserve(all_rowsets_.size()); + tree_->FindContainingPoint(encoded_key, &from_tree); + rowsets->reserve(rowsets->size() + from_tree.size()); + for (RowsetWithBounds* rs : from_tree) { + rowsets->emplace_back(rs->rowset, rs->segment_id); + } +} + +void RowsetTree::ForEachRowsetContainingKeys( + const std::vector<Slice>& encoded_keys, + const std::function<void(RowsetSharedPtr, int)>& cb) const { + DCHECK(std::is_sorted(encoded_keys.cbegin(), encoded_keys.cend(), Slice::Comparator())); + // The interval tree batch query callback would naturally just give us back + // the matching Slices, but that won't allow us to easily tell the caller + // which specific operation _index_ matched the Rowset. So, we make a vector + // of QueryStructs to pair the Slice with its original index. + vector<QueryStruct> queries; + queries.resize(encoded_keys.size()); + for (int i = 0; i < encoded_keys.size(); i++) { + queries[i] = {encoded_keys[i], i}; + } + + tree_->ForEachIntervalContainingPoints( + queries, [&](const QueryStruct& qs, RowsetWithBounds* rs) { cb(rs->rowset, qs.idx); }); +} + +RowsetTree::~RowsetTree() { + for (RowsetWithBounds* e : entries_) { + delete e; + } + entries_.clear(); +} + +void ModifyRowSetTree(const RowsetTree& old_tree, const RowsetVector& rowsets_to_remove, + const RowsetVector& rowsets_to_add, RowsetTree* new_tree) { + RowsetVector post_swap; + + // O(n^2) diff algorithm to collect the set of rowsets excluding + // the rowsets that were included in the compaction + int num_removed = 0; + + for (const RowsetSharedPtr& rs : old_tree.all_rowsets()) { + // Determine if it should be removed + bool should_remove = false; + for (const RowsetSharedPtr& to_remove : rowsets_to_remove) { + if (to_remove->rowset_id() == rs->rowset_id()) { + should_remove = true; + num_removed++; + break; + } + } + if (!should_remove) { + post_swap.push_back(rs); + } + } + + CHECK_EQ(num_removed, rowsets_to_remove.size()); + + // Then push the new rowsets on the end of the new list + std::copy(rowsets_to_add.begin(), rowsets_to_add.end(), std::back_inserter(post_swap)); + + CHECK(new_tree->Init(post_swap).ok()); +} + +} // namespace doris diff --git a/be/src/olap/rowset/rowset_tree.h b/be/src/olap/rowset/rowset_tree.h new file mode 100644 index 0000000000..92503dbf50 --- /dev/null +++ b/be/src/olap/rowset/rowset_tree.h @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This file is copied from +// https://github.com/apache/kudu/blob/master/src/kudu/tablet/rowset_tree.h +// and modified by Doris + +#pragma once + +#include <boost/optional/optional.hpp> +#include <cstdint> +#include <functional> +#include <memory> +#include <unordered_map> +#include <vector> + +#include "common/status.h" +#include "gutil/map-util.h" +#include "olap/rowset/rowset.h" +#include "util/slice.h" + +namespace doris { + +template <class Traits> +class IntervalTree; + +struct RowsetIntervalTraits; +struct RowsetWithBounds; + +// Used often enough, may as well typedef it. +typedef std::vector<RowsetSharedPtr> RowsetVector; + +// Class which encapsulates the set of rowsets which are active for a given +// Tablet. This provides efficient lookup by key for Rowsets which may overlap +// that key range. +// +// Additionally, the rowset tree maintains information about the implicit +// intervals generated by the row sets (for instance, if a tablet has +// rowsets [0, 2] and [1, 3] it has three implicit contiguous intervals: +// [0, 1], [1, 2], and [2, 3]. +class RowsetTree { +public: + // An RSEndpoint is a POD which associates a rowset, an EndpointType + // (either the START or STOP of an interval), and the key at which the + // endpoint is located. + enum EndpointType { START, STOP }; + struct RSEndpoint { + RSEndpoint(RowsetSharedPtr rowset, uint32_t segment_id, EndpointType endpoint, Slice slice) + : rowset_(rowset), segment_id_(segment_id), endpoint_(endpoint), slice_(slice) {} + + RowsetSharedPtr rowset_; + uint32_t segment_id_; + enum EndpointType endpoint_; + Slice slice_; + }; + + RowsetTree(); + Status Init(const RowsetVector& rowsets); + ~RowsetTree(); + + // Return all Rowsets whose range may contain the given encoded key. + // + // The returned pointers are guaranteed to be valid at least until this + // RowsetTree object is Reset(). + void FindRowsetsWithKeyInRange(const Slice& encoded_key, + vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const; + + // Call 'cb(rowset, index)' for each (rowset, index) pair such that + // 'encoded_keys[index]' may be within the bounds of 'rowset'. + // + // See IntervalTree::ForEachIntervalContainingPoints for additional + // information on the particular order in which the callback will be called. + // + // REQUIRES: 'encoded_keys' must be in sorted order. + void ForEachRowsetContainingKeys(const std::vector<Slice>& encoded_keys, + const std::function<void(RowsetSharedPtr, int)>& cb) const; + + // When 'lower_bound' is boost::none, it means negative infinity. + // When 'upper_bound' is boost::none, it means positive infinity. + // So the query interval can be one of below: + // [-OO, +OO) + // [-OO, upper_bound) + // [lower_bound, +OO) + // [lower_bound, upper_bound) + void FindRowsetsIntersectingInterval( + const std::optional<Slice>& lower_bound, const std::optional<Slice>& upper_bound, + vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const; + + const RowsetVector& all_rowsets() const { return all_rowsets_; } + + RowsetSharedPtr rs_by_id(RowsetId rs_id) const { return FindPtrOrNull(rs_by_id_, rs_id); } + + // Iterates over RowsetTree::RSEndpoint, guaranteed to be ordered and for + // any rowset to appear exactly twice, once at its start slice and once at + // its stop slice, equivalent to its GetBounds() values. + const std::vector<RSEndpoint>& key_endpoints() const { return key_endpoints_; } + +private: + // Interval tree of the rowsets. Used to efficiently find rowsets which might contain + // a probe row. + std::unique_ptr<IntervalTree<RowsetIntervalTraits>> tree_; + + // Ordered map of all the interval endpoints, holding the implicit contiguous + // intervals + std::vector<RSEndpoint> key_endpoints_; + + // Container for all of the entries in tree_. IntervalTree does + // not itself manage memory, so this provides a simple way to enumerate + // all the entry structs and free them in the destructor. + std::vector<RowsetWithBounds*> entries_; + + // All of the rowsets which were put in this RowsetTree. + RowsetVector all_rowsets_; + + // The Rowsets in this RowsetTree, keyed by their id. + std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> rs_by_id_; + + bool initted_; +}; + +void ModifyRowSetTree(const RowsetTree& old_tree, const RowsetVector& rowsets_to_remove, + const RowsetVector& rowsets_to_add, RowsetTree* new_tree); + +} // namespace doris diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 1e2ffc8ade..9c38bb875e 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -180,6 +180,7 @@ set(OLAP_TEST_FILES olap/rowset/rowset_meta_test.cpp olap/rowset/beta_rowset_test.cpp olap/rowset/unique_rowset_id_generator_test.cpp + olap/rowset/rowset_tree_test.cpp # TODO yiguolei: it is using alpha rowset to test, should use beta rowset #olap/txn_manager_test.cpp olap/generic_iterators_test.cpp diff --git a/be/test/olap/rowset/rowset_tree_test.cpp b/be/test/olap/rowset/rowset_tree_test.cpp new file mode 100644 index 0000000000..0dd20a64fd --- /dev/null +++ b/be/test/olap/rowset/rowset_tree_test.cpp @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This file is copied from +// https://github.com/apache/kudu/blob/master/src/kudu/tablet/rowset_tree-test.cc +// and modified by Doris + +#include "olap/rowset/rowset_tree.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include <boost/optional/optional.hpp> +#include <cstdlib> +#include <cstring> +#include <functional> +#include <memory> +#include <string> +#include <tuple> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "gutil/map-util.h" +#include "gutil/stringprintf.h" +#include "gutil/strings/substitute.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/unique_rowset_id_generator.h" +#include "testutil/mock_rowset.h" +#include "testutil/test_util.h" +#include "util/slice.h" +#include "util/stopwatch.hpp" + +using std::make_shared; +using std::shared_ptr; +using std::string; +using std::unordered_set; +using std::vector; +using strings::Substitute; + +namespace doris { + +class TestRowsetTree : public testing::Test { +public: + TestRowsetTree() : rowset_id_generator_({0, 0}) {} + + void SetUp() { + TabletSchemaPB schema_pb; + schema_pb.set_keys_type(UNIQUE_KEYS); + schema_.init_from_pb(schema_pb); + } + + // Generates random rowsets with keys between 0 and 10000 + RowsetVector GenerateRandomRowsets(int num_sets) { + RowsetVector vec; + for (int i = 0; i < num_sets; i++) { + int min = rand() % 9000; + int max = min + 1000; + vec.push_back(create_rowset(StringPrintf("%04d", min), StringPrintf("%04d", max))); + } + return vec; + } + + RowsetSharedPtr create_rowset(const string& min_key, const string& max_key, + bool is_mem_rowset = false) { + RowsetMetaPB rs_meta_pb; + rs_meta_pb.set_rowset_id_v2(rowset_id_generator_.next_id().to_string()); + rs_meta_pb.set_num_segments(1); + KeyBoundsPB key_bounds; + key_bounds.set_min_key(min_key); + key_bounds.set_max_key(max_key); + KeyBoundsPB* new_key_bounds = rs_meta_pb.add_segments_key_bounds(); + *new_key_bounds = key_bounds; + RowsetMetaSharedPtr meta_ptr = make_shared<RowsetMeta>(); + meta_ptr->init_from_pb(rs_meta_pb); + RowsetSharedPtr res_ptr; + MockRowset::create_rowset(&schema_, rowset_path_, meta_ptr, &res_ptr, is_mem_rowset); + return res_ptr; + } + +private: + TabletSchema schema_; + std::string rowset_path_; + UniqueRowsetIdGenerator rowset_id_generator_; +}; + +TEST_F(TestRowsetTree, TestTree) { + RowsetVector vec; + vec.push_back(create_rowset("0", "5")); + vec.push_back(create_rowset("3", "5")); + vec.push_back(create_rowset("5", "9")); + vec.push_back(create_rowset("0", "0", true)); + + RowsetTree tree; + ASSERT_FALSE(tree.Init(vec).ok()); + + vec.erase(vec.begin() + 3); + ASSERT_TRUE(tree.Init(vec).ok()); + + // "2" overlaps 0-5 + vector<std::pair<RowsetSharedPtr, int32_t>> out; + tree.FindRowsetsWithKeyInRange("2", &out); + ASSERT_EQ(1, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + + // "4" overlaps 0-5, 3-5 + out.clear(); + tree.FindRowsetsWithKeyInRange("4", &out); + ASSERT_EQ(2, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + + // interval [3,4) overlaps 0-5, 3-5 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("3"), Slice("4"), &out); + ASSERT_EQ(2, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + + // interval [0,2) overlaps 0-5 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("0"), Slice("2"), &out); + ASSERT_EQ(1, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + + // interval [5,7) overlaps 0-5, 3-5, 5-9 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("5"), Slice("7"), &out); + ASSERT_EQ(3, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + ASSERT_EQ(vec[2].get(), out[2].first.get()); + + // "3" overlaps 0-5, 3-5 + out.clear(); + tree.FindRowsetsWithKeyInRange("3", &out); + ASSERT_EQ(2, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + + // "5" overlaps 0-5, 3-5, 5-9 + out.clear(); + tree.FindRowsetsWithKeyInRange("5", &out); + ASSERT_EQ(3, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + ASSERT_EQ(vec[2].get(), out[2].first.get()); + + // interval [0,5) overlaps 0-5, 3-5 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("0"), Slice("5"), &out); + ASSERT_EQ(2, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + + // interval [3,5) overlaps 0-5, 3-5 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("3"), Slice("5"), &out); + ASSERT_EQ(2, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + + // interval [-OO,3) overlaps 0-5 + out.clear(); + tree.FindRowsetsIntersectingInterval(std::nullopt, Slice("3"), &out); + ASSERT_EQ(1, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + + // interval [-OO,5) overlaps 0-5, 3-5 + out.clear(); + tree.FindRowsetsIntersectingInterval(std::nullopt, Slice("5"), &out); + ASSERT_EQ(2, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + + // interval [-OO,99) overlaps 0-5, 3-5, 5-9 + out.clear(); + tree.FindRowsetsIntersectingInterval(std::nullopt, Slice("99"), &out); + ASSERT_EQ(3, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + ASSERT_EQ(vec[2].get(), out[2].first.get()); + + // interval [6,+OO) overlaps 5-9 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("6"), std::nullopt, &out); + ASSERT_EQ(1, out.size()); + ASSERT_EQ(vec[2].get(), out[0].first.get()); + + // interval [5,+OO) overlaps 0-5, 3-5, 5-9 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("5"), std::nullopt, &out); + ASSERT_EQ(3, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + ASSERT_EQ(vec[2].get(), out[2].first.get()); + + // interval [4,+OO) overlaps 0-5, 3-5, 5-9 + out.clear(); + tree.FindRowsetsIntersectingInterval(Slice("4"), std::nullopt, &out); + ASSERT_EQ(3, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + ASSERT_EQ(vec[2].get(), out[2].first.get()); + + // interval [-OO,+OO) overlaps 0-5, 3-5, 5-9 + out.clear(); + tree.FindRowsetsIntersectingInterval(std::nullopt, std::nullopt, &out); + ASSERT_EQ(3, out.size()); + ASSERT_EQ(vec[0].get(), out[0].first.get()); + ASSERT_EQ(vec[1].get(), out[1].first.get()); + ASSERT_EQ(vec[2].get(), out[2].first.get()); +} + +TEST_F(TestRowsetTree, TestTreeRandomized) { + enum BoundOperator { + BOUND_LESS_THAN, + BOUND_LESS_EQUAL, + BOUND_GREATER_THAN, + BOUND_GREATER_EQUAL, + BOUND_EQUAL + }; + const auto& GetStringPair = [](const BoundOperator op, int start, int range_length) { + while (true) { + string s1 = Substitute("$0", rand_rng_int(start, start + range_length)); + string s2 = Substitute("$0", rand_rng_int(start, start + range_length)); + int r = strcmp(s1.c_str(), s2.c_str()); + switch (op) { + case BOUND_LESS_THAN: + if (r == 0) continue; // pass through. + case BOUND_LESS_EQUAL: + return std::pair<string, string>(std::min(s1, s2), std::max(s1, s2)); + case BOUND_GREATER_THAN: + if (r == 0) continue; // pass through. + case BOUND_GREATER_EQUAL: + return std::pair<string, string>(std::max(s1, s2), std::min(s1, s2)); + case BOUND_EQUAL: + return std::pair<string, string>(s1, s1); + } + } + }; + + RowsetVector vec; + for (int i = 0; i < 100; i++) { + std::pair<string, string> bound = GetStringPair(BOUND_LESS_EQUAL, 1000, 900); + ASSERT_LE(bound.first, bound.second); + vec.push_back(shared_ptr<Rowset>(create_rowset(bound.first, bound.second))); + } + RowsetTree tree; + ASSERT_TRUE(tree.Init(vec).ok()); + + // When lower < upper. + vector<std::pair<RowsetSharedPtr, int32_t>> out; + for (int i = 0; i < 100; i++) { + out.clear(); + std::pair<string, string> bound = GetStringPair(BOUND_LESS_THAN, 1000, 900); + ASSERT_LT(bound.first, bound.second); + tree.FindRowsetsIntersectingInterval(Slice(bound.first), Slice(bound.second), &out); + for (const auto& e : out) { + std::vector<KeyBoundsPB> segments_key_bounds; + e.first->get_segments_key_bounds(&segments_key_bounds); + ASSERT_EQ(1, segments_key_bounds.size()); + string min = segments_key_bounds[0].min_key(); + string max = segments_key_bounds[0].max_key(); + if (min < bound.first) { + ASSERT_GE(max, bound.first); + } else { + ASSERT_LT(min, bound.second); + } + if (max >= bound.second) { + ASSERT_LT(min, bound.second); + } else { + ASSERT_GE(max, bound.first); + } + } + } + + // Remove 50 rowsets, add 10 new rowsets, with non overlapping key range. + RowsetVector vec_to_del(vec.begin(), vec.begin() + 50); + RowsetVector vec_to_add; + for (int i = 0; i < 10; i++) { + std::pair<string, string> bound = GetStringPair(BOUND_LESS_EQUAL, 2000, 900); + ASSERT_LE(bound.first, bound.second); + vec_to_add.push_back(shared_ptr<Rowset>(create_rowset(bound.first, bound.second))); + } + + RowsetTree new_tree; + ModifyRowSetTree(tree, vec_to_del, vec_to_add, &new_tree); + + // only 50 rowsets left in old key range "1000"-"1900" + out.clear(); + new_tree.FindRowsetsIntersectingInterval(Slice("1000"), Slice("1999"), &out); + ASSERT_EQ(50, out.size()); + // should get 10 new added rowsets with key range "2000"-"2900" + out.clear(); + new_tree.FindRowsetsIntersectingInterval(Slice("2000"), Slice("2999"), &out); + ASSERT_EQ(10, out.size()); + out.clear(); + new_tree.FindRowsetsIntersectingInterval(Slice("1000"), Slice("2999"), &out); + ASSERT_EQ(60, out.size()); +} + +class TestRowsetTreePerformance : public TestRowsetTree, + public testing::WithParamInterface<std::tuple<int, int>> {}; +INSTANTIATE_TEST_SUITE_P(Parameters, TestRowsetTreePerformance, + testing::Combine( + // Number of rowsets. + // Up to 500 rowsets (500*32MB = 16GB tablet) + testing::Values(10, 100, 250, 500), + // Number of query points in a batch. + testing::Values(10, 100, 500, 1000, 5000))); + +TEST_P(TestRowsetTreePerformance, TestPerformance) { + const int kNumRowsets = std::get<0>(GetParam()); + const int kNumQueries = std::get<1>(GetParam()); + const int kNumIterations = AllowSlowTests() ? 1000 : 10; + + MonotonicStopWatch one_at_time_timer; + MonotonicStopWatch batch_timer; + for (int i = 0; i < kNumIterations; i++) { + // Create a bunch of rowsets, each of which spans about 10% of the "row space". + // The row space here is 4-digit 0-padded numbers. + RowsetVector vec = GenerateRandomRowsets(kNumRowsets); + + RowsetTree tree; + ASSERT_TRUE(tree.Init(vec).ok()); + + vector<string> queries; + for (int j = 0; j < kNumQueries; j++) { + int query = rand_rng_int(0, 10000); + queries.emplace_back(StringPrintf("%04d", query)); + } + + int individual_matches = 0; + one_at_time_timer.start(); + { + vector<std::pair<RowsetSharedPtr, int32_t>> out; + for (const auto& q : queries) { + out.clear(); + tree.FindRowsetsWithKeyInRange(Slice(q), &out); + individual_matches += out.size(); + } + } + one_at_time_timer.stop(); + + vector<Slice> query_slices; + for (const auto& q : queries) { + query_slices.emplace_back(q); + } + + batch_timer.start(); + std::sort(query_slices.begin(), query_slices.end(), Slice::Comparator()); + int bulk_matches = 0; + { + tree.ForEachRowsetContainingKeys( + query_slices, [&](RowsetSharedPtr rs, int slice_idx) { bulk_matches++; }); + } + batch_timer.stop(); + + ASSERT_EQ(bulk_matches, individual_matches); + } + + double batch_total = batch_timer.elapsed_time(); + double oat_total = one_at_time_timer.elapsed_time(); + const string& case_desc = StringPrintf("Q=% 5d R=% 5d", kNumQueries, kNumRowsets); + LOG(INFO) << StringPrintf("%s %10s %d ms", case_desc.c_str(), "1-by-1", + static_cast<int>(oat_total / 1e6)); + LOG(INFO) << StringPrintf("%s %10s %d ms (%.2fx)", case_desc.c_str(), "batched", + static_cast<int>(batch_total / 1e6), + batch_total ? (oat_total / batch_total) : 0); +} + +TEST_F(TestRowsetTree, TestEndpointsConsistency) { + const int kNumRowsets = 1000; + RowsetVector vec = GenerateRandomRowsets(kNumRowsets); + // Add pathological one-key rows + for (int i = 0; i < 10; ++i) { + vec.push_back(create_rowset(StringPrintf("%04d", 11000), StringPrintf("%04d", 11000))); + } + vec.push_back(create_rowset(StringPrintf("%04d", 12000), StringPrintf("%04d", 12000))); + // Make tree + RowsetTree tree; + ASSERT_TRUE(tree.Init(vec).ok()); + // Keep track of "currently open" intervals defined by the endpoints + unordered_set<RowsetSharedPtr> open; + // Keep track of all rowsets that have been visited + unordered_set<RowsetSharedPtr> visited; + + Slice prev; + for (const RowsetTree::RSEndpoint& rse : tree.key_endpoints()) { + RowsetSharedPtr rs = rse.rowset_; + enum RowsetTree::EndpointType ept = rse.endpoint_; + const Slice& slice = rse.slice_; + + ASSERT_TRUE(rs != nullptr) << "RowsetTree has an endpoint with no rowset"; + ASSERT_TRUE(!slice.empty()) << "RowsetTree has an endpoint with no key"; + + if (!prev.empty()) { + ASSERT_LE(prev.compare(slice), 0); + } + + std::vector<KeyBoundsPB> segments_key_bounds; + ASSERT_TRUE(rs->get_segments_key_bounds(&segments_key_bounds).ok()); + ASSERT_EQ(1, segments_key_bounds.size()); + string min = segments_key_bounds[0].min_key(); + string max = segments_key_bounds[0].max_key(); + if (ept == RowsetTree::START) { + ASSERT_EQ(min, slice.to_string()); + ASSERT_TRUE(InsertIfNotPresent(&open, rs)); + ASSERT_TRUE(InsertIfNotPresent(&visited, rs)); + } else if (ept == RowsetTree::STOP) { + ASSERT_EQ(max, slice.to_string()); + ASSERT_TRUE(open.erase(rs) == 1); + } else { + FAIL() << "No such endpoint type exists"; + } + } +} + +} // namespace doris diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h new file mode 100644 index 0000000000..7a1bc4a746 --- /dev/null +++ b/be/test/testutil/mock_rowset.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/rowset/rowset.h" + +namespace doris { + +class MockRowset : public Rowset { + virtual Status create_reader(std::shared_ptr<RowsetReader>* result) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status split_range(const RowCursor& start_key, const RowCursor& end_key, + uint64_t request_block_row_count, size_t key_num, + std::vector<OlapTuple>* ranges) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status remove() override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status remove_old_files(std::vector<std::string>* files_to_remove) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual bool check_path(const std::string& path) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual bool check_file_exist() override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) override { + // TODO(zhangchen): remove this after we implemented memrowset. + if (is_mem_rowset_) { + return Status::NotSupported("Memtable not support key bounds"); + } + return Rowset::get_segments_key_bounds(segments_key_bounds); + } + + static Status create_rowset(const TabletSchema* schema, const std::string& rowset_path, + RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset, + bool is_mem_rowset = false) { + rowset->reset(new MockRowset(schema, rowset_path, rowset_meta)); + ((MockRowset*)rowset->get())->is_mem_rowset_ = is_mem_rowset; + return Status::OK(); + } + +protected: + MockRowset(const TabletSchema* schema, const std::string& rowset_path, + RowsetMetaSharedPtr rowset_meta) + : Rowset(schema, rowset_path, rowset_meta) {} + + virtual Status init() override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual Status do_load(bool use_cache) override { + return Status::NotSupported("MockRowset not support this method."); + } + + virtual void do_close() override { + // Do nothing. + } + +private: + bool is_mem_rowset_; +}; + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org