[ 
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingbo Wang updated AIRFLOW-3964:
---------------------------------
    Description: 
h2. Problem
h3. Airflow Sensor:

Sensors are a certain type of operator that will keep running until a certain 
criterion is met. Examples include a specific file landing in HDFS or S3, a 
partition appearing in Hive, or a specific time of the day. Sensors are derived 
from BaseSensorOperator and run a poke method at a specified poke_interval 
until it returns True.

Airflow Sensor duplication is a normal problem for large scale airflow project. 
There are duplicated partitions needing to be detected from same/different DAG. 
In Airbnb there are 88 boxes running four different types of sensors everyday. 
The number of running sensor tasks ranges from 8k to 16k, which takes great 
amount of resources. Although Airflow team had redirected all sensors to a 
specific queue to allocate relatively minor resource, there is still large room 
to reduce the number of workers and relief DB pressure by optimizing the sensor 
mechanism.

Existing sensor implementation creates an identical task for any sensor task 
with specific dag_id, task_id and execution_date. This task is responsible of 
keeping querying DB until the specified partitions exists. Even if two tasks 
are waiting for same partition in DB, they are creating two connections with 
the DB and checking the status in two separate processes. In one hand, DB need 
to run duplicate jobs in multiple processes which will take both cpu and memory 
resources. At the same time, Airflow need to maintain a process for each sensor 
to query and wait for the partition/table to be created.

To optimize the sensor, add a hashcode for each partition decided by the set of 
(conn_id, schema, table, partition). Add dependencies between qualified sensors 
and partitions. Use a single entry for each sensor to query DB and avoid 
duplication in Airflow.

Add a sensor scheduling part in scheduler to:
 # Check partitions status to enable downstream sensor success and trigger 
sensor downstream tasks
 # Selecting all pending partitions in DB including:
 # New coming partition sensor request
 # Existing sensor request that is still waiting


 # With a time interval:
 # Create the set of tasks for sensing all pending partitions.
 # Kill previous sensor tasks


 # For the task mentioned in 3: Each task should check many partitions. We can 
introduce the sensor chunk number here for a maximum number of partitions one 
task should handle. The sensors keep updating partition status in Airflow DB 
during running

  was:
h2. Problem
h3. Airflow Sensor:

Sensors are a certain type of operator that will keep running until a certain 
criterion is met. Examples include a specific file landing in HDFS or S3, a 
partition appearing in Hive, or a specific time of the day. Sensors are derived 
from BaseSensorOperator and run a poke method at a specified poke_interval 
until it returns True.

Airflow Sensor duplication is a normal problem for large scale airflow project. 
There are duplicated partitions needing to be detected from same/different DAG. 
In Airbnb there are 88 boxes running four different types of sensors everyday. 
The number of running sensor tasks ranges from 8k to 16k, which takes great 
amount of resources. Although Airflow team had redirected all sensors to a 
specific queue to allocate relatively minor resource, there is still large room 
to reduce the number of workers and relief DB pressure by optimizing the sensor 
mechanism.

Existing sensor implementation creates an identical task for any sensor task 
with specific dag_id, task_id and execution_date. This task is responsible of 
keeping querying DB until the specified partitions exists. Even if two tasks 
are waiting for same partition in DB, they are creating two connections with 
the DB and checking the status in two separate processes. In one hand, DB need 
to run duplicate jobs in multiple processes which will take both cpu and memory 
resources. At the same time, Airflow need to maintain a process for each sensor 
to query and wait for the partition/table to be created. 

Airflow Scheduler: 

Airflow scheduler is responsible of parsing DAGs and scheduling airflow tasks. 
The jobs.process_file function process all python file that have “airflow” and 
“Dag”:
 # Execute the file and look for DAG objects in the namespace.
 # Pickle the DAG and save it to the DB (if necessary).
 # For each DAG, see what tasks should run and create appropriate task 
instances in the DB.
 # Record any errors importing the file into ORM
 # Kill (in ORM) any task instances belonging to the DAGs that haven't issued a 
heartbeat in a while.

This function returns a list of SimpleDag objects that represent the DAGs found 
in the file

There are some issues with existing Airflow scheduler:
 # Multiple parsing: Scheduler will parse a DAG file and use the information to 
kicked off dag_run and check a DAG dependency to schedule runnable tasks. 
However the dependency checking does not need to be parsed multiple times since 
the kicking off of dag_run got dependencies of all tasks and should be able to 
use these information later. Scheduler is over parsing information now.
 # Scheduler/worker tasks inconsistency. This is a problem related with the 
first one. Since the scheduler keeps parsing the DAG file and always reflects 
the latest DAG definition. It is possible to generate inconsistency issue if a 
DAG was modified before its existing dag_run completed. When a dag_run instance 
was kicked off by an older version of DAG, the worker should be on the same 
page with older DAG version by syncing with Airflow DB. However the scheduler 
is alway parsing the latest DAG version, which cause an inconsistency between 
scheduler and workers. It is a know issue in Airbnb airflow project that some 
tasks were stuck for this reason. 
 # User rerun problem. Airflow rerun has similar issue as problem 2. There are 
also some DAG generated dynamically and have different list of tasks for 
different execution_date. But some DAG are improved and may want to backfill 
new update for the dag_run history. It is reasonable to allow users decide if 
they want to pick up an updated dag_run or repeat from older version.
 # Runtime error of one dag_run instance can affect multiple DAGs and all their 
dag_runs. The jobs.process_file function works on a file path and processes a 
set of DAGs. The company have experienced multiple times of incident caused by 
an invalid value in one dag_run at runtime. The runtime error broke all DAGs 
and their dag_runs that belong to the same file path.
 # Airflow UI have trouble in showing large DAG since the Airflow webserver has 
to parse DAG and some complicated DAG will take really long time. To pick up 
any update such as DAG modification and new DAGs, the webserver has to be 
restarted periodically. With more and more DAGs added to the cluster, Airflow 
webserver take much longer time for each round of parsing and same as problem 
4, the process can be easily broken and result in a UI error.


> Reduce duplicated tasks and optimize with scheduler embedded sensor 
> --------------------------------------------------------------------
>
>                 Key: AIRFLOW-3964
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: dependencies, operators, scheduler
>            Reporter: Yingbo Wang
>            Assignee: Yingbo Wang
>            Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain 
> criterion is met. Examples include a specific file landing in HDFS or S3, a 
> partition appearing in Hive, or a specific time of the day. Sensors are 
> derived from BaseSensorOperator and run a poke method at a specified 
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow 
> project. There are duplicated partitions needing to be detected from 
> same/different DAG. In Airbnb there are 88 boxes running four different types 
> of sensors everyday. The number of running sensor tasks ranges from 8k to 
> 16k, which takes great amount of resources. Although Airflow team had 
> redirected all sensors to a specific queue to allocate relatively minor 
> resource, there is still large room to reduce the number of workers and 
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task 
> with specific dag_id, task_id and execution_date. This task is responsible of 
> keeping querying DB until the specified partitions exists. Even if two tasks 
> are waiting for same partition in DB, they are creating two connections with 
> the DB and checking the status in two separate processes. In one hand, DB 
> need to run duplicate jobs in multiple processes which will take both cpu and 
> memory resources. At the same time, Airflow need to maintain a process for 
> each sensor to query and wait for the partition/table to be created.
> To optimize the sensor, add a hashcode for each partition decided by the set 
> of (conn_id, schema, table, partition). Add dependencies between qualified 
> sensors and partitions. Use a single entry for each sensor to query DB and 
> avoid duplication in Airflow.
> Add a sensor scheduling part in scheduler to:
>  # Check partitions status to enable downstream sensor success and trigger 
> sensor downstream tasks
>  # Selecting all pending partitions in DB including:
>  # New coming partition sensor request
>  # Existing sensor request that is still waiting
>  # With a time interval:
>  # Create the set of tasks for sensing all pending partitions.
>  # Kill previous sensor tasks
>  # For the task mentioned in 3: Each task should check many partitions. We 
> can introduce the sensor chunk number here for a maximum number of partitions 
> one task should handle. The sensors keep updating partition status in Airflow 
> DB during running



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to