This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new db523dafcb2 [improve](move-memtable) limit task num in load stream 
flush token (#28748)
db523dafcb2 is described below

commit db523dafcb2ee2973753aeebe27b025a7b523719
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Dec 21 12:19:58 2023 +0800

    [improve](move-memtable) limit task num in load stream flush token (#28748)
---
 be/src/common/config.cpp       |  2 ++
 be/src/common/config.h         |  2 ++
 be/src/runtime/load_stream.cpp | 13 +++++++++++--
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e75ac2c1c75..0c4caecc368 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -776,6 +776,8 @@ DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
 DEFINE_Int32(load_stream_messages_in_batch, "128");
 // brpc streaming StreamWait seconds on EAGAIN
 DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
+// max tasks per flush token in load stream
+DEFINE_Int32(load_stream_flush_token_max_tasks, "2");
 
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c911963a87f..6565d15d9f2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -837,6 +837,8 @@ DECLARE_Int64(load_stream_max_buf_size);
 DECLARE_Int32(load_stream_messages_in_batch);
 // brpc streaming StreamWait seconds on EAGAIN
 DECLARE_Int32(load_stream_eagain_wait_seconds);
+// max tasks per flush token in load stream
+DECLARE_Int32(load_stream_flush_token_max_tasks);
 
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 9d05d48f54c..313728091c1 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -18,6 +18,7 @@
 #include "runtime/load_stream.h"
 
 #include <brpc/stream.h>
+#include <bthread/bthread.h>
 #include <bthread/condition_variable.h>
 #include <bthread/mutex.h>
 #include <olap/rowset/rowset_factory.h>
@@ -136,7 +137,11 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
             LOG(INFO) << "write data failed " << *this;
         }
     };
-    return _flush_tokens[new_segid % 
_flush_tokens.size()]->submit_func(flush_func);
+    auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
+    while (flush_token->num_tasks() >= 
config::load_stream_flush_token_max_tasks) {
+        bthread_usleep(10 * 1000); // 10ms
+    }
+    return flush_token->submit_func(flush_func);
 }
 
 Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* 
data) {
@@ -170,7 +175,11 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
             LOG(INFO) << "add segment failed " << *this;
         }
     };
-    return _flush_tokens[new_segid % 
_flush_tokens.size()]->submit_func(add_segment_func);
+    auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
+    while (flush_token->num_tasks() >= 
config::load_stream_flush_token_max_tasks) {
+        bthread_usleep(10 * 1000); // 10ms
+    }
+    return flush_token->submit_func(add_segment_func);
 }
 
 Status TabletStream::close() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to