Copilot commented on code in PR #50791:
URL: https://github.com/apache/airflow/pull/50791#discussion_r3066485636


##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,

Review Comment:
   `use_regex` and `fail_on_missing` are exposed in the constructor/docstring 
but are not actually honored: `use_regex` is not passed to `check_key_async`, 
and `fail_on_missing` is never checked (missing key just sleeps). This makes 
the API misleading; please either implement the behavior or remove these 
parameters.



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )

Review Comment:
   The `raise AirflowException(...)` block has inconsistent indentation for the 
closing parenthesis, which is likely to fail formatting/lint checks and is hard 
to read. Align the closing `)` with the `raise` statement (as in other code in 
this repo).



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )
+
+        self.process_namespace = process_namespace
+        self.process_name = process_name
+
+        self.from_datetime = datetime.fromisoformat(from_datetime) \
+            if isinstance(from_datetime, str) \
+            else from_datetime
+
+        super().__init__(**kwargs)
+
+    # Will eventually be a serialized instance of this object
+    def serialize(self):
+        return (
+            "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+            {
+                "bucket_name": self.bucket_name,
+                "bucket_key": self.bucket_key,
+                "wildcard_match": self.wildcard_match,
+                "aws_conn_id": self.aws_conn_id,
+                "from_datetime": self.from_datetime.isoformat() \
+                    if isinstance(self.from_datetime, datetime) \
+                    else self.from_datetime,
+                "poke_interval": self.poke_interval,
+                "use_regex": self.use_regex,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,

Review Comment:
   `serialize()` should return `tuple[str, dict[str, Any]]` like the other 
triggers in this file, and it must include all constructor args needed to 
correctly re-instantiate the trigger. Currently `fail_on_missing`, 
`is_incremental`, `process_namespace`, and `process_name` are not serialized, 
so behavior will change after persistence/deserialization.



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )
+
+        self.process_namespace = process_namespace
+        self.process_name = process_name
+
+        self.from_datetime = datetime.fromisoformat(from_datetime) \
+            if isinstance(from_datetime, str) \
+            else from_datetime
+
+        super().__init__(**kwargs)
+
+    # Will eventually be a serialized instance of this object
+    def serialize(self):
+        return (
+            "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+            {
+                "bucket_name": self.bucket_name,
+                "bucket_key": self.bucket_key,
+                "wildcard_match": self.wildcard_match,
+                "aws_conn_id": self.aws_conn_id,
+                "from_datetime": self.from_datetime.isoformat() \
+                    if isinstance(self.from_datetime, datetime) \
+                    else self.from_datetime,
+                "poke_interval": self.poke_interval,
+                "use_regex": self.use_regex,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,
+            },
+        )
+
+    def store_process_state(self, key, value):
+        # TODO: This would need to actually "POST" something
+        return {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key,
+            "process_value": value,
+        }
+
+    def get_process_state(self, key, default_value) -> datetime:
+        # TODO: This would need to actually "GET" something
+        _ = {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key
+        }
+        return datetime(2025, 1, 1)

Review Comment:
   `store_process_state()` / `get_process_state()` are currently placeholders 
and do not actually persist or retrieve anything; `get_process_state()` also 
ignores `default_value` and always returns a hard-coded datetime. As written, 
`is_incremental=True` will not behave correctly across trigger restarts and can 
silently miss/duplicate events. Either implement real persistence (and return 
`default_value` when unset) or remove the incremental/stateful options until 
they are supported.



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )
+
+        self.process_namespace = process_namespace
+        self.process_name = process_name
+
+        self.from_datetime = datetime.fromisoformat(from_datetime) \
+            if isinstance(from_datetime, str) \
+            else from_datetime
+
+        super().__init__(**kwargs)
+
+    # Will eventually be a serialized instance of this object
+    def serialize(self):
+        return (
+            "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+            {
+                "bucket_name": self.bucket_name,
+                "bucket_key": self.bucket_key,
+                "wildcard_match": self.wildcard_match,
+                "aws_conn_id": self.aws_conn_id,
+                "from_datetime": self.from_datetime.isoformat() \
+                    if isinstance(self.from_datetime, datetime) \
+                    else self.from_datetime,
+                "poke_interval": self.poke_interval,
+                "use_regex": self.use_regex,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,
+            },
+        )
+
+    def store_process_state(self, key, value):
+        # TODO: This would need to actually "POST" something
+        return {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key,
+            "process_value": value,
+        }
+
+    def get_process_state(self, key, default_value) -> datetime:
+        # TODO: This would need to actually "GET" something
+        _ = {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key
+        }
+        return datetime(2025, 1, 1)
+
+    @cached_property
+    def hook(self) -> S3Hook:
+        return S3Hook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name,
+            verify=self.verify,
+            config=self.botocore_config,
+        )
+
+    async def run(self):
+        try:
+            async with await self.hook.get_async_conn() as client:
+                while True:
+                    # Check to see if the key exists
+                    if await self.hook.check_key_async(
+                        client=client,
+                        bucket=self.bucket_name,
+                        bucket_keys=self.bucket_key,
+                        wildcard_match=self.wildcard_match,
+                        # not including regex, as it's not available in 
list_keys
+                    ):
+                        if self.is_incremental:
+                            from_datetime: datetime = self.get_process_state(
+                                key="last_check_datetime",
+                                default_value=self.from_datetime
+                            )
+                        else:
+                            # If it's not incremental, then it's always going 
to load using the
+                            # self.from_datetime as the lower bound
+                            from_datetime = self.from_datetime
+
+                        # The goal here is to be safe and eliminate the risk 
of potentially missing a file
+                        # that's landed. With this approach, a timestamp is 
actually captured before the
+                        # operation takes place. This way, there isn't the 
possibility of a gap between the
+                        # time that the bucket was last checked for changes 
and the time a new timestamp was
+                        # captured
+                        _safe_to_datetime = datetime.now(timezone.utc)
+
+                        # If they do, then get those keys (and information)
+                        keys_changed: list = self.hook.list_keys(
+                            bucket_name=self.bucket_name,
+                            prefix=self.bucket_key,
+                            from_datetime=from_datetime,

Review Comment:
   `_safe_to_datetime` is timezone-aware (UTC) but `from_datetime` can be 
timezone-naive (e.g. `datetime.fromisoformat()` without offset, and 
`get_process_state()` currently returns a naive datetime). `S3Hook.list_keys()` 
compares these to S3 `LastModified` values and will raise `TypeError` when 
mixing naive/aware datetimes. Ensure `from_datetime` is always tz-aware 
(ideally UTC) and that persisted state preserves tzinfo.



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )
+
+        self.process_namespace = process_namespace
+        self.process_name = process_name
+
+        self.from_datetime = datetime.fromisoformat(from_datetime) \
+            if isinstance(from_datetime, str) \
+            else from_datetime
+
+        super().__init__(**kwargs)
+
+    # Will eventually be a serialized instance of this object
+    def serialize(self):
+        return (
+            "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+            {
+                "bucket_name": self.bucket_name,
+                "bucket_key": self.bucket_key,
+                "wildcard_match": self.wildcard_match,
+                "aws_conn_id": self.aws_conn_id,
+                "from_datetime": self.from_datetime.isoformat() \
+                    if isinstance(self.from_datetime, datetime) \
+                    else self.from_datetime,
+                "poke_interval": self.poke_interval,
+                "use_regex": self.use_regex,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,
+            },
+        )
+
+    def store_process_state(self, key, value):
+        # TODO: This would need to actually "POST" something
+        return {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key,
+            "process_value": value,
+        }
+
+    def get_process_state(self, key, default_value) -> datetime:
+        # TODO: This would need to actually "GET" something
+        _ = {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key
+        }
+        return datetime(2025, 1, 1)
+
+    @cached_property
+    def hook(self) -> S3Hook:
+        return S3Hook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name,
+            verify=self.verify,
+            config=self.botocore_config,
+        )
+
+    async def run(self):
+        try:
+            async with await self.hook.get_async_conn() as client:
+                while True:
+                    # Check to see if the key exists
+                    if await self.hook.check_key_async(
+                        client=client,
+                        bucket=self.bucket_name,
+                        bucket_keys=self.bucket_key,
+                        wildcard_match=self.wildcard_match,
+                        # not including regex, as it's not available in 
list_keys
+                    ):
+                        if self.is_incremental:
+                            from_datetime: datetime = self.get_process_state(
+                                key="last_check_datetime",
+                                default_value=self.from_datetime
+                            )
+                        else:
+                            # If it's not incremental, then it's always going 
to load using the
+                            # self.from_datetime as the lower bound
+                            from_datetime = self.from_datetime
+
+                        # The goal here is to be safe and eliminate the risk 
of potentially missing a file
+                        # that's landed. With this approach, a timestamp is 
actually captured before the
+                        # operation takes place. This way, there isn't the 
possibility of a gap between the
+                        # time that the bucket was last checked for changes 
and the time a new timestamp was
+                        # captured
+                        _safe_to_datetime = datetime.now(timezone.utc)
+
+                        # If they do, then get those keys (and information)
+                        keys_changed: list = self.hook.list_keys(
+                            bucket_name=self.bucket_name,
+                            prefix=self.bucket_key,
+                            from_datetime=from_datetime,
+                            to_datetime=_safe_to_datetime,
+                            apply_wildcard=self.wildcard_match,  # TODO: 
Change this
+                        )

Review Comment:
   This trigger holds an async S3 client context but then calls the synchronous 
`S3Hook.list_keys()` (which uses `self.get_conn()`/boto3 under the hood). That 
blocks the triggerer event loop and also ignores the `client` you just created. 
Please switch to an async listing approach (e.g. iterate 
`get_file_metadata_async`/`get_files_async` with filtering) or wrap the sync 
call in an executor so it doesn’t block asyncio.



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )
+
+        self.process_namespace = process_namespace
+        self.process_name = process_name
+
+        self.from_datetime = datetime.fromisoformat(from_datetime) \
+            if isinstance(from_datetime, str) \
+            else from_datetime
+
+        super().__init__(**kwargs)
+
+    # Will eventually be a serialized instance of this object
+    def serialize(self):
+        return (
+            "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+            {
+                "bucket_name": self.bucket_name,
+                "bucket_key": self.bucket_key,
+                "wildcard_match": self.wildcard_match,
+                "aws_conn_id": self.aws_conn_id,
+                "from_datetime": self.from_datetime.isoformat() \
+                    if isinstance(self.from_datetime, datetime) \
+                    else self.from_datetime,
+                "poke_interval": self.poke_interval,
+                "use_regex": self.use_regex,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,
+            },
+        )
+
+    def store_process_state(self, key, value):
+        # TODO: This would need to actually "POST" something
+        return {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key,
+            "process_value": value,
+        }
+
+    def get_process_state(self, key, default_value) -> datetime:
+        # TODO: This would need to actually "GET" something
+        _ = {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key
+        }
+        return datetime(2025, 1, 1)
+
+    @cached_property
+    def hook(self) -> S3Hook:
+        return S3Hook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name,
+            verify=self.verify,
+            config=self.botocore_config,
+        )
+
+    async def run(self):

Review Comment:
   `run` is an async generator (it yields `TriggerEvent`s) but the signature is 
missing the return type annotation used elsewhere in this module (`-> 
AsyncIterator[TriggerEvent]`). Adding it improves type checking and keeps 
consistency with the other triggers in this file.



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -18,14 +18,13 @@
 
 import asyncio
 from collections.abc import AsyncIterator
+from datetime import datetime, timezone
 from functools import cached_property
-from typing import TYPE_CHECKING, Any
+from typing import Any
 
+from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.triggers.base import BaseTrigger, TriggerEvent
-
-if TYPE_CHECKING:
-    from datetime import datetime
+from airflow.triggers.base import BaseEventTrigger, BaseTrigger, TriggerEvent
 
 

Review Comment:
   `BaseEventTrigger` is imported unconditionally. The amazon provider supports 
Airflow < 3.0 (see `providers/amazon/.../triggers/sqs.py` using 
`AIRFLOW_V_3_0_PLUS`), where `BaseEventTrigger` may not exist and this import 
would fail at module import time. Please add the same version-compat 
conditional import here (falling back to `BaseTrigger as BaseEventTrigger`).



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -123,6 +122,178 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             yield TriggerEvent({"status": "error", "message": str(e)})
 
 
+class S3KeyUpsertedTrigger(BaseEventTrigger):
+    """
+    S3KeyUpsertedTrigger is fired as a deferred class with params to run the 
task in the trigger worker when
+    a certain key in an S3 object is changed.
+
+    :param fail_if_missing: if True and key does not exist, an exception will 
be raised
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        bucket_key: str,
+        wildcard_match: bool = False,
+        aws_conn_id: str | None = "aws_default",
+        from_datetime: str | datetime | None = None,
+        poke_interval: float = 5.0,
+        use_regex: bool = False,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
+        fail_on_missing: bool = False,  # TODO: Probably should get removed?
+        is_incremental: bool = False,
+        process_namespace: str | None = None,
+        process_name: str | None = None,
+        **kwargs
+    ):
+        # TODO: Add validation for the bucket_name and bucket_key
+        # TODO: Add use_regex and should_check_fn
+        self.bucket_name = bucket_name
+        self.bucket_key = bucket_key
+        self.wildcard_match = wildcard_match
+        self.aws_conn_id = aws_conn_id
+        self.poke_interval = poke_interval
+        self.use_regex = use_regex  # TODO: Check this out
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
+        self.fail_on_missing = fail_on_missing
+        self.is_incremental = is_incremental
+
+        if self.is_incremental:
+            if not (process_namespace and process_name):
+                raise AirflowException(
+                    "If is_incremental is True, process_namespace and 
process_name cannot be empty"
+            )
+
+        self.process_namespace = process_namespace
+        self.process_name = process_name
+
+        self.from_datetime = datetime.fromisoformat(from_datetime) \
+            if isinstance(from_datetime, str) \
+            else from_datetime
+
+        super().__init__(**kwargs)
+
+    # Will eventually be a serialized instance of this object
+    def serialize(self):
+        return (
+            "airflow.providers.amazon.aws.triggers.s3.S3KeyUpsertedTrigger",
+            {
+                "bucket_name": self.bucket_name,
+                "bucket_key": self.bucket_key,
+                "wildcard_match": self.wildcard_match,
+                "aws_conn_id": self.aws_conn_id,
+                "from_datetime": self.from_datetime.isoformat() \
+                    if isinstance(self.from_datetime, datetime) \
+                    else self.from_datetime,
+                "poke_interval": self.poke_interval,
+                "use_regex": self.use_regex,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,
+            },
+        )
+
+    def store_process_state(self, key, value):
+        # TODO: This would need to actually "POST" something
+        return {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key,
+            "process_value": value,
+        }
+
+    def get_process_state(self, key, default_value) -> datetime:
+        # TODO: This would need to actually "GET" something
+        _ = {
+            "process_namespace": self.process_namespace,
+            "process_name": self.process_name,
+            "process_key": key
+        }
+        return datetime(2025, 1, 1)
+
+    @cached_property
+    def hook(self) -> S3Hook:
+        return S3Hook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name,
+            verify=self.verify,
+            config=self.botocore_config,
+        )
+
+    async def run(self):
+        try:
+            async with await self.hook.get_async_conn() as client:
+                while True:
+                    # Check to see if the key exists
+                    if await self.hook.check_key_async(
+                        client=client,
+                        bucket=self.bucket_name,
+                        bucket_keys=self.bucket_key,
+                        wildcard_match=self.wildcard_match,
+                        # not including regex, as it's not available in 
list_keys
+                    ):
+                        if self.is_incremental:
+                            from_datetime: datetime = self.get_process_state(
+                                key="last_check_datetime",
+                                default_value=self.from_datetime
+                            )
+                        else:
+                            # If it's not incremental, then it's always going 
to load using the
+                            # self.from_datetime as the lower bound
+                            from_datetime = self.from_datetime
+
+                        # The goal here is to be safe and eliminate the risk 
of potentially missing a file
+                        # that's landed. With this approach, a timestamp is 
actually captured before the
+                        # operation takes place. This way, there isn't the 
possibility of a gap between the
+                        # time that the bucket was last checked for changes 
and the time a new timestamp was
+                        # captured
+                        _safe_to_datetime = datetime.now(timezone.utc)
+
+                        # If they do, then get those keys (and information)
+                        keys_changed: list = self.hook.list_keys(
+                            bucket_name=self.bucket_name,
+                            prefix=self.bucket_key,
+                            from_datetime=from_datetime,
+                            to_datetime=_safe_to_datetime,
+                            apply_wildcard=self.wildcard_match,  # TODO: 
Change this
+                        )
+
+                        # Regardless if there are or are not keys that have 
changed, last_activity_time
+                        # should still be updated
+                        if self.is_incremental:
+                            
self.store_process_state(key="last_check_datetime", value=_safe_to_datetime)
+
+                        # TODO: Remove this
+                        # This will eventually need to go, but maintains 
parity with pre-persistence logic
+                        self.from_datetime = _safe_to_datetime
+
+                        if len(keys_changed) == 0:
+                            self.log.info("Sleeping for %s seconds", 
self.poke_interval)
+                            await asyncio.sleep(self.poke_interval)
+                            continue
+
+                        # Eventually, we'll want to return more of a rich 
payload, such that it can be used
+                        # by downstream Tasks
+                        for key in keys_changed:
+                            yield TriggerEvent({
+                                "status": "success",
+                                "key": key,
+                                # "from_datetime": 
self.from_datetime.isoformat(),
+                            })
+
+                        return  # Since we are persisting state, we can now 
return
+

Review Comment:
   PR description mentions removing the `return` to avoid wiping state, but the 
trigger still `return`s immediately after yielding events. If this trigger is 
intended for continuous asset watching, returning here will stop the trigger 
run and may re-introduce the looping/state-reset behavior depending on how it’s 
scheduled. Please reconcile the intended lifecycle with the implementation 
(single-shot vs continuous).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to