This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch aip-99-blog-posts
in repository https://gitbox.apache.org/repos/asf/airflow-site.git

commit c1ca856c99150b0d5fcc208c6d9b7d49a65e3621
Author: Vikram Koka <[email protected]>
AuthorDate: Mon Apr 13 17:25:42 2026 -0700

    Blog posts to go with the common.ai provider
    
    There are two blog posts to go along with the common.ai provider release.
    
    The first highlights the use case of the common.ai provider with a 
relatively straight forward scenario based on the Airflow survey data.
    
    The second builds on it with a more complex agentic workflow which covers 
typical agentic patterns achievable through Airflow using this provider.
---
 .../en/blog/ai-survey-analysis-pipelines/index.md  | 345 +++++++++++++++++++++
 .../blog/blog-agentic-workloads-airflow-3/index.md | 336 ++++++++++++++++++++
 2 files changed, 681 insertions(+)

diff --git 
a/landing-pages/site/content/en/blog/ai-survey-analysis-pipelines/index.md 
b/landing-pages/site/content/en/blog/ai-survey-analysis-pipelines/index.md
new file mode 100644
index 0000000000..b705baf79a
--- /dev/null
+++ b/landing-pages/site/content/en/blog/ai-survey-analysis-pipelines/index.md
@@ -0,0 +1,345 @@
+---
+title: "Ask Your Survey Anything: Building AI Analysis Pipelines with Airflow 
3"
+linkTitle: "AI Survey Analysis Pipelines with Airflow 3"
+authors:
+  - name: "Vikram Koka"
+    github: "vikramkoka"
+    linkedin: "vikramkoka"
+description: "A walkthrough of two natural language analysis pipelines over 
the 2025 Airflow Community Survey — an interactive human-in-the-loop version 
and a fully automated scheduled version — using operators from the common.ai 
and common.sql providers."
+tags: [Community, Tutorial]
+date: "2026-04-XX"
+images: ["/blog/ai-survey-analysis-pipelines/images/survey-pipeline-dag.png"]
+---
+
+The [2025 Airflow Community Survey](https://airflow.apache.org/survey/) 
collected responses
+from nearly 6,000 practitioners across 168 questions. You can open a 
spreadsheet and filter,
+or write SQL by hand. But what if you could just ask a question and have 
Airflow figure out
+the query, run it, and bring the result back for your approval?
+
+This post builds two pipelines that do exactly that, using the
+[`apache-airflow-providers-common-ai`](https://airflow.apache.org/blog/common-ai-provider/)
+provider released with Airflow 3.
+
+The first pipeline is **interactive**: a human reviews the question before it 
reaches the LLM
+and approves the result before the DAG finishes. The second is **scheduled**: 
it downloads
+fresh survey data, validates the schema, runs the query unattended, and emails 
the result.
+
+If you haven't seen the common.ai provider overview yet, start there for a 
tour of all the
+operators. This post goes deep on a concrete end-to-end example.
+
+
+## Two Pipelines, One Example File
+
+Both DAGs live in
+[`example_llm_survey_analysis.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py)
+and share the same schema context and datasource configuration.
+
+**`example_llm_survey_interactive`** — trigger manually, review at both ends:
+
+```
+prompt_confirmation  →  generate_sql  →  run_query  →  extract_data  →  
result_confirmation
+(HITLEntryOperator)     (LLMSQLQuery)    (Analytics)    (@task)          
(ApprovalOperator)
+```
+
+**`example_llm_survey_scheduled`** — runs `@monthly`, no human in the loop:
+
+```
+download_survey  →  prepare_csv  →  check_schema  →  generate_sql  →  
run_query  →  extract_data  →  send_result
+(HttpOperator)      (@task)         (LLMSchema       (LLMSQLQuery)    
(Analytics)    (@task)          (@task / Email)
+                                     Compare)
+```
+
+
+## The Data
+
+The [Airflow Community Survey 2025](https://airflow.apache.org/survey/) CSV 
has 5,856 rows
+and 168 columns covering everything from Airflow version and executor type to 
cloud provider,
+company size, and AI tool usage. A few highlights from the data:
+
+- **3,320** respondents identify as Data Engineers
+- **2,032** use AWS as their primary cloud provider for Airflow
+- **1,445** are already running Airflow 3
+- **1,351** say they *often* use AI tools to write Airflow code
+
+Those last two numbers together are part of why this example exists: the 
people most likely
+to use this pipeline are already using Airflow 3 and already using AI in their 
workflow.
+
+> **Data prep note:** Apache DataFusion is strict about CSV schemas. The raw 
survey export
+> has 22 duplicate `"Other"` column names and some free-text cells with 
embedded newlines.
+> Both need cleaning before DataFusion will parse the file. The interactive 
DAG assumes a
+> cleaned copy at the path set by the `SURVEY_CSV_PATH` environment variable. 
The scheduled
+> DAG downloads the file at runtime and the `prepare_csv` step handles writing 
it to disk.
+
+
+## The Interactive Pipeline
+
+Five tasks. No external services beyond your LLM provider and a local copy of 
the CSV.
+
+| Step | Operator | What happens |
+|---|---|---|
+| 1 | `HITLEntryOperator` | DAG pauses. Human reviews and optionally edits the 
question. |
+| 2 | `LLMSQLQueryOperator` | LLM translates the confirmed question into SQL, 
validated by sqlglot. |
+| 3 | `AnalyticsOperator` | Apache DataFusion executes the SQL against the 
local CSV. |
+| 4 | `@task extract_data` | Strips the query from the JSON result — reviewer 
sees only data rows. |
+| 5 | `ApprovalOperator` | DAG pauses again. Human approves or rejects the 
result. |
+
+The LLM and DataFusion steps run unattended. The human shows up at step 1 to 
confirm the
+question and at step 5 to sign off on the answer. Everything in between is 
automated.
+
+```python
+@dag(schedule=None)
+def example_llm_survey_interactive():
+
+    prompt_confirmation = HITLEntryOperator(
+        task_id="prompt_confirmation",
+        subject="Review the survey analysis question",
+        params={
+            "prompt": Param(
+                "How does AI tool usage for writing Airflow code compare 
between Airflow 3 users and Airflow 2 users?",
+                type="string",
+                description="The natural language question to answer via SQL",
+            )
+        },
+        response_timeout=datetime.timedelta(hours=1),
+    )
+
+    generate_sql = LLMSQLQueryOperator(
+        task_id="generate_sql",
+        prompt="{{ 
ti.xcom_pull(task_ids='prompt_confirmation')['params_input']['prompt'] }}",
+        llm_conn_id="pydanticai_default",
+        datasource_config=survey_datasource,
+        schema_context=SURVEY_SCHEMA,
+    )
+
+    run_query = AnalyticsOperator(
+        task_id="run_query",
+        datasource_configs=[survey_datasource],
+        queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
+        result_output_format="json",
+    )
+
+    @task
+    def extract_data(raw: str) -> str:
+        results = json.loads(raw)
+        data = [row for item in results for row in item["data"]]
+        return json.dumps(data, indent=2)
+
+    result_data = extract_data(run_query.output)
+
+    result_confirmation = ApprovalOperator(
+        task_id="result_confirmation",
+        subject="Review the survey query result",
+        body="{{ ti.xcom_pull(task_ids='extract_data') }}",
+        response_timeout=datetime.timedelta(hours=1),
+    )
+
+    prompt_confirmation >> generate_sql >> run_query >> result_data >> 
result_confirmation
+```
+
+
+## Walking Through a Run
+
+**Step 1 — Prompt confirmation.** Trigger the DAG and navigate to the HITL 
review tab.
+The default question appears in an editable field. Change it to anything the 
schema supports,
+or leave it as-is and confirm.
+
+> *"How does AI tool usage for writing Airflow code compare between Airflow 3 
users and Airflow 2 users?"*
+
+**Step 2 — SQL generation.** `LLMSQLQueryOperator` receives the confirmed 
question, constructs
+a system prompt from `SURVEY_SCHEMA`, and calls the LLM. It returns validated 
SQL — sqlglot
+parses the output and rejects anything that isn't a `SELECT`. The generated 
query goes to XCom.
+
+```sql
+SELECT
+    "Which version of Airflow do you currently use?"                           
     AS airflow_version,
+    "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing 
Airflow code?" AS ai_usage,
+    COUNT(*) AS respondents
+FROM survey
+WHERE "Which version of Airflow do you currently use?" IS NOT NULL
+  AND "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in 
writing Airflow code?" IS NOT NULL
+GROUP BY airflow_version, ai_usage
+ORDER BY airflow_version, respondents DESC
+```
+
+**Step 3 — DataFusion execution.** `AnalyticsOperator` loads the CSV into a 
DataFusion
+`SessionContext`, registers it as the `survey` table, and executes the SQL 
in-process.
+No database server, no network call. The 5,856-row CSV runs in under a second.
+
+**Step 4 — Extract data.** The raw JSON from `AnalyticsOperator` includes the 
original
+query string alongside the result rows. This `@task` strips the query so the 
reviewer
+isn't looking at SQL when they should be looking at data.
+
+**Step 5 — Result confirmation.** The data rows appear in the Airflow UI 
approval dialog.
+The analyst reads the result, clicks Approve (or Reject if something looks 
off), and the
+DAG completes.
+
+
+## The Scheduled Pipeline
+
+The scheduled variant adds three capabilities the interactive version 
intentionally omits:
+data acquisition, schema validation, and result delivery. It runs `@monthly` 
(configurable)
+with no human steps.
+
+| Step | Operator | What happens |
+|---|---|---|
+| 1 | `HttpOperator` | Downloads the survey CSV from `airflow.apache.org`. |
+| 2 | `@task prepare_csv` | Writes the CSV to disk and generates a reference 
schema file from `SURVEY_SCHEMA`. |
+| 3 | `LLMSchemaCompareOperator` | LLM compares the downloaded CSV schema 
against the reference. Raises if critical columns are missing or renamed. |
+| 4 | `LLMSQLQueryOperator` | Translates the fixed question into validated 
SQL. |
+| 5 | `AnalyticsOperator` | Executes the SQL via DataFusion. |
+| 6 | `@task extract_data` | Extracts data rows from the JSON result. |
+| 7 | `@task send_result` | Sends the result via `EmailOperator` if 
`SMTP_CONN_ID` and `NOTIFY_EMAIL` are set, otherwise logs to the task log. |
+
+The schema check at step 3 is worth calling out. `LLMSchemaCompareOperator` 
compares the
+live download against a reference file derived from `SURVEY_SCHEMA`. If the 
survey format
+changes between runs — a renamed column, a dropped field — the operator 
catches it before
+any SQL runs, rather than failing silently mid-pipeline with a cryptic 
DataFusion error.
+
+```python
+@dag(schedule="@monthly", start_date=None)
+def example_llm_survey_scheduled():
+
+    from airflow.providers.http.operators.http import HttpOperator
+
+    download_survey = HttpOperator(
+        task_id="download_survey",
+        http_conn_id="airflow_website",
+        endpoint="/survey/airflow-user-survey-2025.csv",
+        method="GET",
+        response_filter=lambda r: r.text,
+        log_response=False,
+    )
+
+    @task
+    def prepare_csv(csv_text: str) -> None:
+        import csv as csv_mod, os
+        os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True)
+        with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f:
+            f.write(csv_text)
+        columns = [line.split('"')[1] for line in 
SURVEY_SCHEMA.strip().splitlines() if '"' in line]
+        with open(REFERENCE_CSV_PATH, "w", newline="", encoding="utf-8") as 
ref:
+            csv_mod.writer(ref).writerow(columns)
+
+    csv_ready = prepare_csv(download_survey.output)
+
+    check_schema = LLMSchemaCompareOperator(
+        task_id="check_schema",
+        prompt="Compare the survey CSV schema against the reference. Flag 
missing or renamed columns.",
+        llm_conn_id="pydanticai_default",
+        data_sources=[survey_datasource, reference_datasource],
+        context_strategy="basic",
+    )
+    csv_ready >> check_schema
+
+    generate_sql = LLMSQLQueryOperator(
+        task_id="generate_sql",
+        prompt=SCHEDULED_PROMPT,
+        llm_conn_id="pydanticai_default",
+        datasource_config=survey_datasource,
+        schema_context=SURVEY_SCHEMA,
+    )
+    check_schema >> generate_sql
+
+    run_query = AnalyticsOperator(
+        task_id="run_query",
+        datasource_configs=[survey_datasource],
+        queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
+        result_output_format="json",
+    )
+
+    @task
+    def extract_data(raw: str) -> str:
+        results = json.loads(raw)
+        data = [row for item in results for row in item["data"]]
+        return json.dumps(data, indent=2)
+
+    result_data = extract_data(run_query.output)
+
+    @task
+    def send_result(data: str) -> None:
+        if SMTP_CONN_ID and NOTIFY_EMAIL:
+            from airflow.providers.smtp.operators.smtp import EmailOperator
+            EmailOperator(
+                task_id="_send_email",
+                smtp_conn_id=SMTP_CONN_ID,
+                to=NOTIFY_EMAIL,
+                subject=f"Airflow Survey Analysis: {SCHEDULED_PROMPT}",
+                html_content=f"<pre>{data}</pre>",
+            ).execute({})
+        else:
+            print(f"Survey analysis result:\n{data}")
+
+    generate_sql >> run_query >> result_data >> send_result(result_data)
+```
+
+
+## Connecting Your LLM
+
+Both DAGs use `llm_conn_id="pydanticai_default"`. Create a connection in the 
Airflow UI:
+
+| Provider | Connection type | Required fields |
+|---|---|---|
+| OpenAI | `pydanticai` | Password: API key. Extra: `{"model": 
"openai:gpt-4o"}` |
+| Anthropic | `pydanticai` | Password: API key. Extra: `{"model": 
"anthropic:claude-haiku-4-5-20251001"}` |
+| Google Vertex | `pydanticai-vertex` | Extra: `{"model": 
"google-vertex:gemini-2.0-flash", "project": "...", "vertexai": true}` |
+| AWS Bedrock | `pydanticai-bedrock` | Extra: `{"model": 
"bedrock:us.anthropic.claude-opus-4-5", "region_name": "us-east-1"}` |
+
+Switch providers by changing the connection — neither DAG requires any code 
changes.
+
+For the scheduled DAG, also create an HTTP connection named `airflow_website` 
with host
+`https://airflow.apache.org` (no auth required), and optionally set the 
`SMTP_CONN_ID`
+and `NOTIFY_EMAIL` environment variables to enable email delivery.
+
+
+## What This Shows
+
+Four capabilities come together across these two pipelines that haven't been 
easy to combine before:
+
+**Natural language as the interface.** Neither pipeline requires the analyst 
to write SQL.
+`LLMSQLQueryOperator` handles schema awareness, column quoting, and query 
structure. The
+`SURVEY_SCHEMA` context is the only thing it needs.
+
+**In-process query execution.** `AnalyticsOperator` runs Apache DataFusion 
inside the Airflow
+worker. There's no database to configure, no connection to manage for the data 
itself. Point
+it at a file URI and it runs.
+
+**Schema-aware data validation.** `LLMSchemaCompareOperator` uses an LLM to 
compare schemas
+and surface structural changes in plain language — not a column count diff, 
but an explanation
+of what changed and why it matters for downstream queries. It turns a silent 
mid-pipeline
+failure into an early, actionable error.
+
+**Human oversight without blocking automation.** The `HITLEntryOperator` and 
`ApprovalOperator`
+are standard Airflow operators from 
`airflow.providers.standard.operators.hitl`. They have no
+AI imports — they just pause the DAG and wait. The interactive pipeline uses 
them at both ends;
+the scheduled pipeline skips them entirely. Adding or removing human review 
requires no changes
+to the LLM or DataFusion steps.
+
+
+## Try It
+
+Both DAGs are in the `common.ai` provider example DAGs:
+[`example_llm_survey_analysis.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py).
+
+```bash
+pip install 'apache-airflow-providers-common-ai' \
+            'apache-airflow-providers-common-sql[datafusion]' \
+            'apache-airflow-providers-http' \
+            'apache-airflow-providers-smtp'   # optional, for email delivery
+```
+
+Requires Apache Airflow 3.0+. `apache-airflow-providers-standard` (which 
provides
+`HITLEntryOperator` and `ApprovalOperator`) ships with Airflow 3 and does not 
need
+a separate install.
+
+For the interactive DAG: set `SURVEY_CSV_PATH` to your local copy of the 
survey CSV, create
+a `pydanticai_default` connection, and trigger 
`example_llm_survey_interactive`.
+
+For the scheduled DAG: create the `airflow_website` HTTP connection, set 
`SMTP_CONN_ID` and
+`NOTIFY_EMAIL` if you want email delivery, and trigger 
`example_llm_survey_scheduled`.
+
+To go further, the follow-on post [Agentic Workloads on Airflow 
3](https://airflow.apache.org/blog/agentic-workloads-airflow-3/)
+extends this example into a multi-query synthesis pattern — answering 
questions that require
+querying several dimensions in parallel and synthesizing the results with a 
second LLM call.
+
+Questions, feedback, and survey queries that stumped the LLM are all welcome on
+[Airflow Slack](https://s.apache.org/airflow-slack) in `#airflow-ai`.
diff --git 
a/landing-pages/site/content/en/blog/blog-agentic-workloads-airflow-3/index.md 
b/landing-pages/site/content/en/blog/blog-agentic-workloads-airflow-3/index.md
new file mode 100644
index 0000000000..54110b9d01
--- /dev/null
+++ 
b/landing-pages/site/content/en/blog/blog-agentic-workloads-airflow-3/index.md
@@ -0,0 +1,336 @@
+---
+title: "Agentic Workloads on Airflow: Observable, Retryable, and Auditable by 
Design"
+linkTitle: "Agentic Workloads on Airflow 3"
+authors:
+  - name: "Vikram Koka"
+    github: "vikramkoka"
+    linkedin: "vikramkoka"
+description: "How Dynamic Task Mapping and the common.ai provider turn a 
multi-dimensional research question into a fan-out/fan-in pipeline where every 
LLM call is a named, logged, independently retryable task — not a hidden step 
inside a reasoning loop."
+tags: [Community, Tutorial]
+date: "2026-04-XX"
+images: ["/blog/agentic-workloads-airflow-3/images/agentic-fanout-dag.png"]
+---
+
+A question like "How does AI tool usage vary across Airflow versions?" has a 
natural SQL shape: one cross-tabulation, one result. A question like "What does 
a typical Airflow deployment look like for practitioners who are actively using 
AI in their workflow?" does not. It requires querying executor type, deployment 
method, cloud provider, and Airflow version independently, each filtered to the 
same respondent group, then synthesizing the results into a coherent picture. 
No single query r [...]
+
+This is where Airflow's agentic pattern begins: not when you add an LLM to a 
workflow, but when the structure of the work itself depends on running multiple 
LLM calls whose outputs feed a synthesis step. This post builds that pattern 
using the [2025 Airflow Community Survey](https://airflow.apache.org/survey/) 
data set and the 
[`apache-airflow-providers-common-ai`](https://airflow.apache.org/blog/common-ai-provider/)
 provider released with Airflow 3.
+
+If you haven't read the [introductory survey analysis 
post](https://airflow.apache.org/blog/ai-survey-analysis-pipelines/) yet, start 
there for a walkthrough of the single-query interactive and scheduled 
pipelines. This post picks up where that one ends.
+
+
+## The Agentic Gap in the Single-Query Pattern
+
+The interactive and scheduled survey DAGs from the introductory post each do 
one thing: translate a natural language question into SQL, execute it against 
the CSV, and return the result. The LLM is involved once. The structure of the 
pipeline does not change based on what that LLM call returns.
+
+That is not a limitation to fix — it is the right design for that class of 
question. For a large fraction of production AI workflows, a single 
well-structured LLM call with good context is sufficient and preferable.
+
+The pattern becomes agentic when two things are true simultaneously:
+
+1. The question requires querying multiple independent dimensions
+2. The synthesis step — the thing that produces the final answer — depends on 
*all* of those results
+
+In an agent harness framework, this would be handled inside a reasoning loop: 
the LLM decides to call a tool, receives a result, calls another tool, 
accumulates context, and eventually produces a synthesis. Each tool call is 
invisible to any outside observer. If one tool call fails, the loop either 
retries internally or fails entirely.
+
+In Airflow, the same logic takes a different shape. Each sub-query becomes a 
named task. The fan-out is Dynamic Task Mapping. The synthesis is a named task 
with its inputs in XCom. Every step is observable, independently retryable, and 
logged.
+
+
+## The DAG: `example_llm_survey_agentic`
+
+The full DAG is in
+[`example_llm_survey_agentic.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py).
+
+**Question:** *"What does a typical Airflow deployment look like for 
practitioners who actively use AI tools in their workflow?"*
+
+**Task graph:**
+
+```
+decompose_question  →  generate_sql (×4)  →  wrap_query (×4)  →  run_query (×4)
+   (@task)              (LLMSQLQuery,          (@task,              (Analytics,
+                         mapped)                mapped)              mapped)
+                                                                         ↓
+                                                                  
collect_results
+                                                                     (@task)
+                                                                         ↓
+                                                                  
synthesize_answer
+                                                                    
(LLMOperator)
+                                                                         ↓
+                                                                  
result_confirmation
+                                                                   
(ApprovalOperator)
+```
+
+Seven tasks. Four of them run in parallel. Two LLM calls total — one for SQL 
generation (four instances), one for synthesis. One human review at the end.
+
+| Step | Operator | What happens |
+|---|---|---|
+| 1 | `@task decompose_question` | Returns a list of four sub-questions, one 
per dimension. |
+| 2 | `LLMSQLQueryOperator` (mapped ×4) | Each sub-question becomes one SQL 
query, translated and validated in parallel. |
+| 3 | `@task wrap_query` (mapped ×4) | Wraps each SQL string into a 
single-element list for `AnalyticsOperator`. |
+| 4 | `AnalyticsOperator` (mapped ×4) | Apache DataFusion executes all four 
queries in parallel against the local CSV. |
+| 5 | `@task collect_results` | Gathers the four JSON results and labels each 
by dimension. |
+| 6 | `LLMOperator` | Reads all four labeled result sets and writes a 
narrative characterization. |
+| 7 | `ApprovalOperator` | Human reviews the synthesized narrative before the 
DAG completes. |
+
+
+## Decomposing the Question
+
+`decompose_question` is a plain `@task` that returns the list of 
sub-questions. In this example, the list is static — the four dimensions are 
hardcoded as strings:
+
+```python
+@task
+def decompose_question() -> list[str]:
+    return [
+        (
+            "Among respondents who use AI/LLM tools to write Airflow code, "
+            "what executor types (CeleryExecutor, KubernetesExecutor, 
LocalExecutor) "
+            "are most commonly enabled? Return the count per executor type."
+        ),
+        (
+            "Among respondents who use AI/LLM tools to write Airflow code, "
+            "how do they deploy Airflow? Return the count per deployment 
method."
+        ),
+        (
+            "Among respondents who use AI/LLM tools to write Airflow code, "
+            "which cloud providers are most commonly used for Airflow? "
+            "Return the count per cloud provider."
+        ),
+        (
+            "Among respondents who use AI/LLM tools to write Airflow code, "
+            "what version of Airflow are they currently running? "
+            "Return the count per version."
+        ),
+    ]
+
+sub_questions = decompose_question()
+```
+
+The output of this task — a list of four strings — becomes the input to the 
`expand()` call on `LLMSQLQueryOperator` in the next step. Airflow creates one 
mapped task instance per list element.
+
+> **Why keep this static?** A dynamic version — where the LLM itself 
decomposes the high-level question into sub-questions at runtime — is possible 
and more agentic. It adds an LLM call before any SQL runs, which introduces 
latency and a failure point early in the graph. For a first example, static 
decomposition is clearer. The dynamic variant is a follow-on pattern.
+
+
+## SQL Generation: Mapping Over Sub-Questions
+
+`LLMSQLQueryOperator.partial().expand()` creates one mapped task instance per 
sub-question. All four run in parallel, each translating one natural language 
question into validated SQL against the survey schema:
+
+```python
+generate_sql = LLMSQLQueryOperator.partial(
+    task_id="generate_sql",
+    llm_conn_id=LLM_CONN_ID,
+    datasource_config=survey_datasource,
+    schema_context=SURVEY_SCHEMA,
+).expand(prompt=sub_questions)
+```
+
+In the Airflow UI, this renders as four task instances: `generate_sql[0]`, 
`generate_sql[1]`, `generate_sql[2]`, `generate_sql[3]`. Each has its own log, 
retry counter, and XCom entry. This is what an agent harness's parallel tool 
calls look like when they are made explicit.
+
+Each instance returns a single SQL string. `LLMSQLQueryOperator` validates the 
output with sqlglot before returning it — anything that is not a `SELECT` 
statement is rejected.
+
+
+## The `wrap_query` Bridge
+
+`AnalyticsOperator` expects `queries: list[str]` — a list because it can run 
multiple queries in one execution. `LLMSQLQueryOperator` returns a single 
`str`. A small `@task` bridges the interface:
+
+```python
+@task
+def wrap_query(sql: str) -> list[str]:
+    return [sql]
+
+wrapped_queries = wrap_query.expand(sql=generate_sql.output)
+```
+
+This step is an implementation detail, not a conceptual one. Four mapped 
instances of `wrap_query` run in parallel, each converting one SQL string into 
a one-element list. The result is four `list[str]` values that 
`AnalyticsOperator` can consume directly.
+
+
+## Parallel Execution via DataFusion
+
+```python
+run_query = AnalyticsOperator.partial(
+    task_id="run_query",
+    datasource_configs=[survey_datasource],
+    result_output_format="json",
+).expand(queries=wrapped_queries)
+```
+
+Four mapped instances of `AnalyticsOperator` run in parallel. Each loads the 
survey CSV into an Apache DataFusion `SessionContext` in-process and executes 
its SQL against it. No database server, no shared state between instances.
+
+This is where independent retry earns its value. If the cloud provider query 
returns a DataFusion error due to a null value in that column, only 
`run_query[2]` fails. `run_query[0]`, `run_query[1]`, and `run_query[3]` have 
already succeeded and their results are in XCom. When `run_query[2]` is cleared 
and retried, the other three results are preserved.
+
+
+## Collecting and Labeling Results
+
+`collect_results` gathers all four outputs from `run_query` — Airflow passes 
the list of mapped outputs directly — and labels each one by dimension key:
+
+```python
+# DIMENSION_KEYS = ["executor", "deployment", "cloud", "airflow_version"]
+# Order must match the sub-questions returned by decompose_question.
+# Airflow preserves mapped task output ordering by index, so this zip is safe.
+
+@task(trigger_rule="all_success")
+def collect_results(results: list[str]) -> dict:
+    labeled: dict[str, list] = {}
+    for key, raw in zip(DIMENSION_KEYS, results):
+        items = json.loads(raw)
+        data = [row for item in items for row in item["data"]]
+        labeled[key] = data
+    return labeled
+
+collected = collect_results(run_query.output)
+```
+
+The output is a dict like:
+
+```json
+{
+  "executor": [{"KubernetesExecutor": "Yes", "count": 847}, ...],
+  "deployment": [{"How do you deploy Airflow?": "Managed Cloud Service", 
"count": 1203}, ...],
+  "cloud": [{"primary_cloud": "AWS", "count": 891}, ...],
+  "airflow_version": [{"version": "3.x", "count": 412}, ...]
+}
+```
+
+All four result sets in one XCom entry. This is the input to the synthesis 
step.
+
+
+## Synthesis: The Second LLM Call
+
+`LLMOperator` takes the collected results and produces a narrative. This is 
the synthesis step — the part of the pipeline that could not exist without all 
four sub-queries having completed first:
+
+```python
+synthesize_answer = LLMOperator(
+    task_id="synthesize_answer",
+    llm_conn_id=LLM_CONN_ID,
+    system_prompt=(
+        "You are a data analyst summarizing survey results about Apache 
Airflow practitioners. "
+        "Write in plain, concise language suitable for a technical audience. "
+        "Focus on patterns and proportions rather than raw counts."
+    ),
+    prompt=(
+        "Given these four independent survey query results about practitioners 
"
+        "who use AI tools to write Airflow code, write a 2-3 sentence "
+        "characterization of what a typical Airflow deployment looks like for "
+        "this group.\n\n"
+        "Results: {{ ti.xcom_pull(task_ids='collect_results') }}"
+    ),
+)
+collected >> synthesize_answer
+```
+
+`prompt` is a template field, so `{{ ti.xcom_pull(task_ids='collect_results') 
}}` renders the full dict at execution time. `system_prompt` maps to the 
PydanticAI agent's `instructions` parameter, so the framing instruction carries 
into every token the model generates.
+
+The output — a 2-3 sentence characterization — goes to XCom and then to the 
final approval step.
+
+> **Inline HITL alternative:** `LLMOperator` supports `require_approval=True` 
and `allow_modifications=True` as constructor parameters, via 
`LLMApprovalMixin`. Setting these eliminates the separate `ApprovalOperator` 
task and lets the reviewer edit the synthesized narrative directly before 
approving. Whether to use inline approval or a separate `ApprovalOperator` is a 
design choice; both produce the same result.
+
+
+## Walking Through a Run
+
+**Step 1 — Decompose.** Trigger the DAG. `decompose_question` completes in 
milliseconds and returns the four sub-question strings.
+
+**Steps 2–4 — Fan-out.** Twelve mapped task instances run: four 
`generate_sql`, four `wrap_query`, four `run_query`. In the Airflow UI, these 
appear as three rows of four parallel task instances. Each SQL generation call 
goes to the LLM; each DataFusion execution runs in-process against the CSV.
+
+<!-- SCREENSHOT PLACEHOLDER: Airflow UI Grid view showing the fan-out — three 
rows of four mapped instances (generate_sql[0..3], wrap_query[0..3], 
run_query[0..3]) all in success state, converging to collect_results → 
synthesize_answer → result_confirmation. Capture after a full successful run. 
-->
+
+A representative generated query for the executor dimension:
+
+```sql
+SELECT
+    CASE WHEN "CeleryExecutor" = 'Yes' THEN 'CeleryExecutor' END        AS 
executor_type,
+    COUNT(*) AS count
+FROM survey
+WHERE "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in 
writing Airflow code?" != 'No, I don''t use AI to write Airflow code'
+  AND "CeleryExecutor" IS NOT NULL
+GROUP BY executor_type
+
+UNION ALL
+
+SELECT
+    CASE WHEN "KubernetesExecutor" = 'Yes' THEN 'KubernetesExecutor' END,
+    COUNT(*)
+FROM survey
+WHERE "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in 
writing Airflow code?" != 'No, I don''t use AI to write Airflow code'
+  AND "KubernetesExecutor" IS NOT NULL
+GROUP BY 1
+-- ... and so on
+```
+
+**Step 5 — Collect.** `collect_results` assembles the four result sets into a 
labeled dict.
+
+**Step 6 — Synthesize.** `LLMOperator` calls the LLM once with all four result 
sets as context. A representative output:
+
+> "Among practitioners who actively use AI tools to write Airflow code, the 
majority (61%) deploy on a managed cloud service or cloud-native setup, with 
AWS as the primary cloud provider (38%). KubernetesExecutor is the dominant 
choice (54%), and this group is adopting Airflow 3.x at a notably higher rate 
than the survey population as a whole (29% vs. 21% overall)."
+
+**Step 7 — Review.** The `ApprovalOperator` presents the narrative in the 
Airflow UI. Approve to complete the DAG; reject to fail it and trigger a retry 
from the synthesis step if desired.
+
+
+## What the DAG Topology Makes Explicit
+
+The core difference between this pattern and the equivalent agent harness 
implementation is not the output — it is what is auditable after the run.
+
+| What's happening | In an agent harness | In this DAG |
+|---|---|---|
+| Sub-query: executor distribution | LLM internal tool call, no external 
artifact | Task `generate_sql[0]` — SQL in XCom, full log |
+| Sub-query: cloud provider | LLM internal tool call | Task `generate_sql[2]` 
— SQL in XCom, full log |
+| Parallel execution | Concurrent or sequential, implementation-dependent | 
Explicit mapped instances, each on its own worker |
+| cloud_provider query fails | Entire run restarts from the top, or fails | 
Only `run_query[2]` retries; other three results preserved |
+| Synthesis inputs | Accumulated context in the LLM's reasoning loop | 
`collect_results` XCom entry — the exact dict the LLM received |
+| Why did it characterize it that way? | No artifact | `synthesize_answer` 
XCom: input dict and output string both stored |
+
+Each `generate_sql[i]` task log contains the prompt the LLM received, the SQL 
it returned, and the validation result from sqlglot. Each `run_query[i]` log 
contains the DataFusion execution details and the row count returned. The 
synthesis step's XCom entry contains the exact dict that was passed as context.
+
+This is the same information an agent harness has internally — the difference 
is that Airflow surfaces it as first-class task artifacts, accessible from the 
Airflow UI without instrumenting or patching the reasoning loop.
+
+
+## Connecting Your LLM
+
+Both `LLMSQLQueryOperator` and `LLMOperator` use 
`llm_conn_id="pydanticai_default"`. The same connection table from the 
introductory post applies:
+
+| Provider | Connection type | Required fields |
+|---|---|---|
+| OpenAI | `pydanticai` | Password: API key. Extra: `{"model": 
"openai:gpt-4o"}` |
+| Anthropic | `pydanticai` | Password: API key. Extra: `{"model": 
"anthropic:claude-haiku-4-5-20251001"}` |
+| Google Vertex | `pydanticai-vertex` | Extra: `{"model": 
"google-vertex:gemini-2.0-flash", "project": "...", "vertexai": true}` |
+| AWS Bedrock | `pydanticai-bedrock` | Extra: `{"model": 
"bedrock:us.anthropic.claude-opus-4-5", "region_name": "us-east-1"}` |
+
+One connection serves both operators. The synthesis step and the SQL 
generation steps can use different connections if you want a stronger model for 
synthesis and a faster one for the SQL generation pass — set `model_id` on the 
`LLMOperator` to override the connection's default.
+
+
+## The Multi-Agent Pattern Hidden in Plain Sight
+
+This DAG was not designed around multi-agent frameworks, but it accidentally 
implements one of the most common separation-of-concerns patterns in that 
space: the **SQL Architect / Critic / Narrator** triad.
+
+In agent harness frameworks, these three roles are typically implemented as 
distinct agent instances that coordinate through an internal routing layer. The 
underlying rationale is that mixing generation, evaluation, and communication 
into a single agent produces outputs that are mediocre at all three jobs. 
Separating them forces each role to reason only about what it is responsible 
for.
+
+The survey DAG lands in the same place through a different path: the task 
boundary enforces the separation.
+
+**SQL Architect → `generate_sql[0..3]` (`LLMSQLQueryOperator`).**
+Each mapped instance receives one natural language sub-question and produces 
one SQL query. Schema context is passed as a system-level framing, not as part 
of the user prompt, so the model reasons about structure before generating 
syntax. The Architect role is strict: produce a valid `SELECT` statement or 
fail.
+
+**Critic → two layers.**
+The first layer is embedded in `LLMSQLQueryOperator`: sqlglot parses and 
validates the generated SQL before the task returns. This is a syntax-level 
Critic — it rejects anything that is not a `SELECT`. The second and fuller 
layer is the `LLMBranchOperator` pattern from Pattern 2 in this series: an 
explicit task that evaluates result quality and decides whether the finding is 
reportable, needs a drill-down, or warrants a pivot to a different hypothesis. 
That task does what the Critic does [...]
+
+**Narrator → `synthesize_answer` (`LLMOperator`).**
+Receives the labeled result sets from all four dimensions and produces a 
plain-language characterization. The Narrator's role is bounded by design: it 
receives structured data rows, not the intermediate SQL or any reasoning 
artifacts, and its system prompt constrains it to communication — "focus on 
patterns and proportions rather than raw counts." The role separation is 
enforced by what is in XCom, not by agent routing logic.
+
+One genuine structural difference remains. In a multi-agent system, the Critic 
can loop back to the Architect with feedback — "this query has a NULL handling 
problem, try again" — and the cycle runs until the output meets a quality bar. 
Airflow DAGs are acyclic. The Critic either raises an exception and triggers a 
task-level retry of the Architect instance (automatic but blunt), or routes to 
an alternative path via `LLMBranchOperator` (explicit and auditable, but the 
alternative path mus [...]
+
+That acyclicity is a deliberate tradeoff: it is also what makes the DAG's 
execution fully auditable and its failure modes predictable. The feedback loop 
pattern — and the open question of how far it can be supported within a 
structured workflow model — is part of what Airflow's roadmap is actively 
working through.
+
+---
+
+## Try It
+
+The DAG is in the `common.ai` provider example DAGs:
+[`example_llm_survey_agentic.py`](https://github.com/apache/airflow/tree/main/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py).
+
+```bash
+pip install 'apache-airflow-providers-common-ai' \
+            'apache-airflow-providers-common-sql[datafusion]'
+```
+
+Requires Apache Airflow 3.0+.
+
+Set `SURVEY_CSV_PATH` to your local cleaned copy of the survey CSV, create a 
`pydanticai_default` connection, and trigger `example_llm_survey_agentic`.
+
+The Airflow UI will show the four parallel `generate_sql` and `run_query` 
instances fanning out and converging to `collect_results`. That visual is the 
clearest way to see what distinguishes the agentic pattern from a single-query 
run.
+
+Questions, results, and sub-questions that surprised the LLM are welcome on 
[Airflow Slack](https://s.apache.org/airflow-slack) in `#airflow-ai`.


Reply via email to