jroachgolf84 opened a new pull request, #65103:
URL: https://github.com/apache/airflow/pull/65103

   ## Description
   
   With AIP-103 adding an interface to persist state for Assets, AIP-93 shifts 
more towards implementing a way to use this state when building 
`BaseEventTrigger` for "Asset-watching".
   
   Currently, any Trigger defined for "Asset watching" is Asset-unaware. This 
PR adds the ability to make these Trigger Asset-aware. The new pattern 
implemented here allows for an `Asset` object to be passed to the 
`AssetWatcher` (via the `trigger`). This makes much more sense; first, I define 
an Asset. Then, I define a way to "watch" that Asset. 
   
   ```python
   from airflow.sdk import Asset, AssetWatcher
   from plugins.triggers import GenericEventTrigger
   
   ...  # Other imports
   
   # Defining the Asset first
   random_integer_asset = Asset(name="random_integer_asset")
   
   random_integer_watcher = AssetWatcher(
       name="random_integer_watcher",
       asset=random_integer_asset,  # Passing the Asset to the AssetWatcher
       trigger=GenericEventTrigger(
           target_number=7,
           waiter_delay=30
       )
   )
   
   with DAG(
       dag_id="random_integer_downstream_dag",
       start_date=datetime(2026, 1, 1),
       schedule=[random_integer_asset]
   ) as dag:
       ...
   
   ```
   
   ## Future State
   
   Eventually, I'd love to get to the following syntax, which is SIGNIFICANTLY 
more intuitive for DAG authors.
   
   ```python
   from airflow.sdk import asset, DAG
   
   ...  # Other imports
   
   @asset.watcher(
       name="random_integer_asset_watcher",
       waiter_delay=30
   )
   def random_integer_asset(target_number):
       generated_number: int = random.randint(0, 10)
       
       # Anything "truthy" yields a TriggerEvent and breaks execution 
       if generated_number == target_number:
           return {"generated_number": generated_number}
   
   
   with DAG(
       dag_id="random_integer_downstream_dag",
       start_date=datetime(2026, 1, 1),
       schedule=[random_integer_asset(7)]
   ) as dag:
       ...
   
   ```
   
   ## Testing
   
   Note, no unit tests have been implemented yet; this PR is just providing a 
forum for discussion. Tests will be added after discussion.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to