[
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yingbo Wang updated AIRFLOW-3964:
---------------------------------
Description:
h2. Problem
h3. Airflow Sensor:
Sensors are a certain type of operator that will keep running until a certain
criterion is met. Examples include a specific file landing in HDFS or S3, a
partition appearing in Hive, or a specific time of the day. Sensors are derived
from BaseSensorOperator and run a poke method at a specified poke_interval
until it returns True.
Airflow Sensor duplication is a normal problem for large scale airflow project.
There are duplicated partitions needing to be detected from same/different DAG.
In Airbnb there are 88 boxes running four different types of sensors everyday.
The number of running sensor tasks ranges from 8k to 16k, which takes great
amount of resources. Although Airflow team had redirected all sensors to a
specific queue to allocate relatively minor resource, there is still large room
to reduce the number of workers and relief DB pressure by optimizing the sensor
mechanism.
Existing sensor implementation creates an identical task for any sensor task
with specific dag_id, task_id and execution_date. This task is responsible of
keeping querying DB until the specified partitions exists. Even if two tasks
are waiting for same partition in DB, they are creating two connections with
the DB and checking the status in two separate processes. In one hand, DB need
to run duplicate jobs in multiple processes which will take both cpu and memory
resources. At the same time, Airflow need to maintain a process for each sensor
to query and wait for the partition/table to be created.
To optimize the sensor, add a hashcode for each partition decided by the set of
(conn_id, schema, table, partition). Add dependencies between qualified sensors
and partitions. Use a single entry for each sensor to query DB and avoid
duplication in Airflow.
Add a sensor scheduling part in scheduler to:
# Check partitions status to enable downstream sensor success and trigger
sensor downstream tasks
# Selecting all pending partitions in DB including:
## New coming partition sensor request
## Existing sensor request that is still waiting
## With a time interval:
### Create the set of tasks for sensing all pending partitions.
### Kill previous sensor tasks
# For the task mentioned in 3: Each task should check many partitions. We can
introduce the sensor chunk number here for a maximum number of partitions one
task should handle. The sensors keep updating partition status in Airflow DB
during running
was:
h2. Problem
h3. Airflow Sensor:
Sensors are a certain type of operator that will keep running until a certain
criterion is met. Examples include a specific file landing in HDFS or S3, a
partition appearing in Hive, or a specific time of the day. Sensors are derived
from BaseSensorOperator and run a poke method at a specified poke_interval
until it returns True.
Airflow Sensor duplication is a normal problem for large scale airflow project.
There are duplicated partitions needing to be detected from same/different DAG.
In Airbnb there are 88 boxes running four different types of sensors everyday.
The number of running sensor tasks ranges from 8k to 16k, which takes great
amount of resources. Although Airflow team had redirected all sensors to a
specific queue to allocate relatively minor resource, there is still large room
to reduce the number of workers and relief DB pressure by optimizing the sensor
mechanism.
Existing sensor implementation creates an identical task for any sensor task
with specific dag_id, task_id and execution_date. This task is responsible of
keeping querying DB until the specified partitions exists. Even if two tasks
are waiting for same partition in DB, they are creating two connections with
the DB and checking the status in two separate processes. In one hand, DB need
to run duplicate jobs in multiple processes which will take both cpu and memory
resources. At the same time, Airflow need to maintain a process for each sensor
to query and wait for the partition/table to be created.
To optimize the sensor, add a hashcode for each partition decided by the set of
(conn_id, schema, table, partition). Add dependencies between qualified sensors
and partitions. Use a single entry for each sensor to query DB and avoid
duplication in Airflow.
Add a sensor scheduling part in scheduler to:
# Check partitions status to enable downstream sensor success and trigger
sensor downstream tasks
# Selecting all pending partitions in DB including:
# New coming partition sensor request
# Existing sensor request that is still waiting
# With a time interval:
# Create the set of tasks for sensing all pending partitions.
# Kill previous sensor tasks
# For the task mentioned in 3: Each task should check many partitions. We can
introduce the sensor chunk number here for a maximum number of partitions one
task should handle. The sensors keep updating partition status in Airflow DB
during running
> Reduce duplicated tasks and optimize with scheduler embedded sensor
> --------------------------------------------------------------------
>
> Key: AIRFLOW-3964
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
> Project: Apache Airflow
> Issue Type: Improvement
> Components: dependencies, operators, scheduler
> Reporter: Yingbo Wang
> Assignee: Yingbo Wang
> Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain
> criterion is met. Examples include a specific file landing in HDFS or S3, a
> partition appearing in Hive, or a specific time of the day. Sensors are
> derived from BaseSensorOperator and run a poke method at a specified
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow
> project. There are duplicated partitions needing to be detected from
> same/different DAG. In Airbnb there are 88 boxes running four different types
> of sensors everyday. The number of running sensor tasks ranges from 8k to
> 16k, which takes great amount of resources. Although Airflow team had
> redirected all sensors to a specific queue to allocate relatively minor
> resource, there is still large room to reduce the number of workers and
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task
> with specific dag_id, task_id and execution_date. This task is responsible of
> keeping querying DB until the specified partitions exists. Even if two tasks
> are waiting for same partition in DB, they are creating two connections with
> the DB and checking the status in two separate processes. In one hand, DB
> need to run duplicate jobs in multiple processes which will take both cpu and
> memory resources. At the same time, Airflow need to maintain a process for
> each sensor to query and wait for the partition/table to be created.
> To optimize the sensor, add a hashcode for each partition decided by the set
> of (conn_id, schema, table, partition). Add dependencies between qualified
> sensors and partitions. Use a single entry for each sensor to query DB and
> avoid duplication in Airflow.
> Add a sensor scheduling part in scheduler to:
> # Check partitions status to enable downstream sensor success and trigger
> sensor downstream tasks
> # Selecting all pending partitions in DB including:
> ## New coming partition sensor request
> ## Existing sensor request that is still waiting
> ## With a time interval:
> ### Create the set of tasks for sensing all pending partitions.
> ### Kill previous sensor tasks
> # For the task mentioned in 3: Each task should check many partitions. We
> can introduce the sensor chunk number here for a maximum number of partitions
> one task should handle. The sensors keep updating partition status in Airflow
> DB during running
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)