hkc-8010 opened a new issue, #65011:
URL: https://github.com/apache/airflow/issues/65011

   ### Apache Airflow version
   
   3.1.8
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   We are seeing an intermittent failure pattern with deferrable 
`GlueJobOperator` where the same task instance collides on XCom keys across the 
initial execute/defer phase and the resumed `execute_complete()` phase.
   
   The failures show up as duplicate-key / already-exists errors on XCom keys 
such as:
   
   - `glue_job_run_details`
   - `return_value`
   
   In one representative timeline, the task progressed like this:
   
   ```text
   try 1 starts
   try 1 logs duplicate XCom write on `glue_job_run_details`
   try 1 still transitions to DEFERRED
   try 1 resumes
   try 1 fails on duplicate XCom write on `return_value`
   try 1 transitions to UP_FOR_RETRY
   try 2 starts
   try 2 logs duplicate XCom write on `glue_job_run_details`
   try 2 transitions to DEFERRED
   try 2 resumes
   try 2 fails on duplicate XCom write on `return_value`
   try 2 transitions to FAILED
   ```
   
   Example sanitized errors:
   
   ```text
   409 Conflict
   The XCom with key: `glue_job_run_details` with mentioned task instance 
already exists.
   ```
   
   ```text
   409 Conflict
   The XCom with key: `return_value` with mentioned task instance already 
exists.
   ```
   
   In another occurrence of the same underlying problem family, the resumed 
task failed with the database uniqueness error on the XCom PK:
   
   ```text
   duplicate key value violates unique constraint "xcom_pkey"
   Key (dag_run_id, task_id, map_index, key)=(..., run_job_task, -1, 
return_value) already exists
   ```
   
   This does not look like a custom-operator bug. The custom operator involved 
was only a thin subclass of `GlueJobOperator` that enabled `deferrable=True` 
and did not override `execute()`, `execute_complete()`, or add custom XCom 
writes.
   
   ### What you think should happen instead?
   
   A deferrable `GlueJobOperator` should not fail because it tries to create an 
XCom key that was already written by an earlier phase of the same task instance 
lifecycle.
   
   More specifically:
   
   - provider-managed link metadata such as `glue_job_run_details` should be 
idempotent across deferral/resume/retry boundaries, or
   - deferred task execution should not preserve conflicting XCom rows that the 
resumed phase will write again, or
   - the resumed / retried path should update or replace the existing XCom 
instead of failing on create.
   
   ### How to reproduce
   
   I do not have a distilled standalone reproducer yet, but the behavior 
appears to require this combination:
   
   1. Use `GlueJobOperator(deferrable=True)`.
   2. Allow the task to enter `DEFERRED` and later resume via 
`execute_complete()`.
   3. Have a retry/resume path where XCom rows from the earlier phase still 
exist.
   4. Observe duplicate-write failures on one or both of:
      - `glue_job_run_details`
      - `return_value`
   
   Even without the full minimal DAG yet, the relevant upstream code paths seem 
to make the collision window plausible.
   
   ### Operating System
   
   Debian-based container image
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-amazon` as bundled with Airflow `3.1.8`
   
   ### Deployment
   
   Other 3rd-party Helm chart / managed runtime
   
   ### Deployment details
   
   Observed on a managed Airflow 3.1.8 deployment using CeleryExecutor.
   
   ### Anything else?
   
   What looks relevant in the source:
   
   1. `GlueJobOperator.execute()` persists `glue_job_run_details` before 
deferring, and also returns the Glue run id.
   
   - `execute()` persists link metadata here:
     
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L326-L339](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L326-L339)
   - it defers here:
     
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L342-L354](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L342-L354)
   - it returns the run id here:
     
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L367](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L367)
   - `execute_complete()` returns the run id again here:
     
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L369-L374](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L369-L374)
   
   2. The provider link helper writes `glue_job_run_details` through 
`xcom_push()` and suppresses exceptions.
   
   - `GlueJobRunDetailsLink.key = "glue_job_run_details"`:
     
[providers/amazon/src/airflow/providers/amazon/aws/links/glue.py#L22-L28](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/links/glue.py#L22-L28)
   - `BaseAwsLink.persist()` writes `context["ti"].xcom_push(key=cls.key, ...)`:
     
[providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L81-L95](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L81-L95)
   - `persist()` is wrapped in `@return_on_error(None)`, so a duplicate-key 
failure there can be swallowed:
     
[providers/amazon/src/airflow/providers/amazon/aws/utils/suppress.py#L41-L69](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/utils/suppress.py#L41-L69)
   
   3. On resume, Airflow calls `resume_execution()` and then 
`execute_complete()`, and successful return values are auto-pushed to XCom 
`return_value`.
   
   - resume path:
     
[task-sdk/src/airflow/sdk/bases/operator.py#L1664-L1683](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/bases/operator.py#L1664-L1683)
   - task runner switches to resumed execution when `next_method` is set:
     
[task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1638-L1653](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1638-L1653)
   - return values are auto-pushed to `return_value` here:
     
[task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1709-L1751](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1709-L1751)
   
   4. Airflow explicitly does not clear XComs when resuming from deferral.
   
   - `# However, do not clear it for deferral`:
     
[airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L255-L283](https://github.com/apache/airflow/blob/3.1.8/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L255-L283)
   
   5. There is also a separate operator-extra-link XCom path. 
`BaseOperatorLink.xcom_key` defaults to `_link_<ClassName>`, so the Glue 
operator is effectively involved with both:
   
   - `glue_job_run_details` via provider `persist()`
   - `_link_GlueJobRunDetailsLink` via framework-managed operator extra links
   
   Reference:
   
[task-sdk/src/airflow/sdk/bases/operatorlink.py#L47-L55](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/bases/operatorlink.py#L47-L55)
   
   My current read is that the deferrable Glue path is not idempotent with 
respect to its XCom writes across pre-deferral, resume, and retry boundaries. 
At minimum, `glue_job_run_details` and `return_value` appear vulnerable to 
duplicate-create semantics in this lifecycle.
   
   If helpful, I can follow up with a minimal repro DAG once I isolate it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to