This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fa5e7008d5d [Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103) fa5e7008d5d is described below commit fa5e7008d5d918f30e0580ce7bda03512b3d6aec Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jul 2 14:16:39 2024 +0800 [Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103) ## Proposed changes enlarge sync filter size rpc timeout limit rf will failed when rpc timeout, so we need enlarge limit ``` sync filter size meet error, filter: RuntimeFilter: (id = 3, type = in_or_bloomfilter, need_local_merge: false, is_broadcast: false, build_bf_cardinality: true ``` --- be/src/common/config.cpp | 1 + be/src/exprs/runtime_filter.cpp | 4 ++-- be/src/exprs/runtime_filter.h | 2 +- be/src/exprs/runtime_filter_slots.h | 2 +- regression-test/suites/query_p0/join/test_join5.groovy | 1 + 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 78afc756af8..92303473ad6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -255,6 +255,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400"); // default thrift client connect timeout(in seconds) DEFINE_mInt32(thrift_connect_timeout_seconds, "3"); DEFINE_mInt32(fetch_rpc_timeout_seconds, "30"); + // default thrift client retry interval (in milliseconds) DEFINE_mInt64(thrift_client_retry_interval_ms, "1000"); // max message size of thrift request diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index e38a80a143e..2dcfc97b096 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1054,7 +1054,7 @@ public: : Base(req, callback), _dependency(std::move(dependency)) {} }; -Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { +Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { DCHECK(is_producer()); if (_need_local_merge) { @@ -1105,7 +1105,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { request->set_filter_size(local_filter_size); request->set_filter_id(_filter_id); - callback->cntl_->set_timeout_ms(wait_time_ms()); + callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000); stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index a78d732b687..390a61bfe1a 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -229,7 +229,7 @@ public: // push filter to remote node or push down it to scan_node Status publish(bool publish_local = false); - Status send_filter_size(uint64_t local_filter_size); + Status send_filter_size(RuntimeState* state, uint64_t local_filter_size); RuntimeFilterType type() const { return _runtime_filter_type; } diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index b5b04a1ebac..0bf8a33f9f2 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -55,7 +55,7 @@ public: // send_filter_size may call dependency->sub(), so we call set_dependency firstly for all rf to avoid dependency set_ready repeatedly for (auto* runtime_filter : _runtime_filters) { if (runtime_filter->need_sync_filter_size()) { - RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size)); + RETURN_IF_ERROR(runtime_filter->send_filter_size(state, hash_table_size)); } } return Status::OK(); diff --git a/regression-test/suites/query_p0/join/test_join5.groovy b/regression-test/suites/query_p0/join/test_join5.groovy index 62be496372d..4323575870f 100644 --- a/regression-test/suites/query_p0/join/test_join5.groovy +++ b/regression-test/suites/query_p0/join/test_join5.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_join5", "query,p0") { + sql "set runtime_filter_wait_time_ms = 5" def DBname = "regression_test_join5" sql "DROP DATABASE IF EXISTS ${DBname}" sql "CREATE DATABASE IF NOT EXISTS ${DBname}" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org