github-actions[bot] commented on code in PR #33291:
URL: https://github.com/apache/doris/pull/33291#discussion_r1554640592


##########
be/src/vec/spill/spill_stream.cpp:
##########
@@ -103,9 +113,9 @@ Status SpillStream::wait_spill() {
     return Status::OK();
 }
 
-Status SpillStream::spill_block(const Block& block, bool eof) {
+Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool 
eof) {

Review Comment:
   warning: method 'spill_block' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/spill/spill_stream.h:55:
   ```diff
   -     Status spill_block(RuntimeState* state, const Block& block, bool eof);
   +     static Status spill_block(RuntimeState* state, const Block& block, 
bool eof);
   ```
   



##########
be/src/vec/spill/spill_stream_manager.cpp:
##########
@@ -273,18 +275,67 @@ Status SpillDataDir::init() {
         RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, 
path={}", _path),
                                        "check file exist failed");
     }
-
+    return update_capacity();
+}
+Status SpillDataDir::update_capacity() {
+    std::lock_guard<std::mutex> l(_mutex);
+    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, 
&_disk_capacity_bytes,
+                                                                  
&_available_bytes));
+    if (_shared_with_storage_path) {
+        _limit_bytes = (size_t)(_disk_capacity_bytes *
+                                (config::storage_flood_stage_usage_percent / 
100.0) *
+                                (config::spill_storage_usage_percent / 100.0));
+    } else {
+        _limit_bytes =
+                (size_t)(_disk_capacity_bytes * 
(config::spill_storage_usage_percent / 100.0));
+    }
     return Status::OK();
 }
 bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
-    double used_pct = get_usage(incoming_data_size);
-    int64_t left_bytes = _available_bytes - incoming_data_size;
-    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
-        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
-        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
-                     << ", left bytes: " << left_bytes << ", path: " << _path;
-        return true;
+    std::lock_guard<std::mutex> l(_mutex);
+    if (_shared_with_storage_path) {
+        VLOG_DEBUG << fmt::format(
+                "spill data path: {}, limit: {}, used: {}, available: {}, "
+                "incoming "
+                "bytes: {}",
+                _path, PrettyPrinter::print_bytes(_limit_bytes),
+                PrettyPrinter::print_bytes(_used_bytes),
+                PrettyPrinter::print_bytes(_available_bytes),
+                PrettyPrinter::print_bytes(incoming_data_size));
+        int64_t left_bytes = _available_bytes - incoming_data_size;
+        if (_used_bytes + incoming_data_size > _limit_bytes ||
+            left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
+            LOG(WARNING) << fmt::format(
+                    "spill data reach limit, path: {}, limit: {}, used: {}, 
available: {}, "
+                    "incoming "
+                    "bytes: {}",
+                    _path, PrettyPrinter::print_bytes(_limit_bytes),
+                    PrettyPrinter::print_bytes(_used_bytes),
+                    PrettyPrinter::print_bytes(_available_bytes),
+                    PrettyPrinter::print_bytes(incoming_data_size));
+            return true;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   
   be/src/vec/spill/spill_stream_manager.cpp:305:
   ```diff
   -         if (_used_bytes + incoming_data_size > _limit_bytes ||
   -             left_bytes <= config::storage_flood_stage_left_capacity_bytes) 
{
   -             LOG(WARNING) << fmt::format(
   -                     "spill data reach limit, path: {}, limit: {}, used: 
{}, available: {}, "
   -                     "incoming "
   -                     "bytes: {}",
   -                     _path, PrettyPrinter::print_bytes(_limit_bytes),
   -                     PrettyPrinter::print_bytes(_used_bytes),
   -                     PrettyPrinter::print_bytes(_available_bytes),
   -                     PrettyPrinter::print_bytes(incoming_data_size));
   -             return true;
   -         }
   -         return false;
   +         return _used_bytes + incoming_data_size > _limit_bytes ||
   +             left_bytes <= config::storage_flood_stage_left_capacity_bytes;
   ```
   



##########
be/src/vec/spill/spill_stream_manager.h:
##########
@@ -54,25 +55,61 @@
 
     Status update_capacity();
 
-    double get_usage(int64_t incoming_data_size) const {
-        return _disk_capacity_bytes == 0
-                       ? 0
-                       : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
-                                 (double)_disk_capacity_bytes;
+    void update_usage(int64_t incoming_data_size) {
+        if (_shared_with_storage_path) {
+            std::lock_guard<std::mutex> l(_mutex);
+            _used_bytes += incoming_data_size;
+        }
+    }
+
+    int64_t get_used_bytes() {
+        std::lock_guard<std::mutex> l(_mutex);
+        if (_shared_with_storage_path) {
+            return _used_bytes;
+        } else {
+            return _disk_capacity_bytes - _available_bytes;
+        }
+    }
+
+    double get_usage(int64_t incoming_data_size) {
+        std::lock_guard<std::mutex> l(_mutex);
+        if (_shared_with_storage_path) {
+            return _limit_bytes == 0 ? 0 : _used_bytes + incoming_data_size / 
(double)_limit_bytes;
+
+        } else {
+            return _disk_capacity_bytes == 0
+                           ? 0
+                           : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
+                                     (double)_disk_capacity_bytes;
+        }
+    }
+
+    int64_t storage_limit() {

Review Comment:
   warning: method 'storage_limit' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       int64_t storage_limit() const {
   ```
   



##########
be/src/vec/spill/spill_stream_manager.h:
##########
@@ -54,25 +55,61 @@
 
     Status update_capacity();
 
-    double get_usage(int64_t incoming_data_size) const {
-        return _disk_capacity_bytes == 0
-                       ? 0
-                       : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
-                                 (double)_disk_capacity_bytes;
+    void update_usage(int64_t incoming_data_size) {
+        if (_shared_with_storage_path) {
+            std::lock_guard<std::mutex> l(_mutex);
+            _used_bytes += incoming_data_size;
+        }
+    }
+
+    int64_t get_used_bytes() {
+        std::lock_guard<std::mutex> l(_mutex);
+        if (_shared_with_storage_path) {
+            return _used_bytes;
+        } else {
+            return _disk_capacity_bytes - _available_bytes;
+        }
+    }
+
+    double get_usage(int64_t incoming_data_size) {

Review Comment:
   warning: method 'get_usage' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       double get_usage(int64_t incoming_data_size) const {
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -267,11 +267,21 @@ void AggSpillPartition::close() {
 }
 
 void PartitionedAggSharedState::close() {
+    if (is_closed) {
+        return;
+    }
+    is_closed = true;
     for (auto partition : spill_partitions) {
         partition->close();
     }
+    spill_partitions.clear();
 }
-void SpillSortSharedState::clear() {
+
+void SpillSortSharedState::close() {

Review Comment:
   warning: method 'close' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:514:
   ```diff
   -     void close();
   +     static void close();
   ```
   



##########
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp:
##########
@@ -494,27 +494,36 @@ Status PartitionedHashJoinProbeOperatorX::init(const 
TPlanNode& tnode, RuntimeSt
     return _probe_operator->init(tnode_, state);
 }
 Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
-    // here do NOT call `OperatorXBase::prepare(state)`
-    // RETURN_IF_ERROR(OperatorXBase::prepare(state));
-    for (auto& conjunct : _conjuncts) {
-        RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
-    }
-
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
intermediate_row_desc()));
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));
-    RETURN_IF_ERROR(_probe_operator->set_child(_child_x));
+    // to avoid prepare _child_x twice
+    auto child_x = std::move(_child_x);
+    RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
+    RETURN_IF_ERROR(_probe_operator->set_child(child_x));
     DCHECK(_build_side_child != nullptr);
     _probe_operator->set_build_side_child(_build_side_child);
     RETURN_IF_ERROR(_sink_operator->set_child(_build_side_child));
     RETURN_IF_ERROR(_probe_operator->prepare(state));
     RETURN_IF_ERROR(_sink_operator->prepare(state));
+    _child_x = std::move(child_x);
     return Status::OK();
 }
 
 Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
+    // to avoid open _child_x twice
+    auto child_x = std::move(_child_x);
     RETURN_IF_ERROR(JoinProbeOperatorX::open(state));
     RETURN_IF_ERROR(_probe_operator->open(state));
     RETURN_IF_ERROR(_sink_operator->open(state));
+    _child_x = std::move(child_x);
+    return Status::OK();
+}
+
+Status PartitionedHashJoinProbeOperatorX::close(RuntimeState* state) {

Review Comment:
   warning: method 'close' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/exec/partitioned_hash_join_probe_operator.h:155:
   ```diff
   -     Status close(RuntimeState* state) override;
   +     static Status close(RuntimeState* state) override;
   ```
   



##########
be/src/vec/spill/spill_stream_manager.h:
##########
@@ -54,25 +55,61 @@ class SpillDataDir {
 
     Status update_capacity();
 
-    double get_usage(int64_t incoming_data_size) const {
-        return _disk_capacity_bytes == 0
-                       ? 0
-                       : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
-                                 (double)_disk_capacity_bytes;
+    void update_usage(int64_t incoming_data_size) {
+        if (_shared_with_storage_path) {
+            std::lock_guard<std::mutex> l(_mutex);
+            _used_bytes += incoming_data_size;
+        }
+    }
+
+    int64_t get_used_bytes() {

Review Comment:
   warning: method 'get_used_bytes' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       int64_t get_used_bytes() const {
   ```
   



##########
be/src/vec/spill/spill_stream_manager.cpp:
##########
@@ -273,18 +275,67 @@
         RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, 
path={}", _path),
                                        "check file exist failed");
     }
-
+    return update_capacity();
+}
+Status SpillDataDir::update_capacity() {
+    std::lock_guard<std::mutex> l(_mutex);
+    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, 
&_disk_capacity_bytes,
+                                                                  
&_available_bytes));
+    if (_shared_with_storage_path) {
+        _limit_bytes = (size_t)(_disk_capacity_bytes *
+                                (config::storage_flood_stage_usage_percent / 
100.0) *
+                                (config::spill_storage_usage_percent / 100.0));
+    } else {
+        _limit_bytes =
+                (size_t)(_disk_capacity_bytes * 
(config::spill_storage_usage_percent / 100.0));
+    }
     return Status::OK();
 }
 bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
-    double used_pct = get_usage(incoming_data_size);
-    int64_t left_bytes = _available_bytes - incoming_data_size;
-    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
-        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
-        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
-                     << ", left bytes: " << left_bytes << ", path: " << _path;
-        return true;
+    std::lock_guard<std::mutex> l(_mutex);
+    if (_shared_with_storage_path) {
+        VLOG_DEBUG << fmt::format(
+                "spill data path: {}, limit: {}, used: {}, available: {}, "
+                "incoming "
+                "bytes: {}",
+                _path, PrettyPrinter::print_bytes(_limit_bytes),
+                PrettyPrinter::print_bytes(_used_bytes),
+                PrettyPrinter::print_bytes(_available_bytes),
+                PrettyPrinter::print_bytes(incoming_data_size));
+        int64_t left_bytes = _available_bytes - incoming_data_size;
+        if (_used_bytes + incoming_data_size > _limit_bytes ||
+            left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
+            LOG(WARNING) << fmt::format(
+                    "spill data reach limit, path: {}, limit: {}, used: {}, 
available: {}, "
+                    "incoming "
+                    "bytes: {}",
+                    _path, PrettyPrinter::print_bytes(_limit_bytes),
+                    PrettyPrinter::print_bytes(_used_bytes),
+                    PrettyPrinter::print_bytes(_available_bytes),
+                    PrettyPrinter::print_bytes(incoming_data_size));
+            return true;
+        }
+        return false;
+    } else {
+        double used_pct = _disk_capacity_bytes == 0
+                                  ? 0
+                                  : (_disk_capacity_bytes - _available_bytes + 
incoming_data_size) /
+                                            (double)_disk_capacity_bytes;
+        VLOG_DEBUG << fmt::format(
+                "spill path: {}, capacity: {}, available: {}, used pct: {}, 
incoming bytes: {}",
+                _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+                PrettyPrinter::print_bytes(_available_bytes), used_pct,
+                PrettyPrinter::print_bytes(incoming_data_size));
+        if (used_pct >= config::spill_storage_usage_percent / 100.0) {
+            LOG(WARNING) << fmt::format(
+                    "spill data reach limit, path: {}, capacity: {}, 
available: {}, incoming "
+                    "bytes: {}",
+                    _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+                    PrettyPrinter::print_bytes(_available_bytes),
+                    PrettyPrinter::print_bytes(incoming_data_size));
+            return true;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   
   be/src/vec/spill/spill_stream_manager.cpp:328:
   ```diff
   -         if (used_pct >= config::spill_storage_usage_percent / 100.0) {
   -             LOG(WARNING) << fmt::format(
   -                     "spill data reach limit, path: {}, capacity: {}, 
available: {}, incoming "
   -                     "bytes: {}",
   -                     _path, 
PrettyPrinter::print_bytes(_disk_capacity_bytes),
   -                     PrettyPrinter::print_bytes(_available_bytes),
   -                     PrettyPrinter::print_bytes(incoming_data_size));
   -             return true;
   -         }
   -         return false;
   +         return used_pct >= config::spill_storage_usage_percent / 100.0;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to