This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 8001e4c86e feat(bench): add Arrow Flight E2E benchmark + Benchmarks CI 
workflow (#5557)
8001e4c86e is described below

commit 8001e4c86e8d60971887ad7509b88c42a9fd1ad5
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Jun 11 15:29:10 2026 -0700

    feat(bench): add Arrow Flight E2E benchmark + Benchmarks CI workflow (#5557)
    
    ### What changes were proposed in this PR?
    
    A bench-agnostic CI lifecycle that future suites (e.g. JMH for
    `ArrowUtils` micros) plug into by appending one line to
    `bin/run-benchmarks.sh`, plus the first concrete suite: an end-to-end
    Arrow Flight + `PythonWorkflowWorker` micro-bench.
    
    **Lifecycle**
    
    | Trigger | Mode | PR comment | Publish to gh-pages |
    |---|---|---|---|
    | `pull_request` (label-gated, mirrors `amber-integration`'s set) | `pr`
    — 3 configs × 20 batches (~5 min) | ✓ | — |
    | `push` to `main` | `pr` (post-merge fast signal) | — | ✓ |
    | `schedule` Sundays 08:00 UTC | `full` — 36 configs × 200 batches
    (~50-60 min) | — | ✓ |
    | `workflow_dispatch` | `full` | — | — |
    
    PR runs upload the bench as an artifact + render a markdown summary
    table on the workflow page; the `workflow_run`-triggered `Benchmarks PR
    Comment` listener (separate file because `pull_request` from forks gets
    a read-only token and zero secret access) downloads the artifact,
    sanitizes the CSV, and upserts a single marker-tagged PR comment.
    Non-blocking — not part of `required-checks.yml`'s aggregator.
    
    **First benchmark: Arrow Flight E2E (`ArrowFlightActorBench`)**
    
    Spawns a real `PythonWorkflowWorker` actor (real Pekko mailbox + real
    `texera_run_python_worker.py` subprocess + real Arrow Flight gRPC) wired
    to an identity Python UDF, then times per-batch send→echo round-trip
    across a sweep of `batch_size × schema_width × string_len`. Per-config
    output: throughput (tuples/s, MB/s), latency p50/p95/p99, total ms. Each
    config writes incrementally so a killed sweep still leaves usable
    artifacts.
    
    ASF: `benchmark-action/github-action-benchmark` is SHA-pinned to
    `52576c92bccf6ac60c8223ec7eb2565637cae9ba` (v1.22.1) per the
    apache-infrastructure-actions allow-list.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5556
    
    ### How was this PR tested?
    
    End-to-end validated on a fork-internal PR —
    [Yicong-Huang/texera#17](https://github.com/Yicong-Huang/texera/pull/17)
    ran the full `Benchmarks` workflow, the `workflow_run` listener fired,
    and a marker-tagged comment landed and upserted across two push cycles
    ([rendered
    
example](https://github.com/Yicong-Huang/texera/pull/17#issuecomment-4645589605)).
    `workflow_run` only listens on the default branch, so the loop can't be
    tested from a non-default branch — that's why the dry-run lived on a
    fork; after merge, the same flow takes effect on `apache/texera:main`
    automatically.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
---
 .github/workflows/benchmarks-pr-comment.yml        | 261 +++++++++
 .github/workflows/benchmarks.yml                   | 327 ++++++++++++
 amber/build.sbt                                    |   7 +
 .../texera/amber/bench/ArrowFlightActorBench.scala | 592 +++++++++++++++++++++
 bin/run-benchmarks.sh                              |  59 ++
 5 files changed, 1246 insertions(+)

diff --git a/.github/workflows/benchmarks-pr-comment.yml 
b/.github/workflows/benchmarks-pr-comment.yml
new file mode 100644
index 0000000000..f8bbae1a65
--- /dev/null
+++ b/.github/workflows/benchmarks-pr-comment.yml
@@ -0,0 +1,261 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Posts (or upserts) a PR comment with bench results AFTER the Benchmarks
+# workflow completes.
+#
+# Why a separate workflow_run-triggered file:
+#   - The Benchmarks workflow runs on `pull_request`, which for fork PRs
+#     gets a read-only GITHUB_TOKEN and zero secret access — GitHub's
+#     hard-coded security model. We can't comment from there.
+#   - `workflow_run` runs in the BASE repo's context (apache/texera)
+#     with normal token + secret access, so it CAN comment on fork PRs.
+#   - This is the ASF-approved pattern; `pull_request_target` is policy-
+#     forbidden for any action that handles tokens.
+#
+# Why workflow_run is safe here vs pull_request_target:
+#   - We only READ a small, opaque artifact (pr-number.txt + the bench
+#     JSON / CSV) produced by the upstream run; we don't execute any
+#     PR-author code in this workflow.
+#   - The PR number is validated against ^[0-9]+$ before being used in
+#     any API call, blocking ref injection.
+
+name: Benchmarks PR Comment
+
+on:
+  workflow_run:
+    workflows: ["Benchmarks"]
+    types: [completed]
+
+permissions:
+  # Need pull-requests: write to post / update the comment.
+  # contents: read is the default and enough to checkout for github-script
+  # which we don't actually do here (we only call REST APIs).
+  pull-requests: write
+  actions: read
+
+jobs:
+  comment:
+    # Only act when the upstream Benchmarks run was triggered by a PR.
+    # push-to-main / schedule / dispatch produce no PR to comment on.
+    if: ${{ github.event.workflow_run.event == 'pull_request' }}
+    runs-on: ubuntu-22.04
+    steps:
+      - name: Download bench-results artifact
+        uses: actions/github-script@v8
+        with:
+          script: |
+            const fs = require("fs");
+            const path = require("path");
+            const runId = context.payload.workflow_run.id;
+            const { data } = await 
github.rest.actions.listWorkflowRunArtifacts({
+              owner: context.repo.owner,
+              repo: context.repo.repo,
+              run_id: runId,
+            });
+            const match = data.artifacts.find((a) => 
a.name.startsWith("bench-results-"));
+            if (!match) {
+              core.warning(`no bench-results-* artifact on run ${runId}; 
nothing to comment.`);
+              return;
+            }
+            const zip = await github.rest.actions.downloadArtifact({
+              owner: context.repo.owner,
+              repo: context.repo.repo,
+              artifact_id: match.id,
+              archive_format: "zip",
+            });
+            fs.mkdirSync("bench-results-zip", { recursive: true });
+            fs.writeFileSync(path.join("bench-results-zip", "artifact.zip"), 
Buffer.from(zip.data));
+            core.info(`downloaded artifact ${match.name} 
(${match.size_in_bytes} bytes)`);
+
+      - name: Unzip artifact
+        run: |
+          mkdir -p bench-results
+          unzip -o bench-results-zip/artifact.zip -d bench-results
+          ls -la bench-results/
+
+      - name: Read PR number from artifact
+        id: pr
+        # Read + strictly validate (digits only) before using in API calls.
+        # The artifact comes from a fork-triggered workflow whose contents
+        # are not entirely trusted; numeric-only PR numbers block any
+        # injection vector through this value.
+        run: |
+          if [ ! -f bench-results/pr-number.txt ]; then
+            echo "no pr-number.txt in artifact; bailing out"
+            exit 0
+          fi
+          raw=$(tr -d '[:space:]' < bench-results/pr-number.txt)
+          if ! [[ "$raw" =~ ^[0-9]+$ ]]; then
+            echo "invalid pr-number.txt contents: '$raw'"
+            exit 1
+          fi
+          echo "number=$raw" >> "$GITHUB_OUTPUT"
+
+      - name: Upsert PR comment with bench summary
+        if: steps.pr.outputs.number != ''
+        uses: actions/github-script@v8
+        env:
+          PR_NUMBER: ${{ steps.pr.outputs.number }}
+        with:
+          script: |
+            const fs = require("fs");
+            const pr = Number(process.env.PR_NUMBER);
+            const marker = "<!-- texera-benchmarks-comment -->";
+
+            // CSV comes from a fork-PR-controlled artifact — sanitize before
+            // embedding in markdown:
+            //   1. Cap total size so a giant junk file can't bloat a comment.
+            //   2. Strip any triple-backtick sequence so the content cannot
+            //      escape the surrounding code fence and inject arbitrary
+            //      markdown (phishing links, image-rendering tricks, etc).
+            //      Replacement with a zero-width char preserves byte alignment
+            //      visually while neutralizing fence termination.
+            const MAX_CSV_BYTES = 32 * 1024;
+            const csvPath = "bench-results/arrow-flight-e2e.csv";
+            let csv = null;
+            if (fs.existsSync(csvPath)) {
+              let raw = fs.readFileSync(csvPath, "utf8");
+              if (raw.length > MAX_CSV_BYTES) {
+                raw = raw.slice(0, MAX_CSV_BYTES) + "\n[truncated]";
+              }
+              csv = raw.replace(/```+/g, "`​``").trim();
+            }
+
+            // Per-cell sanitizer for the markdown table: strip newlines, 
escape
+            // pipes (would break columns), and cap length.
+            const escapeCell = (s) =>
+              s == null
+                ? ""
+                : String(s).replace(/[\r\n]+/g, " ").replace(/\|/g, 
"\\|").slice(0, 64);
+
+            // Render selected columns as a markdown table. Drops noise columns
+            // (config_idx, total_tuples, total_bytes, lat_p95_us) and converts
+            // microseconds to milliseconds for latency readability. Returns
+            // null on any parsing failure → fallback renders raw CSV instead.
+            const csvToTable = (text) => {
+              try {
+                const rows = text
+                  .trim()
+                  .split(/\r?\n/)
+                  .map((line) => line.split(","));
+                if (rows.length < 2) return null;
+                const header = rows[0].map((h) => h.trim());
+                const idx = (col) => header.indexOf(col);
+                const cols = [
+                  { col: "batch_size", label: "batch", fmt: (v) => v },
+                  { col: "schema_width", label: "schema_w", fmt: (v) => v },
+                  { col: "string_len", label: "str_len", fmt: (v) => v },
+                  { col: "num_batches", label: "n_batches", fmt: (v) => v },
+                  { col: "tuples_per_sec", label: "tuples/s", fmt: (v) => v },
+                  { col: "mb_per_sec", label: "MB/s", fmt: (v) => v },
+                  {
+                    col: "lat_p50_us",
+                    label: "p50 ms",
+                    fmt: (v) => (parseFloat(v) / 1000).toFixed(2),
+                  },
+                  {
+                    col: "lat_p99_us",
+                    label: "p99 ms",
+                    fmt: (v) => (parseFloat(v) / 1000).toFixed(2),
+                  },
+                  { col: "total_ms", label: "total ms", fmt: (v) => v },
+                ].filter((c) => idx(c.col) >= 0);
+                if (cols.length === 0) return null;
+                const lines = [];
+                lines.push("| " + cols.map((c) => escapeCell(c.label)).join(" 
| ") + " |");
+                lines.push("|" + cols.map(() => "---:").join("|") + "|");
+                for (const row of rows.slice(1)) {
+                  const cells = cols.map((c) => {
+                    const raw = row[idx(c.col)];
+                    try {
+                      return escapeCell(c.fmt(raw));
+                    } catch (e) {
+                      return escapeCell(raw);
+                    }
+                  });
+                  lines.push("| " + cells.join(" | ") + " |");
+                }
+                return lines.join("\n");
+              } catch (e) {
+                core.warning(`csvToTable failed: ${e.message}`);
+                return null;
+              }
+            };
+
+            // workflow_run.html_url is GitHub-emitted (URL to apache/texera
+            // run page); not attacker-influenceable.
+            const upstreamUrl = context.payload.workflow_run.html_url;
+
+            // Primary view: rendered markdown table for skim-readability.
+            // Fallback view (collapsed <details>): raw sanitized CSV for full
+            // verifiability — readers click to expand if they need every 
column.
+            const tableMd = csv ? csvToTable(csv) : null;
+            const bodyParts = [marker, "## Arrow Flight E2E bench", ""];
+            if (tableMd) {
+              bodyParts.push(tableMd, "");
+            } else if (!csv) {
+              bodyParts.push("_(no arrow-flight-e2e.csv in artifact)_", "");
+            } else {
+              bodyParts.push("_(unable to parse CSV; raw below)_", "");
+            }
+            if (csv) {
+              bodyParts.push(
+                "<details><summary>Raw CSV</summary>",
+                "",
+                "```csv",
+                csv,
+                "```",
+                "",
+                "</details>",
+                ""
+              );
+            }
+            bodyParts.push(`[Full workflow run](${upstreamUrl})`);
+            const body = bodyParts.join("\n");
+
+            // Find existing marker comment so subsequent runs upsert in place.
+            // Paginate via `paginate` so a long-running PR with >100 comments
+            // still locates the marker — otherwise we'd silently create a
+            // duplicate every push past the 100-comment ceiling.
+            const allComments = await github.paginate(
+              github.rest.issues.listComments,
+              {
+                owner: context.repo.owner,
+                repo: context.repo.repo,
+                issue_number: pr,
+                per_page: 100,
+              }
+            );
+            const existing = allComments.find((c) => c.body && 
c.body.includes(marker));
+            if (existing) {
+              await github.rest.issues.updateComment({
+                owner: context.repo.owner,
+                repo: context.repo.repo,
+                comment_id: existing.id,
+                body,
+              });
+              core.info(`updated comment ${existing.id} on PR #${pr}`);
+            } else {
+              await github.rest.issues.createComment({
+                owner: context.repo.owner,
+                repo: context.repo.repo,
+                issue_number: pr,
+                body,
+              });
+              core.info(`created new comment on PR #${pr}`);
+            }
diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml
new file mode 100644
index 0000000000..83da742857
--- /dev/null
+++ b/.github/workflows/benchmarks.yml
@@ -0,0 +1,327 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Texera benchmarks — bench-agnostic umbrella workflow.
+#
+# This file is the single CI entry for ALL Texera performance benchmarks
+# (currently Arrow Flight E2E; JMH and others land here as well). The
+# workflow knows nothing about specific benches — bin/run-benchmarks.sh
+# is the opaque entry point that owns which benches run and where their
+# outputs land under bench-results/. Adding a new bench is:
+#   1. Append the run command to bin/run-benchmarks.sh.
+#   2. Add a `Publish <chart-name>` step block below pointing at the
+#      bench's JSON output file with the right `tool:` setting.
+# This workflow file otherwise stays unchanged.
+#
+# Triggering — mirrors amber-integration's label gate (NOT file paths):
+#   - PR: runs only when one of the labels mapped to the amber-integration
+#     stack in required-checks.yml's LABEL_STACKS is present on the PR.
+#     Labels are applied by the .github/labeler.yml workflow on opened /
+#     synchronize, so we wait for that workflow to complete before
+#     deciding (same pattern required-checks.yml uses).
+#   - push to main: always runs (same trimmed grid as PR for quick post-
+#     merge signal) and publishes to gh-pages.
+#   - schedule (weekly): runs the full 36-config sweep and publishes to
+#     gh-pages — this is the authoritative long-term baseline.
+#   - workflow_dispatch: manual full-grid run (no publish; bring-your-own
+#     trigger for ad-hoc exploration).
+#
+# Two modes via BENCH_MODE env (read by the bench Scala main):
+#   pr   — 3 configs × 20 batches, ~5 min   (PR + push-to-main)
+#   full — 36 configs × 200 batches, ~50-60 min   (schedule + dispatch)
+#
+# Non-blocking: this workflow is NOT included in required-checks.yml's
+# `required-checks` aggregator, so its result doesn't gate merges even
+# when it fails. Adding it to branch protection later is a deliberate
+# .asf.yaml change.
+#
+# Permissions:
+#   contents: write — needed by benchmark-action's auto-push to gh-pages.
+#   PR runs (which GitHub auto-downgrades to read-only on forks) gate
+#   auto-push off via the event check, so the missing write is never
+#   exercised.
+
+name: Benchmarks
+
+on:
+  push:
+    branches: [main]
+  pull_request:
+    types: [opened, reopened, synchronize, labeled, unlabeled]
+  schedule:
+    # Weekly full-grid baseline refresh, Sundays 08:00 UTC. PR and post-
+    # merge runs use a trimmed 3-config grid to stay around 5 min; the
+    # scheduled run covers the full 36-config sweep that the gh-pages
+    # dashboard tracks long-term.
+    - cron: "0 8 * * 0"
+  workflow_dispatch:
+
+permissions:
+  contents: write
+
+concurrency:
+  group: benchmarks-${{ github.ref }}
+  # On main: never cancel an in-flight baseline run; on PRs: supersede.
+  cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
+
+jobs:
+  precheck:
+    # Decide whether to run based on PR labels (push / dispatch always
+    # run). Lifted from required-checks.yml's precheck so the trigger
+    # surface matches amber-integration exactly.
+    name: Precheck
+    runs-on: ubuntu-latest
+    outputs:
+      run_bench: ${{ steps.decide.outputs.run_bench }}
+    steps:
+      - name: Wait for Pull Request Labeler
+        if: github.event_name == 'pull_request'
+        uses: actions/github-script@v8
+        with:
+          script: |
+            const ref = context.payload.pull_request.head.sha;
+            const maxAttempts = 30;
+            for (let i = 0; i < maxAttempts; i++) {
+              const { data } = await github.rest.checks.listForRef({
+                owner: context.repo.owner,
+                repo: context.repo.repo,
+                ref,
+                check_name: "labeler",
+              });
+              const check = data.check_runs[0];
+              if (check && check.status === "completed") {
+                core.info(`labeler ${check.conclusion}`);
+                return;
+              }
+              core.info(`labeler not ready (attempt ${i + 1}/${maxAttempts})`);
+              await new Promise((r) => setTimeout(r, 10000));
+            }
+            core.warning("labeler did not complete within 5 minutes; 
proceeding with current labels.");
+
+      - name: Decide whether to run bench
+        id: decide
+        uses: actions/github-script@v8
+        with:
+          script: |
+            const eventName = context.eventName;
+            if (eventName !== "pull_request") {
+              // push to main / workflow_dispatch always run.
+              core.info(`event=${eventName} — running unconditionally`);
+              core.setOutput("run_bench", "true");
+              return;
+            }
+            // Re-fetch labels: the labeler may have just added some.
+            const { data: pr } = await github.rest.pulls.get({
+              owner: context.repo.owner,
+              repo: context.repo.repo,
+              pull_number: context.payload.pull_request.number,
+            });
+            const labels = pr.labels.map((l) => l.name);
+            core.info(`PR labels: ${labels.join(", ") || "(none)"}`);
+            // Mirrors LABEL_STACKS in required-checks.yml: every label
+            // whose stack list contains "amber-integration" triggers this
+            // bench. Keep in sync if LABEL_STACKS there changes.
+            const TRIGGER_LABELS = new Set([
+              "pyamber",
+              "engine",
+              "amber-integration",
+              "common",
+              "ddl-change",
+              "ci",
+            ]);
+            const matched = labels.filter((l) => TRIGGER_LABELS.has(l));
+            const shouldRun = matched.length > 0;
+            core.info(
+              shouldRun
+                ? `Triggering on labels: ${matched.join(", ")}`
+                : "No trigger label present; skipping bench."
+            );
+            core.setOutput("run_bench", shouldRun ? "true" : "false");
+
+  bench:
+    name: Bench
+    needs: precheck
+    if: ${{ needs.precheck.outputs.run_bench == 'true' }}
+    runs-on: ubuntu-22.04
+    env:
+      JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
+      JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
+      # `pr` mode = 3-config trimmed sweep (~5 min) for PR + post-merge.
+      # `full` mode = 36-config sweep (~50-60 min) for schedule + manual.
+      # Read by the bench Scala main (see GridSpec switch); workflow only
+      # decides which mode to pass.
+      BENCH_MODE: ${{ (github.event_name == 'schedule' || github.event_name == 
'workflow_dispatch') && 'full' || 'pr' }}
+    services:
+      # The bench itself doesn't touch the DB, but sbt's transitive compile
+      # chain reaches `common/auth` which imports JOOQ-generated classes
+      # from `org.apache.texera.dao.jooq.generated.*`. JOOQ codegen at
+      # sbt compile time requires a live Postgres to introspect against;
+      # without it the auth module's `User` / `UserRoleEnum` symbols fail
+      # to resolve and the whole bench compile aborts. Mirrors the same
+      # service block from amber-integration in build.yml.
+      postgres:
+        image: postgres
+        env:
+          POSTGRES_PASSWORD: postgres
+        ports:
+          - 5432:5432
+        options: >-
+          --health-cmd="pg_isready -U postgres"
+          --health-interval=10s
+          --health-timeout=5s
+          --health-retries=5
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v5
+        with:
+          fetch-depth: 0
+      - name: Setup JDK
+        uses: actions/setup-java@v5
+        with:
+          distribution: "temurin"
+          java-version: 17
+      - name: Setup Python
+        uses: actions/setup-python@v6
+        with:
+          python-version: "3.12"
+      - name: Install Python dependencies
+        # Mirrors amber-integration's installer in build.yml so the bench
+        # subprocess imports resolve identically (pytorch CPU index +
+        # betterproto plugin via dev-requirements).
+        run: |
+          python -m pip install uv
+          if [ -f amber/requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
+          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; 
fi
+          if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
+      - name: Install protoc
+        run: |
+          PROTOC_VERSION=$(cat bin/protoc-version.txt)
+          curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
+          sudo unzip -o /tmp/protoc.zip -d /usr/local
+          sudo chmod +x /usr/local/bin/protoc
+          sudo chmod -R a+rX /usr/local/include/google
+      - name: Create Database for JOOQ codegen
+        # Minimal subset of amber-integration's "Create Databases" step —
+        # JOOQ only introspects against texera_db, not iceberg/lakefs/
+        # lakekeeper schemas which the bench never touches.
+        run: psql -h localhost -U postgres -f sql/texera_ddl.sql
+        env:
+          PGPASSWORD: postgres
+      - name: Generate Python proto bindings
+        run: bash bin/python-proto-gen.sh
+      - name: Setup sbt launcher
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+      - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
v8.1.0
+        with:
+          extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}", 
"project/build.properties" ]'
+
+      - name: Run benchmarks
+        # Single opaque entry point — this workflow doesn't know which
+        # benches exist. Adding a JMH suite later = appending one line
+        # inside bin/run-benchmarks.sh and adding a publish step below.
+        run: bash bin/run-benchmarks.sh
+
+      - name: Stash PR number for downstream comment workflow
+        # PR fork workflows can't comment (GitHub forces read-only token);
+        # benchmarks-pr-comment.yml runs separately via workflow_run with
+        # proper write access, and needs the PR number to find the target.
+        # github.event.workflow_run.pull_requests is empty for fork PRs,
+        # so we ferry the number via artifact.
+        if: ${{ github.event_name == 'pull_request' && !cancelled() }}
+        env:
+          PR_NUMBER: ${{ github.event.pull_request.number }}
+        run: echo "$PR_NUMBER" > bench-results/pr-number.txt
+
+      - name: Render bench summary
+        # Render the bench CSV into a markdown table on the workflow run
+        # page. Visible without further clicks — and doesn't need any
+        # extra permissions (writes to $GITHUB_STEP_SUMMARY only).
+        if: ${{ !cancelled() }}
+        run: |
+          {
+            echo "## Bench results (\`$BENCH_MODE\` mode)"
+            echo
+            if [ -f bench-results/arrow-flight-e2e.csv ]; then
+              echo '```csv'
+              cat bench-results/arrow-flight-e2e.csv
+              echo '```'
+            else
+              echo "_(no bench-results/arrow-flight-e2e.csv produced)_"
+            fi
+          } >> "$GITHUB_STEP_SUMMARY"
+
+      - name: Upload bench artifacts
+        if: ${{ !cancelled() }}
+        uses: actions/upload-artifact@v4
+        with:
+          name: bench-results-${{ github.run_id }}
+          path: bench-results/
+          retention-days: 14
+
+      # Publish to the gh-pages dashboard. auto-push + save-data-file are
+      # both gated on push-to-main / schedule: PR runs only emit the job
+      # summary and the uploaded artifact, never touching the tracked
+      # baseline. Adding a new benchmark = adding one publish block below
+      # matching the JSON filename convention in bin/run-benchmarks.sh.
+      #
+      # `skip-fetch-gh-pages: true` because gh-pages does not exist on
+      # apache/texera yet — without this the action fatals with
+      # `couldn't find remote ref gh-pages` even before the auto-push
+      # decision. auto-push: true on push/schedule still creates the
+      # branch on first write. Once the dashboard is seeded, flip this
+      # to false to re-enable baseline comparison + alert-threshold.
+      #
+      # `continue-on-error: true` keeps any other gh-pages-side surprise
+      # (permission glitches, transient git failures) from failing the
+      # bench job overall — the bench data itself is already in the
+      # uploaded artifact above.
+      - name: Publish throughput
+        if: ${{ !cancelled() }}
+        continue-on-error: true
+        uses: 
benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba
 # v1.22.1
+        with:
+          name: Arrow Flight E2E Throughput
+          tool: customBiggerIsBetter
+          output-file-path: bench-results/arrow-flight-e2e-throughput.json
+          github-token: ${{ secrets.GITHUB_TOKEN }}
+          auto-push: ${{ (github.event_name == 'push' && github.ref == 
'refs/heads/main') || github.event_name == 'schedule' }}
+          save-data-file: ${{ (github.event_name == 'push' && github.ref == 
'refs/heads/main') || github.event_name == 'schedule' }}
+          skip-fetch-gh-pages: true
+          gh-pages-branch: gh-pages
+          benchmark-data-dir-path: dev/bench
+          alert-threshold: "150%"
+          # comment-on-alert needs pull-requests:write; skip and let
+          # results show up via summary-always instead.
+          comment-on-alert: false
+          summary-always: true
+      - name: Publish latency
+        if: ${{ !cancelled() }}
+        continue-on-error: true
+        uses: 
benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba
 # v1.22.1
+        with:
+          name: Arrow Flight E2E Latency
+          tool: customSmallerIsBetter
+          output-file-path: bench-results/arrow-flight-e2e-latency.json
+          github-token: ${{ secrets.GITHUB_TOKEN }}
+          auto-push: ${{ (github.event_name == 'push' && github.ref == 
'refs/heads/main') || github.event_name == 'schedule' }}
+          save-data-file: ${{ (github.event_name == 'push' && github.ref == 
'refs/heads/main') || github.event_name == 'schedule' }}
+          skip-fetch-gh-pages: true
+          gh-pages-branch: gh-pages
+          benchmark-data-dir-path: dev/bench
+          alert-threshold: "150%"
+          comment-on-alert: false
+          summary-always: true
diff --git a/amber/build.sbt b/amber/build.sbt
index aa775f7301..efcaee5006 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -62,6 +62,13 @@ Compile / unmanagedSourceDirectories += baseDirectory.value 
/ "src" / "main" / "
 // AMBER_TEST_FILTER env var below routes which tagged subset runs.
 Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / 
"integration"
 
+// `amber/src/bench` holds performance benchmarks (no pass/fail assertion;
+// emit metrics for the github-action-benchmark CI dashboard). Kept out of
+// `src/test/` so reviewers don't conflate "runs in test suite" with "is a
+// test". Same Test-scope wiring as `integration/` above so scalafmt /
+// scalafix still cover it and `sbt Test/runMain` can invoke benches.
+Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "bench" / 
"scala"
+
 // Test-filter switch driven by the AMBER_TEST_FILTER env var so the
 // amber and amber-integration CI jobs select disjoint subsets without
 // each invocation having to embed a `set Tests.Argument(...)` prefix.
diff --git 
a/amber/src/bench/scala/org/apache/texera/amber/bench/ArrowFlightActorBench.scala
 
b/amber/src/bench/scala/org/apache/texera/amber/bench/ArrowFlightActorBench.scala
new file mode 100644
index 0000000000..79d0c8cd7d
--- /dev/null
+++ 
b/amber/src/bench/scala/org/apache/texera/amber/bench/ArrowFlightActorBench.scala
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.bench
+
+import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
+import org.apache.pekko.testkit.TestProbe
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
+  ChannelIdentity,
+  EmbeddedControlMessageIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
+import 
org.apache.texera.amber.engine.architecture.common.WorkflowActor.{NetworkAck, 
NetworkMessage}
+import 
org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation
+import 
org.apache.texera.amber.engine.architecture.scheduling.config.WorkerConfig
+import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import 
org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
+import java.io.PrintWriter
+import java.nio.file.{Files, Paths}
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+  * End-to-end micro-benchmark of the real Arrow Flight data path through a
+  * live PythonWorkflowWorker actor.
+  *
+  * Each measured config spawns a fresh PythonWorkflowWorker (real Pekko actor,
+  * real Python subprocess via texera_run_python_worker.py, real Arrow Flight
+  * transport), wires up an identity Python UDF, and times the round-trip of
+  * `numBatches` DataFrames send→echo through the actor mailbox.
+  *
+  * Output (rewritten incrementally after every config so a killed sweep
+  * still preserves usable data):
+  *   - stdout summary per config
+  *   - bench-results/arrow-flight-e2e.csv               (one row per config)
+  *   - bench-results/arrow-flight-e2e-throughput.json   
(github-action-benchmark customBiggerIsBetter)
+  *   - bench-results/arrow-flight-e2e-latency.json      
(github-action-benchmark customSmallerIsBetter)
+  *
+  * Run with:
+  *   sbt "WorkflowExecutionService/Test/runMain \
+  *     org.apache.texera.amber.bench.ArrowFlightActorBench"
+  *
+  * Caveat: `Utils.amberHomePath` does a `Files.walk(cwd, 2).findAny` for any
+  * dir ending in `amber`. If `.claude/amber/` exists locally, the Python
+  * subprocess may end up trying to launch from that path; move it aside for
+  * the run, or fix `amberHomePath` upstream.
+  */
+object ArrowFlightActorBench {
+
+  // 
---------------------------------------------------------------------------
+  // Identity Python UDF — passes input tuples straight through to output.
+  // 
---------------------------------------------------------------------------
+  private val IdentityPythonCode: String =
+    """from pytexera import *
+      |
+      |class ProcessTupleOperator(UDFOperatorV2):
+      |    @overrides
+      |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+      |        yield tuple_
+      |""".stripMargin
+
+  private val WorkflowId = WorkflowIdentity(1L)
+  private val InputPortId = PortIdentity(id = 0, internal = false)
+  private val OutputPortId = PortIdentity(id = 0, internal = false)
+
+  // Sweep grid + iteration counts switch on BENCH_MODE so PR / post-merge
+  // checks stay around 5 min while scheduled / manual runs do the full
+  // 36-config grid that the gh-pages dashboard tracks long-term.
+  //   pr   — 3 configs × 20 batches, warmup 5  (~4-5 min in CI)
+  //   full — 36 configs × 200 batches, warmup 20  (~50-60 min in CI)
+  // BENCH_NUM_BATCHES, if set, overrides numBatches for the current mode
+  // (useful for local smoke).
+  private val BenchMode: String = sys.env.getOrElse("BENCH_MODE", 
"full").toLowerCase
+
+  private case class GridSpec(
+      batchSizes: Seq[Int],
+      schemaWidths: Seq[Int],
+      stringLens: Seq[Int],
+      numBatches: Int,
+      warmupBatches: Int
+  )
+
+  private val grid: GridSpec = BenchMode match {
+    case "pr" =>
+      GridSpec(
+        batchSizes = Seq(10, 100, 1000),
+        schemaWidths = Seq(10),
+        stringLens = Seq(64),
+        numBatches = 
sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(20),
+        warmupBatches = 5
+      )
+    case _ =>
+      GridSpec(
+        batchSizes = Seq(10, 100, 1000, 10000),
+        schemaWidths = Seq(1, 10, 50),
+        stringLens = Seq(8, 64, 512),
+        numBatches = 
sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(200),
+        warmupBatches = 20
+      )
+  }
+
+  private val DefaultBatchSizes: Seq[Int] = grid.batchSizes
+  private val DefaultSchemaWidths: Seq[Int] = grid.schemaWidths
+  private val DefaultStringLens: Seq[Int] = grid.stringLens
+  private val DefaultNumBatches: Int = grid.numBatches
+  private val WarmupBatches: Int = grid.warmupBatches
+
+  // All artifacts land under bench-results/ so CI can artifact-upload the
+  // whole directory uniformly without knowing individual filenames beyond
+  // what its publish matrix declares.
+  // Conventions for new benches:
+  //   bench-results/<bench-name>-throughput.json  → customBiggerIsBetter
+  //   bench-results/<bench-name>-latency.json     → customSmallerIsBetter
+  //   bench-results/<bench-name>-jmh.json         → tool=jmh
+  private val OutDir = Paths.get("bench-results")
+  private val CsvOutPath = OutDir.resolve("arrow-flight-e2e.csv")
+  // Two JSON files — github-action-benchmark needs distinct
+  // customBiggerIsBetter / customSmallerIsBetter inputs since each upload
+  // direction is per-`tool` parameter.
+  private val ThroughputJsonPath = 
OutDir.resolve("arrow-flight-e2e-throughput.json")
+  private val LatencyJsonPath = OutDir.resolve("arrow-flight-e2e-latency.json")
+
+  // 
---------------------------------------------------------------------------
+  // Sink actor: stands in for the downstream worker. Auto-acks every
+  // NetworkMessage from the worker (otherwise PekkoMessageTransferService
+  // throttles after the first unacked reply and the bench stalls), and
+  // forwards every received message to the bench probe for inspection.
+  // 
---------------------------------------------------------------------------
+  private class AutoAckSink(forwardTo: ActorRef) extends Actor {
+    override def receive: Receive = {
+      case msg @ NetworkMessage(id, internal) =>
+        sender() ! NetworkAck(id, getInMemSize(internal), 0L)
+        forwardTo ! msg
+      case other =>
+        forwardTo ! other
+    }
+  }
+
+  private case class BenchConfig(
+      configIdx: Int,
+      batchSize: Int,
+      schemaWidth: Int,
+      stringLen: Int,
+      numBatches: Int
+  )
+
+  private case class BenchResult(
+      cfg: BenchConfig,
+      totalWallNs: Long,
+      totalTuples: Long,
+      totalBytesApprox: Long,
+      latencyP50Ns: Long,
+      latencyP95Ns: Long,
+      latencyP99Ns: Long
+  ) {
+    def tuplesPerSec: Double = totalTuples * 1e9 / totalWallNs
+    def mbPerSec: Double = totalBytesApprox * 1e9 / totalWallNs / (1024.0 * 
1024.0)
+  }
+
+  def main(args: Array[String]): Unit = {
+    val system = ActorSystem("arrow-flight-bench", AmberRuntime.pekkoConfig)
+    system.actorOf(Props[SingleNodeListener](), "cluster-info")
+
+    val configs: Seq[BenchConfig] = (for {
+      sw <- DefaultSchemaWidths
+      sl <- DefaultStringLens
+      bs <- DefaultBatchSizes
+    } yield (sw, sl, bs)).zipWithIndex.map {
+      case ((sw, sl, bs), idx) =>
+        BenchConfig(
+          idx,
+          batchSize = bs,
+          schemaWidth = sw,
+          stringLen = sl,
+          numBatches = DefaultNumBatches
+        )
+    }
+
+    println(s"[bench] sweeping ${configs.size} configurations × 
${DefaultNumBatches} batches each")
+    // Pre-create output dir + rewrite the result files after every completed
+    // config so a killed / timed-out sweep still leaves a usable artifact.
+    Files.createDirectories(OutDir)
+    val resultsBuf = scala.collection.mutable.ArrayBuffer.empty[BenchResult]
+    configs.foreach { cfg =>
+      try {
+        val r = runConfig(system, cfg)
+        resultsBuf += r
+        writeCsv(resultsBuf.toSeq)
+        writeJsonForGitHubActionBenchmark(resultsBuf.toSeq)
+      } catch {
+        case t: Throwable =>
+          println(s"[bench] FAILED config #${cfg.configIdx} ($cfg): $t")
+      }
+    }
+    printSummary(resultsBuf.toSeq)
+    Await.result(system.terminate(), 30.seconds)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // One configuration: spawn fresh worker, run warmup + timed loop, tear down.
+  // 
---------------------------------------------------------------------------
+  private def runConfig(system: ActorSystem, cfg: BenchConfig): BenchResult = {
+    val workerId =
+      VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "bench", "main", 
cfg.configIdx)
+    val downstreamId =
+      VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "benchsink", 
"main", cfg.configIdx)
+
+    val ctlChannelIn = ChannelIdentity(downstreamId, workerId, isControl = 
true)
+    val dataChannelIn = ChannelIdentity(downstreamId, workerId, isControl = 
false)
+    val dataChannelOut = ChannelIdentity(workerId, downstreamId, isControl = 
false)
+
+    val probe = TestProbe()(system)
+    val sink = system.actorOf(
+      Props(new AutoAckSink(probe.ref)),
+      name = s"bench-sink-${cfg.configIdx}"
+    )
+    val worker = system.actorOf(
+      PythonWorkflowWorker.props(WorkerConfig(workerId)),
+      name = s"bench-worker-${cfg.configIdx}"
+    )
+
+    println(s"\n[bench] config #${cfg.configIdx}: $cfg")
+
+    try {
+      val schema = makeSchema(cfg.schemaWidth)
+      val schemaMap = schema.getAttributes.map(a => a.getName -> 
a.getType.name()).toMap
+
+      var ctlSeq: Long = 0L
+      var dataSeq: Long = 0L
+      var msgId: Long = 0L
+
+      def sendCtl(payload: ControlInvocation): Unit = {
+        val fifo = WorkflowFIFOMessage(ctlChannelIn, ctlSeq, payload)
+        ctlSeq += 1
+        worker.tell(NetworkMessage(msgId, fifo), sink)
+        msgId += 1
+      }
+      def sendOnDataChannel(
+          payload: 
org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload
+      ): Unit = {
+        val fifo = WorkflowFIFOMessage(dataChannelIn, dataSeq, payload)
+        dataSeq += 1
+        worker.tell(NetworkMessage(msgId, fifo), sink)
+        msgId += 1
+      }
+
+      // 
-----------------------------------------------------------------------
+      // Setup control sequence + StartChannel ECM (see Pass 1 for details).
+      // 
-----------------------------------------------------------------------
+      val ctx = AsyncRPCContext(sender = downstreamId, receiver = workerId)
+      sendCtl(
+        ControlInvocation(
+          "InitializeExecutor",
+          InitializeExecutorRequest(
+            1,
+            OpExecWithCode(IdentityPythonCode, "python"),
+            isSource = false
+          ),
+          ctx,
+          0L
+        )
+      )
+      sendCtl(
+        ControlInvocation(
+          "AssignPort",
+          AssignPortRequest(InputPortId, input = true, schemaMap, Seq.empty, 
Seq.empty),
+          ctx,
+          1L
+        )
+      )
+      sendCtl(
+        ControlInvocation(
+          "AssignPort",
+          AssignPortRequest(OutputPortId, input = false, schemaMap, Seq.empty, 
Seq.empty),
+          ctx,
+          2L
+        )
+      )
+      sendCtl(
+        ControlInvocation(
+          "AddInputChannel",
+          AddInputChannelRequest(dataChannelIn, InputPortId),
+          ctx,
+          3L
+        )
+      )
+      val outLink = PhysicalLink(
+        fromOpId = VirtualIdentityUtils.getPhysicalOpId(workerId),
+        fromPortId = OutputPortId,
+        toOpId = VirtualIdentityUtils.getPhysicalOpId(downstreamId),
+        toPortId = InputPortId
+      )
+      sendCtl(
+        ControlInvocation(
+          "AddPartitioning",
+          AddPartitioningRequest(
+            outLink,
+            // batch_size = cfg.batchSize keeps the Python-side partitioning
+            // buffer aligned with our send size — one Java DataFrame in maps
+            // to exactly one Python DataFrame out.
+            OneToOnePartitioning(batchSize = cfg.batchSize, channels = 
Seq(dataChannelOut))
+          ),
+          ctx,
+          4L
+        )
+      )
+      sendCtl(ControlInvocation("OpenExecutor", EmptyRequest(), ctx, 5L))
+      sendCtl(ControlInvocation("StartWorker", EmptyRequest(), ctx, 6L))
+
+      waitForReturns(probe, 7, 60.seconds)
+
+      // StartChannel ECM enables data flow on the input channel.
+      val startChannelEcm = EmbeddedControlMessage(
+        id = EmbeddedControlMessageIdentity("StartChannel"),
+        ecmType = EmbeddedControlMessageType.NO_ALIGNMENT,
+        scope = Seq.empty,
+        commandMapping = Map(
+          workerId.name -> ControlInvocation(
+            "StartChannel",
+            EmptyRequest(),
+            AsyncRPCContext(ActorVirtualIdentity(""), 
ActorVirtualIdentity("")),
+            -1L
+          )
+        )
+      )
+      sendOnDataChannel(startChannelEcm)
+      // Drain the StartChannel-echo the worker forwards downstream so it
+      // doesn't show up in the data-loop's measurement window.
+      drainNonDataFor(probe, 2.seconds)
+
+      // 
-----------------------------------------------------------------------
+      // Build sample tuples once; reuse across all batches in this config.
+      // 
-----------------------------------------------------------------------
+      val sampleBatch: Array[Tuple] = buildBatch(schema, cfg.batchSize, 
cfg.stringLen)
+      val approxBytesPerBatch: Long =
+        cfg.batchSize.toLong * cfg.schemaWidth.toLong * cfg.stringLen.toLong
+
+      // Warmup — let JIT settle, Python import any lazy modules.
+      var warmedBatches = 0
+      while (warmedBatches < WarmupBatches) {
+        sendOnDataChannel(DataFrame(sampleBatch))
+        if (awaitOneDataFrameEcho(probe, 30.seconds)) warmedBatches += 1
+      }
+
+      // 
-----------------------------------------------------------------------
+      // Timed loop — per-batch latency from send to corresponding echo.
+      // Because the Python pipeline is FIFO, sending batch i then awaiting
+      // exactly one DataFrame echo gives latency_i = receive_i - send_i.
+      // 
-----------------------------------------------------------------------
+      val latencies = new Array[Long](cfg.numBatches)
+      val totalStart = System.nanoTime()
+      var i = 0
+      while (i < cfg.numBatches) {
+        val t0 = System.nanoTime()
+        sendOnDataChannel(DataFrame(sampleBatch))
+        if (!awaitOneDataFrameEcho(probe, 60.seconds)) {
+          throw new RuntimeException(s"timed out waiting for echo of batch $i")
+        }
+        latencies(i) = System.nanoTime() - t0
+        i += 1
+      }
+      val totalNs = System.nanoTime() - totalStart
+
+      val totalTuples = cfg.numBatches.toLong * cfg.batchSize.toLong
+      val totalBytes = cfg.numBatches.toLong * approxBytesPerBatch
+      val result = BenchResult(
+        cfg,
+        totalWallNs = totalNs,
+        totalTuples = totalTuples,
+        totalBytesApprox = totalBytes,
+        latencyP50Ns = percentile(latencies, 0.50),
+        latencyP95Ns = percentile(latencies, 0.95),
+        latencyP99Ns = percentile(latencies, 0.99)
+      )
+
+      printOne(result)
+      result
+    } finally {
+      worker ! PoisonPill
+      sink ! PoisonPill
+      // Give the worker a moment to tear down its Python subprocess + flight
+      // server cleanly before we move to the next config.
+      Thread.sleep(500)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Helpers
+  // 
---------------------------------------------------------------------------
+  private def makeSchema(width: Int): Schema = {
+    val attrs = (0 until width).map(i => new Attribute(s"col$i", 
AttributeType.STRING))
+    Schema(attrs.toList)
+  }
+
+  private def buildBatch(schema: Schema, batchSize: Int, stringLen: Int): 
Array[Tuple] = {
+    val arr = new Array[Tuple](batchSize)
+    val sampleVal = "x" * stringLen
+    var i = 0
+    val attrs = schema.getAttributes
+    while (i < batchSize) {
+      val b = Tuple.builder(schema)
+      var j = 0
+      while (j < attrs.size) {
+        b.add(attrs(j), sampleVal)
+        j += 1
+      }
+      arr(i) = b.build()
+      i += 1
+    }
+    arr
+  }
+
+  private def waitForReturns(probe: TestProbe, n: Int, timeout: 
FiniteDuration): Unit = {
+    val deadline = System.currentTimeMillis() + timeout.toMillis
+    var seen = 0
+    while (seen < n && System.currentTimeMillis() < deadline) {
+      probe.receiveOne(2.seconds) match {
+        case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: ReturnInvocation)) 
=>
+          seen += 1
+        case _ => // ignore acks + other
+      }
+    }
+    if (seen < n) {
+      throw new RuntimeException(s"only $seen/$n control returns within 
$timeout")
+    }
+  }
+
+  private def awaitOneDataFrameEcho(probe: TestProbe, timeout: 
FiniteDuration): Boolean = {
+    // Each iteration uses the *remaining* time, not the full timeout — so a
+    // flood of ACK / ECM messages can't extend the overall wait beyond the
+    // caller's deadline by `timeout` × N.
+    val deadline = System.nanoTime() + timeout.toNanos
+    while (true) {
+      val remainingNs = deadline - System.nanoTime()
+      if (remainingNs <= 0) return false
+      probe.receiveOne(remainingNs.nanos) match {
+        case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: DataFrame)) => 
return true
+        case null                                                       => 
return false
+        case _                                                          => // 
ignore acks, ECM forwards; loop
+      }
+    }
+    false
+  }
+
+  private def drainNonDataFor(probe: TestProbe, dur: FiniteDuration): Unit = {
+    val end = System.currentTimeMillis() + dur.toMillis
+    while (System.currentTimeMillis() < end) {
+      probe.receiveOne(200.millis) match {
+        case null => return
+        case _    => // discard
+      }
+    }
+  }
+
+  private def percentile(values: Array[Long], q: Double): Long = {
+    if (values.isEmpty) 0L
+    else {
+      val sorted = values.sorted
+      val idx = math.min(sorted.length - 1, math.max(0, (sorted.length * 
q).toInt))
+      sorted(idx)
+    }
+  }
+
+  private def printOne(r: BenchResult): Unit = {
+    val ms = r.totalWallNs / 1e6
+    println(
+      f"  -> total=${ms}%.0fms  tuples/s=${r.tuplesPerSec}%,.0f  
MB/s=${r.mbPerSec}%.2f  " +
+        f"p50=${r.latencyP50Ns / 1000.0}%.1fus  p95=${r.latencyP95Ns / 
1000.0}%.1fus  " +
+        f"p99=${r.latencyP99Ns / 1000.0}%.1fus"
+    )
+  }
+
+  private def writeCsv(results: Seq[BenchResult]): Unit = {
+    val pw = new PrintWriter(Files.newBufferedWriter(CsvOutPath))
+    try {
+      pw.println(
+        "config_idx,batch_size,schema_width,string_len,num_batches," +
+          "total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec," +
+          "lat_p50_us,lat_p95_us,lat_p99_us"
+      )
+      results.foreach { r =>
+        val c = r.cfg
+        pw.println(
+          List(
+            c.configIdx,
+            c.batchSize,
+            c.schemaWidth,
+            c.stringLen,
+            c.numBatches,
+            f"${r.totalWallNs / 1e6}%.2f",
+            r.totalTuples,
+            r.totalBytesApprox,
+            f"${r.tuplesPerSec}%.0f",
+            f"${r.mbPerSec}%.3f",
+            f"${r.latencyP50Ns / 1000.0}%.2f",
+            f"${r.latencyP95Ns / 1000.0}%.2f",
+            f"${r.latencyP99Ns / 1000.0}%.2f"
+          ).mkString(",")
+        )
+      }
+    } finally pw.close()
+    println(s"\n[bench] wrote ${results.size} rows to 
${CsvOutPath.toAbsolutePath}")
+  }
+
+  /**
+    * Emit two JSON arrays per github-action-benchmark's customBiggerIsBetter
+    * (throughput) and customSmallerIsBetter (latency) input formats. Each
+    * config produces one throughput entry and three latency entries (p50/p95/
+    * p99). Spec: https://github.com/benchmark-action/github-action-benchmark
+    */
+  private def writeJsonForGitHubActionBenchmark(results: Seq[BenchResult]): 
Unit = {
+    def cfgLabel(c: BenchConfig): String =
+      s"bs=${c.batchSize} sw=${c.schemaWidth} sl=${c.stringLen}"
+
+    def jsonEntry(name: String, unit: String, value: Double): String =
+      s"""  { "name": ${quoteJson(name)}, "unit": ${quoteJson(unit)}, "value": 
$value }"""
+
+    val throughputEntries = results.map { r =>
+      jsonEntry(s"throughput / ${cfgLabel(r.cfg)}", "tuples/sec", 
r.tuplesPerSec)
+    }
+    val latencyEntries = results.flatMap { r =>
+      Seq(
+        jsonEntry(s"latency p50 / ${cfgLabel(r.cfg)}", "us", r.latencyP50Ns / 
1000.0),
+        jsonEntry(s"latency p95 / ${cfgLabel(r.cfg)}", "us", r.latencyP95Ns / 
1000.0),
+        jsonEntry(s"latency p99 / ${cfgLabel(r.cfg)}", "us", r.latencyP99Ns / 
1000.0)
+      )
+    }
+
+    writeJsonArray(ThroughputJsonPath, throughputEntries)
+    writeJsonArray(LatencyJsonPath, latencyEntries)
+    println(
+      s"[bench] wrote ${results.size} throughput entries to 
${ThroughputJsonPath.toAbsolutePath}"
+    )
+    println(
+      s"[bench] wrote ${latencyEntries.size} latency entries to 
${LatencyJsonPath.toAbsolutePath}"
+    )
+  }
+
+  private def writeJsonArray(path: java.nio.file.Path, entries: Seq[String]): 
Unit = {
+    val pw = new PrintWriter(Files.newBufferedWriter(path))
+    try {
+      pw.println("[")
+      pw.println(entries.mkString(",\n"))
+      pw.println("]")
+    } finally pw.close()
+  }
+
+  private def quoteJson(s: String): String =
+    "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
+
+  private def printSummary(results: Seq[BenchResult]): Unit = {
+    println("\n[bench] === summary ===")
+    println(
+      f"${"#"}%3s  ${"bs"}%5s  ${"sw"}%3s  ${"sl"}%4s  ${"tuples/s"}%10s  
${"MB/s"}%7s  " +
+        f"${"p50us"}%8s  ${"p99us"}%8s"
+    )
+    results.foreach { r =>
+      println(
+        f"${r.cfg.configIdx}%3d  ${r.cfg.batchSize}%5d  
${r.cfg.schemaWidth}%3d  ${r.cfg.stringLen}%4d  " +
+          f"${r.tuplesPerSec}%,10.0f  ${r.mbPerSec}%7.2f  " +
+          f"${r.latencyP50Ns / 1000.0}%8.1f  ${r.latencyP99Ns / 1000.0}%8.1f"
+      )
+    }
+  }
+}
diff --git a/bin/run-benchmarks.sh b/bin/run-benchmarks.sh
new file mode 100755
index 0000000000..c3f843c9a3
--- /dev/null
+++ b/bin/run-benchmarks.sh
@@ -0,0 +1,59 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Single entry-point for all Texera benchmarks. CI calls this script
+# verbatim — it does NOT reference individual benchmark main classes.
+# Adding a new benchmark (e.g., a JMH suite) means appending one block
+# to this script; no CI workflow change.
+#
+# Output convention: every benchmark writes to bench-results/ with a
+# self-describing filename suffix that matches the github-action-benchmark
+# `tool` parameter expected by the publish step in build.yml:
+#   bench-results/<bench>-throughput.json  → tool: customBiggerIsBetter
+#   bench-results/<bench>-latency.json     → tool: customSmallerIsBetter
+#   bench-results/<bench>-jmh.json         → tool: jmh
+# CSV / log / debug files may live alongside; the publish matrix only
+# cares about the *.json files declared in build.yml.
+#
+# Env vars honored:
+#   BENCH_NUM_BATCHES — passes through to the e2e bench (default 100).
+#                       Lower for fast PR runs; higher for stable nightlies.
+#   UDF_PYTHON_PATH   — Python executable for the spawned worker subprocess.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
+cd "$REPO_ROOT"
+
+mkdir -p bench-results
+
+echo "=== run-benchmarks: arrow-flight-e2e ==="
+sbt --error \
+  "WorkflowExecutionService/Test/runMain 
org.apache.texera.amber.bench.ArrowFlightActorBench"
+
+# Future benchmarks: add new blocks below. Each block should self-contain
+# the run command and ensure its outputs land in bench-results/. Example
+# for a future JMH suite:
+#   echo "=== run-benchmarks: arrow-utils-jmh ==="
+#   sbt "WorkflowExecutionService/Jmh/run -rf json -rff 
$REPO_ROOT/bench-results/arrow-utils-jmh.json"
+
+echo
+echo "=== bench artifacts ==="
+ls -la bench-results/

Reply via email to