This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e117d8b41 [enhance](cloud) Implement FullRangeGetIterator to 
simplify iterating over a kv range (#33388)
26e117d8b41 is described below

commit 26e117d8b41858401bcd1b5c8f6e971cb6fc3ba0
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Tue Jun 11 14:48:02 2024 +0800

    [enhance](cloud) Implement FullRangeGetIterator to simplify iterating over 
a kv range (#33388)
    
    Implement FullRangeGetIterator to simplify iterating over a kv range
    
    ## Further comments
    
    If this is a relatively large or complex change, kick off the discussion
    at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why
    you chose the solution you did and what alternatives you considered,
    etc...
---
 cloud/src/meta-service/mem_txn_kv.cpp |  49 +++++++
 cloud/src/meta-service/mem_txn_kv.h   |  26 +++-
 cloud/src/meta-service/txn_kv.cpp     | 131 +++++++++++++++++
 cloud/src/meta-service/txn_kv.h       |  87 +++++++++++
 cloud/test/CMakeLists.txt             |   1 +
 cloud/test/txn_kv_test.cpp            | 263 ++++++++++++++++++++++++++++++++++
 6 files changed, 556 insertions(+), 1 deletion(-)

diff --git a/cloud/src/meta-service/mem_txn_kv.cpp 
b/cloud/src/meta-service/mem_txn_kv.cpp
index 76066192252..36453251beb 100644
--- a/cloud/src/meta-service/mem_txn_kv.cpp
+++ b/cloud/src/meta-service/mem_txn_kv.cpp
@@ -218,6 +218,12 @@ int64_t MemTxnKv::get_last_read_version() {
     return read_version_;
 }
 
+std::unique_ptr<FullRangeGetIterator> MemTxnKv::full_range_get(std::string 
begin, std::string end,
+                                                               
FullRangeGetIteratorOptions opts) {
+    return std::make_unique<memkv::FullRangeGetIterator>(std::move(begin), 
std::move(end),
+                                                         std::move(opts));
+}
+
 } // namespace doris::cloud
 
 namespace doris::cloud::memkv {
@@ -477,4 +483,47 @@ TxnErrorCode 
Transaction::batch_get(std::vector<std::optional<std::string>>* res
     return TxnErrorCode::TXN_OK;
 }
 
+FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
+                                           FullRangeGetIteratorOptions opts)
+        : opts_(std::move(opts)), begin_(std::move(begin)), 
end_(std::move(end)) {}
+
+FullRangeGetIterator::~FullRangeGetIterator() = default;
+
+bool FullRangeGetIterator::has_next() {
+    if (!is_valid_) {
+        return false;
+    }
+
+    if (!inner_iter_) {
+        auto* txn = opts_.txn;
+        if (!txn) {
+            // Create a new txn for each inner range get
+            std::unique_ptr<cloud::Transaction> txn1;
+            TxnErrorCode err = opts_.txn_kv->create_txn(&txn_);
+            if (err != TxnErrorCode::TXN_OK) {
+                is_valid_ = false;
+                return false;
+            }
+
+            txn = txn_.get();
+        }
+
+        TxnErrorCode err = txn->get(begin_, end_, &inner_iter_, 
opts_.snapshot, 0);
+        if (err != TxnErrorCode::TXN_OK) {
+            is_valid_ = false;
+            return false;
+        }
+    }
+
+    return inner_iter_->has_next();
+}
+
+std::optional<std::pair<std::string_view, std::string_view>> 
FullRangeGetIterator::next() {
+    if (!has_next()) {
+        return std::nullopt;
+    }
+
+    return inner_iter_->next();
+}
+
 } // namespace doris::cloud::memkv
diff --git a/cloud/src/meta-service/mem_txn_kv.h 
b/cloud/src/meta-service/mem_txn_kv.h
index 7da70aad8f3..63fb008f586 100644
--- a/cloud/src/meta-service/mem_txn_kv.h
+++ b/cloud/src/meta-service/mem_txn_kv.h
@@ -50,6 +50,9 @@ public:
 
     int init() override;
 
+    std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin, 
std::string end,
+                                                         
FullRangeGetIteratorOptions opts) override;
+
     TxnErrorCode get_kv(const std::string& key, std::string* val, int64_t 
version);
     TxnErrorCode get_kv(const std::string& begin, const std::string& end, 
int64_t version,
                         int limit, bool* more, std::map<std::string, 
std::string>* kv_list);
@@ -208,7 +211,7 @@ private:
     std::set<std::string> unreadable_keys_;
     std::set<std::string> read_set_;
     std::map<std::string, std::string> writes_;
-    std::list<std::pair<std::string, std::string>> remove_ranges_;
+    std::vector<std::pair<std::string, std::string>> remove_ranges_;
     std::vector<std::tuple<ModifyOpType, std::string, std::string>> op_list_;
 
     int64_t committed_version_ = -1;
@@ -260,5 +263,26 @@ private:
     bool more_;
 };
 
+class FullRangeGetIterator final : public cloud::FullRangeGetIterator {
+public:
+    FullRangeGetIterator(std::string begin, std::string end, 
FullRangeGetIteratorOptions opts);
+
+    ~FullRangeGetIterator() override;
+
+    bool is_valid() override { return is_valid_; }
+
+    bool has_next() override;
+
+    std::optional<std::pair<std::string_view, std::string_view>> next() 
override;
+
+private:
+    FullRangeGetIteratorOptions opts_;
+    bool is_valid_ {true};
+    std::unique_ptr<cloud::RangeGetIterator> inner_iter_;
+    std::string begin_;
+    std::string end_;
+    std::unique_ptr<cloud::Transaction> txn_;
+};
+
 } // namespace memkv
 } // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/txn_kv.cpp 
b/cloud/src/meta-service/txn_kv.cpp
index ebb63d095f8..f48ac8f9912 100644
--- a/cloud/src/meta-service/txn_kv.cpp
+++ b/cloud/src/meta-service/txn_kv.cpp
@@ -78,6 +78,12 @@ TxnErrorCode 
FdbTxnKv::create_txn(std::unique_ptr<Transaction>* txn) {
     return ret;
 }
 
+std::unique_ptr<FullRangeGetIterator> FdbTxnKv::full_range_get(std::string 
begin, std::string end,
+                                                               
FullRangeGetIteratorOptions opts) {
+    return std::make_unique<fdb::FullRangeGetIterator>(std::move(begin), 
std::move(end),
+                                                       std::move(opts));
+}
+
 } // namespace doris::cloud
 
 namespace doris::cloud::fdb {
@@ -586,4 +592,129 @@ TxnErrorCode 
Transaction::batch_get(std::vector<std::optional<std::string>>* res
     return TxnErrorCode::TXN_OK;
 }
 
+FullRangeGetIterator::FullRangeGetIterator(std::string begin, std::string end,
+                                           FullRangeGetIteratorOptions opts)
+        : opts_(std::move(opts)), begin_(std::move(begin)), 
end_(std::move(end)) {
+    DCHECK(dynamic_cast<FdbTxnKv*>(opts_.txn_kv.get()));
+    DCHECK(!opts_.txn || dynamic_cast<fdb::Transaction*>(opts_.txn)) << 
opts_.txn;
+}
+
+FullRangeGetIterator::~FullRangeGetIterator() {
+    if (fut_) {
+        static_cast<void>(fdb::await_future(fut_));
+        fdb_future_destroy(fut_);
+    }
+}
+
+bool FullRangeGetIterator::has_next() {
+    if (!is_valid_) {
+        return false;
+    }
+
+    if (!inner_iter_) {
+        // The first call
+        init();
+        if (!is_valid_) {
+            return false;
+        }
+
+        return inner_iter_->has_next();
+    }
+
+    if (inner_iter_->has_next()) {
+        if (prefetch()) {
+            TEST_SYNC_POINT("fdb.FullRangeGetIterator.has_next_prefetch");
+            async_inner_get(inner_iter_->next_begin_key());
+        }
+        return true;
+    }
+
+    if (!inner_iter_->more()) {
+        return false;
+    }
+
+    if (!fut_) {
+        async_inner_get(inner_iter_->next_begin_key());
+        if (!is_valid_) {
+            return false;
+        }
+    }
+
+    await_future();
+    return is_valid_ ? inner_iter_->has_next() : false;
+}
+
+std::optional<std::pair<std::string_view, std::string_view>> 
FullRangeGetIterator::next() {
+    if (!has_next()) {
+        return std::nullopt;
+    }
+
+    return inner_iter_->next();
+}
+
+void FullRangeGetIterator::await_future() {
+    auto ret = fdb::await_future(fut_);
+    if (ret != TxnErrorCode::TXN_OK) {
+        is_valid_ = false;
+        return;
+    }
+
+    auto err = fdb_future_get_error(fut_);
+    if (err) {
+        is_valid_ = false;
+        LOG(WARNING) << fdb_get_error(err);
+        return;
+    }
+
+    if (opts_.obj_pool && inner_iter_) {
+        opts_.obj_pool->push_back(std::move(inner_iter_));
+    }
+    inner_iter_ = std::make_unique<RangeGetIterator>(fut_);
+    fut_ = nullptr;
+
+    if (inner_iter_->init() != TxnErrorCode::TXN_OK) {
+        is_valid_ = false;
+    }
+}
+
+void FullRangeGetIterator::init() {
+    async_inner_get(begin_);
+    if (!is_valid_) {
+        return;
+    }
+
+    await_future();
+}
+
+bool FullRangeGetIterator::prefetch() {
+    return opts_.prefetch && is_valid_ && !fut_ && inner_iter_->more();
+}
+
+void FullRangeGetIterator::async_inner_get(std::string_view begin) {
+    DCHECK(!fut_);
+
+    auto* txn = static_cast<Transaction*>(opts_.txn);
+    if (!txn) {
+        // Create a new txn for each inner range get
+        std::unique_ptr<cloud::Transaction> txn1;
+        // TODO(plat1ko): Async create txn
+        TxnErrorCode err = opts_.txn_kv->create_txn(&txn1);
+        if (err != TxnErrorCode::TXN_OK) {
+            is_valid_ = false;
+            return;
+        }
+
+        txn_.reset(static_cast<Transaction*>(txn1.release()));
+        txn = txn_.get();
+    }
+
+    // TODO(plat1ko): Support `Transaction::async_get` api
+    fut_ = fdb_transaction_get_range(
+            txn->txn_, 
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)begin.data(), begin.size()),
+            FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end_.data(), 
end_.size()), opts_.limit,
+            0 /*target_bytes, unlimited*/, 
FDBStreamingMode::FDB_STREAMING_MODE_WANT_ALL,
+            //       FDBStreamingMode::FDB_STREAMING_MODE_ITERATOR,
+            0 /*iteration*/, opts_.snapshot, false /*reverse*/);
+}
+
 } // namespace doris::cloud::fdb
diff --git a/cloud/src/meta-service/txn_kv.h b/cloud/src/meta-service/txn_kv.h
index bb12e78404e..99b187e0f63 100644
--- a/cloud/src/meta-service/txn_kv.h
+++ b/cloud/src/meta-service/txn_kv.h
@@ -38,6 +38,49 @@ namespace doris::cloud {
 
 class Transaction;
 class RangeGetIterator;
+class TxnKv;
+
+/**
+ * Unlike `RangeGetIterator`, which can only iterate within a page of range, 
this iterator is
+ * capable of iterating over the entire specified range.
+ *
+ * Usage:
+ * for (auto kvp = it.next(); kvp.has_value(); kvp = it.next()) {
+ *     auto [k, v] = *kvp;
+ * }
+ * if (!it.is_valid()) {
+ *     return err;
+ * }
+ */
+struct FullRangeGetIteratorOptions {
+    std::shared_ptr<TxnKv> txn_kv;
+    // Trigger prefetch getting next batch kvs before access them
+    bool prefetch = false;
+    bool snapshot = false;
+    // If non-zero, indicates the maximum number of key-value pairs to return 
(not effective in memkv)
+    int limit = 0;
+    // Reference. If not null, each inner range get is performed through this 
transaction; otherwise
+    // perform each inner range get through a new transaction.
+    Transaction* txn = nullptr;
+    // If users want to extend the lifespan of the kv pair returned by 
`next()`, they can pass an
+    // object pool to collect the inner iterators that have completed iterated.
+    std::vector<std::unique_ptr<RangeGetIterator>>* obj_pool = nullptr;
+
+    FullRangeGetIteratorOptions(std::shared_ptr<TxnKv> _txn_kv) : 
txn_kv(std::move(_txn_kv)) {}
+};
+
+class FullRangeGetIterator {
+public:
+    FullRangeGetIterator() = default;
+
+    virtual ~FullRangeGetIterator() = default;
+
+    virtual bool is_valid() = 0;
+
+    virtual bool has_next() = 0;
+
+    virtual std::optional<std::pair<std::string_view, std::string_view>> 
next() = 0;
+};
 
 class TxnKv {
 public:
@@ -54,6 +97,9 @@ public:
     virtual TxnErrorCode create_txn(std::unique_ptr<Transaction>* txn) = 0;
 
     virtual int init() = 0;
+
+    virtual std::unique_ptr<FullRangeGetIterator> full_range_get(
+            std::string begin, std::string end, FullRangeGetIteratorOptions 
opts) = 0;
 };
 
 class Transaction {
@@ -280,6 +326,9 @@ public:
 
     int init() override;
 
+    std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin, 
std::string end,
+                                                         
FullRangeGetIteratorOptions opts) override;
+
 private:
     std::shared_ptr<fdb::Network> network_;
     std::shared_ptr<fdb::Database> database_;
@@ -416,6 +465,8 @@ private:
 class Transaction : public cloud::Transaction {
 public:
     friend class Database;
+    friend class FullRangeGetIterator;
+
     Transaction(std::shared_ptr<Database> db) : db_(std::move(db)) {}
 
     ~Transaction() override {
@@ -520,5 +571,41 @@ private:
     size_t approximate_bytes_ {0};
 };
 
+class FullRangeGetIterator final : public cloud::FullRangeGetIterator {
+public:
+    FullRangeGetIterator(std::string begin, std::string end, 
FullRangeGetIteratorOptions opts);
+
+    ~FullRangeGetIterator() override;
+
+    bool is_valid() override { return is_valid_; }
+
+    bool has_next() override;
+
+    std::optional<std::pair<std::string_view, std::string_view>> next() 
override;
+
+private:
+    // Set `is_valid_` to false if meet any error
+    void init();
+
+    // Await `fut_` and create new inner iter.
+    // Set `is_valid_` to false if meet any error
+    void await_future();
+
+    // Perform a paginate range get asynchronously and set `fut_`.
+    // Set `is_valid_` to false if meet any error
+    void async_inner_get(std::string_view begin);
+
+    bool prefetch();
+
+    FullRangeGetIteratorOptions opts_;
+
+    bool is_valid_ = true;
+    std::string begin_;
+    std::string end_;
+    std::unique_ptr<Transaction> txn_;
+    std::unique_ptr<RangeGetIterator> inner_iter_;
+    FDBFuture* fut_ = nullptr;
+};
+
 } // namespace fdb
 } // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 94b84aa4ba0..ab8101ee2c9 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -15,6 +15,7 @@ add_executable(keys_test keys_test.cpp)
 add_executable(doris_txn_test doris_txn_test.cpp)
 
 add_executable(txn_kv_test txn_kv_test.cpp)
+set_target_properties(txn_kv_test PROPERTIES COMPILE_FLAGS 
"-fno-access-control")
 
 add_executable(recycler_test recycler_test.cpp)
 
diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp
index 63a89a07434..4f8eed1090d 100644
--- a/cloud/test/txn_kv_test.cpp
+++ b/cloud/test/txn_kv_test.cpp
@@ -24,12 +24,16 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <gtest/gtest.h>
 
+#include <chrono>
 #include <cstddef>
+#include <string>
+#include <thread>
 
 #include "common/config.h"
 #include "common/stopwatch.h"
 #include "common/sync_point.h"
 #include "common/util.h"
+#include "meta-service/codec.h"
 #include "meta-service/doris_txn.h"
 #include "meta-service/keys.h"
 #include "meta-service/mem_txn_kv.h"
@@ -545,3 +549,262 @@ TEST(TxnKvTest, BatchGet) {
         }
     }
 }
+
+TEST(TxnKvTest, FullRangeGetIterator) {
+    using namespace std::chrono_literals;
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+    constexpr std::string_view prefix = "FullRangeGetIterator";
+    for (int i = 0; i < 100; ++i) {
+        std::string key {prefix};
+        encode_int64(i, &key);
+        txn->put(key, std::to_string(i));
+    }
+    err = txn->commit();
+    ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+
+    std::string begin {prefix};
+    std::string end {prefix};
+    encode_int64(INT64_MAX, &end);
+
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer(
+            (int*)0x01, [](int*) { 
SyncPoint::get_instance()->clear_all_call_backs(); });
+    sp->enable_processing();
+
+    {
+        // Without txn
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        ASSERT_TRUE(it->is_valid());
+
+        int cnt = 0;
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            auto [k, v] = *kvp;
+            EXPECT_EQ(v, std::to_string(cnt));
+            ++cnt;
+            // Total cost: 60ms * 100 = 6s > fdb txn timeout 5s, however we 
create a new transaction
+            // in each inner range get
+            std::this_thread::sleep_for(60ms);
+        }
+        ASSERT_TRUE(it->is_valid());
+        EXPECT_EQ(cnt, 100);
+    }
+
+    {
+        // With txn
+        err = txn_kv->create_txn(&txn);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+        opts.txn = txn.get();
+
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        ASSERT_TRUE(it->is_valid());
+
+        int cnt = 0;
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            auto [k, v] = *kvp;
+            EXPECT_EQ(v, std::to_string(cnt));
+            ++cnt;
+        }
+        ASSERT_TRUE(it->is_valid());
+        EXPECT_EQ(cnt, 100);
+    }
+
+    {
+        // With prefetch
+        err = txn_kv->create_txn(&txn);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+        opts.txn = txn.get();
+        opts.prefetch = true;
+
+        int prefetch_cnt = 0;
+        sp->set_call_back("fdb.FullRangeGetIterator.has_next_prefetch", 
[&](void* p) {
+            ++prefetch_cnt;
+            std::cout << "With prefetch prefetch_cnt=" << prefetch_cnt << 
std::endl;
+        });
+
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        ASSERT_TRUE(it->is_valid());
+
+        int cnt = 0;
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            auto [k, v] = *kvp;
+            EXPECT_EQ(v, std::to_string(cnt));
+            ++cnt;
+            // Sleep to wait for prefetch to be ready
+            std::this_thread::sleep_for(1ms);
+        }
+        ASSERT_TRUE(it->is_valid());
+        EXPECT_EQ(cnt, 100);
+
+        sp->clear_call_back("fdb.FullRangeGetIterator.has_next_prefetch");
+    }
+
+    {
+        // With object pool
+        std::vector<std::unique_ptr<RangeGetIterator>> obj_pool;
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+        opts.obj_pool = &obj_pool;
+
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        ASSERT_TRUE(it->is_valid());
+
+        int cnt = 0;
+        std::vector<std::string_view> values;
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            auto [k, v] = *kvp;
+            EXPECT_EQ(v, std::to_string(cnt));
+            values.push_back(v);
+            if (cnt % 25 == 24) {
+                // values should be alive
+                int base = cnt / 25 * 25;
+                for (int i = 0; i < values.size(); ++i) {
+                    EXPECT_EQ(values[i], std::to_string(base + i));
+                }
+                values.clear();
+                obj_pool.clear();
+            }
+            ++cnt;
+        }
+        ASSERT_TRUE(it->is_valid());
+        EXPECT_EQ(cnt, 100);
+    }
+
+    {
+        // Abnormal
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+        err = txn_kv->create_txn(&txn);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        opts.txn = txn.get();
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        auto* fdb_it = static_cast<fdb::FullRangeGetIterator*>(it.get());
+        fdb_it->is_valid_ = false;
+        ASSERT_FALSE(it->is_valid());
+        ASSERT_FALSE(it->has_next());
+        ASSERT_FALSE(it->next().has_value());
+
+        fdb_it->is_valid_ = true;
+        ASSERT_TRUE(it->is_valid());
+        int cnt = 0;
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            auto [k, v] = *kvp;
+            EXPECT_EQ(v, std::to_string(cnt));
+            ++cnt;
+            // Total cost: 60ms * 100 = 6s > fdb txn timeout 5s
+            std::this_thread::sleep_for(60ms);
+        }
+        // Txn timeout
+        ASSERT_FALSE(it->is_valid());
+        ASSERT_FALSE(it->has_next());
+        ASSERT_FALSE(it->next().has_value());
+    }
+
+    {
+        // Abnormal dtor
+        int prefetch_cnt = 0;
+        sp->set_call_back("fdb.FullRangeGetIterator.has_next_prefetch", 
[&](void* p) {
+            ++prefetch_cnt;
+            std::cout << "Abnormal dtor prefetch_cnt=" << prefetch_cnt << 
std::endl;
+        });
+
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+        opts.prefetch = true;
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        auto kvp = it->next();
+        ASSERT_TRUE(kvp.has_value());
+        kvp = it->next(); // Trigger prefetch
+        ASSERT_TRUE(kvp.has_value());
+        auto* fdb_it = static_cast<fdb::FullRangeGetIterator*>(it.get());
+        ASSERT_TRUE(fdb_it->fut_ != nullptr); // There is an inflight range get
+        // Since there is an inflight range get, should not trigger another 
prefetch
+        ASSERT_FALSE(fdb_it->prefetch());
+
+        sp->clear_call_back("fdb.FullRangeGetIterator.has_next_prefetch");
+
+        // `~FullRangeGetIterator` without consuming inflight range get result
+    }
+
+    {
+        // Benchmark prefetch
+        // No prefetch
+        FullRangeGetIteratorOptions opts(txn_kv);
+        opts.limit = 11;
+        err = txn_kv->create_txn(&txn);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        opts.txn = txn.get();
+
+        auto it = txn_kv->full_range_get(begin, end, opts);
+        int cnt = 0;
+        auto start = std::chrono::steady_clock::now();
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            ++cnt;
+            std::this_thread::sleep_for(1ms);
+        }
+        auto finish = std::chrono::steady_clock::now();
+        ASSERT_TRUE(it->is_valid());
+        EXPECT_EQ(cnt, 100);
+        std::cout << "no prefetch cost="
+                  << 
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
+                  << "ms" << std::endl;
+
+        // Prefetch
+        err = txn_kv->create_txn(&txn);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        opts.txn = txn.get();
+        opts.prefetch = true;
+        it = txn_kv->full_range_get(begin, end, opts);
+        cnt = 0;
+        start = std::chrono::steady_clock::now();
+        for (auto kvp = it->next(); kvp.has_value(); kvp = it->next()) {
+            ++cnt;
+            std::this_thread::sleep_for(1ms);
+        }
+        finish = std::chrono::steady_clock::now();
+        ASSERT_TRUE(it->is_valid());
+        EXPECT_EQ(cnt, 100);
+        std::cout << "prefetch cost="
+                  << 
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
+                  << "ms" << std::endl;
+
+        // Use RangeGetIterator
+        err = txn_kv->create_txn(&txn);
+        ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+        std::unique_ptr<RangeGetIterator> inner_it;
+        auto inner_begin = begin;
+        cnt = 0;
+        start = std::chrono::steady_clock::now();
+        do {
+            err = txn->get(inner_begin, end, &inner_it, false, 11);
+            ASSERT_EQ(err, TxnErrorCode::TXN_OK);
+            if (!inner_it->has_next()) {
+                break;
+            }
+            while (inner_it->has_next()) {
+                // recycle corresponding resources
+                auto [k, v] = inner_it->next();
+                std::this_thread::sleep_for(1ms);
+                ++cnt;
+                if (!inner_it->has_next()) {
+                    inner_begin = k;
+                }
+            }
+            inner_begin.push_back('\x00'); // Update to next smallest key for 
iteration
+        } while (inner_it->more());
+        finish = std::chrono::steady_clock::now();
+        EXPECT_EQ(cnt, 100);
+        std::cout << "RangeGetIterator cost="
+                  << 
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count()
+                  << "ms" << std::endl;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to