github-actions[bot] commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r3006396371
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -22,10 +22,63 @@
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter.h"
#include "runtime/query_context.h"
-#include "runtime/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper,
+ std::weak_ptr<QueryContext> context)
+ : _dependency(std::move(dependency)), _wrapper(wrapper),
_context(std::move(context)) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
Review Comment:
nit (observability): The old `SyncSizeClosure` logged warnings via
`_process_if_rpc_failed()` → base class → `LOG(WARNING)`. This new
implementation cancels the query context correctly but drops the standalone
`LOG(WARNING)`. Consider adding `LOG(WARNING) << ...` on the `cntl_->Failed()`
and `!status.ok()` paths for consistency with `DummyBrpcCallback::call()` and
to preserve error traceability in logs.
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -22,10 +22,63 @@
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter.h"
#include "runtime/query_context.h"
-#include "runtime/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper,
+ std::weak_ptr<QueryContext> context)
+ : _dependency(std::move(dependency)), _wrapper(wrapper),
_context(std::move(context)) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
cntl_->ErrorText()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ return;
+ }
Review Comment:
nit: C-style cast `((CountedFinishDependency*)_dependency.get())->sub()` —
prefer `static_cast<CountedFinishDependency*>(_dependency.get())->sub()` for
type safety and clarity. Same applies to the identical cast on the other error
path below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]