github-actions[bot] commented on code in PR #33519:
URL: https://github.com/apache/doris/pull/33519#discussion_r1560780207


##########
be/src/vec/spill/spill_stream_manager.cpp:
##########
@@ -271,18 +282,54 @@
         RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, 
path={}", _path),
                                        "check file exist failed");
     }
-
+    return update_capacity();
+}
+Status SpillDataDir::update_capacity() {
+    std::lock_guard<std::mutex> l(_mutex);
+    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, 
&_disk_capacity_bytes,
+                                                                  
&_available_bytes));
+    if (_shared_with_storage_path) {
+        _limit_bytes = (size_t)(_disk_capacity_bytes *
+                                (config::storage_flood_stage_usage_percent / 
100.0) *
+                                (config::spill_storage_usage_percent / 100.0));
+    } else {
+        _limit_bytes =
+                (size_t)(_disk_capacity_bytes * 
(config::spill_storage_usage_percent / 100.0));
+    }
     return Status::OK();
 }
 bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
-    double used_pct = get_usage(incoming_data_size);
-    int64_t left_bytes = _available_bytes - incoming_data_size;
-    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
-        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
-        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
-                     << ", left bytes: " << left_bytes << ", path: " << _path;
-        return true;
+    std::lock_guard<std::mutex> l(_mutex);
+    if (_shared_with_storage_path) {
+        int64_t left_bytes = _available_bytes - incoming_data_size;
+        if (_used_bytes + incoming_data_size > _limit_bytes ||
+            left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
+            LOG_EVERY_T(WARNING, 1) << fmt::format(
+                    "spill data reach limit, path: {}, limit: {}, used: {}, 
available: {}, "
+                    "incoming "
+                    "bytes: {}",
+                    _path, PrettyPrinter::print_bytes(_limit_bytes),
+                    PrettyPrinter::print_bytes(_used_bytes),
+                    PrettyPrinter::print_bytes(_available_bytes),
+                    PrettyPrinter::print_bytes(incoming_data_size));
+            return true;
+        }
+        return false;
+    } else {
+        double used_pct = _disk_capacity_bytes == 0
+                                  ? 0
+                                  : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
+                                            (double)_disk_capacity_bytes;
+        if (used_pct >= config::spill_storage_usage_percent / 100.0) {
+            LOG_EVERY_T(WARNING, 1) << fmt::format(
+                    "spill data reach limit, path: {}, capacity: {}, 
available: {}, incoming "
+                    "bytes: {}",
+                    _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+                    PrettyPrinter::print_bytes(_available_bytes),
+                    PrettyPrinter::print_bytes(incoming_data_size));
+            return true;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   
   be/src/vec/spill/spill_stream_manager.cpp:322:
   ```diff
   -         if (used_pct >= config::spill_storage_usage_percent / 100.0) {
   -             LOG_EVERY_T(WARNING, 1) << fmt::format(
   -                     "spill data reach limit, path: {}, capacity: {}, 
available: {}, incoming "
   -                     "bytes: {}",
   -                     _path, 
PrettyPrinter::print_bytes(_disk_capacity_bytes),
   -                     PrettyPrinter::print_bytes(_available_bytes),
   -                     PrettyPrinter::print_bytes(incoming_data_size));
   -             return true;
   -         }
   -         return false;
   +         return used_pct >= config::spill_storage_usage_percent / 100.0;
   ```
   



##########
be/src/vec/spill/spill_stream_manager.cpp:
##########
@@ -271,18 +282,54 @@ Status SpillDataDir::init() {
         RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, 
path={}", _path),
                                        "check file exist failed");
     }
-
+    return update_capacity();
+}
+Status SpillDataDir::update_capacity() {
+    std::lock_guard<std::mutex> l(_mutex);
+    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, 
&_disk_capacity_bytes,
+                                                                  
&_available_bytes));
+    if (_shared_with_storage_path) {
+        _limit_bytes = (size_t)(_disk_capacity_bytes *
+                                (config::storage_flood_stage_usage_percent / 
100.0) *
+                                (config::spill_storage_usage_percent / 100.0));
+    } else {
+        _limit_bytes =
+                (size_t)(_disk_capacity_bytes * 
(config::spill_storage_usage_percent / 100.0));
+    }
     return Status::OK();
 }
 bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
-    double used_pct = get_usage(incoming_data_size);
-    int64_t left_bytes = _available_bytes - incoming_data_size;
-    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
-        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
-        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
-                     << ", left bytes: " << left_bytes << ", path: " << _path;
-        return true;
+    std::lock_guard<std::mutex> l(_mutex);
+    if (_shared_with_storage_path) {
+        int64_t left_bytes = _available_bytes - incoming_data_size;
+        if (_used_bytes + incoming_data_size > _limit_bytes ||
+            left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
+            LOG_EVERY_T(WARNING, 1) << fmt::format(
+                    "spill data reach limit, path: {}, limit: {}, used: {}, 
available: {}, "
+                    "incoming "
+                    "bytes: {}",
+                    _path, PrettyPrinter::print_bytes(_limit_bytes),
+                    PrettyPrinter::print_bytes(_used_bytes),
+                    PrettyPrinter::print_bytes(_available_bytes),
+                    PrettyPrinter::print_bytes(incoming_data_size));
+            return true;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   
   be/src/vec/spill/spill_stream_manager.cpp:304:
   ```diff
   -         if (_used_bytes + incoming_data_size > _limit_bytes ||
   -             left_bytes <= config::storage_flood_stage_left_capacity_bytes) 
{
   -             LOG_EVERY_T(WARNING, 1) << fmt::format(
   -                     "spill data reach limit, path: {}, limit: {}, used: 
{}, available: {}, "
   -                     "incoming "
   -                     "bytes: {}",
   -                     _path, PrettyPrinter::print_bytes(_limit_bytes),
   -                     PrettyPrinter::print_bytes(_used_bytes),
   -                     PrettyPrinter::print_bytes(_available_bytes),
   -                     PrettyPrinter::print_bytes(incoming_data_size));
   -             return true;
   -         }
   -         return false;
   +         return _used_bytes + incoming_data_size > _limit_bytes ||
   +             left_bytes <= config::storage_flood_stage_left_capacity_bytes;
   ```
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -493,6 +493,7 @@ Status 
PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
     // to avoid prepare _child_x twice
     auto child_x = std::move(_child_x);
     RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));

Review Comment:
   warning: method 'prepare' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/partitioned_hash_join_probe_operator.h:153:
   ```diff
   -     Status prepare(RuntimeState* state) override;
   +     static Status prepare(RuntimeState* state) override;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to