Gabriel39 commented on code in PR #45207: URL: https://github.com/apache/doris/pull/45207#discussion_r1886240717
########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -186,9 +186,6 @@ void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { _finished_channels.emplace(channel_id); if (_working_channels_count.fetch_sub(1) == 1) { set_reach_limit(); - if (_finish_dependency) { Review Comment: 如果当前task处于被finish dependency阻塞的状态,on_channel_finished是在rpc的callback中执行。之前可以在这里set ready来唤醒task,这么改完以后是如何唤醒task执行呢 ########## be/src/pipeline/exec/hashjoin_build_sink.cpp: ########## @@ -135,26 +135,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || + !p.get_local_state(state)._eos) { Review Comment: Why use `p.get_local_state(state)` instead of `this` ? ########## be/src/pipeline/exec/hashjoin_build_sink.cpp: ########## @@ -135,26 +135,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || + !p.get_local_state(state)._eos) { return Base::close(state, exec_status); } try { - if (state->get_task()->wake_up_by_downstream()) { - if (_should_build_hash_table) { - // partitial ignore rf to make global rf work - RETURN_IF_ERROR( - _runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); - } else { - // do not publish filter coz local rf not inited and useless - return Base::close(state, exec_status); - } + if (state->get_task()->wake_up_early()) { Review Comment: 那如果是build shared hash table的instance提前结束,是不是意味着在sink里面eos的时候sync filter size又在close的时候又sync了一次?这样会有问题吗 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org