github-actions[bot] commented on code in PR #16031:
URL: https://github.com/apache/doris/pull/16031#discussion_r1072323733


##########
be/src/util/async_io.h:
##########
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <bthread/bthread.h>
+
+#include "io/fs/file_system.h"
+#include "olap/olap_define.h"
+#include "priority_thread_pool.hpp"
+#include "runtime/threadlocal.h"
+
+namespace doris {
+
+struct AsyncIOCtx {
+    int nice;
+};
+
+/**
+ * Separate task from bthread to pthread, specific for IO task.
+ */
+class AsyncIO {
+public:
+    AsyncIO() {
+        _io_thread_pool = new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+                                                 
config::doris_scanner_thread_pool_queue_size,
+                                                 "async_io_thread_pool");
+        _remote_thread_pool = new PriorityThreadPool(
+                config::doris_remote_scanner_thread_pool_thread_num,
+                config::doris_remote_scanner_thread_pool_queue_size, 
"async_remote_thread_pool");
+    }
+
+    ~AsyncIO() {
+        SAFE_DELETE(_io_thread_pool);
+        SAFE_DELETE(_remote_thread_pool);
+    }
+
+    AsyncIO& operator=(const AsyncIO&) = delete;
+    AsyncIO(const AsyncIO&) = delete;
+
+    static AsyncIO& instance() {
+        static AsyncIO instance;
+        return instance;
+    }
+
+    // This function should run on the bthread, and it will put the task into
+    // thread_pool and release the bthread_worker at cv.wait. When the task is 
completed,
+    // the bthread will continue to execute.
+    static void run_task(std::function<void()> fn, io::FileSystemType 
file_type) {
+        DCHECK(bthread_self() != 0);
+        doris::Mutex mutex;
+        doris::ConditionVariable cv;
+        std::unique_lock l(mutex);
+
+        AsyncIOCtx* ctx = 
static_cast<AsyncIOCtx*>(bthread_getspecific(btls_io_ctx_key));
+        int nice = -1;
+        if (ctx == nullptr) {
+            nice = 18;
+        } else {
+            nice = ctx->nice;
+        }
+
+        PriorityThreadPool::Task task;
+        task.priority = nice;
+        task.work_function = [&] {
+            fn();
+            std::unique_lock l(mutex);
+            cv.notify_one();
+        };
+
+        if (file_type == io::FileSystemType::S3) {
+            AsyncIO::instance().remote_thread_pool()->offer(task);
+        } else {
+            AsyncIO::instance().io_thread_pool()->offer(task);
+        }
+        cv.wait(l);
+    }
+
+    inline static bthread_key_t btls_io_ctx_key;
+
+    static void io_ctx_key_deleter(void* d) { delete 
static_cast<AsyncIOCtx*>(d); }
+
+private:
+    PriorityThreadPool* _io_thread_pool = nullptr;
+    PriorityThreadPool* _remote_thread_pool = nullptr;
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the 
previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/util/async_io.h:79:** previously declared here
   ```cpp
   private:
   ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
     void drain_and_shutdown() override {
         {
-            std::unique_lock<std::mutex> l(_lock);
+            std::unique_lock l(_lock);

Review Comment:
   warning: use of undeclared identifier '_lock'; did you mean 'clock'? 
[clang-diagnostic-error]
   
   ```suggestion
               std::unique_lock l(clock);
   ```
   **/usr/include/time.h:71:** 'clock' declared here
   ```cpp
   extern clock_t clock (void) __THROW;
                  ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
     void drain_and_shutdown() override {
         {
-            std::unique_lock<std::mutex> l(_lock);
+            std::unique_lock l(_lock);
             while (get_queue_size() != 0) {
                 _empty_cv.wait(l);

Review Comment:
   warning: use of undeclared identifier '_empty_cv' [clang-diagnostic-error]
   ```cpp
                   _empty_cv.wait(l);
                   ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@ class PriorityWorkStealingThreadPool : public 
PriorityThreadPool {
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
     void drain_and_shutdown() override {

Review Comment:
   warning: only virtual member functions can be marked 'override' 
[clang-diagnostic-error]
   
   ```suggestion
       void drain_and_shutdown() {
   ```
   



##########
be/src/util/once.h:
##########
@@ -54,10 +55,16 @@ class DorisCallOnce {
     // lambda and stores its return value. Otherwise, returns the stored 
Status.
     template <typename Fn>
     ReturnType call(Fn fn) {
-        std::call_once(_once_flag, [this, fn] {
-            _status = fn();
-            _has_called.store(true, std::memory_order_release);
-        });
+        if (!_has_called.load(std::memory_order_acquire)) {
+            do {
+                std::lock_guard l(_mutex);
+                if (_has_called.load(std::memory_order_acquire)) break;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
                   if (_has_called.load(std::memory_order_acquire)) { break;
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to