holmuk opened a new pull request, #65058:
URL: https://github.com/apache/airflow/pull/65058

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   Closes #64867
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes (please specify the tool below)
           Cursor
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   
   This PR resolves the hanging `Running` state issue in 
`KubernetesJobOperator` / `KubernetesJobTrigger` for `deferrable=True` / 
`do_xcom_push=True`.
   
   ### Problem description
   
   The trigger waits for container completion for every pod name from a 
precomputed snapshot (pod_names) before checking the final Job status. That 
snapshot is built from pod discovery tied to parallelism, not to actual 
successful completions.
   
   **Example** (`parallelism=2`, `completions=1`):
   - Airflow creates a Job
   - Kubernetes starts 2 pods
   - One pod succeeds
   - Job becomes `Complete` (`completions=1` reached)
   - The second pod may never reach the expected terminal state
   - `KubernetesJobTrigger` keeps waiting on the second pod and does not reach 
Job-status evaluation, so the task can remain Running/Deferred forever.
   
   **Proposed fix:** Task completion should be driven by Job terminal status 
(`Complete` / `Failed`), which already reflects `completions`:
   - Make Job status the primary completion condition.
   - Collect XCom/logs only as best-effort from pods that actually finished and 
are still readable.
   - Do not block task finalization on missing/non-terminal pods from the 
initial snapshot.
   
   ### What does this PR do?
   
   Updates logic for `KubernetesJobTrigger`:
   - The waiting flow is now job-first: completion is driven by final Job 
status, not by requiring all pods from the initial snapshot to finish.
   - XCom collection is now best-effort: results are collected only from pods 
that are available and successfully processed.
   - 404 for missing/deleted pods is handled as skip instead of failing the 
trigger.
   - The previous unbounded pod-first waits were removed: container waits are 
now bounded and periodically re-check whether the Job has already completed.
   
   **Regresssion tests for #64867**
   
   - Trigger regression tests (triggers/test_job.py)
   
     - 
`test_run_completes_when_job_is_done_even_if_some_snapshot_pods_never_complete`:
 Verify the trigger does not hang when a pod from the initial snapshot never 
reaches terminal state after the Job is already complete.
     
     - `test_run_skips_deleted_snapshot_pod_and_completes_when_job_is_done`: 
Verify deleted snapshot pods (`404`) are skipped and do not fail trigger 
completion. Assert behavior is best-effort for XCom collection (collect from 
available/finished pods only).
   
   - Operator regression test (operators/test_job.py)
   
     - `test_execute_complete_supports_partial_xcom_results`: Verify 
`execute_complete` correctly handles partial `xcom_result` payloads (fewer XCom 
entries than initial pod snapshot), which is expected in `parallelism > 
completions` scenarios.
   
   ### Risks
   
   With very small `poll_interval` values, the new bounded wait loop may 
generate extra timeout/cancel/retry iterations while waiting for pod container 
states. This does not fail the task by itself (it is expected retry behavior), 
but it can increase polling overhead and log noise until the Job reaches a 
terminal state.
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to