Copilot commented on code in PR #495:
URL: https://github.com/apache/incubator-livy/pull/495#discussion_r2595449096
##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -161,18 +163,25 @@ class Session(
_statements.synchronized { _statements(statementId) = statement }
Future {
- setJobGroup(tpe, statementId)
- statement.compareAndTransit(StatementState.Waiting,
StatementState.Running)
+ val currentThread = Thread.currentThread()
+ statementThreads.put(statementId, currentThread)
+ try {
+ setJobGroup(tpe, statementId)
+ statement.compareAndTransit(StatementState.Waiting,
StatementState.Running)
- if (statement.state.get() == StatementState.Running) {
- statement.started = System.currentTimeMillis()
- statement.output = executeCode(interpreter(tpe), statementId, code)
- }
+ if (statement.state.get() == StatementState.Running) {
+ statement.started = System.currentTimeMillis()
+ statement.output = executeCode(interpreter(tpe), statementId, code)
+ }
- statement.compareAndTransit(StatementState.Running,
StatementState.Available)
- statement.compareAndTransit(StatementState.Cancelling,
StatementState.Cancelled)
- statement.updateProgress(1.0)
- statement.completed = System.currentTimeMillis()
+ statement.compareAndTransit(StatementState.Running,
StatementState.Available)
+ statement.compareAndTransit(StatementState.Cancelling,
StatementState.Cancelled)
+ statement.updateProgress(1.0)
+ statement.completed = System.currentTimeMillis()
+ } finally {
+ statementThreads.remove(statementId, currentThread)
+ Thread.interrupted()
+ }
Review Comment:
The call to `Thread.interrupted()` clears the interrupt flag but discards
the result. This could mask important interrupt state information. Consider
either:
1. Checking the result and logging it if the thread was interrupted
2. Using `Thread.currentThread().isInterrupted()` if you only want to check
without clearing the flag
If the intent is to reset the interrupt flag regardless, consider adding a
comment explaining this design choice.
```suggestion
// Clear the interrupt flag, but log if the thread was interrupted.
if (Thread.interrupted()) {
logWarning(s"Thread was interrupted during execution of statement
$statementId; interrupt flag cleared.")
}
```
##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -161,18 +163,25 @@ class Session(
_statements.synchronized { _statements(statementId) = statement }
Future {
- setJobGroup(tpe, statementId)
- statement.compareAndTransit(StatementState.Waiting,
StatementState.Running)
+ val currentThread = Thread.currentThread()
+ statementThreads.put(statementId, currentThread)
+ try {
+ setJobGroup(tpe, statementId)
+ statement.compareAndTransit(StatementState.Waiting,
StatementState.Running)
- if (statement.state.get() == StatementState.Running) {
- statement.started = System.currentTimeMillis()
- statement.output = executeCode(interpreter(tpe), statementId, code)
- }
+ if (statement.state.get() == StatementState.Running) {
+ statement.started = System.currentTimeMillis()
+ statement.output = executeCode(interpreter(tpe), statementId, code)
+ }
- statement.compareAndTransit(StatementState.Running,
StatementState.Available)
- statement.compareAndTransit(StatementState.Cancelling,
StatementState.Cancelled)
- statement.updateProgress(1.0)
- statement.completed = System.currentTimeMillis()
+ statement.compareAndTransit(StatementState.Running,
StatementState.Available)
+ statement.compareAndTransit(StatementState.Cancelling,
StatementState.Cancelled)
+ statement.updateProgress(1.0)
+ statement.completed = System.currentTimeMillis()
+ } finally {
+ statementThreads.remove(statementId, currentThread)
Review Comment:
The `statementThreads.remove(statementId, currentThread)` uses the
two-argument remove method which only removes the entry if the current value
matches. While this is correct, if a thread is somehow replaced in the map
between put and remove, the entry won't be cleaned up. Consider adding a
comment to document that this is intentional to handle edge cases, or use the
simpler `statementThreads.remove(statementId)` if the additional safety check
isn't needed.
```suggestion
// Always remove the entry for this statementId to avoid leaks, even
if the thread was replaced.
statementThreads.remove(statementId)
```
##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -161,18 +163,25 @@ class Session(
_statements.synchronized { _statements(statementId) = statement }
Future {
- setJobGroup(tpe, statementId)
- statement.compareAndTransit(StatementState.Waiting,
StatementState.Running)
+ val currentThread = Thread.currentThread()
+ statementThreads.put(statementId, currentThread)
+ try {
Review Comment:
There's a potential race condition: the thread could be interrupted between
`statementThreads.put()` (line 167) and the try block execution. If `cancel()`
is called during this window, the interrupt would be lost. Consider moving the
`statementThreads.put()` call inside the try block, immediately before
`setJobGroup()`, to minimize the race window.
```suggestion
try {
statementThreads.put(statementId, currentThread)
```
##########
repl/src/main/scala/org/apache/livy/repl/Session.scala:
##########
@@ -212,6 +221,7 @@ class Session(
info(s"Failed to cancel statement $statementId.")
statement.compareAndTransit(StatementState.Cancelling,
StatementState.Cancelled)
} else {
+ Option(statementThreads.get(statementId)).foreach(_.interrupt())
Review Comment:
The thread interrupt is called repeatedly in a loop (line 224) for the same
thread, which is redundant. Once a thread is interrupted, repeatedly calling
`interrupt()` doesn't provide additional benefit. Consider moving the interrupt
call outside the while loop, right before line 219, to interrupt the thread
once at the beginning of the cancellation process rather than repeatedly.
##########
repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala:
##########
@@ -236,6 +236,33 @@ class SparkSessionSpec extends BaseSessionSpec(Spark) {
}
}
+ it should "cancel driver code without spark jobs" in withSession { session =>
+ val stmtId = session.execute(
+ """
+ |Thread.sleep(60000)
Review Comment:
[nitpick] Using a 60-second sleep in tests can make the test suite slow.
Consider using a shorter sleep duration (e.g., 5-10 seconds) which should still
be sufficient to validate the cancellation behavior while keeping tests fast.
The timeout on line 247 is already set to 30 seconds, so a shorter sleep would
be more appropriate.
```suggestion
|Thread.sleep(5000)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]