ashb opened a new pull request, #22862:
URL: https://github.com/apache/airflow/pull/22862
In writing the docs for Dynamic Task Mapping (AIP-42) I noticed that
there are some cases where users need to use XComArg directly, and it
didn't feel right to make the import things from `airflow.models`.
And I've now refactored the lazy import to be "data-driven" as three
blocks of almost identical code was my limit.
Example dag from the docs (future PR) that inspired this change:
```python
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.models.xcom_arg import XComArg # <--- this I wasn't happy
about
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
with DAG(dag_id="mapped_s3", start_date=datetime(2020,4,7)) as dag:
files = S3ListOperator(
task_id="get_input",
bucket="example-bucket",
prefix='incoming/provider_a/{{
data_interval_start.strftime("%Y-%m-%d") }}',
)
@task
def count_lines(aws_conn_id, bucket, file):
hook = S3Hook(aws_conn_id=aws_conn_id)
return len(hook.read_key(file, bucket).splitlines())
@task
def total(lines):
return sum(lines)
counts = count_lines.partial(aws_conn_id="aws_default",
bucket=files.bucket).expand(file=XComArg(files))
total(lines=counts)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]