Copilot commented on code in PR #17700:
URL:
https://github.com/apache/dolphinscheduler/pull/17700#discussion_r2562576611
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java:
##########
@@ -81,23 +83,66 @@ public void handle(TaskCallBack taskCallBack) throws
TaskException {
}
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
- GrpcDynamicService stubService = new GrpcDynamicService(channel,
fileDesc);
- DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
- grpcParameters.getConnectTimeoutMs());
- Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
- addDefaultOutput(printer.print(message));
+
+ // Attach a cancellable gRPC Context to support external
cancellation.
+ // This context propagates cancellation signals to the underlying
RPC call.
+ this.cancellableContext = (Context.CancellableContext)
Context.current().withCancellation().attach();
+ Context previous = this.cancellableContext;
+
+ try {
+ GrpcDynamicService stubService = new
GrpcDynamicService(channel, fileDesc);
+ DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
+ grpcParameters.getConnectTimeoutMs());
+ Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
+ addDefaultOutput(printer.print(message));
+ validateResponse(Status.OK);
+ } finally {
+ // Detach the cancellable context to restore the previous
context and avoid leaks
+ this.cancellableContext.detach(previous);
+ this.cancellableContext = null;
+ }
} catch (StatusRuntimeException statusre) {
+ if (statusre.getStatus().getCode() == Status.Code.CANCELLED) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ }
validateResponse(statusre.getStatus());
Review Comment:
The `validateResponse()` method is called after setting the exit status code
in the `StatusRuntimeException` handler (line 110), which may overwrite the
exit code. The `validateResponse()` method sets `exitStatusCode` internally
(lines 156, 170, 185 in the full file). This can cause the exit code set on
lines 106 or 108 to be overwritten. Consider restructuring to only call
`validateResponse()` for non-cancelled cases, or ensure `validateResponse()`
respects already-set exit codes.
```suggestion
// Do not call validateResponse for cancelled cases to avoid
overwriting exit code
} else {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
validateResponse(statusre.getStatus());
}
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java:
##########
@@ -81,23 +83,66 @@ public void handle(TaskCallBack taskCallBack) throws
TaskException {
}
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
- GrpcDynamicService stubService = new GrpcDynamicService(channel,
fileDesc);
- DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
- grpcParameters.getConnectTimeoutMs());
- Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
- addDefaultOutput(printer.print(message));
+
+ // Attach a cancellable gRPC Context to support external
cancellation.
+ // This context propagates cancellation signals to the underlying
RPC call.
+ this.cancellableContext = (Context.CancellableContext)
Context.current().withCancellation().attach();
+ Context previous = this.cancellableContext;
+
+ try {
+ GrpcDynamicService stubService = new
GrpcDynamicService(channel, fileDesc);
+ DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
+ grpcParameters.getConnectTimeoutMs());
+ Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
+ addDefaultOutput(printer.print(message));
+ validateResponse(Status.OK);
+ } finally {
+ // Detach the cancellable context to restore the previous
context and avoid leaks
+ this.cancellableContext.detach(previous);
+ this.cancellableContext = null;
+ }
} catch (StatusRuntimeException statusre) {
+ if (statusre.getStatus().getCode() == Status.Code.CANCELLED) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ }
validateResponse(statusre.getStatus());
- return;
} catch (Exception e) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new GrpcTaskException("gRPC handle exception:", e);
+ } finally {
+ // Gracefully shut down the gRPC channel to release network
resources
+ if (channel != null) {
+ channel.shutdown();
+ }
}
- validateResponse(Status.OK);
}
@Override
- public void cancel() throws TaskException {
- // Do nothing when task to be canceled
+ public void cancel() {
Review Comment:
The `cancel()` method signature should declare `throws TaskException` to
match the abstract method in `AbstractTask`. All other task implementations
(ShellTask, SqlTask, etc.) follow this pattern and declare `throws
TaskException`.
```suggestion
public void cancel() throws TaskException {
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java:
##########
@@ -81,23 +83,66 @@ public void handle(TaskCallBack taskCallBack) throws
TaskException {
}
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
- GrpcDynamicService stubService = new GrpcDynamicService(channel,
fileDesc);
- DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
- grpcParameters.getConnectTimeoutMs());
- Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
- addDefaultOutput(printer.print(message));
+
+ // Attach a cancellable gRPC Context to support external
cancellation.
+ // This context propagates cancellation signals to the underlying
RPC call.
+ this.cancellableContext = (Context.CancellableContext)
Context.current().withCancellation().attach();
+ Context previous = this.cancellableContext;
+
+ try {
+ GrpcDynamicService stubService = new
GrpcDynamicService(channel, fileDesc);
+ DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
+ grpcParameters.getConnectTimeoutMs());
+ Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
+ addDefaultOutput(printer.print(message));
+ validateResponse(Status.OK);
+ } finally {
+ // Detach the cancellable context to restore the previous
context and avoid leaks
+ this.cancellableContext.detach(previous);
+ this.cancellableContext = null;
+ }
} catch (StatusRuntimeException statusre) {
+ if (statusre.getStatus().getCode() == Status.Code.CANCELLED) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ }
validateResponse(statusre.getStatus());
- return;
} catch (Exception e) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new GrpcTaskException("gRPC handle exception:", e);
+ } finally {
+ // Gracefully shut down the gRPC channel to release network
resources
+ if (channel != null) {
+ channel.shutdown();
+ }
}
- validateResponse(Status.OK);
}
@Override
- public void cancel() throws TaskException {
- // Do nothing when task to be canceled
+ public void cancel() {
+ // Read volatile reference once for thread safety (avoid repeated
reads under race conditions)
+ Context.CancellableContext ctx = this.cancellableContext;
+
+ if (ctx != null && !ctx.isCancelled()) {
+ try {
+ log.info("Canceling gRPC task: method={}",
grpcParameters.getMethodName());
+
+ // Trigger gRPC cancellation by canceling the context.
+ // This interrupts the ongoing RPC and causes
stubService.call() to throw CANCELLED.
+ ctx.cancel(new TaskException("gRPC task was canceled by
user"));
+
+ // Record user intent: task was explicitly killed, not failed
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("gRPC task was successfully canceled");
+ } catch (Exception ex) {
+ log.error("Failed to cancel gRPC context", ex);
+ throw new TaskException("Cancel gRPC task failed", ex);
Review Comment:
[nitpick] The `cancel()` method wraps the caught exception in a
`TaskException` and re-throws it, but this may cause issues since `cancel()` is
typically called from a different thread (the cancellation thread). Consider
logging the error instead of throwing an exception, as throwing from `cancel()`
may not properly propagate to the caller. Most other task implementations
(e.g., ShellTask) throw from cancel, so this follows the pattern, but the
additional wrapping might not be necessary.
```suggestion
// Do not throw from cancel(); just log the error.
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java:
##########
@@ -81,23 +83,66 @@ public void handle(TaskCallBack taskCallBack) throws
TaskException {
}
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
- GrpcDynamicService stubService = new GrpcDynamicService(channel,
fileDesc);
- DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
- grpcParameters.getConnectTimeoutMs());
- Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
- addDefaultOutput(printer.print(message));
+
+ // Attach a cancellable gRPC Context to support external
cancellation.
+ // This context propagates cancellation signals to the underlying
RPC call.
+ this.cancellableContext = (Context.CancellableContext)
Context.current().withCancellation().attach();
+ Context previous = this.cancellableContext;
+
+ try {
+ GrpcDynamicService stubService = new
GrpcDynamicService(channel, fileDesc);
+ DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
+ grpcParameters.getConnectTimeoutMs());
+ Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
+ addDefaultOutput(printer.print(message));
+ validateResponse(Status.OK);
+ } finally {
+ // Detach the cancellable context to restore the previous
context and avoid leaks
+ this.cancellableContext.detach(previous);
+ this.cancellableContext = null;
+ }
} catch (StatusRuntimeException statusre) {
+ if (statusre.getStatus().getCode() == Status.Code.CANCELLED) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ }
validateResponse(statusre.getStatus());
- return;
} catch (Exception e) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new GrpcTaskException("gRPC handle exception:", e);
+ } finally {
+ // Gracefully shut down the gRPC channel to release network
resources
+ if (channel != null) {
+ channel.shutdown();
+ }
}
- validateResponse(Status.OK);
}
@Override
- public void cancel() throws TaskException {
- // Do nothing when task to be canceled
+ public void cancel() {
+ // Read volatile reference once for thread safety (avoid repeated
reads under race conditions)
+ Context.CancellableContext ctx = this.cancellableContext;
+
+ if (ctx != null && !ctx.isCancelled()) {
+ try {
+ log.info("Canceling gRPC task: method={}",
grpcParameters.getMethodName());
Review Comment:
Missing null check for `grpcParameters` when logging in the cancel method.
If `cancel()` is called before `init()` completes or if `init()` fails,
`grpcParameters` will be null, causing a NullPointerException at line 129.
Consider adding a null check: `log.info("Canceling gRPC task: method={}",
grpcParameters != null ? grpcParameters.getMethodName() : "unknown");`
```suggestion
log.info("Canceling gRPC task: method={}", grpcParameters !=
null ? grpcParameters.getMethodName() : "unknown");
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java:
##########
@@ -81,23 +83,66 @@ public void handle(TaskCallBack taskCallBack) throws
TaskException {
}
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
- GrpcDynamicService stubService = new GrpcDynamicService(channel,
fileDesc);
- DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
- grpcParameters.getConnectTimeoutMs());
- Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
- addDefaultOutput(printer.print(message));
+
+ // Attach a cancellable gRPC Context to support external
cancellation.
+ // This context propagates cancellation signals to the underlying
RPC call.
+ this.cancellableContext = (Context.CancellableContext)
Context.current().withCancellation().attach();
+ Context previous = this.cancellableContext;
Review Comment:
Incorrect context management pattern. The `attach()` method returns the
*previous* context that was attached, not the newly created cancellable
context. This code incorrectly tries to cast the return value to
`CancellableContext` and then assigns it again to `previous`.
The correct pattern should be:
```java
this.cancellableContext = Context.current().withCancellation();
Context previous = this.cancellableContext.attach();
```
This separates the creation of the cancellable context from the attachment,
and correctly captures the previous context returned by `attach()`.
```suggestion
this.cancellableContext = Context.current().withCancellation();
Context previous = this.cancellableContext.attach();
```
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java:
##########
@@ -81,23 +83,66 @@ public void handle(TaskCallBack taskCallBack) throws
TaskException {
}
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
- GrpcDynamicService stubService = new GrpcDynamicService(channel,
fileDesc);
- DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
- grpcParameters.getConnectTimeoutMs());
- Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
- addDefaultOutput(printer.print(message));
+
+ // Attach a cancellable gRPC Context to support external
cancellation.
+ // This context propagates cancellation signals to the underlying
RPC call.
+ this.cancellableContext = (Context.CancellableContext)
Context.current().withCancellation().attach();
+ Context previous = this.cancellableContext;
+
+ try {
+ GrpcDynamicService stubService = new
GrpcDynamicService(channel, fileDesc);
+ DynamicMessage message =
stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
+ grpcParameters.getConnectTimeoutMs());
+ Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace();
+ addDefaultOutput(printer.print(message));
+ validateResponse(Status.OK);
+ } finally {
+ // Detach the cancellable context to restore the previous
context and avoid leaks
+ this.cancellableContext.detach(previous);
+ this.cancellableContext = null;
+ }
} catch (StatusRuntimeException statusre) {
+ if (statusre.getStatus().getCode() == Status.Code.CANCELLED) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ }
validateResponse(statusre.getStatus());
- return;
} catch (Exception e) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new GrpcTaskException("gRPC handle exception:", e);
+ } finally {
+ // Gracefully shut down the gRPC channel to release network
resources
+ if (channel != null) {
+ channel.shutdown();
+ }
}
- validateResponse(Status.OK);
}
@Override
- public void cancel() throws TaskException {
- // Do nothing when task to be canceled
+ public void cancel() {
+ // Read volatile reference once for thread safety (avoid repeated
reads under race conditions)
+ Context.CancellableContext ctx = this.cancellableContext;
+
+ if (ctx != null && !ctx.isCancelled()) {
+ try {
+ log.info("Canceling gRPC task: method={}",
grpcParameters.getMethodName());
+
+ // Trigger gRPC cancellation by canceling the context.
+ // This interrupts the ongoing RPC and causes
stubService.call() to throw CANCELLED.
+ ctx.cancel(new TaskException("gRPC task was canceled by
user"));
+
+ // Record user intent: task was explicitly killed, not failed
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("gRPC task was successfully canceled");
+ } catch (Exception ex) {
+ log.error("Failed to cancel gRPC context", ex);
+ throw new TaskException("Cancel gRPC task failed", ex);
+ }
+ } else {
+ // No active context: task may not have started, already finished,
or already canceled
+ log.warn("gRPC task cancel requested, but no active cancellable
context.");
+ }
}
Review Comment:
The new `cancel()` functionality lacks test coverage. Since
`GrpcTaskTest.java` already contains comprehensive tests for the gRPC task
(testHandleStatusCodeDefaultOK, testHandleStatusCodeCustom,
testAddDefaultOutput), consider adding a test case that verifies the
cancellation behavior, such as starting a long-running gRPC call and ensuring
that calling `cancel()` properly interrupts it and sets the exit code to
`EXIT_CODE_KILL`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]