This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch selectdb-doris-2.1-glzq
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ae212fa0203356e0726ae08c4b1a8a29d3c31ae5
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri Jul 19 15:12:39 2024 +0800

    [Chore](brpc) add gc for abafreelist to avoid eagain and set brpc 
timeout_ms to 2s (#37888)
    
    ## Proposed changes
    1. add gc for abafreelist to avoid eagain
    ```
    [CANCELLED]failed to send brpc when exchange, error=Resource temporarily 
unavailable, error_text=[E11]Resource temporarily unavailable
    ```
    2. set brpc timeout_ms to 2s
---
 be/src/util/brpc_client_cache.h                    |   1 +
 ....9.0-add-gc-on-abalist-and-adjust-gc-size.patch | 280 +++++++++++++++++++++
 2 files changed, 281 insertions(+)

diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 290f2cc3e04..ebef80f4a6b 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -126,6 +126,7 @@ public:
             options.connection_group = connection_group;
         }
         options.connect_timeout_ms = 2000;
+        options.timeout_ms = 2000;
         options.max_retry = 10;
 
         std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
diff --git 
a/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch 
b/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch
new file mode 100644
index 00000000000..7287b4a536d
--- /dev/null
+++ b/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch
@@ -0,0 +1,280 @@
+From fb53ab9245e8570d44a2eeea2ab536841a7876ec Mon Sep 17 00:00:00 2001
+From: BiteTheDDDDt <pxl...@qq.com>
+Date: Tue, 16 Jul 2024 14:21:21 +0800
+Subject: [PATCH] add gc on abalist and adjust gc size
+
+---
+ src/bthread/bthread.cpp          |   3 +-
+ src/bthread/id.cpp               |   1 +
+ src/bthread/list_of_abafree_id.h | 161 ++++++++++++++++++++++++++++---
+ 3 files changed, 151 insertions(+), 14 deletions(-)
+
+diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
+index 5ac0c3b1..86fd2c90 100644
+--- a/src/bthread/bthread.cpp
++++ b/src/bthread/bthread.cpp
+@@ -147,7 +147,8 @@ start_from_non_worker(bthread_t* __restrict tid,
+ 
+ struct TidTraits {
+     static const size_t BLOCK_SIZE = 63;
+-    static const size_t MAX_ENTRIES = 65536;
++    static const size_t MAX_ENTRIES = 655360;
++    static const size_t INIT_GC_SIZE = 65536;
+     static const bthread_t ID_INIT;
+     static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); 
}
+ };
+diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp
+index 41c49a3f..ba77580a 100644
+--- a/src/bthread/id.cpp
++++ b/src/bthread/id.cpp
+@@ -291,6 +291,7 @@ void id_pool_status(std::ostream &os) {
+ struct IdTraits {
+     static const size_t BLOCK_SIZE = 63;
+     static const size_t MAX_ENTRIES = 100000;
++    static const size_t INIT_GC_SIZE = 4096;
+     static const bthread_id_t ID_INIT;
+     static bool exists(bthread_id_t id)
+     { return bthread::id_exists_with_true_negatives(id); }
+diff --git a/src/bthread/list_of_abafree_id.h 
b/src/bthread/list_of_abafree_id.h
+index ac2b2234..45043acc 100644
+--- a/src/bthread/list_of_abafree_id.h
++++ b/src/bthread/list_of_abafree_id.h
+@@ -22,8 +22,10 @@
+ #ifndef BTHREAD_LIST_OF_ABAFREE_ID_H
+ #define BTHREAD_LIST_OF_ABAFREE_ID_H
+ 
+-#include <vector>
+ #include <deque>
++#include <vector>
++
++#include "butil/macros.h"
+ 
+ namespace bthread {
+ 
+@@ -48,6 +50,9 @@ namespace bthread {
+ //   // Max #entries. Often has close relationship with concurrency, 65536
+ //   // is "huge" for most apps.
+ //   static const size_t MAX_ENTRIES = 65536;
++//   // Initial GC length, when the number of blocks reaches this length, 
++//   // start to initiate list GC operation. It will release useless blocks
++//   static const size_t INIT_GC_SIZE = 4096;
+ //
+ //   // Initial value of id. Id with the value is treated as invalid.
+ //   static const Id ID_INIT = ...;
+@@ -64,13 +69,14 @@ class ListOfABAFreeId {
+ public:
+     ListOfABAFreeId();
+     ~ListOfABAFreeId();
+-    
++
+     // Add an identifier into the list.
+     int add(Id id);
+-    
++
+     // Apply fn(id) to all identifiers.
+-    template <typename Fn> void apply(const Fn& fn);
+-    
++    template <typename Fn>
++    void apply(const Fn& fn);
++
+     // Put #entries of each level into `counts'
+     // Returns #levels.
+     size_t get_sizes(size_t* counts, size_t n);
+@@ -82,19 +88,31 @@ private:
+         IdBlock* next;
+     };
+     void forward_index();
++
++    struct TempIdBlock {
++        IdBlock* block;
++        uint32_t index;
++        uint32_t nblock;
++    };
++
++    int gc();
++    int add_to_temp_list(TempIdBlock* temp_list, Id id);
++    template <typename Fn>
++    int for_each(const Fn& fn);
++    void free_list(IdBlock* block);
++
+     IdBlock* _cur_block;
+     uint32_t _cur_index;
+     uint32_t _nblock;
+     IdBlock _head_block;
++    uint32_t _next_gc_size;
+ };
+ 
+ // [impl.]
+ 
+ template <typename Id, typename IdTraits> 
+ ListOfABAFreeId<Id, IdTraits>::ListOfABAFreeId()
+-    : _cur_block(&_head_block)
+-    , _cur_index(0)
+-    , _nblock(1) {
++        : _cur_block(&_head_block), _cur_index(0), _nblock(1), 
_next_gc_size(IdTraits::INIT_GC_SIZE) {
+     for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
+         _head_block.ids[i] = IdTraits::ID_INIT;
+     }
+@@ -140,6 +158,30 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
+         }
+         saved_pos[i] = pos;
+     }
++    // If we don't expect a GC to occur in abalist, then an error is reported 
and EAGAIN is returned.
++    if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
++        return EAGAIN;
++    }
++    // If the number of blocks exceeds the minimum GC length, start the GC 
operation
++    if (_nblock * IdTraits::BLOCK_SIZE > _next_gc_size) {
++        uint32_t before_gc_blocks = _nblock;
++        int rc = gc();
++        // To avoid frequent GC operations, we only let the GC be effective 
enough to continue the GC. 
++        // otherwise we let the next GC occur length * 2.
++        // 
++        // Condition for a GC to be sufficiently efficient: the number of 
blocks 
++        // retained after the GC is 1/4 of the previous one.
++        if ((before_gc_blocks - _nblock) * IdTraits::BLOCK_SIZE < 
(_next_gc_size - (_next_gc_size >> 2))) {
++            _next_gc_size <<= 1;
++            // We want to make sure that GC must occur before MAX_ENTRIES.
++            static_assert(IdTraits::MAX_ENTRIES > IdTraits::BLOCK_SIZE * 2, 
"MAX_ENTRIES should be greater than 2 * IdTraits::BLOCK_SIZE");
++            if (_next_gc_size >= IdTraits::MAX_ENTRIES) {
++                _next_gc_size = IdTraits::MAX_ENTRIES - IdTraits::BLOCK_SIZE 
* 2;
++            }
++        }
++
++        return rc;
++    }
+     // The list is considered to be "crowded", add a new block and scatter
+     // the conflict identifiers by inserting an empty entry after each of
+     // them, so that even if the identifiers are still valid when we walk
+@@ -152,9 +194,6 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
+     //
+     //  [..xxxx....] -> [......yyyy] -> [..........]
+     //    block A        new block      block B
+-    if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) {
+-        return EAGAIN;
+-    }
+     IdBlock* new_block = new (std::nothrow) IdBlock;
+     if (NULL == new_block) {
+         return ENOMEM;
+@@ -188,6 +227,93 @@ int ListOfABAFreeId<Id, IdTraits>::add(Id id) {
+     return 0;
+ }
+ 
++template <typename Id, typename IdTraits>
++int ListOfABAFreeId<Id, IdTraits>::gc() {
++    IdBlock* new_block = new (std::nothrow) IdBlock;
++    if (NULL == new_block) {
++        return ENOMEM;
++    }
++    // reset head block
++    for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++        new_block->ids[i] = IdTraits::ID_INIT;
++    }
++    new_block->next = NULL;
++
++    TempIdBlock tmp_id_block;
++    tmp_id_block.block = new_block;
++    tmp_id_block.nblock = 1;
++    tmp_id_block.index = 0;
++
++    // Add each element of the old list to the new list
++    int rc = for_each([&](Id id) {
++        int rc;
++        rc = add_to_temp_list(&tmp_id_block, id);
++        if (rc != 0) {
++            return rc;
++        }
++        rc = add_to_temp_list(&tmp_id_block, IdTraits::ID_INIT);
++        if (rc != 0) {
++            return rc;
++        }
++        return 0;
++    });
++
++    if (rc != 0) {
++        free_list(new_block);
++        return rc;
++    }
++
++    // reset head block
++    for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++        _head_block.ids[i] = IdTraits::ID_INIT;
++    }
++
++    free_list(_head_block.next);
++    _cur_block = tmp_id_block.block;
++    _cur_index = tmp_id_block.index;
++    // nblock and head_block
++    _nblock = tmp_id_block.nblock + 1;
++    _head_block.next = new_block;
++
++    return 0;
++}
++
++template <typename Id, typename IdTraits>
++int ListOfABAFreeId<Id, IdTraits>::add_to_temp_list(TempIdBlock* block_list, 
Id id) {
++    block_list->block->ids[block_list->index++] = id;
++    // add new list
++    if (block_list->index == IdTraits::BLOCK_SIZE) {
++        block_list->index = 0;
++        block_list->nblock++;
++        block_list->block->next = new (std::nothrow) IdBlock;
++        if (NULL == block_list->block->next) {
++            return ENOMEM;
++        }
++        block_list->block = block_list->block->next;
++        for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++            block_list->block->ids[i] = IdTraits::ID_INIT;
++        }
++        block_list->block->next = NULL;
++    }
++    return 0;
++}
++
++template <typename Id, typename IdTraits>
++template <typename Fn>
++int ListOfABAFreeId<Id, IdTraits>::for_each(const Fn& fn) {
++    for (IdBlock* p = &_head_block; p != NULL; p = p->next) {
++        for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) {
++            if (p->ids[i] != IdTraits::ID_INIT && 
IdTraits::exists(p->ids[i])) {
++                int rc = fn(p->ids[i]);
++                if (rc != 0) {
++                    return rc;
++                }
++            }
++        }
++    }
++    return 0;
++}
++
+ template <typename Id, typename IdTraits>
+ template <typename Fn>
+ void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
+@@ -200,6 +326,15 @@ void ListOfABAFreeId<Id, IdTraits>::apply(const Fn& fn) {
+     }
+ }
+ 
++template <typename Id, typename IdTraits>
++void ListOfABAFreeId<Id, IdTraits>::free_list(IdBlock* p) {
++    for (; p != NULL;) {
++        IdBlock* saved_next = p->next;
++        delete p;
++        p = saved_next;
++    }
++}
++
+ template <typename Id, typename IdTraits>
+ size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* cnts, size_t n) {
+     if (n == 0) {
+@@ -210,6 +345,6 @@ size_t ListOfABAFreeId<Id, IdTraits>::get_sizes(size_t* 
cnts, size_t n) {
+     return 1;
+ }
+ 
+-}  // namespace bthread
++} // namespace bthread
+ 
+-#endif  // BTHREAD_LIST_OF_ABAFREE_ID_H
++#endif // BTHREAD_LIST_OF_ABAFREE_ID_H
+-- 
+2.31.1
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to