hkc-8010 opened a new issue, #65011:
URL: https://github.com/apache/airflow/issues/65011
### Apache Airflow version
3.1.8
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
We are seeing an intermittent failure pattern with deferrable
`GlueJobOperator` where the same task instance collides on XCom keys across the
initial execute/defer phase and the resumed `execute_complete()` phase.
The failures show up as duplicate-key / already-exists errors on XCom keys
such as:
- `glue_job_run_details`
- `return_value`
In one representative timeline, the task progressed like this:
```text
try 1 starts
try 1 logs duplicate XCom write on `glue_job_run_details`
try 1 still transitions to DEFERRED
try 1 resumes
try 1 fails on duplicate XCom write on `return_value`
try 1 transitions to UP_FOR_RETRY
try 2 starts
try 2 logs duplicate XCom write on `glue_job_run_details`
try 2 transitions to DEFERRED
try 2 resumes
try 2 fails on duplicate XCom write on `return_value`
try 2 transitions to FAILED
```
Example sanitized errors:
```text
409 Conflict
The XCom with key: `glue_job_run_details` with mentioned task instance
already exists.
```
```text
409 Conflict
The XCom with key: `return_value` with mentioned task instance already
exists.
```
In another occurrence of the same underlying problem family, the resumed
task failed with the database uniqueness error on the XCom PK:
```text
duplicate key value violates unique constraint "xcom_pkey"
Key (dag_run_id, task_id, map_index, key)=(..., run_job_task, -1,
return_value) already exists
```
This does not look like a custom-operator bug. The custom operator involved
was only a thin subclass of `GlueJobOperator` that enabled `deferrable=True`
and did not override `execute()`, `execute_complete()`, or add custom XCom
writes.
### What you think should happen instead?
A deferrable `GlueJobOperator` should not fail because it tries to create an
XCom key that was already written by an earlier phase of the same task instance
lifecycle.
More specifically:
- provider-managed link metadata such as `glue_job_run_details` should be
idempotent across deferral/resume/retry boundaries, or
- deferred task execution should not preserve conflicting XCom rows that the
resumed phase will write again, or
- the resumed / retried path should update or replace the existing XCom
instead of failing on create.
### How to reproduce
I do not have a distilled standalone reproducer yet, but the behavior
appears to require this combination:
1. Use `GlueJobOperator(deferrable=True)`.
2. Allow the task to enter `DEFERRED` and later resume via
`execute_complete()`.
3. Have a retry/resume path where XCom rows from the earlier phase still
exist.
4. Observe duplicate-write failures on one or both of:
- `glue_job_run_details`
- `return_value`
Even without the full minimal DAG yet, the relevant upstream code paths seem
to make the collision window plausible.
### Operating System
Debian-based container image
### Versions of Apache Airflow Providers
`apache-airflow-providers-amazon` as bundled with Airflow `3.1.8`
### Deployment
Other 3rd-party Helm chart / managed runtime
### Deployment details
Observed on a managed Airflow 3.1.8 deployment using CeleryExecutor.
### Anything else?
What looks relevant in the source:
1. `GlueJobOperator.execute()` persists `glue_job_run_details` before
deferring, and also returns the Glue run id.
- `execute()` persists link metadata here:
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L326-L339](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L326-L339)
- it defers here:
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L342-L354](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L342-L354)
- it returns the run id here:
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L367](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L367)
- `execute_complete()` returns the run id again here:
[providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L369-L374](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py#L369-L374)
2. The provider link helper writes `glue_job_run_details` through
`xcom_push()` and suppresses exceptions.
- `GlueJobRunDetailsLink.key = "glue_job_run_details"`:
[providers/amazon/src/airflow/providers/amazon/aws/links/glue.py#L22-L28](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/links/glue.py#L22-L28)
- `BaseAwsLink.persist()` writes `context["ti"].xcom_push(key=cls.key, ...)`:
[providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L81-L95](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py#L81-L95)
- `persist()` is wrapped in `@return_on_error(None)`, so a duplicate-key
failure there can be swallowed:
[providers/amazon/src/airflow/providers/amazon/aws/utils/suppress.py#L41-L69](https://github.com/apache/airflow/blob/3.1.8/providers/amazon/src/airflow/providers/amazon/aws/utils/suppress.py#L41-L69)
3. On resume, Airflow calls `resume_execution()` and then
`execute_complete()`, and successful return values are auto-pushed to XCom
`return_value`.
- resume path:
[task-sdk/src/airflow/sdk/bases/operator.py#L1664-L1683](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/bases/operator.py#L1664-L1683)
- task runner switches to resumed execution when `next_method` is set:
[task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1638-L1653](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1638-L1653)
- return values are auto-pushed to `return_value` here:
[task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1709-L1751](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1709-L1751)
4. Airflow explicitly does not clear XComs when resuming from deferral.
- `# However, do not clear it for deferral`:
[airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L255-L283](https://github.com/apache/airflow/blob/3.1.8/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L255-L283)
5. There is also a separate operator-extra-link XCom path.
`BaseOperatorLink.xcom_key` defaults to `_link_<ClassName>`, so the Glue
operator is effectively involved with both:
- `glue_job_run_details` via provider `persist()`
- `_link_GlueJobRunDetailsLink` via framework-managed operator extra links
Reference:
[task-sdk/src/airflow/sdk/bases/operatorlink.py#L47-L55](https://github.com/apache/airflow/blob/3.1.8/task-sdk/src/airflow/sdk/bases/operatorlink.py#L47-L55)
My current read is that the deferrable Glue path is not idempotent with
respect to its XCom writes across pre-deferral, resume, and retry boundaries.
At minimum, `glue_job_run_details` and `return_value` appear vulnerable to
duplicate-create semantics in this lifecycle.
If helpful, I can follow up with a minimal repro DAG once I isolate it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]