Gabriel39 commented on code in PR #45207:
URL: https://github.com/apache/doris/pull/45207#discussion_r1886240717


##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -186,9 +186,6 @@ void 
ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
         _finished_channels.emplace(channel_id);
         if (_working_channels_count.fetch_sub(1) == 1) {
             set_reach_limit();
-            if (_finish_dependency) {

Review Comment:
   如果当前task处于被finish 
dependency阻塞的状态,on_channel_finished是在rpc的callback中执行。之前可以在这里set 
ready来唤醒task,这么改完以后是如何唤醒task执行呢



##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -135,26 +135,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
     }};
 
-    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled()) {
+    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled() ||
+        !p.get_local_state(state)._eos) {

Review Comment:
   Why use `p.get_local_state(state)` instead of `this` ? 



##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -135,26 +135,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
     }};
 
-    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled()) {
+    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled() ||
+        !p.get_local_state(state)._eos) {
         return Base::close(state, exec_status);
     }
 
     try {
-        if (state->get_task()->wake_up_by_downstream()) {
-            if (_should_build_hash_table) {
-                // partitial ignore rf to make global rf work
-                RETURN_IF_ERROR(
-                        _runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
-                RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
-            } else {
-                // do not publish filter coz local rf not inited and useless
-                return Base::close(state, exec_status);
-            }
+        if (state->get_task()->wake_up_early()) {

Review Comment:
   那如果是build shared hash table的instance提前结束,是不是意味着在sink里面eos的时候sync filter 
size又在close的时候又sync了一次?这样会有问题吗



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to