[
https://issues.apache.org/jira/browse/FLINK-39085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yi Zhang updated FLINK-39085:
-----------------------------
Description:
In HA mode, when a JM failure occurs, the TM disconnects from the JM and fails
all tasks running on it—similar to the task cancellation process. However,
during this phase, certain tasks may get stuck in cancellation. If the JM
recovers quickly and begins redeploying tasks, a problematic scenario can
arise: a single slot on the TM might end up hosting two instances of the same
logical task—one from the pre-failover execution and one newly deployed after
JM recovery.
Notably, after JM failover, the {{executionGraphId}} changes, which results in
different {{{}ExecutionAttemptID{}}}s for the old and new task instances.
Because the current implementation of {{TaskSlotTable.addTask()}} only checks
for conflicts based on {{{}ExecutionAttemptID{}}}, it fails to recognize that
these two tasks represent the same logical entity. Consequently, the new task
is successfully deployed and starts running alongside the lingering old
instance within the same slot.
This raises a question: Is this behavior acceptable? Should the TaskExecutor
instead reject the deployment of a new task if an older instance of the same
logical task is still present in the slot—even if their
{{{}ExecutionAttemptID{}}}s differ? Or, more aggressively, should the TM
trigger a fatal error to prevent potential inconsistency, resource contention,
or correctness issues arising from such duplication?
A Simple Reproduction Example
1. Setup: Modify the SocketWindowWordCount example by adding an uninterruptible
sleep in the FlatMap to simulate a task that hangs during cancellation.
2. Initial run: Start ZooKeeper, a standalone Flink cluster and one
TaskManager, then submit the modified job. The TM correctly deploys two tasks
(as shown in tasks1.png in the attachment). Metrics (e.g.,
backPressuredTimeMsPerSecond) from a metric reporter also show these tasks.
```
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows_{_}{{_}}{{_}}Sink:{_}Print_to_Std\{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
```
!tasks1.png|width=808,height=60!
3. JM failover: Kill and restart the JM to mock failover. After recovery, the
TM ends up running three tasks (as shown in tasks2.png in the attachment)—one
lingering instance from before the failover (whose cancellation was stuck) plus
two newly deployed tasks. Metric reporter output shows three distinct tasks,
confirming the duplication.
```
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows_{_}{{_}}{{_}}Sink:{_}Print_to_Std\{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
```
!tasks2.png|width=800,height=62!
was:
In HA mode, when a JM failure occurs, the TM disconnects from the JM and fails
all tasks running on it—similar to the task cancellation process. However,
during this phase, certain tasks may get stuck in cancellation. If the JM
recovers quickly and begins redeploying tasks, a problematic scenario can
arise: a single slot on the TM might end up hosting two instances of the same
logical task—one from the pre-failover execution and one newly deployed after
JM recovery.
Notably, after JM failover, the {{executionGraphId}} changes, which results in
different {{{}ExecutionAttemptID{}}}s for the old and new task instances.
Because the current implementation of {{TaskSlotTable.addTask()}} only checks
for conflicts based on {{{}ExecutionAttemptID{}}}, it fails to recognize that
these two tasks represent the same logical entity. Consequently, the new task
is successfully deployed and starts running alongside the lingering old
instance within the same slot.
This raises a question: Is this behavior acceptable? Should the TaskExecutor
instead reject the deployment of a new task if an older instance of the same
logical task is still present in the slot—even if their
{{{}ExecutionAttemptID{}}}s differ? Or, more aggressively, should the TM
trigger a fatal error to prevent potential inconsistency, resource contention,
or correctness issues arising from such duplication?
A Simple Reproduction Example
1. Setup: Modify the SocketWindowWordCount example by adding an uninterruptible
sleep in the FlatMap to simulate a task that hangs during cancellation.
2. Initial run: Start ZooKeeper, a standalone Flink cluster and one
TaskManager, then submit the modified job. The TM correctly deploys two tasks
(as shown in tasks1.png in the attachment). Metrics (e.g.,
backPressuredTimeMsPerSecond) from a metric reporter also show these tasks.
```
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows__{_}{{_}}Sink:{_}Print_to_Std{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
```
!tasks1.png|width=808,height=60!
3. JM failover: Kill and restart the JM to mock failover. After recovery, the
TM ends up running three tasks (as shown in tasks2.png in the attachment)—one
lingering instance from before the failover (whose cancellation was stuck) plus
two newly deployed tasks. Metric reporter output shows three distinct tasks,
confirming the duplication.
```
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows__{_}{{_}}Sink:{_}Print_to_Std{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
0.0
```
!tasks2.png|width=800,height=62!
> Potential task duplication in the same TM slot after JM failover
> ----------------------------------------------------------------
>
> Key: FLINK-39085
> URL: https://issues.apache.org/jira/browse/FLINK-39085
> Project: Flink
> Issue Type: Bug
> Reporter: Yi Zhang
> Priority: Major
> Attachments: tasks1.png, tasks2.png
>
>
> In HA mode, when a JM failure occurs, the TM disconnects from the JM and
> fails all tasks running on it—similar to the task cancellation process.
> However, during this phase, certain tasks may get stuck in cancellation. If
> the JM recovers quickly and begins redeploying tasks, a problematic scenario
> can arise: a single slot on the TM might end up hosting two instances of the
> same logical task—one from the pre-failover execution and one newly deployed
> after JM recovery.
> Notably, after JM failover, the {{executionGraphId}} changes, which results
> in different {{{}ExecutionAttemptID{}}}s for the old and new task instances.
> Because the current implementation of {{TaskSlotTable.addTask()}} only checks
> for conflicts based on {{{}ExecutionAttemptID{}}}, it fails to recognize that
> these two tasks represent the same logical entity. Consequently, the new task
> is successfully deployed and starts running alongside the lingering old
> instance within the same slot.
> This raises a question: Is this behavior acceptable? Should the TaskExecutor
> instead reject the deployment of a new task if an older instance of the same
> logical task is still present in the slot—even if their
> {{{}ExecutionAttemptID{}}}s differ? Or, more aggressively, should the TM
> trigger a fatal error to prevent potential inconsistency, resource
> contention, or correctness issues arising from such duplication?
> A Simple Reproduction Example
> 1. Setup: Modify the SocketWindowWordCount example by adding an
> uninterruptible sleep in the FlatMap to simulate a task that hangs during
> cancellation.
> 2. Initial run: Start ZooKeeper, a standalone Flink cluster and one
> TaskManager, then submit the modified job. The TM correctly deploys two tasks
> (as shown in tasks1.png in the attachment). Metrics (e.g.,
> backPressuredTimeMsPerSecond) from a metric reporter also show these tasks.
> ```
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
> 0.0
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows_{_}{{_}}{{_}}Sink:{_}Print_to_Std\{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
> 0.0
> ```
> !tasks1.png|width=808,height=60!
> 3. JM failover: Kill and restart the JM to mock failover. After recovery, the
> TM ends up running three tasks (as shown in tasks2.png in the attachment)—one
> lingering instance from before the failover (whose cancellation was stuck)
> plus two newly deployed tasks. Metric reporter output shows three distinct
> tasks, confirming the duplication.
> ```
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="1ca69b7ff26ebad2ffad095b5d244744_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
> 0.0
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_cbc357ccb763df2852fee8c4fc7d55f2_0_0",host="localhost",task_name="Source:{_}Socket_Stream{_}___Flat_Map",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
> 0.0
> flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job_id="d892a5678efa0662c444915dd6637066",task_id="90bea66de1c231edf33913ecd54406c1",task_attempt_id="ba4cbf5d8d59a24ce75da3eb0ef1a5e2_90bea66de1c231edf33913ecd54406c1_0_0",host="localhost",task_name="TumblingProcessingTimeWindows_{_}{{_}}{{_}}Sink:{_}Print_to_Std\{_}_Out",task_attempt_num="0",job_name="Socket_Window_WordCount",tm_id="localhost:59807_fbc5f0",subtask_index="0",}
> 0.0
> ```
> !tasks2.png|width=800,height=62!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)