Copilot commented on code in PR #63499:
URL: https://github.com/apache/airflow/pull/63499#discussion_r3066479847
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -1237,9 +1238,10 @@ def load_file(
get_hook_lineage_collector().add_input_asset(
context=self, scheme="file", asset_kwargs={"path": filename}
)
- get_hook_lineage_collector().add_output_asset(
- context=self, scheme="s3", asset_kwargs={"bucket": bucket_name,
"key": key}
- )
+ if self.enable_hook_level_lineage:
+ get_hook_lineage_collector().add_output_asset(
+ context=self, scheme="s3", asset_kwargs={"bucket":
bucket_name, "key": key}
+ )
Review Comment:
The new flag is described as disabling all hook-level lineage, but in this
method `add_input_asset(...)` still runs unconditionally when
`enable_hook_level_lineage=False`. If the intent is to fully disable hook-level
lineage, this call (and other `get_hook_lineage_collector()` usages elsewhere
in the hook) should also be gated behind the flag, otherwise users will still
emit lineage assets.
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -195,6 +195,7 @@ def __init__(
kwargs["client_type"] = "s3"
kwargs["aws_conn_id"] = aws_conn_id
self._requester_pays = kwargs.pop("requester_pays", False)
+ self.enable_hook_level_lineage =
kwargs.pop("enable_hook_level_lineage", True)
Review Comment:
`enable_hook_level_lineage` is implemented via `kwargs.pop(...)`, but it is
not part of the `S3Hook.__init__` signature (and therefore not visible to type
checkers / generated docs). To match the stated API
(`S3Hook(enable_hook_level_lineage=False)`), add it as an explicit keyword-only
parameter (e.g., after `*args`) and assign `self.enable_hook_level_lineage`
from that argument instead of pulling it from `kwargs`.
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -1383,9 +1385,10 @@ def _upload_file_obj(
Config=self.transfer_config,
)
# No input because file_obj can be anything - handle in calling
function if possible
- get_hook_lineage_collector().add_output_asset(
- context=self, scheme="s3", asset_kwargs={"bucket": bucket_name,
"key": key}
- )
+ if getattr(self, "enable_hook_level_lineage", True):
Review Comment:
`_upload_file_obj` uses `getattr(self, "enable_hook_level_lineage", True)`
while `load_file` uses `self.enable_hook_level_lineage`. Since the attribute is
now set in `__init__`, prefer a single consistent access pattern (and avoid
silently defaulting to True if the attribute is unexpectedly missing).
##########
providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py:
##########
@@ -2070,3 +2070,37 @@ def test_unify_and_provide_ordered_properly():
matches = re.findall(r"@provide_bucket_name\s+@unify_bucket_name_and_key",
code, re.MULTILINE)
if matches:
pytest.fail("@unify_bucket_name_and_key should be applied before
@provide_bucket_name in S3Hook")
+
+
+class TestS3HookLineageConfig:
+ def test_hook_lineage_enabled_by_default(self):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+ hook = S3Hook()
+ assert hook.enable_hook_level_lineage is True
+
+ def test_hook_lineage_disabled_when_flag_false(self):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+ hook = S3Hook(enable_hook_level_lineage=False)
+ assert hook.enable_hook_level_lineage is False
+
+ from unittest import mock
+
+
@mock.patch("airflow.providers.amazon.aws.hooks.s3.get_hook_lineage_collector")
+ @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_conn")
+ def test_load_string_skips_lineage_when_disabled(self, mock_conn,
mock_collector):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
Review Comment:
This test class re-imports `S3Hook` inside each test and re-imports `mock`
inside the class body, even though both are already imported at module level.
Please remove these inner imports and use the module-level imports to keep
import style consistent and avoid unnecessary indirection.
##########
providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py:
##########
@@ -2070,3 +2070,37 @@ def test_unify_and_provide_ordered_properly():
matches = re.findall(r"@provide_bucket_name\s+@unify_bucket_name_and_key",
code, re.MULTILINE)
if matches:
pytest.fail("@unify_bucket_name_and_key should be applied before
@provide_bucket_name in S3Hook")
+
+
+class TestS3HookLineageConfig:
+ def test_hook_lineage_enabled_by_default(self):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+ hook = S3Hook()
+ assert hook.enable_hook_level_lineage is True
+
+ def test_hook_lineage_disabled_when_flag_false(self):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+ hook = S3Hook(enable_hook_level_lineage=False)
+ assert hook.enable_hook_level_lineage is False
+
+ from unittest import mock
+
+
@mock.patch("airflow.providers.amazon.aws.hooks.s3.get_hook_lineage_collector")
+ @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_conn")
+ def test_load_string_skips_lineage_when_disabled(self, mock_conn,
mock_collector):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+ hook = S3Hook(enable_hook_level_lineage=False)
+ hook.load_string("data", "key", bucket_name="bucket", replace=True)
+ mock_collector.return_value.add_output_asset.assert_not_called()
+
+
@mock.patch("airflow.providers.amazon.aws.hooks.s3.get_hook_lineage_collector")
+ @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_conn")
+ def test_load_string_exposes_lineage_when_enabled(self, mock_conn,
mock_collector):
+ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+ hook = S3Hook(enable_hook_level_lineage=True)
+ hook.load_string("data", "key", bucket_name="bucket", replace=True)
+ mock_collector.return_value.add_output_asset.assert_called_once()
Review Comment:
The new tests only assert `add_output_asset` behavior for `load_string`, but
the stated behavior is that *all* `get_hook_lineage_collector()` calls in the
hook are skipped when disabled. Please extend coverage to ensure the collector
itself is not called when the flag is False, and add at least one regression
test for another method that registers lineage (e.g. `load_file`,
`copy_object`, or `download_file`) to prevent partial gating regressions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]