This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e89163be5a5 Add debug logging and fix exception handling in DynamoDB
hook (#64629)
e89163be5a5 is described below
commit e89163be5a564efe29b4216c2864d4e7020de8d1
Author: Shivam Rastogi <[email protected]>
AuthorDate: Thu Apr 2 12:07:34 2026 -0700
Add debug logging and fix exception handling in DynamoDB hook (#64629)
- Add debug logging before/after API calls in write_batch_data and
get_import_status, following the pattern established in the Airbyte
provider (PR #51503)
- Fix exception chaining: add `from e` to preserve __cause__ so
tracebacks show the full causal chain
- Fix `raise e` → bare `raise` in get_import_status to avoid adding
a misleading extra frame to the traceback
- Include import_arn in ImportNotFoundException error message for
easier debugging
- Rename shadowing variable `describe_import` → `response`
- Rename `general_error` → `e` for consistency
Part of the Airflow 3 Debugging Improvements initiative.
---
.../airflow/providers/amazon/aws/hooks/dynamodb.py | 33 ++++++++++++++++------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/hooks/dynamodb.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/dynamodb.py
index ae9d7005d3d..6df2faab52a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/dynamodb.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/dynamodb.py
@@ -74,14 +74,21 @@ class DynamoDBHook(AwsBaseHook):
:param items: list of DynamoDB items.
"""
try:
+ self.log.debug("Loading table %s from DynamoDB connection",
self.table_name)
table = self.get_conn().Table(self.table_name)
+ self.log.debug(
+ "Starting batch write to table %s with overwrite keys %s",
+ self.table_name,
+ self.table_keys,
+ )
with table.batch_writer(overwrite_by_pkeys=self.table_keys) as
batch:
for item in items:
batch.put_item(Item=item)
+ self.log.debug("Batch write to table %s completed successfully",
self.table_name)
return True
- except Exception as general_error:
- raise AirflowException(f"Failed to insert items in dynamodb,
error: {general_error}")
+ except Exception as e:
+ raise AirflowException(f"Failed to insert items in dynamodb,
error: {e}") from e
def get_import_status(self, import_arn: str) -> tuple[str, str | None, str
| None]:
"""
@@ -93,13 +100,23 @@ class DynamoDBHook(AwsBaseHook):
self.log.info("Poking for Dynamodb import %s", import_arn)
try:
- describe_import = self.client.describe_import(ImportArn=import_arn)
- status = describe_import["ImportTableDescription"]["ImportStatus"]
- error_code =
describe_import["ImportTableDescription"].get("FailureCode")
- error_msg =
describe_import["ImportTableDescription"].get("FailureMessage")
+ self.log.debug("Calling describe_import for import ARN %s",
import_arn)
+ response = self.client.describe_import(ImportArn=import_arn)
+ status = response["ImportTableDescription"]["ImportStatus"]
+ error_code = response["ImportTableDescription"].get("FailureCode")
+ error_msg =
response["ImportTableDescription"].get("FailureMessage")
+ self.log.debug(
+ "Import %s status: %s, error_code: %s, error_message: %s",
+ import_arn,
+ status,
+ error_code,
+ error_msg,
+ )
return status, error_code, error_msg
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
if error_code == "ImportNotFoundException":
- raise AirflowException("S3 import into Dynamodb job not
found.")
- raise e
+ raise AirflowException(
+ f"S3 import into Dynamodb job not found. Import ARN:
{import_arn}"
+ ) from e
+ raise