amoghrajesh opened a new pull request, #53870:
URL: https://github.com/apache/airflow/pull/53870

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   closes: https://github.com/apache/airflow/issues/49404
   
   ## Motivation
   As part of the effort of heading DAG Authors use task SDK as their authoring 
interface for Airflow 3.1, we need to eliminate parallel implementations and 
consolidate functionality in the Task SDK. The `models.Connection` class 
currently has methods (to_dict, from_json, as_json, get_extra_dejson) that the 
SDK Connection lacks, creating feature gaps and forcing users to choose between 
implementations.
   
   ## How?
   
   These methods have been ported over from models.connections to 
sdk.connections. Long term, sdk.connections will be the way to go.
   
   No breaking changes - same method signatures, same behavior, same 
implementation.
   
   ## Testing
   
   Tried to validate the behaviour of models.connections, sdk.connections, 
their compat layer
   
   ### Models.connection behaviour:
   
   DAG used:
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.decorators import task
   from airflow.models.connection import Connection
   
   with DAG(
       dag_id="test_connection_methods",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       tags=["example"],
   ):
   
       @task()
       def test_connection_serialization():
           """Test the new Connection serialization methods."""
           conn = Connection(
               conn_id='test_conn',
               conn_type='postgres',
               host='localhost',
               port=5432,
               login='user',
               password='pass',
               extra='{"ssl": true}'
           )
   
           print(f"Created connection: {conn.conn_id}")
   
           # Test to_dict
           conn_dict = conn.to_dict()
           print(f"to_dict() keys: {list(conn_dict.keys())}")
   
           # Test as_json
           json_str = conn.as_json()
           print(f"as_json() length: {len(json_str)}")
   
           # Test get_extra_dejson
           extra = conn.get_extra_dejson()
           print(f"get_extra_dejson(): {extra}")
   
           # Test from_json round trip
           new_conn = Connection.from_json(json_str)
           print(f"from_json() worked: {new_conn.conn_type}")
   
           print("✅ All methods working!")
           return "success"
   
       test_connection_serialization()
   
   ```
   
   <img width="1648" height="730" alt="image" 
src="https://github.com/user-attachments/assets/7b50b86f-ec93-4aa0-bc9f-073b2ee3d6d7";
 />
   
   
   Logs:
   ```
   
   [2025-07-29, 15:30:05] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-07-29, 15:30:05] INFO - Filling up the DagBag from 
/files/dags/test_all_sdk_connection_methods.py: 
source="airflow.models.dagbag.DagBag"
   [2025-07-29, 15:30:05] WARNING - Skipping masking for a secret as it's too 
short (<5 chars): source="airflow.sdk.execution_time.secrets_masker"
   [2025-07-29, 15:30:05] INFO - Created connection: test_conn: chan="stdout": 
source="task"
   [2025-07-29, 15:30:05] INFO - to_dict() keys: ['conn_id', 'conn_type', 
'description', 'host', 'login', 'password', 'schema', 'port', 'extra']: 
chan="stdout": source="task"
   [2025-07-29, 15:30:05] INFO - as_json() length: 121: chan="stdout": 
source="task"
   [2025-07-29, 15:30:05] INFO - get_extra_dejson(): {'ssl': True}: 
chan="stdout": source="task"
   [2025-07-29, 15:30:05] INFO - from_json() worked: postgres: chan="stdout": 
source="task"
   [2025-07-29, 15:30:05] INFO - ✅ All methods working!: chan="stdout": 
source="task"
   [2025-07-29, 15:30:05] INFO - Done. Returned value was: success: 
source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
   [2025-07-29, 15:30:05] INFO - Pushing xcom: 
ti="RuntimeTaskInstance(id=UUID('0198559f-f743-744a-8fa3-845da7f8a790'), 
task_id='test_connection_serialization', dag_id='test_connection_methods', 
run_id='manual__2025-07-29T10:00:03.640806+00:00', try_number=1, 
dag_version_id=UUID('0198559b-106d-79af-af3f-a81bc08070f8'), map_index=-1, 
hostname='66ab7d87f917', context_carrier={}, 
task=<Task(_PythonDecoratedOperator): test_connection_serialization>, 
bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, 
start_date=datetime.datetime(2025, 7, 29, 10, 0, 4, 348019, 
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 
'running'>, is_mapped=False, rendered_map_index=None, 
log_url='http://localhost:8080/dags/test_connection_methods/runs/manual__2025-07-29T10%3A00%3A03.640806%2B00%3A00/tasks/test_connection_serialization?try_number=1')":
 source="task"
   ```
   
   ### SDK.connection
   
   DAG used:
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.decorators import task
   from airflow.sdk import Connection
   
   with DAG(
       dag_id="test_connection_methods",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       tags=["example"],
   ):
   
       @task()
       def test_connection_serialization():
           """Test the new Connection serialization methods."""
           conn = Connection(
               conn_id='test_conn',
               conn_type='postgres',
               host='localhost',
               port=5432,
               login='user',
               password='pass',
               extra='{"ssl": true}'
           )
   
           print(f"Created connection: {conn.conn_id}")
   
           # Test to_dict
           conn_dict = conn.to_dict()
           print(f"to_dict() keys: {list(conn_dict.keys())}")
   
           # Test as_json
           json_str = conn.as_json()
           print(f"as_json() length: {len(json_str)}")
   
           # Test get_extra_dejson
           extra = conn.get_extra_dejson()
           print(f"get_extra_dejson(): {extra}")
   
           # Test from_json round trip
           new_conn = Connection.from_json(json_str)
           print(f"from_json() worked: {new_conn.conn_type}")
   
           print("✅ All methods working!")
           return "success"
   
       test_connection_serialization()
   
   ```
   
   <img width="1648" height="730" alt="image" 
src="https://github.com/user-attachments/assets/d9317e7f-6f57-4dce-b3a6-7c392bbc4aef";
 />
   
   Logs:
   ```
   [2025-07-29, 15:33:33] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-07-29, 15:33:33] INFO - Filling up the DagBag from 
/files/dags/test_all_sdk_connection_methods.py: 
source="airflow.models.dagbag.DagBag"
   [2025-07-29, 15:33:33] INFO - Created connection: test_conn: chan="stdout": 
source="task"
   [2025-07-29, 15:33:33] INFO - to_dict() keys: ['conn_id', 'conn_type', 
'description', 'host', 'login', 'password', 'schema', 'port', 'extra']: 
chan="stdout": source="task"
   [2025-07-29, 15:33:33] INFO - as_json() length: 121: chan="stdout": 
source="task"
   [2025-07-29, 15:33:33] INFO - get_extra_dejson(): {'ssl': True}: 
chan="stdout": source="task"
   [2025-07-29, 15:33:33] INFO - Done. Returned value was: success: 
source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
   [2025-07-29, 15:33:33] INFO - Pushing xcom: 
ti="RuntimeTaskInstance(id=UUID('019855a3-1ec9-7e54-9c9f-71e2d4ac2d50'), 
task_id='test_connection_serialization', dag_id='test_connection_methods', 
run_id='manual__2025-07-29T10:03:30.349994+00:00', try_number=1, 
dag_version_id=UUID('0198559b-106d-79af-af3f-a81bc08070f8'), map_index=-1, 
hostname='66ab7d87f917', context_carrier={}, 
task=<Task(_PythonDecoratedOperator): test_connection_serialization>, 
bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, 
start_date=datetime.datetime(2025, 7, 29, 10, 3, 32, 286149, 
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 
'running'>, is_mapped=False, rendered_map_index=None, 
log_url='http://localhost:8080/dags/test_connection_methods/runs/manual__2025-07-29T10%3A03%3A30.349994%2B00%3A00/tasks/test_connection_serialization?try_number=1')":
 source="task"
   [2025-07-29, 15:33:33] INFO - from_json() worked: postgres: chan="stdout": 
source="task"
   [2025-07-29, 15:33:33] INFO - ✅ All methods working!: chan="stdout": 
source="task"
   ```
   
   
   ### Compat layer (added the changes later in last commit and tested after 
that)
   
   DAG used: same as 1.
   <img width="1648" height="900" alt="image" 
src="https://github.com/user-attachments/assets/8805ed30-f4b0-4edb-9ad3-3bff2c88320a";
 />
   
   
   Logs:
   
   ```
   [2025-07-29, 15:49:29] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-07-29, 15:49:29] INFO - Filling up the DagBag from 
/files/dags/test_all_sdk_connection_methods.py: 
source="airflow.models.dagbag.DagBag"
   [2025-07-29, 15:49:29] WARNING - Skipping masking for a secret as it's too 
short (<5 chars): source="airflow.sdk.execution_time.secrets_masker"
   [2025-07-29, 15:49:29] INFO - Created connection: test_conn: chan="stdout": 
source="task"
   [2025-07-29, 15:49:29] WARNING - 
/opt/airflow/airflow-core/src/airflow/models/connection.py:534: 
DeprecationWarning: Using Connection.to_dict from `airflow.models` is 
deprecated.Please use `from airflow.sdk import Connection` instead
     warnings.warn(
   : source="py.warnings"
   [2025-07-29, 15:49:29] INFO - to_dict() keys: ['conn_id', 'conn_type', 
'description', 'host', 'login', 'password', 'schema', 'port', 'extra']: 
chan="stdout": source="task"
   [2025-07-29, 15:49:29] WARNING - 
/opt/airflow/airflow-core/src/airflow/models/connection.py:594: 
DeprecationWarning: Using Connection.as_json from `airflow.models` is 
deprecated.Please use `from airflow.sdk import Connection` instead
     warnings.warn(
   : source="py.warnings"
   [2025-07-29, 15:49:29] INFO - as_json() length: 121: chan="stdout": 
source="task"
   [2025-07-29, 15:49:29] INFO - get_extra_dejson(): {'ssl': True}: 
chan="stdout": source="task"
   [2025-07-29, 15:49:29] WARNING - 
/opt/airflow/airflow-core/src/airflow/models/connection.py:566: 
DeprecationWarning: Using Connection.from_json from `airflow.models` is 
deprecated.Please use `from airflow.sdk import Connection` instead
     warnings.warn(
   : source="py.warnings"
   [2025-07-29, 15:49:29] INFO - Done. Returned value was: success: 
source="airflow.task.operators.airflow.providers.standard.decorators.python._PythonDecoratedOperator"
   [2025-07-29, 15:49:29] INFO - Pushing xcom: 
ti="RuntimeTaskInstance(id=UUID('019855b1-b024-7018-966f-0a7b09740b3b'), 
task_id='test_connection_serialization', dag_id='test_connection_methods', 
run_id='manual__2025-07-29T10:19:25.057678+00:00', try_number=1, 
dag_version_id=UUID('0198559b-106d-79af-af3f-a81bc08070f8'), map_index=-1, 
hostname='66ab7d87f917', context_carrier={}, 
task=<Task(_PythonDecoratedOperator): test_connection_serialization>, 
bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, 
start_date=datetime.datetime(2025, 7, 29, 10, 19, 28, 497687, 
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 
'running'>, is_mapped=False, rendered_map_index=None, 
log_url='http://localhost:8080/dags/test_connection_methods/runs/manual__2025-07-29T10%3A19%3A25.057678%2B00%3A00/tasks/test_connection_serialization?try_number=1')":
 source="task"
   [2025-07-29, 15:49:29] INFO - from_json() worked: postgres: chan="stdout": 
source="task"
   [2025-07-29, 15:49:29] INFO - ✅ All methods working!: chan="stdout": 
source="task"
   ```
   
   Pay attention to the warnings ^
   
   
   ## What's next?
   
   - [ ] Remove the parallel paths for connection from models and sdk
   - [ ] `test_connection`?
   - [ ] Identify any other delta and handle it
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to