[ 
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-3964 started by Yingbo Wang.
--------------------------------------------
> Reduce duplicated tasks and optimize with scheduler embedded sensor 
> --------------------------------------------------------------------
>
>                 Key: AIRFLOW-3964
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: dependencies, operators, scheduler
>            Reporter: Yingbo Wang
>            Assignee: Yingbo Wang
>            Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain 
> criterion is met. Examples include a specific file landing in HDFS or S3, a 
> partition appearing in Hive, or a specific time of the day. Sensors are 
> derived from BaseSensorOperator and run a poke method at a specified 
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow 
> project. There are duplicated partitions needing to be detected from 
> same/different DAG. In Airbnb there are 88 boxes running four different types 
> of sensors everyday. The number of running sensor tasks ranges from 8k to 
> 16k, which takes great amount of resources. Although Airflow team had 
> redirected all sensors to a specific queue to allocate relatively minor 
> resource, there is still large room to reduce the number of workers and 
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task 
> with specific dag_id, task_id and execution_date. This task is responsible of 
> keeping querying DB until the specified partitions exists. Even if two tasks 
> are waiting for same partition in DB, they are creating two connections with 
> the DB and checking the status in two separate processes. In one hand, DB 
> need to run duplicate jobs in multiple processes which will take both cpu and 
> memory resources. At the same time, Airflow need to maintain a process for 
> each sensor to query and wait for the partition/table to be created.
> To optimize the sensor, add a hashcode for each partition decided by the set 
> of (conn_id, schema, table, partition). Add dependencies between qualified 
> sensors and partitions. Use a single entry for each sensor to query DB and 
> avoid duplication in Airflow.
> Add a sensor scheduling part in scheduler to:
>  # Check partitions status to enable downstream sensor success and trigger 
> sensor downstream tasks
>  # Selecting all pending partitions in DB including:
>  ## New coming partition sensor request
>  ## Existing sensor request that is still waiting
>  # With a time interval:
>  ## Create the set of tasks for sensing all pending partitions.
>  ## Kill previous sensor tasks
>  # For the task mentioned in 3: Each task should check many partitions. We 
> can introduce the sensor chunk number here for a maximum number of partitions 
> one task should handle. The sensors keep updating partition status in Airflow 
> DB during running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to