vamossagar12 commented on code in PR #16628:
URL: https://github.com/apache/kafka/pull/16628#discussion_r1683244575
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1478,7 +1478,9 @@ public void restartTask(final ConnectorTaskId id, final
Callback<Void> callback)
if (assignment.tasks().contains(id)) {
try (TickThreadStage stage = new
TickThreadStage("restarting task " + id)) {
worker.stopAndAwaitTask(id);
- if (startTask(id))
+ // It could happen that by the time the stop finishes,
the task might not be assigned to this
+ // worker
+ if (assignment.tasks().contains(id) && startTask(id))
Review Comment:
I noticed this in my testing that if for some reason the stop method is
blocked and by the time the control returns, the assignments might no longer
have that task which we wanted to restart in the first place. In such a case,
the task would get started on the worker even though it should no longer be
owning it. I thought it's better to check for the assignment here again to be
double sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]