[
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Work on AIRFLOW-3964 started by Yingbo Wang.
--------------------------------------------
> Reduce duplicated tasks and optimize with scheduler embedded sensor
> --------------------------------------------------------------------
>
> Key: AIRFLOW-3964
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
> Project: Apache Airflow
> Issue Type: Improvement
> Components: dependencies, operators, scheduler
> Reporter: Yingbo Wang
> Assignee: Yingbo Wang
> Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain
> criterion is met. Examples include a specific file landing in HDFS or S3, a
> partition appearing in Hive, or a specific time of the day. Sensors are
> derived from BaseSensorOperator and run a poke method at a specified
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow
> project. There are duplicated partitions needing to be detected from
> same/different DAG. In Airbnb there are 88 boxes running four different types
> of sensors everyday. The number of running sensor tasks ranges from 8k to
> 16k, which takes great amount of resources. Although Airflow team had
> redirected all sensors to a specific queue to allocate relatively minor
> resource, there is still large room to reduce the number of workers and
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task
> with specific dag_id, task_id and execution_date. This task is responsible of
> keeping querying DB until the specified partitions exists. Even if two tasks
> are waiting for same partition in DB, they are creating two connections with
> the DB and checking the status in two separate processes. In one hand, DB
> need to run duplicate jobs in multiple processes which will take both cpu and
> memory resources. At the same time, Airflow need to maintain a process for
> each sensor to query and wait for the partition/table to be created.
> To optimize the sensor, add a hashcode for each partition decided by the set
> of (conn_id, schema, table, partition). Add dependencies between qualified
> sensors and partitions. Use a single entry for each sensor to query DB and
> avoid duplication in Airflow.
> Add a sensor scheduling part in scheduler to:
> # Check partitions status to enable downstream sensor success and trigger
> sensor downstream tasks
> # Selecting all pending partitions in DB including:
> ## New coming partition sensor request
> ## Existing sensor request that is still waiting
> # With a time interval:
> ## Create the set of tasks for sensing all pending partitions.
> ## Kill previous sensor tasks
> # For the task mentioned in 3: Each task should check many partitions. We
> can introduce the sensor chunk number here for a maximum number of partitions
> one task should handle. The sensors keep updating partition status in Airflow
> DB during running.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)