Copilot commented on code in PR #50791:
URL: https://github.com/apache/airflow/pull/50791#discussion_r3066485636
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
Review Comment:
`use_regex` and `fail_on_missing` are exposed in the constructor/docstring
but are not actually honored: `use_regex` is not passed to `check_key_async`,
and `fail_on_missing` is never checked (missing key just sleeps). This makes
the API misleading; please either implement the behavior or remove these
parameters.
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
Review Comment:
The `raise AirflowException(...)` block has inconsistent indentation for the
closing parenthesis, which is likely to fail formatting/lint checks and is hard
to read. Align the closing `)` with the `raise` statement (as in other code in
this repo).
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
+
+ self.process_namespace = process_namespace
+ self.process_name = process_name
+
+ self.from_datetime = datetime.fromisoformat(from_datetime) \
+ if isinstance(from_datetime, str) \
+ else from_datetime
+
+ super().__init__(**kwargs)
+
+ # Will eventually be a serialized instance of this object
+ def serialize(self):
+ return (
+ "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+ {
+ "bucket_name": self.bucket_name,
+ "bucket_key": self.bucket_key,
+ "wildcard_match": self.wildcard_match,
+ "aws_conn_id": self.aws_conn_id,
+ "from_datetime": self.from_datetime.isoformat() \
+ if isinstance(self.from_datetime, datetime) \
+ else self.from_datetime,
+ "poke_interval": self.poke_interval,
+ "use_regex": self.use_regex,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
Review Comment:
`serialize()` should return `tuple[str, dict[str, Any]]` like the other
triggers in this file, and it must include all constructor args needed to
correctly re-instantiate the trigger. Currently `fail_on_missing`,
`is_incremental`, `process_namespace`, and `process_name` are not serialized,
so behavior will change after persistence/deserialization.
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
+
+ self.process_namespace = process_namespace
+ self.process_name = process_name
+
+ self.from_datetime = datetime.fromisoformat(from_datetime) \
+ if isinstance(from_datetime, str) \
+ else from_datetime
+
+ super().__init__(**kwargs)
+
+ # Will eventually be a serialized instance of this object
+ def serialize(self):
+ return (
+ "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+ {
+ "bucket_name": self.bucket_name,
+ "bucket_key": self.bucket_key,
+ "wildcard_match": self.wildcard_match,
+ "aws_conn_id": self.aws_conn_id,
+ "from_datetime": self.from_datetime.isoformat() \
+ if isinstance(self.from_datetime, datetime) \
+ else self.from_datetime,
+ "poke_interval": self.poke_interval,
+ "use_regex": self.use_regex,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
+ },
+ )
+
+ def store_process_state(self, key, value):
+ # TODO: This would need to actually "POST" something
+ return {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key,
+ "process_value": value,
+ }
+
+ def get_process_state(self, key, default_value) -> datetime:
+ # TODO: This would need to actually "GET" something
+ _ = {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key
+ }
+ return datetime(2025, 1, 1)
Review Comment:
`store_process_state()` / `get_process_state()` are currently placeholders
and do not actually persist or retrieve anything; `get_process_state()` also
ignores `default_value` and always returns a hard-coded datetime. As written,
`is_incremental=True` will not behave correctly across trigger restarts and can
silently miss/duplicate events. Either implement real persistence (and return
`default_value` when unset) or remove the incremental/stateful options until
they are supported.
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
+
+ self.process_namespace = process_namespace
+ self.process_name = process_name
+
+ self.from_datetime = datetime.fromisoformat(from_datetime) \
+ if isinstance(from_datetime, str) \
+ else from_datetime
+
+ super().__init__(**kwargs)
+
+ # Will eventually be a serialized instance of this object
+ def serialize(self):
+ return (
+ "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+ {
+ "bucket_name": self.bucket_name,
+ "bucket_key": self.bucket_key,
+ "wildcard_match": self.wildcard_match,
+ "aws_conn_id": self.aws_conn_id,
+ "from_datetime": self.from_datetime.isoformat() \
+ if isinstance(self.from_datetime, datetime) \
+ else self.from_datetime,
+ "poke_interval": self.poke_interval,
+ "use_regex": self.use_regex,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
+ },
+ )
+
+ def store_process_state(self, key, value):
+ # TODO: This would need to actually "POST" something
+ return {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key,
+ "process_value": value,
+ }
+
+ def get_process_state(self, key, default_value) -> datetime:
+ # TODO: This would need to actually "GET" something
+ _ = {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key
+ }
+ return datetime(2025, 1, 1)
+
+ @cached_property
+ def hook(self) -> S3Hook:
+ return S3Hook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
+
+ async def run(self):
+ try:
+ async with await self.hook.get_async_conn() as client:
+ while True:
+ # Check to see if the key exists
+ if await self.hook.check_key_async(
+ client=client,
+ bucket=self.bucket_name,
+ bucket_keys=self.bucket_key,
+ wildcard_match=self.wildcard_match,
+ # not including regex, as it's not available in
list_keys
+ ):
+ if self.is_incremental:
+ from_datetime: datetime = self.get_process_state(
+ key="last_check_datetime",
+ default_value=self.from_datetime
+ )
+ else:
+ # If it's not incremental, then it's always going
to load using the
+ # self.from_datetime as the lower bound
+ from_datetime = self.from_datetime
+
+ # The goal here is to be safe and eliminate the risk
of potentially missing a file
+ # that's landed. With this approach, a timestamp is
actually captured before the
+ # operation takes place. This way, there isn't the
possibility of a gap between the
+ # time that the bucket was last checked for changes
and the time a new timestamp was
+ # captured
+ _safe_to_datetime = datetime.now(timezone.utc)
+
+ # If they do, then get those keys (and information)
+ keys_changed: list = self.hook.list_keys(
+ bucket_name=self.bucket_name,
+ prefix=self.bucket_key,
+ from_datetime=from_datetime,
Review Comment:
`_safe_to_datetime` is timezone-aware (UTC) but `from_datetime` can be
timezone-naive (e.g. `datetime.fromisoformat()` without offset, and
`get_process_state()` currently returns a naive datetime). `S3Hook.list_keys()`
compares these to S3 `LastModified` values and will raise `TypeError` when
mixing naive/aware datetimes. Ensure `from_datetime` is always tz-aware
(ideally UTC) and that persisted state preserves tzinfo.
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
+
+ self.process_namespace = process_namespace
+ self.process_name = process_name
+
+ self.from_datetime = datetime.fromisoformat(from_datetime) \
+ if isinstance(from_datetime, str) \
+ else from_datetime
+
+ super().__init__(**kwargs)
+
+ # Will eventually be a serialized instance of this object
+ def serialize(self):
+ return (
+ "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+ {
+ "bucket_name": self.bucket_name,
+ "bucket_key": self.bucket_key,
+ "wildcard_match": self.wildcard_match,
+ "aws_conn_id": self.aws_conn_id,
+ "from_datetime": self.from_datetime.isoformat() \
+ if isinstance(self.from_datetime, datetime) \
+ else self.from_datetime,
+ "poke_interval": self.poke_interval,
+ "use_regex": self.use_regex,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
+ },
+ )
+
+ def store_process_state(self, key, value):
+ # TODO: This would need to actually "POST" something
+ return {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key,
+ "process_value": value,
+ }
+
+ def get_process_state(self, key, default_value) -> datetime:
+ # TODO: This would need to actually "GET" something
+ _ = {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key
+ }
+ return datetime(2025, 1, 1)
+
+ @cached_property
+ def hook(self) -> S3Hook:
+ return S3Hook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
+
+ async def run(self):
+ try:
+ async with await self.hook.get_async_conn() as client:
+ while True:
+ # Check to see if the key exists
+ if await self.hook.check_key_async(
+ client=client,
+ bucket=self.bucket_name,
+ bucket_keys=self.bucket_key,
+ wildcard_match=self.wildcard_match,
+ # not including regex, as it's not available in
list_keys
+ ):
+ if self.is_incremental:
+ from_datetime: datetime = self.get_process_state(
+ key="last_check_datetime",
+ default_value=self.from_datetime
+ )
+ else:
+ # If it's not incremental, then it's always going
to load using the
+ # self.from_datetime as the lower bound
+ from_datetime = self.from_datetime
+
+ # The goal here is to be safe and eliminate the risk
of potentially missing a file
+ # that's landed. With this approach, a timestamp is
actually captured before the
+ # operation takes place. This way, there isn't the
possibility of a gap between the
+ # time that the bucket was last checked for changes
and the time a new timestamp was
+ # captured
+ _safe_to_datetime = datetime.now(timezone.utc)
+
+ # If they do, then get those keys (and information)
+ keys_changed: list = self.hook.list_keys(
+ bucket_name=self.bucket_name,
+ prefix=self.bucket_key,
+ from_datetime=from_datetime,
+ to_datetime=_safe_to_datetime,
+ apply_wildcard=self.wildcard_match, # TODO:
Change this
+ )
Review Comment:
This trigger holds an async S3 client context but then calls the synchronous
`S3Hook.list_keys()` (which uses `self.get_conn()`/boto3 under the hood). That
blocks the triggerer event loop and also ignores the `client` you just created.
Please switch to an async listing approach (e.g. iterate
`get_file_metadata_async`/`get_files_async` with filtering) or wrap the sync
call in an executor so it doesn’t block asyncio.
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
+
+ self.process_namespace = process_namespace
+ self.process_name = process_name
+
+ self.from_datetime = datetime.fromisoformat(from_datetime) \
+ if isinstance(from_datetime, str) \
+ else from_datetime
+
+ super().__init__(**kwargs)
+
+ # Will eventually be a serialized instance of this object
+ def serialize(self):
+ return (
+ "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+ {
+ "bucket_name": self.bucket_name,
+ "bucket_key": self.bucket_key,
+ "wildcard_match": self.wildcard_match,
+ "aws_conn_id": self.aws_conn_id,
+ "from_datetime": self.from_datetime.isoformat() \
+ if isinstance(self.from_datetime, datetime) \
+ else self.from_datetime,
+ "poke_interval": self.poke_interval,
+ "use_regex": self.use_regex,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
+ },
+ )
+
+ def store_process_state(self, key, value):
+ # TODO: This would need to actually "POST" something
+ return {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key,
+ "process_value": value,
+ }
+
+ def get_process_state(self, key, default_value) -> datetime:
+ # TODO: This would need to actually "GET" something
+ _ = {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key
+ }
+ return datetime(2025, 1, 1)
+
+ @cached_property
+ def hook(self) -> S3Hook:
+ return S3Hook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
+
+ async def run(self):
Review Comment:
`run` is an async generator (it yields `TriggerEvent`s) but the signature is
missing the return type annotation used elsewhere in this module (`->
AsyncIterator[TriggerEvent]`). Adding it improves type checking and keeps
consistency with the other triggers in this file.
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -18,14 +18,13 @@
import asyncio
from collections.abc import AsyncIterator
+from datetime import datetime, timezone
from functools import cached_property
-from typing import TYPE_CHECKING, Any
+from typing import Any
+from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.triggers.base import BaseTrigger, TriggerEvent
-
-if TYPE_CHECKING:
- from datetime import datetime
+from airflow.triggers.base import BaseEventTrigger, BaseTrigger, TriggerEvent
Review Comment:
`BaseEventTrigger` is imported unconditionally. The amazon provider supports
Airflow < 3.0 (see `providers/amazon/.../triggers/sqs.py` using
`AIRFLOW_V_3_0_PLUS`), where `BaseEventTrigger` may not exist and this import
would fail at module import time. Please add the same version-compat
conditional import here (falling back to `BaseTrigger as BaseEventTrigger`).
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e)})
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+ """
+ S3KeyUpsertedTrigger is fired as a deferred class with params to run the
task in the trigger worker when
+ a certain key in an S3 object is changed.
+
+ :param fail_if_missing: if True and key does not exist, an exception will
be raised
+ """
+
+ def __init__(
+ self,
+ bucket_name: str,
+ bucket_key: str,
+ wildcard_match: bool = False,
+ aws_conn_id: str | None = "aws_default",
+ from_datetime: str | datetime | None = None,
+ poke_interval: float = 5.0,
+ use_regex: bool = False,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
+ fail_on_missing: bool = False, # TODO: Probably should get removed?
+ is_incremental: bool = False,
+ process_namespace: str | None = None,
+ process_name: str | None = None,
+ **kwargs
+ ):
+ # TODO: Add validation for the bucket_name and bucket_key
+ # TODO: Add use_regex and should_check_fn
+ self.bucket_name = bucket_name
+ self.bucket_key = bucket_key
+ self.wildcard_match = wildcard_match
+ self.aws_conn_id = aws_conn_id
+ self.poke_interval = poke_interval
+ self.use_regex = use_regex # TODO: Check this out
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
+ self.fail_on_missing = fail_on_missing
+ self.is_incremental = is_incremental
+
+ if self.is_incremental:
+ if not (process_namespace and process_name):
+ raise AirflowException(
+ "If is_incremental is True, process_namespace and
process_name cannot be empty"
+ )
+
+ self.process_namespace = process_namespace
+ self.process_name = process_name
+
+ self.from_datetime = datetime.fromisoformat(from_datetime) \
+ if isinstance(from_datetime, str) \
+ else from_datetime
+
+ super().__init__(**kwargs)
+
+ # Will eventually be a serialized instance of this object
+ def serialize(self):
+ return (
+ "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+ {
+ "bucket_name": self.bucket_name,
+ "bucket_key": self.bucket_key,
+ "wildcard_match": self.wildcard_match,
+ "aws_conn_id": self.aws_conn_id,
+ "from_datetime": self.from_datetime.isoformat() \
+ if isinstance(self.from_datetime, datetime) \
+ else self.from_datetime,
+ "poke_interval": self.poke_interval,
+ "use_regex": self.use_regex,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
+ },
+ )
+
+ def store_process_state(self, key, value):
+ # TODO: This would need to actually "POST" something
+ return {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key,
+ "process_value": value,
+ }
+
+ def get_process_state(self, key, default_value) -> datetime:
+ # TODO: This would need to actually "GET" something
+ _ = {
+ "process_namespace": self.process_namespace,
+ "process_name": self.process_name,
+ "process_key": key
+ }
+ return datetime(2025, 1, 1)
+
+ @cached_property
+ def hook(self) -> S3Hook:
+ return S3Hook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
+
+ async def run(self):
+ try:
+ async with await self.hook.get_async_conn() as client:
+ while True:
+ # Check to see if the key exists
+ if await self.hook.check_key_async(
+ client=client,
+ bucket=self.bucket_name,
+ bucket_keys=self.bucket_key,
+ wildcard_match=self.wildcard_match,
+ # not including regex, as it's not available in
list_keys
+ ):
+ if self.is_incremental:
+ from_datetime: datetime = self.get_process_state(
+ key="last_check_datetime",
+ default_value=self.from_datetime
+ )
+ else:
+ # If it's not incremental, then it's always going
to load using the
+ # self.from_datetime as the lower bound
+ from_datetime = self.from_datetime
+
+ # The goal here is to be safe and eliminate the risk
of potentially missing a file
+ # that's landed. With this approach, a timestamp is
actually captured before the
+ # operation takes place. This way, there isn't the
possibility of a gap between the
+ # time that the bucket was last checked for changes
and the time a new timestamp was
+ # captured
+ _safe_to_datetime = datetime.now(timezone.utc)
+
+ # If they do, then get those keys (and information)
+ keys_changed: list = self.hook.list_keys(
+ bucket_name=self.bucket_name,
+ prefix=self.bucket_key,
+ from_datetime=from_datetime,
+ to_datetime=_safe_to_datetime,
+ apply_wildcard=self.wildcard_match, # TODO:
Change this
+ )
+
+ # Regardless if there are or are not keys that have
changed, last_activity_time
+ # should still be updated
+ if self.is_incremental:
+
self.store_process_state(key="last_check_datetime", value=_safe_to_datetime)
+
+ # TODO: Remove this
+ # This will eventually need to go, but maintains
parity with pre-persistence logic
+ self.from_datetime = _safe_to_datetime
+
+ if len(keys_changed) == 0:
+ self.log.info("Sleeping for %s seconds",
self.poke_interval)
+ await asyncio.sleep(self.poke_interval)
+ continue
+
+ # Eventually, we'll want to return more of a rich
payload, such that it can be used
+ # by downstream Tasks
+ for key in keys_changed:
+ yield TriggerEvent({
+ "status": "success",
+ "key": key,
+ # "from_datetime":
self.from_datetime.isoformat(),
+ })
+
+ return # Since we are persisting state, we can now
return
+
Review Comment:
PR description mentions removing the `return` to avoid wiping state, but the
trigger still `return`s immediately after yielding events. If this trigger is
intended for continuous asset watching, returning here will stop the trigger
run and may re-introduce the looping/state-reset behavior depending on how it’s
scheduled. Please reconcile the intended lifecycle with the implementation
(single-shot vs continuous).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]