agomez-etsy opened a new issue, #33485:
URL: https://github.com/apache/airflow/issues/33485

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow version: **2.5.3**
   
   Related to this comment from @vchiapaikeo: 
https://github.com/apache/airflow/pull/33172#issuecomment-1677501450
   
   A couple of mins after midnight UTC - when 100s of DAGs are kicked off - we 
noticed our triggerer replicas failing liveness probe checks and restarting 
systematically.
   
   Further profiling led to the discovery that the triggerer’s sync loop hangs 
for several minutes when there are 1000s of triggers running simultaneously, 
specifically while [bulk fetching 
triggers](https://github.com/apache/airflow/blob/v2-5-test/airflow/jobs/triggerer_job.py#L398),
 which causes the triggerer to miss heartbeats and eventually get restarted by 
k8s.
   
   With profiling still enabled, we observed that while the trigger is hanging 
and we profile the execution, we get this stack trace:
   
   ```
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     [506/45463]
           1    0.000    0.000   29.928   29.928 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2757(all)
               
           1    0.000    0.000   29.923   29.923 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1468(all)
      
           1    0.000    0.000   29.923   29.923 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:395(_allrows)
  
           1    0.000    0.000   29.923   29.923 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1388(_fetchall_impl)
       
           1    0.000    0.000   29.923   29.923 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1808(_fetchall_impl)
                                                                                
                                                                                
                                                                                
                                                                                
                                                
           2    0.000    0.000   29.922   14.961 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py:135(chunks)
           1    0.000    0.000   29.921   29.921 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:390(_raw_all_rows)
           1    0.001    0.001   29.921   29.921 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:393(<listcomp>)
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
        
         125    0.000    0.000   29.919    0.239 
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/type_api.py:1711(process)
 
         125    0.002    0.000   29.915    0.239 
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py:146(process_result_value)
  
         125    0.001    0.000   29.909    0.239 
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py:122(db_supports_json)
    
         125    0.001    0.000   29.908    0.239 
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:562(get)
                  
         125    0.000    0.000   29.907    0.239 
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:732(_get_environment_variables)
         125    0.002    0.000   29.907    0.239 
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:478(_get_env_var_option)
         125    0.002    0.000   29.902    0.239 
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:103(run_command)
                
         125    0.001    0.000   29.786    0.238 
/usr/local/lib/python3.10/subprocess.py:1110(communicate)                       
                                
         125    0.006    0.000   29.785    0.238 
/usr/local/lib/python3.10/subprocess.py:1952(_communicate)                      
                       
         250    0.003    0.000   29.762    0.119 
/usr/local/lib/python3.10/selectors.py:403(select)                              
           
         250   29.758    0.119   29.758    0.119 {method 'poll' of 
'select.poll' objects}                                                          
        
         125    0.002    0.000    0.100    0.001 
/usr/local/lib/python3.10/subprocess.py:758(__init__)                           
                                       
         125    0.004    0.000    0.094    0.001 
/usr/local/lib/python3.10/subprocess.py:1687(_execute_child)
   ```
   
   Which indicates that airflow is running a subprocess for each fetched row 
and that takes the vast majority of the execution time.
   
   We found that during the unmarshaling of the resulting rows into the Trigger 
model, the [kwargs 
column](https://github.com/apache/airflow/blob/v2-5-test/airflow/models/trigger.py#L57)
 (ExtendedJSON) runs 
[process_returned_value](https://github.com/apache/airflow/blob/v2-5-test/airflow/utils/sqlalchemy.py#L146),
 on each row, and reads the `SQL_ALCHEMY_CONN` configuration to determine 
whether the engine supports json or not and parse kwargs accordingly. However, 
in our case we define `SQL_ALCHEMY_CONN_CMD` as opposed to `SQL_ALCHEMY_CONN`, 
which causes the sync loop to spawn a new subprocess for every row 
([here](https://github.com/apache/airflow/blob/v2-5-test/airflow/configuration.py#L485-L488)).
   
   We workaround it by using `SQL_ALCHEMY_CONN` instead of 
`SQL_ALCHEMY_CONN_CMD`, as it involves reading an environment variable instead 
of spawning a new subprocess.
   
   ### What you think should happen instead
   
   The triggerer model caches caches either the `SQL_ALCHEMY_CONN` or the 
[db_supports_json](https://github.com/apache/airflow/blob/v2-5-stable/airflow/utils/sqlalchemy.py#L122)
 property.
   
   ### How to reproduce
   
   Simultaneously kick off 100s of DAGs with at least a few deferrable 
operators each and use `SQL_ALCHEMY_CONN_CMD` instead of `SQL_ALCHEMY_CONN`
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-airbyte==3.2.0
   apache-airflow-providers-alibaba==2.2.0
   apache-airflow-providers-amazon==7.3.0
   apache-airflow-providers-apache-beam==4.3.0
   apache-airflow-providers-apache-cassandra==3.1.1
   apache-airflow-providers-apache-drill==2.3.1
   apache-airflow-providers-apache-druid==3.3.1
   apache-airflow-providers-apache-hdfs==3.2.0
   apache-airflow-providers-apache-hive==5.1.3
   apache-airflow-providers-apache-kylin==3.1.0
   apache-airflow-providers-apache-livy==3.3.0
   apache-airflow-providers-apache-pig==4.0.0
   apache-airflow-providers-apache-pinot==4.0.1
   apache-airflow-providers-apache-spark==4.0.0
   apache-airflow-providers-apache-sqoop==3.1.1
   apache-airflow-providers-arangodb==2.1.1
   apache-airflow-providers-asana==2.1.0
   apache-airflow-providers-atlassian-jira==2.0.1
   apache-airflow-providers-celery==3.1.0
   apache-airflow-providers-cloudant==3.1.0
   apache-airflow-providers-cncf-kubernetes==5.2.2
   apache-airflow-providers-common-sql==1.3.4
   apache-airflow-providers-databricks==4.0.0
   apache-airflow-providers-datadog==3.1.0
   apache-airflow-providers-dbt-cloud==3.1.0
   apache-airflow-providers-dingding==3.1.0
   apache-airflow-providers-discord==3.1.0
   apache-airflow-providers-docker==3.5.1
   apache-airflow-providers-elasticsearch==4.4.0
   apache-airflow-providers-exasol==4.1.3
   apache-airflow-providers-facebook==3.1.0
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-github==2.2.1
   apache-airflow-providers-google==8.11.0
   apache-airflow-providers-grpc==3.1.0
   apache-airflow-providers-hashicorp==3.3.0
   apache-airflow-providers-http==4.2.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-influxdb==2.1.0
   apache-airflow-providers-jdbc==3.3.0
   apache-airflow-providers-jenkins==3.2.0
   apache-airflow-providers-microsoft-azure==5.2.1
   apache-airflow-providers-microsoft-mssql==3.3.2
   apache-airflow-providers-microsoft-psrp==2.2.0
   apache-airflow-providers-microsoft-winrm==3.1.1
   apache-airflow-providers-mongo==3.1.1
   apache-airflow-providers-mysql==4.0.2
   apache-airflow-providers-neo4j==3.2.1
   apache-airflow-providers-odbc==3.2.1
   apache-airflow-providers-openfaas==3.1.0
   apache-airflow-providers-opsgenie==5.0.0
   apache-airflow-providers-oracle==3.6.0
   apache-airflow-providers-pagerduty==3.1.0
   apache-airflow-providers-papermill==3.1.1
   apache-airflow-providers-plexus==3.1.0
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-presto==4.2.2
   apache-airflow-providers-qubole==3.3.1
   apache-airflow-providers-redis==3.1.0
   apache-airflow-providers-salesforce==5.3.0
   apache-airflow-providers-samba==4.1.0
   apache-airflow-providers-segment==3.1.0
   apache-airflow-providers-sendgrid==3.1.0
   apache-airflow-providers-sftp==4.2.4
   apache-airflow-providers-singularity==3.1.0
   apache-airflow-providers-slack==7.2.0
   apache-airflow-providers-snowflake==4.0.4
   apache-airflow-providers-sqlite==3.3.1
   apache-airflow-providers-ssh==3.5.0
   apache-airflow-providers-tableau==4.1.0
   apache-airflow-providers-tabular==1.1.0
   apache-airflow-providers-telegram==4.0.0
   apache-airflow-providers-trino==4.3.2
   apache-airflow-providers-vertica==3.3.1
   apache-airflow-providers-yandex==3.3.0
   apache-airflow-providers-zendesk==4.2.0
   
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Chart based on the official helm chart. Airflow running on Google Kubernetes 
Engine (GKE) using `KubernetesExecutor`.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to