This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5682-dfa0434e70c256feb37a8d38c1d0bc134e789fdd
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 3242ac9e3f29d5ceac95443dde39a3aa81bfe3dd
Author: Yicong Huang <[email protected]>
AuthorDate: Sun Jun 14 20:18:07 2026 -0700

    test(amber): move state-mat e2e into amber-integration (#5682)
    
    ### What changes were proposed in this PR?
    
    Two things:
    
    **1. Move the state-materialization e2e tests into
    `amber-integration`.** The two e2e specs that previously ran in the
    deleted `pyamber-state-materialization-mac` diagnostic job now run under
    the existing `amber-integration` job, which already provisions postgres
    + iceberg catalog DB + MinIO + Lakekeeper and runs `pytest -m
    integration` as its last step. The test's catalog backend switches from
    sqlite `SqlCatalog` to the real postgres-backed `JdbcCatalog`, matching
    `test_iceberg_document.py:45`, so we exercise the prod catalog code path
    instead of an sqlite shim.
    
    **2. Add macOS to the `amber-integration` matrix.** We want CI coverage
    that the integration stack actually runs on macOS (where most dev
    machines live), not just Linux. GitHub-hosted macOS runners have no
    Docker, so each docker-dependent step now branches on `$RUNNER_OS`:
    macOS provisions postgres / minio / lakekeeper natively (`brew install`
    + the upstream lakekeeper `aarch64-apple-darwin` tarball — published
    from v0.11.0 onward, same version as the Linux docker tag) while Linux
    keeps the existing docker path. Protoc on macOS uses brew's arm64-native
    protobuf because protoc 3.19.4 has no arm64-mac build and running its
    x86_64 binary under Rosetta breaks the `python_betterproto` plugin (arch
    / site-packages split between Rosetta'd protoc and arm64 setup-python).
    For proto3 sources, the plugin output depends on `betterproto`, not
    protoc, so the protoc version drift on macOS is benign for codegen.
    
    | Before | After |
    | --- | --- |
    | `pyamber-state-materialization-mac` macOS diagnostic job
    (build.yml:742) | deleted |
    | `SqlCatalog` (sqlite) injected in module fixture | real
    postgres-backed `JdbcCatalog`, matching `test_iceberg_document.py:45` |
    | Test discovered by an explicit `pytest -sv <path>` from that job |
    `@pytest.mark.integration` + picked up by `amber-integration`'s `pytest
    -m integration` |
    | `amber-integration` runs only on `ubuntu-22.04` | `amber-integration`
    runs on `[ubuntu-22.04, macos-latest]` with `$RUNNER_OS`-branched
    service provisioning |
    
    `StorageConfig.initialize` is wrapped in a class-scoped autouse fixture
    (rather than called at module import time) so it co-exists with
    `test_iceberg_document.py`'s import-time `initialize` regardless of
    pytest collection order. All catalog + S3 credentials read the same
    `STORAGE_*` env vars the production code consumes (via storage.conf),
    with defaults that match storage.conf — so the test stays aligned with
    whichever identity the surrounding CI infra uses.
    
    The unit-style
    `test_process_start_channel_persists_produce_state_on_start_output` that
    the deleted mac job also ran is untouched: it monkeypatches the output
    manager and is already picked up by the regular `pyamber` job's `pytest
    -m "not integration"` step.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5681.
    
    ### How was this PR tested?
    
    Locally against the existing texera-dev infra (postgres on 5432 with
    `texera_iceberg_catalog` schema initialized via
    `sql/iceberg_postgres_catalog.sql`):
    
    ```
    cd amber && pytest -m integration --junit-xml=/tmp/junit-integration.xml -sv
    # 3 passed, 502 deselected  -- 
test_state_written_by_output_manager_is_replayed_by_reader,
    #                              
test_state_table_persists_across_writer_close,
    #                              test_rest_catalog_round_trip
    ```
    
    And the regular pyamber suite still green:
    
    ```
    cd amber && pytest -m "not integration" -q
    # 502 passed, 3 deselected
    ```
    
    In CI, the `amber-integration` job picks these tests up automatically
    because they're marked `@pytest.mark.integration`, on both
    `ubuntu-22.04` and `macos-latest`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Signed-off-by: Yicong Huang <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .github/workflows/build.yml                        | 218 ++++++++-----
 .../packaging/test_state_materialization_e2e.py    | 355 +++++++++++----------
 2 files changed, 316 insertions(+), 257 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index aa2822e187..14f2429b63 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -316,25 +316,19 @@ jobs:
     # license check stay in `amber`; this job is tests-only.
     if: ${{ inputs.run_amber_integration }}
     strategy:
+      # macOS provisions postgres / minio / lakekeeper natively because
+      # GitHub-hosted macOS runners have no Docker (and `services:`
+      # containers are Linux-only). Each docker-dependent step below
+      # branches on $RUNNER_OS inside its `run:` script: Linux keeps
+      # the docker image, macOS uses brew + the upstream
+      # aarch64-apple-darwin lakekeeper tarball.
       matrix:
-        os: [ubuntu-22.04]
+        os: [ubuntu-22.04, macos-latest]
         java-version: [17]
     runs-on: ${{ matrix.os }}
     env:
       JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
       JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M 
-Dfile.encoding=UTF-8
-    services:
-      postgres:
-        image: postgres
-        env:
-          POSTGRES_PASSWORD: postgres
-        ports:
-          - 5432:5432
-        options: >-
-          --health-cmd="pg_isready -U postgres"
-          --health-interval=10s
-          --health-timeout=5s
-          --health-retries=5
     steps:
       - name: Checkout
         uses: actions/checkout@v5
@@ -375,13 +369,59 @@ jobs:
           if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; 
fi
           if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
       - name: Install protoc
-        # Version pinned in bin/protoc-version.txt.
+        # Linux pins protoc to the version in bin/protoc-version.txt via
+        # the upstream release zip. macOS uses brew's arm64-native
+        # protobuf instead because protoc 3.19.4 has no arm64-mac build
+        # and running the x86_64 binary under Rosetta breaks
+        # protoc-gen-python_betterproto (the plugin's shebang resolves
+        # to arm64-only setup-python, and the resulting arch/site-pkg
+        # split surfaces as a silent "plugin failed status 1"). For
+        # proto3 sources the python_betterproto plugin's output depends
+        # on betterproto, not protoc, so the version drift is benign
+        # for python-proto-gen.sh — bin/python-proto-gen.sh derives the
+        # include dir from `command -v protoc`, so brew's /opt/homebrew
+        # layout is picked up automatically.
         run: |
-          PROTOC_VERSION=$(cat bin/protoc-version.txt)
-          curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
-          sudo unzip -o /tmp/protoc.zip -d /usr/local
-          sudo chmod +x /usr/local/bin/protoc
-          sudo chmod -R a+rX /usr/local/include/google
+          if [ "$RUNNER_OS" = "Linux" ]; then
+            PROTOC_VERSION=$(cat bin/protoc-version.txt)
+            curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
+            sudo unzip -o /tmp/protoc.zip -d /usr/local
+            sudo chmod +x /usr/local/bin/protoc
+            sudo chmod -R a+rX /usr/local/include
+          else
+            brew install protobuf
+          fi
+      - name: Start Postgres
+        # Replaces the job-level `services.postgres` container, which
+        # GitHub only supports on Linux runners. Both branches end with
+        # postgres listening on localhost:5432 with a `postgres` superuser
+        # (password 'postgres') so the psql steps below stay OS-agnostic.
+        # macOS uses brew's pg_hba `trust` for 127.0.0.1, which means the
+        # password is effectively ignored — same effective auth as the
+        # Linux docker image here.
+        run: |
+          if [ "$RUNNER_OS" = "Linux" ]; then
+            docker run -d --name postgres \
+              -p 5432:5432 \
+              -e POSTGRES_PASSWORD=postgres \
+              postgres
+            for i in $(seq 1 30); do
+              docker exec postgres pg_isready -U postgres -h localhost -q && 
break
+              echo "Waiting for Postgres... (attempt $i)"
+              sleep 1
+            done
+            docker exec postgres pg_isready -U postgres -h localhost -q
+          else
+            brew install postgresql@16
+            brew services start postgresql@16
+            for i in $(seq 1 30); do
+              pg_isready -h localhost -q && break
+              echo "Waiting for Postgres... (attempt $i)"
+              sleep 1
+            done
+            createuser -h localhost -s postgres
+            psql -h localhost -d postgres -c "ALTER USER postgres WITH 
PASSWORD 'postgres';"
+          fi
       - name: Create Databases
         run: |
           psql -h localhost -U postgres -f sql/texera_ddl.sql
@@ -400,43 +440,79 @@ jobs:
         env:
           PGPASSWORD: postgres
       - name: Start MinIO
+        # Linux uses the pinned docker image; macOS uses brew's native
+        # arm64 binary, backgrounded via nohup and logged to /tmp/minio.log
+        # for post-mortem if the curl health check below fails. The brew
+        # version drifts from the Linux pin, but the tests only touch
+        # S3-protocol surface that has been stable across releases.
         run: |
-          docker run -d --name minio --network host \
-            -e MINIO_ROOT_USER=texera_minio \
-            -e MINIO_ROOT_PASSWORD=password \
-            minio/minio:RELEASE.2025-02-28T09-55-16Z server /data
-          for i in $(seq 1 3); do
+          if [ "$RUNNER_OS" = "Linux" ]; then
+            docker run -d --name minio --network host \
+              -e MINIO_ROOT_USER=texera_minio \
+              -e MINIO_ROOT_PASSWORD=password \
+              minio/minio:RELEASE.2025-02-28T09-55-16Z server /data
+          else
+            brew install minio/stable/minio
+            mkdir -p /tmp/minio-data
+            MINIO_ROOT_USER=texera_minio MINIO_ROOT_PASSWORD=password \
+              nohup minio server /tmp/minio-data > /tmp/minio.log 2>&1 &
+          fi
+          for i in $(seq 1 15); do
             curl -sf http://localhost:9000/minio/health/live && break
             echo "Waiting for MinIO... (attempt $i)"
             sleep 1
           done
+          curl -sf http://localhost:9000/minio/health/live
       - name: Start Lakekeeper
+        # Linux uses the v0.11.0 docker image; macOS downloads the
+        # same-version aarch64-apple-darwin tarball that upstream
+        # publishes alongside the linux image (v0.11.0 onward). Both
+        # branches run `migrate` then `serve`, then poll the binary's
+        # built-in `healthcheck` subcommand until ready. Failure dumps
+        # the container logs (Linux) or /tmp/lakekeeper.log (macOS).
         env:
+          LAKEKEEPER_VERSION: v0.11.0
           LAKEKEEPER__PG_DATABASE_URL_READ: 
postgres://postgres:postgres@localhost:5432/texera_lakekeeper
           LAKEKEEPER__PG_DATABASE_URL_WRITE: 
postgres://postgres:postgres@localhost:5432/texera_lakekeeper
           LAKEKEEPER__PG_ENCRYPTION_KEY: texera_key
+          LAKEKEEPER__METRICS_PORT: "9091"
         run: |
-          docker run --rm --network host \
-            -e LAKEKEEPER__PG_DATABASE_URL_READ \
-            -e LAKEKEEPER__PG_DATABASE_URL_WRITE \
-            -e LAKEKEEPER__PG_ENCRYPTION_KEY \
-            vakamo/lakekeeper:v0.11.0 migrate
-          docker run -d --name lakekeeper --network host \
-            -e LAKEKEEPER__PG_DATABASE_URL_READ \
-            -e LAKEKEEPER__PG_DATABASE_URL_WRITE \
-            -e LAKEKEEPER__PG_ENCRYPTION_KEY \
-            -e LAKEKEEPER__METRICS_PORT=9091 \
-            vakamo/lakekeeper:v0.11.0 serve
-          for i in $(seq 1 3); do
-            docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && 
break
+          if [ "$RUNNER_OS" = "Linux" ]; then
+            docker run --rm --network host \
+              -e LAKEKEEPER__PG_DATABASE_URL_READ \
+              -e LAKEKEEPER__PG_DATABASE_URL_WRITE \
+              -e LAKEKEEPER__PG_ENCRYPTION_KEY \
+              vakamo/lakekeeper:${LAKEKEEPER_VERSION} migrate
+            docker run -d --name lakekeeper --network host \
+              -e LAKEKEEPER__PG_DATABASE_URL_READ \
+              -e LAKEKEEPER__PG_DATABASE_URL_WRITE \
+              -e LAKEKEEPER__PG_ENCRYPTION_KEY \
+              -e LAKEKEEPER__METRICS_PORT \
+              vakamo/lakekeeper:${LAKEKEEPER_VERSION} serve
+            healthcheck() { docker exec lakekeeper /home/nonroot/lakekeeper 
healthcheck; }
+            on_fail() { echo "Lakekeeper failed to start. Container logs:"; 
docker logs lakekeeper; }
+          else
+            curl -fsSL -o /tmp/lakekeeper.tar.gz \
+              
"https://github.com/lakekeeper/lakekeeper/releases/download/${LAKEKEEPER_VERSION}/lakekeeper-aarch64-apple-darwin.tar.gz";
+            mkdir -p /tmp/lakekeeper-bin
+            tar -xzf /tmp/lakekeeper.tar.gz -C /tmp/lakekeeper-bin
+            LAKEKEEPER_BIN=$(find /tmp/lakekeeper-bin -type f -perm -u+x -name 
lakekeeper | head -1)
+            if [ -z "$LAKEKEEPER_BIN" ]; then
+              echo "Could not find lakekeeper binary in tarball:"
+              find /tmp/lakekeeper-bin -type f
+              exit 1
+            fi
+            "$LAKEKEEPER_BIN" migrate
+            nohup "$LAKEKEEPER_BIN" serve > /tmp/lakekeeper.log 2>&1 &
+            healthcheck() { "$LAKEKEEPER_BIN" healthcheck; }
+            on_fail() { echo "Lakekeeper failed to start. Log:"; cat 
/tmp/lakekeeper.log; }
+          fi
+          for i in $(seq 1 15); do
+            healthcheck && break
             echo "Waiting for Lakekeeper... (attempt $i)"
             sleep 1
           done
-          docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || {
-            echo "Lakekeeper failed to start. Container logs:"
-            docker logs lakekeeper
-            exit 1
-          }
+          healthcheck || { on_fail; exit 1; }
       - name: Initialize Lakekeeper warehouse
         # Pull defaults out of storage.conf so this step doesn't duplicate
         # values that already live in the runtime config. Each scalar in
@@ -461,9 +537,18 @@ jobs:
           LAKEKEEPER_BASE=${REST_URI%/catalog}
           LAKEKEEPER_BASE=${LAKEKEEPER_BASE%/}
 
-          docker run --rm --network host --entrypoint sh minio/mc -c \
-            "mc alias set minio $S3_ENDPOINT $S3_USERNAME $S3_PASSWORD && \
-             mc mb --ignore-existing minio/$S3_BUCKET"
+          # bucket creation runs through `mc`; on Linux we keep the
+          # minio/mc image, on macOS we use the brew-installed native CLI
+          # since docker is unavailable.
+          if [ "$RUNNER_OS" = "Linux" ]; then
+            docker run --rm --network host --entrypoint sh minio/mc -c \
+              "mc alias set minio $S3_ENDPOINT $S3_USERNAME $S3_PASSWORD && \
+               mc mb --ignore-existing minio/$S3_BUCKET"
+          else
+            brew install minio-mc
+            mc alias set minio "$S3_ENDPOINT" "$S3_USERNAME" "$S3_PASSWORD"
+            mc mb --ignore-existing "minio/$S3_BUCKET"
+          fi
           curl -sf -X POST -H 'Content-Type: application/json' \
             -d 
'{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}'
 \
             "$LAKEKEEPER_BASE/management/v1/project" || true
@@ -763,49 +848,6 @@ jobs:
           disable_search: true
           fail_ci_if_error: false
 
-  pyamber-state-materialization-mac:
-    # Diagnostic leg: cross-region state materialization is reported to
-    # fail on macOS while working on Windows / Linux. The main `pyamber`
-    # job above runs only on ubuntu-latest because it depends on a
-    # postgres service container (service containers don't work on
-    # macOS runners). The state-materialization integration tests use
-    # an in-process sqlite-backed SqlCatalog instead, so we can run
-    # them on macOS without postgres infra. If they fail here but pass
-    # in the main `pyamber` job, we've reproduced the macOS-specific
-    # regression in CI.
-    if: ${{ inputs.run_pyamber }}
-    runs-on: macos-latest
-    steps:
-      - name: Checkout Texera
-        uses: actions/checkout@v5
-        with:
-          ref: ${{ inputs.checkout_ref || github.sha }}
-          fetch-depth: 0
-      - name: Prepare backport workspace
-        if: ${{ inputs.backport_target_branch != '' }}
-        run: bash ./.github/scripts/prepare-backport-checkout.sh "${{ 
inputs.backport_target_branch }}" "${{ inputs.backport_commit_range }}"
-      - name: Set up Python 3.12
-        uses: actions/setup-python@v6
-        with:
-          python-version: "3.12"
-      - name: Install dependencies
-        run: |
-          python -m pip install uv
-          if [ -f amber/requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
-          if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; 
fi
-          if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
-r amber/dev-requirements.txt; fi
-      - name: Install protoc
-        # Homebrew protoc; this job doesn't exercise scalapb so the
-        # bin/protoc-version.txt pin doesn't apply here.
-        run: brew install protobuf
-      - name: Generate Python proto bindings
-        run: bash bin/python-proto-gen.sh
-      - name: Run state-materialization integration tests
-        run: |
-          cd amber && pytest -sv \
-            
src/test/python/core/architecture/packaging/test_state_materialization_e2e.py \
-            
src/test/python/core/runnables/test_main_loop.py::TestMainLoop::test_process_start_channel_persists_produce_state_on_start_output
-
   agent-service:
     if: ${{ inputs.run_agent_service }}
     name: ${{ format('agent-service{0} ({1})', inputs.job_name_suffix, 
matrix.os) }}
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
index cfc4f7f676..8613be95b1 100644
--- 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
+++ 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -25,7 +25,7 @@ This test wires:
 
     OutputManager.set_up_port_storage_writer(port, base_uri)
        → real PortStorageWriter thread
-       → real IcebergTableWriter (sqlite-backed SqlCatalog)
+       → real IcebergTableWriter (postgres-backed JdbcCatalog)
        → state document at VFSURIFactory.state_uri(base_uri)
        → InputPortMaterializationReaderRunnable.run()
        → DataElement(StateFrame) on the consumer's input queue
@@ -33,14 +33,20 @@ This test wires:
 and asserts that a state put through `save_state_to_storage_if_needed`
 on the producer side actually arrives at the consumer's queue, with the
 same payload.
+
+Marked @integration so the CI runner that has postgres + iceberg
+catalog DB provisioned (amber-integration) picks it up via
+`pytest -m integration`. Earlier versions of this test substituted a
+sqlite-backed SqlCatalog to dodge that infra dependency; that diverged
+from the prod catalog code path, so we now exercise the real one.
 """
 
+import os
 import tempfile
 import threading
 import uuid
 
 import pytest
-from pyiceberg.catalog.sql import SqlCatalog
 
 from core.architecture.packaging.output_manager import OutputManager
 from core.models import State, StateFrame
@@ -68,192 +74,203 @@ from 
proto.org.apache.texera.amber.engine.architecture.sendsemantics import (
 )
 
 
-# Module-level scratch dir for the sqlite catalog + iceberg warehouse.
-# We don't initialize `StorageConfig` here: other test modules (e.g.
-# test_iceberg_document.py) also call `StorageConfig.initialize` at
-# import time, and the class rejects re-initialization with
-# RuntimeError. Whichever module gets collected first wins; we adopt
-# its namespaces below.
-_WAREHOUSE_DIR = tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-")
[email protected]
+class TestStateMaterializationE2E:
+    @pytest.fixture(autouse=True, scope="class")
+    def _init_storage_config(self):
+        """Initialize StorageConfig + IcebergCatalogInstance for the real
+        postgres-backed catalog in the `amber-integration` CI job.
+
+        Critical detail: the Scala integration tests that run earlier in
+        the same job connect to the iceberg catalog DB as user
+        `postgres/postgres` (the storage.conf default for
+        `STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME/PASSWORD`). pyiceberg
+        creates the catalog's `iceberg_tables` metadata table on first
+        use, owned by whoever wrote first — so it ends up owned by
+        `postgres`. We MUST connect as the same user, otherwise we hit
+        `permission denied for table iceberg_tables`.
 
+        Why the reset: `test_iceberg_document.py` also calls
+        `StorageConfig.initialize` at module import time (with a
+        different `texera/password` user that works for it because no
+        Scala writes first in the `pyamber` job where it runs). pytest
+        imports every test module during collection, even ones whose
+        tests will be deselected by `-m integration`, so that
+        initialization happens here too. We force-reset the singletons
+        and re-init with the prod-correct credentials; safe because
+        test_iceberg_document's tests are deselected from this run.
 
[email protected](scope="module", autouse=True)
-def sqlite_iceberg_catalog():
-    """Inject a sqlite-backed SqlCatalog so the test runs without external
-    iceberg infra (postgres/minio).
+        All catalog + S3 settings read the same `STORAGE_*` env vars
+        the production code consumes (via storage.conf), so the test
+        matches whichever identity the Scala side uses in the same job
+        and stays aligned with the bucket / endpoint the workflow
+        provisions. Defaults mirror storage.conf so a local sbt run
+        without those vars exported still works.
 
-    Module-scoped so all tests in this file share one warehouse, and so
-    namespace creation only happens once. We save/restore the original
-    `IcebergCatalogInstance` singleton so other test modules that expect
-    a real postgres-backed catalog (e.g. test_iceberg_document.py) are
-    not affected by our replacement.
-    """
-    # Some other test module may have initialized StorageConfig already
-    # (it has a single-init lock). If nothing has initialized it yet,
-    # do it here with arbitrary values -- we replace the catalog
-    # instance below so the postgres/rest fields are never exercised.
-    if not StorageConfig._initialized:
+        Class-scoped so the reset + tempdir allocation happens once
+        per class; the two tests in this class share state through the
+        same StorageConfig singleton anyway.
+        """
+        StorageConfig._initialized = False
+        IcebergCatalogInstance._instance = None
+        large_binaries_bucket = os.environ.get(
+            "STORAGE_S3_LARGE_BINARIES_BUCKET", "texera-large-binaries"
+        )
         StorageConfig.initialize(
             catalog_type="postgres",
-            postgres_uri_without_scheme="unused",
-            postgres_username="unused",
-            postgres_password="unused",
-            rest_catalog_uri="unused",
-            rest_catalog_warehouse_name="unused",
+            postgres_uri_without_scheme=os.environ.get(
+                "STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME",
+                "localhost:5432/texera_iceberg_catalog",
+            ),
+            postgres_username=os.environ.get(
+                "STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME", "postgres"
+            ),
+            postgres_password=os.environ.get(
+                "STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD", "postgres"
+            ),
+            rest_catalog_uri="http://localhost:8181/catalog/";,
+            rest_catalog_warehouse_name="texera",
             table_result_namespace="operator-port-result",
             table_state_namespace="operator-port-state",
-            directory_path=_WAREHOUSE_DIR,
+            
directory_path=tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-"),
             commit_batch_size=4096,
-            s3_endpoint="unused",
-            s3_region="unused",
-            s3_auth_username="unused",
-            s3_auth_password="unused",
-            s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
+            s3_endpoint=os.environ.get("STORAGE_S3_ENDPOINT", 
"http://localhost:9000";),
+            s3_region=os.environ.get("STORAGE_S3_REGION", "us-west-2"),
+            s3_auth_username=os.environ.get("STORAGE_S3_AUTH_USERNAME", 
"texera_minio"),
+            s3_auth_password=os.environ.get("STORAGE_S3_AUTH_PASSWORD", 
"password"),
+            
s3_large_binaries_base_uri=f"s3://{large_binaries_bucket}/objects/0/",
         )
 
-    original_instance = IcebergCatalogInstance._instance
-    db_path = f"{_WAREHOUSE_DIR}/catalog.sqlite"
-    catalog = SqlCatalog(
-        "texera_iceberg_e2e",
-        **{
-            "uri": f"sqlite:///{db_path}",
-            "warehouse": f"file://{_WAREHOUSE_DIR}",
-        },
-    )
-    # Adopt whatever namespaces StorageConfig already has -- those are
-    # the ones DocumentFactory will route into.
-    
catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE)
-    
catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE)
-    IcebergCatalogInstance.replace_instance(catalog)
-    try:
-        yield catalog
-    finally:
-        IcebergCatalogInstance.replace_instance(original_instance)
-
-
-def _fresh_base_uri() -> str:
-    """A unique port-base URI per test so tables don't collide."""
-    return VFSURIFactory.create_port_base_uri(
-        WorkflowIdentity(id=0),
-        ExecutionIdentity(id=0),
-        GlobalPortIdentity(
-            op_id=PhysicalOpIdentity(
-                logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"),
-                layer_name="main",
+    @pytest.fixture
+    def base_uri(self) -> str:
+        """A unique port-base URI per test so tables don't collide."""
+        return VFSURIFactory.create_port_base_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    
logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0, internal=False),
+                input=False,
             ),
-            port_id=PortIdentity(id=0, internal=False),
-            input=False,
-        ),
-    )
-
-
-def test_state_written_by_output_manager_is_replayed_by_reader():
-    """Producer side writes a state via OutputManager; consumer side reads
-    it via InputPortMaterializationReaderRunnable. The state must arrive
-    on the consumer's input queue intact.
-    """
-    base_uri = _fresh_base_uri()
-    port_id = PortIdentity(id=0, internal=False)
-    worker_schema_for_result = State.SCHEMA  # producer-side: only state 
matters
-
-    # 1. RegionExecutionCoordinator's responsibility: provision result +
-    # state documents at the port base URI before any worker starts.
-    # We emulate that here.
-    DocumentFactory.create_document(
-        VFSURIFactory.result_uri(base_uri), worker_schema_for_result
-    )
-    DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
-
-    # 2. Producer side: spin up an OutputManager, set up real state +
-    # result writer threads against the iceberg storage.
-    producer = OutputManager(worker_id="Worker:WF0-test-producer-main-0")
-    producer.add_output_port(
-        port_id, schema=worker_schema_for_result, storage_uri_base=base_uri
-    )
-
-    # 3. Drive a state through the producer-side path.
-    state = State({"flag": True, "loop_counter": 7, "name": "outer"})
-    producer.save_state_to_storage_if_needed(state)
-
-    # 4. Force the writer threads to flush + commit by closing them.
-    # Without this, the iceberg buffer holds the state in memory and
-    # nothing is durable yet.
-    producer.close_port_storage_writers()
+        )
 
-    # 5. Consumer side: spin up the materialization reader against the
-    # same base URI. Each reader needs a partitioning even when no real
-    # downstream worker exists -- supply a OneToOnePartitioning whose
-    # only receiver is the consumer worker itself.
-    consumer_worker = ActorVirtualIdentity(name="consumer-worker-0")
-    consumer_queue = InternalQueue()
-    partitioning = Partitioning(
-        one_to_one_partitioning=OneToOnePartitioning(
-            batch_size=400,
-            channels=[
-                ChannelIdentity(
-                    
from_worker_id=ActorVirtualIdentity(name="producer-worker-0"),
-                    to_worker_id=consumer_worker,
-                    is_control=False,
-                )
-            ],
+    @pytest.fixture
+    def producer(self, base_uri):
+        """An OutputManager wired to the iceberg result + state documents
+        at `base_uri`. Closes its writer threads on teardown so cached
+        buffers are flushed even if a test errors out before
+        `close_port_storage_writers()`.
+        """
+        # RegionExecutionCoordinator's responsibility in prod: provision
+        # result + state documents at the port base URI before any
+        # worker starts. We emulate that here.
+        DocumentFactory.create_document(
+            VFSURIFactory.result_uri(base_uri), State.SCHEMA
         )
-    )
-    reader = InputPortMaterializationReaderRunnable(
-        uri=base_uri,
-        queue=consumer_queue,
-        worker_actor_id=consumer_worker,
-        partitioning=partitioning,
-    )
+        DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
 
-    # Run the reader on a worker thread so we can time out cleanly if
-    # something goes wrong.
-    reader_thread = threading.Thread(target=reader.run, daemon=True)
-    reader_thread.start()
-    reader_thread.join(timeout=30)
-    assert not reader_thread.is_alive(), "reader did not finish within timeout"
-    assert reader.finished(), "reader exited but did not mark itself finished"
+        mgr = OutputManager(worker_id="Worker:WF0-test-producer-main-0")
+        mgr.add_output_port(
+            PortIdentity(id=0, internal=False),
+            schema=State.SCHEMA,
+            storage_uri_base=base_uri,
+        )
+        try:
+            yield mgr
+        finally:
+            # close_port_storage_writers is idempotent — fine to call
+            # again here if the test already closed.
+            try:
+                mgr.close_port_storage_writers()
+            except Exception:
+                pass
 
-    # 6. Drain the consumer's queue and find the StateFrame(s).
-    state_frames: list[State] = []
-    while not consumer_queue.is_empty():
-        elem = consumer_queue.get()
-        if isinstance(elem, DataElement) and isinstance(elem.payload, 
StateFrame):
-            state_frames.append(elem.payload.frame)
+    def test_state_written_by_output_manager_is_replayed_by_reader(
+        self, base_uri, producer
+    ):
+        """Producer side writes a state via OutputManager; consumer side
+        reads it via InputPortMaterializationReaderRunnable. The state
+        must arrive on the consumer's input queue intact.
+        """
+        # Drive a state through the producer-side path.
+        state = State({"flag": True, "loop_counter": 7, "name": "outer"})
+        producer.save_state_to_storage_if_needed(state)
 
-    assert len(state_frames) == 1, (
-        f"expected exactly one State to flow through writer→iceberg→reader; "
-        f"got {len(state_frames)}: {state_frames}"
-    )
-    assert state_frames[0] == state, (
-        f"replayed state did not match what was written; "
-        f"wrote={state}, read={state_frames[0]}"
-    )
+        # Force the writer threads to flush + commit by closing them.
+        # Without this, the iceberg buffer holds the state in memory
+        # and nothing is durable yet.
+        producer.close_port_storage_writers()
 
+        # Consumer side: spin up the materialization reader against the
+        # same base URI. Each reader needs a partitioning even when no
+        # real downstream worker exists — supply a OneToOnePartitioning
+        # whose only receiver is the consumer worker itself.
+        consumer_worker = ActorVirtualIdentity(name="consumer-worker-0")
+        consumer_queue = InternalQueue()
+        partitioning = Partitioning(
+            one_to_one_partitioning=OneToOnePartitioning(
+                batch_size=400,
+                channels=[
+                    ChannelIdentity(
+                        
from_worker_id=ActorVirtualIdentity(name="producer-worker-0"),
+                        to_worker_id=consumer_worker,
+                        is_control=False,
+                    )
+                ],
+            )
+        )
+        reader = InputPortMaterializationReaderRunnable(
+            uri=base_uri,
+            queue=consumer_queue,
+            worker_actor_id=consumer_worker,
+            partitioning=partitioning,
+        )
 
-def test_state_table_persists_across_writer_close():
-    """Independently verify the iceberg state table contains the row.
-    If this passes but the reader test above fails, the bug is in the
-    reader / consumer wiring; if this fails, the bug is in the writer /
-    storage layer.
-    """
-    base_uri = _fresh_base_uri()
-    port_id = PortIdentity(id=0, internal=False)
+        # Run the reader on a worker thread so we can time out cleanly
+        # if something goes wrong.
+        reader_thread = threading.Thread(target=reader.run, daemon=True)
+        reader_thread.start()
+        reader_thread.join(timeout=30)
+        assert not reader_thread.is_alive(), "reader did not finish within 
timeout"
+        assert reader.finished(), "reader exited but did not mark itself 
finished"
 
-    DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri), 
State.SCHEMA)
-    DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), 
State.SCHEMA)
+        # Drain the consumer's queue and find the StateFrame(s).
+        state_frames: list[State] = []
+        while not consumer_queue.is_empty():
+            elem = consumer_queue.get()
+            if isinstance(elem, DataElement) and isinstance(elem.payload, 
StateFrame):
+                state_frames.append(elem.payload.frame)
 
-    producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0")
-    producer.add_output_port(port_id, schema=State.SCHEMA, 
storage_uri_base=base_uri)
+        assert len(state_frames) == 1, (
+            f"expected exactly one State to flow through 
writer→iceberg→reader; "
+            f"got {len(state_frames)}: {state_frames}"
+        )
+        assert state_frames[0] == state, (
+            f"replayed state did not match what was written; "
+            f"wrote={state}, read={state_frames[0]}"
+        )
 
-    state = State({"flag": False, "checkpoint": 42})
-    producer.save_state_to_storage_if_needed(state)
-    producer.close_port_storage_writers()
+    def test_state_table_persists_across_writer_close(self, base_uri, 
producer):
+        """Independently verify the iceberg state table contains the row.
+        If this passes but the reader test above fails, the bug is in
+        the reader / consumer wiring; if this fails, the bug is in the
+        writer / storage layer.
+        """
+        state = State({"flag": False, "checkpoint": 42})
+        producer.save_state_to_storage_if_needed(state)
+        producer.close_port_storage_writers()
 
-    # Read directly from the iceberg state document, bypassing the reader.
-    state_document, _ = 
DocumentFactory.open_document(VFSURIFactory.state_uri(base_uri))
-    rows = list(state_document.get())
-    assert len(rows) == 1, (
-        f"expected exactly one row in the iceberg state table after the "
-        f"writer was closed; got {len(rows)} rows"
-    )
-    assert State.from_tuple(rows[0]) == state
+        # Read directly from the iceberg state document, bypassing the
+        # reader.
+        state_document, _ = DocumentFactory.open_document(
+            VFSURIFactory.state_uri(base_uri)
+        )
+        rows = list(state_document.get())
+        assert len(rows) == 1, (
+            f"expected exactly one row in the iceberg state table after "
+            f"the writer was closed; got {len(rows)} rows"
+        )
+        assert State.from_tuple(rows[0]) == state


Reply via email to