huaxingao commented on code in PR #3803:
URL: https://github.com/apache/polaris/pull/3803#discussion_r2876195745


##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyFilter.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.vertx.core.Vertx;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.container.ContainerResponseContext;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.SecurityContext;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.resolver.Resolver;
+import org.apache.polaris.core.persistence.resolver.ResolverFactory;
+import org.apache.polaris.core.persistence.resolver.ResolverStatus;
+import org.apache.polaris.service.catalog.CatalogPrefixParser;
+import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter;
+import org.apache.polaris.service.context.RealmContextFilter;
+import org.jboss.logging.Logger;
+import org.jboss.resteasy.reactive.server.ServerRequestFilter;
+import org.jboss.resteasy.reactive.server.ServerResponseFilter;
+import org.jboss.resteasy.reactive.server.core.CurrentRequestManager;
+import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
+import org.jboss.resteasy.reactive.server.spi.ResteasyReactiveResourceInfo;
+
+/**
+ * HTTP idempotency integration at the request/response layer.
+ *
+ * <p>For in-scope requests (see {@link IdempotencyConfiguration#scopes()}), 
this filter reserves an
+ * idempotency key before executing the request, and replays the previously 
finalized response when
+ * a duplicate key is received. For owned requests, it finalizes the response 
summary/headers for
+ * future replay.
+ *
+ * <p>Replayed responses use the full persisted response entity; responses are 
not truncated.
+ */
+public class IdempotencyFilter {
+  private static final Logger LOG = Logger.getLogger(IdempotencyFilter.class);
+
+  private static final Set<Integer> NEVER_FINALIZE_STATUSES = Set.of(401, 403, 
408, 429);
+
+  private static final long WARN_THROTTLE_MS = 60_000L;
+  private static final AtomicLong LAST_WARN_AT_MS = new AtomicLong(0L);
+
+  private static final String PROP_IDEMPOTENCY_KEY = "idempotency.key";
+  private static final String PROP_IDEMPOTENCY_OPERATION = 
"idempotency.operation";
+  private static final String PROP_IDEMPOTENCY_RESOURCE = 
"idempotency.resource";
+  private static final String PROP_IDEMPOTENCY_OWNED = "idempotency.owned";
+  private static final String PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID = 
"idempotency.heartbeat.timerId";
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStore store;
+  @Inject Clock clock;
+  @Inject ObjectMapper objectMapper;
+  @Inject Vertx vertx;
+  @Inject ResolverFactory resolverFactory;
+  @Inject CatalogPrefixParser prefixParser;
+
+  private volatile String resolvedExecutorId;
+
+  @ServerRequestFilter(priority = Priorities.AUTHORIZATION + 10)
+  public Uni<Response> reserveOrReplay(ContainerRequestContext rc) {
+    if (!configuration.enabled()) {
+      return Uni.createFrom().nullItem();
+    }
+
+    SecurityContext securityContext = rc.getSecurityContext();
+
+    String rawKey = rc.getHeaderString(configuration.keyHeader());
+    if (rawKey == null || rawKey.isBlank()) {
+      return Uni.createFrom().nullItem();
+    }
+    String key = normalizeIdempotencyKey(rawKey);
+    if (key == null) {
+      return Uni.createFrom()
+          .item(
+              error(
+                  400,
+                  "idempotency_key_invalid",
+                  "Idempotency-Key must be a UUIDv7 string (RFC 9562)"));
+    }
+
+    boolean scopedMode = !configuration.scopes().isEmpty();
+    if (!scopedMode) {
+      // Without an explicit scope allowlist, only apply idempotency to 
Iceberg REST catalog
+      // mutating endpoints under /v1/{prefix}/...
+      //
+      // This check is best-effort because request-to-resource matching 
metadata may be unavailable
+      // in some cases; if we can't confidently identify an Iceberg mutation 
endpoint we fail closed
+      // (treat idempotency as a no-op) to avoid false positives.
+      if (!isMutatingMethod(rc.getMethod()) || !isIcebergMutationEndpoint(rc)) 
{
+        return Uni.createFrom().nullItem();
+      }
+    }
+
+    RealmContext realmContext = (RealmContext) 
rc.getProperty(RealmContextFilter.REALM_CONTEXT_KEY);
+    if (realmContext == null) {
+      // RealmContextFilter should run before this; treat missing realm as a 
server error.
+      return Uni.createFrom().item(error(500, "MissingRealmContext", "Missing 
realm context"));
+    }
+
+    // If scopes are configured, apply idempotency only to matching endpoints 
and use the configured
+    // operationType for stable binding.
+    IdempotencyConfiguration.Scope scope = matchScope(rc);
+    if (scope == null && scopedMode) {
+      return Uni.createFrom().nullItem();
+    }
+
+    PolarisPrincipal principal =
+        securityContext != null && securityContext.getUserPrincipal() 
instanceof PolarisPrincipal p
+            ? p
+            : null;
+    final String requestPath = rc.getUriInfo().getPath();
+    Uni<Boolean> internalCatalogCheck =
+        principal == null
+            ? Uni.createFrom().item(true)
+            : Uni.createFrom()
+                .item(() -> 
isPolarisManagedInternalIcebergCatalogRequest(requestPath, principal))
+                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
+                .onFailure()
+                .recoverWithItem(false);
+
+    String realmId = realmContext.getRealmIdentifier();
+    String operationType = scope == null ? normalizeOperationType(rc) : 
scope.operationType();
+    String resourceId = normalizeResourceId(rc);
+
+    Instant now = clock.instant();
+    Instant expiresAt = 
now.plus(configuration.ttl()).plus(configuration.ttlGrace());
+
+    return internalCatalogCheck
+        .onItem()
+        .transformToUni(
+            internal -> {
+              if (!internal) {
+                // For federated/external catalogs, Polaris may not enforce 
idempotency end-to-end,
+                // so treat Idempotency-Key as a no-op (do not 
reserve/finalize/replay).
+                return Uni.createFrom().nullItem();
+              }
+              return Uni.createFrom()
+                  .item(
+                      () ->
+                          store.reserve(
+                              realmId,
+                              key,
+                              operationType,
+                              resourceId,
+                              expiresAt,
+                              executorId(),
+                              now))
+                  .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
+                  .onItem()
+                  .transformToUni(
+                      r -> {
+                        if (r.type() == 
IdempotencyStore.ReserveResultType.OWNED) {
+                          rc.setProperty(PROP_IDEMPOTENCY_KEY, key);
+                          rc.setProperty(PROP_IDEMPOTENCY_OPERATION, 
operationType);
+                          rc.setProperty(PROP_IDEMPOTENCY_RESOURCE, 
resourceId);
+                          rc.setProperty(PROP_IDEMPOTENCY_OWNED, Boolean.TRUE);
+                          maybeStartHeartbeat(rc, realmId, key);
+                          return Uni.createFrom().nullItem();
+                        }
+
+                        Optional<IdempotencyRecord> existingOpt = r.existing();
+                        if (existingOpt.isEmpty()) {
+                          // Should not happen: DUPLICATE should always return 
an existing record.
+                          return Uni.createFrom()
+                              .item(
+                                  error(
+                                      500,
+                                      "IdempotencyInvariantViolation",
+                                      "Missing existing record"));
+                        }
+
+                        IdempotencyRecord existing = existingOpt.get();
+                        if (!operationType.equals(existing.operationType())
+                            || 
!resourceId.equals(existing.normalizedResourceId())) {
+                          return Uni.createFrom()
+                              .item(
+                                  error(
+                                      422,
+                                      "idempotency_key_conflict",
+                                      "Idempotency key was already used for a 
different operation/resource"));
+                        }
+
+                        if (!existing.isFinalized()) {
+                          if (configuration.heartbeatEnabled()) {
+                            // If the owner appears stale (no recent 
heartbeat), don't wait
+                            // indefinitely. Full reconciliation/takeover is 
out of scope for this
+                            // change; return a retryable 503.
+                            Instant checkNow = clock.instant();
+                            Instant lastSignal = existing.heartbeatAt();
+                            if (lastSignal == null) {
+                              // A duplicate can arrive before the first 
heartbeat; fall back to
+                              // updatedAt/createdAt so we don't treat fresh 
owners as stale.
+                              lastSignal = existing.updatedAt();
+                              if (lastSignal == null) {
+                                lastSignal = existing.createdAt();
+                              }
+                            }
+                            if (lastSignal == null
+                                || Duration.between(lastSignal, checkNow)
+                                        .compareTo(configuration.leaseTtl())
+                                    > 0) {
+                              return Uni.createFrom()
+                                  .item(
+                                      error(
+                                          503,
+                                          "idempotency_in_progress",
+                                          "Operation with this idempotency key 
may be stale; retry later"));
+                            }
+                          }
+                          Instant deadline = 
clock.instant().plus(configuration.inProgressWait());
+                          return waitForFinalized(realmId, key, deadline)
+                              .onItem()
+                              .transform(this::replayResponse)
+                              .onFailure(TimeoutException.class)
+                              .recoverWithItem(
+                                  error(
+                                      503,
+                                      "idempotency_in_progress",
+                                      "Operation with this idempotency key is 
still in progress; retry later"))
+                              .onFailure()
+                              .recoverWithItem(
+                                  error(
+                                      503,
+                                      "idempotency_store_unavailable",
+                                      "Idempotency store unavailable; retry 
later"));
+                        }
+
+                        return Uni.createFrom().item(replayResponse(existing));
+                      })
+                  .onFailure()
+                  .recoverWithItem(
+                      error(
+                          503,
+                          "idempotency_store_unavailable",
+                          "Idempotency store unavailable; retry later"));
+            });
+  }
+
+  @ServerResponseFilter
+  public void finalizeOwned(ContainerRequestContext request, 
ContainerResponseContext response) {
+    if (!configuration.enabled()) {
+      return;
+    }
+    if (!Boolean.TRUE.equals(request.getProperty(PROP_IDEMPOTENCY_OWNED))) {
+      return;
+    }
+
+    stopHeartbeat(request);
+
+    RealmContext realmContext =
+        (RealmContext) 
request.getProperty(RealmContextFilter.REALM_CONTEXT_KEY);
+    if (realmContext == null) {
+      return;
+    }
+
+    String realmId = realmContext.getRealmIdentifier();
+    String key = (String) request.getProperty(PROP_IDEMPOTENCY_KEY);
+    if (key == null) {
+      return;
+    }
+
+    int status = response.getStatus();
+    if (!shouldFinalize(status)) {
+      // Do not finalize/replay for 401/403/408/429 or any 5xx. If we reserved 
this key for such a
+      // request, release the in-progress record so the key does not linger 
and block retries.
+      if (shouldCancelInProgressReservation(status)) {
+        Infrastructure.getDefaultWorkerPool()
+            .execute(
+                () -> {
+                  try {
+                    store.cancelInProgressReservation(realmId, key, 
executorId());
+                  } catch (RuntimeException e) {
+                    warnThrottled(
+                        e,
+                        "Failed to cancel in-progress idempotency reservation; 
replay may be unavailable");
+                  }
+                });
+      }
+      return;
+    }
+    final String body = responseEntityAsString(response, objectMapper);
+    final Map<String, String> headers =
+        headerSnapshot(response, configuration.responseHeaderAllowlist());
+    Instant now = clock.instant();
+
+    Infrastructure.getDefaultWorkerPool()
+        .execute(
+            () -> {
+              try {
+                store.finalizeRecord(realmId, key, executorId(), status, null, 
body, headers, now);
+              } catch (RuntimeException e) {
+                warnThrottled(
+                    e,
+                    "Failed to finalize idempotency record; replay may be 
unavailable until a later retry or reconciliation");
+              }
+            });
+  }
+
+  private void maybeStartHeartbeat(ContainerRequestContext rc, String realmId, 
String key) {
+    if (!configuration.heartbeatEnabled()) {
+      return;
+    }
+    long intervalMs = configuration.heartbeatInterval().toMillis();
+    long timerId =
+        vertx.setPeriodic(
+            intervalMs,
+            ignored -> {
+              Infrastructure.getDefaultWorkerPool()
+                  .execute(
+                      () -> {
+                        try {
+                          store.updateHeartbeat(realmId, key, executorId(), 
clock.instant());
+                        } catch (RuntimeException ignored2) {
+                          // Best-effort.
+                        }
+                      });
+            });
+    rc.setProperty(PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID, timerId);
+  }
+
+  private String executorId() {
+    String cached = resolvedExecutorId;
+    if (cached != null) {
+      return cached;
+    }
+    String fromConfig = configuration.executorId().orElse(null);
+    if (fromConfig != null && !fromConfig.isBlank()) {
+      resolvedExecutorId = fromConfig;
+      return fromConfig;
+    }
+    String computed = defaultExecutorId();
+    resolvedExecutorId = computed;
+    return computed;
+  }
+
+  private static String defaultExecutorId() {
+    String pid = String.valueOf(ProcessHandle.current().pid());
+    String node =
+        firstNonBlank(
+            System.getenv("POD_NAME"), System.getenv("HOSTNAME"), 
System.getenv("NODE_NAME"));
+    if (node != null) {
+      return node + "-" + pid;
+    }
+    try {
+      return InetAddress.getLocalHost().getHostName() + "-" + pid;
+    } catch (Exception e) {
+      return "pid-" + pid;
+    }
+  }
+
+  private static String firstNonBlank(String... values) {
+    if (values == null) {
+      return null;
+    }
+    for (String v : values) {
+      if (v != null && !v.isBlank()) {
+        return v;
+      }
+    }
+    return null;
+  }
+
+  private void stopHeartbeat(ContainerRequestContext request) {
+    Object tid = request.getProperty(PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID);
+    if (tid instanceof Long l) {
+      vertx.cancelTimer(l);
+    }
+  }
+
+  private IdempotencyConfiguration.Scope matchScope(ContainerRequestContext 
rc) {
+    String method = rc.getMethod();
+    String path = rc.getUriInfo().getPath();
+    if (method == null || path == null) {
+      return null;
+    }
+    String normalizedPath = path.startsWith("/") ? path.substring(1) : path;
+    for (IdempotencyConfiguration.Scope s : configuration.scopes()) {
+      if (s == null) {
+        continue;
+      }
+      if (!method.equalsIgnoreCase(s.method())) {
+        continue;
+      }
+      String prefix = s.pathPrefix();
+      if (prefix == null || prefix.isBlank()) {
+        continue;
+      }
+      String normalizedPrefix = prefix.startsWith("/") ? prefix.substring(1) : 
prefix;
+      if (normalizedPath.startsWith(normalizedPrefix)) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  private static String normalizeOperationType(ContainerRequestContext rc) {
+    // Fallback for when scopes are not configured: use the matched JAX-RS 
resource method
+    // (stable across resource IDs), falling back to the HTTP method if 
unavailable.
+    try {
+      ResteasyReactiveRequestContext ctx = CurrentRequestManager.get();
+      if (ctx != null) {
+        ResteasyReactiveResourceInfo info = 
ctx.getResteasyReactiveResourceInfo();
+        if (info != null) {
+          Class<?> resourceClass = info.getResourceClass();
+          Method resourceMethod = info.getResourceMethod();
+          if (resourceClass != null && resourceMethod != null) {
+            return resourceClass.getName() + "#" + resourceMethod.getName();
+          }
+        }
+      }
+    } catch (Throwable ignored) {
+      // Best-effort; fall back below.
+    }
+    return rc.getMethod().toLowerCase(Locale.ROOT);
+  }
+
+  private static boolean shouldFinalize(int httpStatus) {
+    // - Finalize & replay: 200, 201, 204, and deterministic terminal 4xx
+    // - Never finalize (not stored/replayed): 401, 403, 408, 429, all 5xx
+    if (httpStatus == 200 || httpStatus == 201 || httpStatus == 204) {
+      return true;
+    }
+    if (httpStatus >= 400 && httpStatus < 500) {
+      return !NEVER_FINALIZE_STATUSES.contains(httpStatus);
+    }
+    return false;
+  }
+
+  private static boolean shouldCancelInProgressReservation(int httpStatus) {
+    return httpStatus >= 500 || NEVER_FINALIZE_STATUSES.contains(httpStatus);
+  }
+
+  private static void warnThrottled(Throwable t, String message) {
+    long now = System.currentTimeMillis();
+    long last = LAST_WARN_AT_MS.get();
+    if (now - last >= WARN_THROTTLE_MS && LAST_WARN_AT_MS.compareAndSet(last, 
now)) {
+      LOG.warn(message, t);
+    } else {
+      LOG.debug(message, t);
+    }
+  }
+
+  /**
+   * Normalize and validate the idempotency key.
+   *
+   * <p>Idempotency keys must be UUIDv7 in string form; normalizing to {@link 
UUID#toString()}
+   * avoids accidental mismatches caused by casing.
+   */
+  private static String normalizeIdempotencyKey(String rawKey) {
+    if (rawKey == null) {
+      return null;
+    }
+    String trimmed = rawKey.trim();
+    // Iceberg REST OpenAPI expects 36-char UUID strings.
+    if (trimmed.length() != 36) {
+      return null;
+    }
+    try {
+      UUID uuid = UUID.fromString(trimmed);
+      return uuid.version() == 7 ? uuid.toString() : null;
+    } catch (IllegalArgumentException e) {
+      return null;
+    }
+  }
+
+  private Response replayResponse(IdempotencyRecord existing) {
+    Response.ResponseBuilder replay =
+        Response.status(existing.httpStatus() == null ? 200 : 
existing.httpStatus());
+    replay.header("X-Idempotency-Replayed", "true");
+    applyReplayedHeaders(replay, existing.responseHeaders());
+    if (existing.responseSummary() != null) {
+      replay.entity(existing.responseSummary());
+    }
+    return replay.build();
+  }
+
+  private Uni<IdempotencyRecord> waitForFinalized(String realmId, String key, 
Instant deadline) {
+    // Start with a small delay for low-latency replays, then exponentially 
back off (capped below)
+    // to avoid tight polling when the original attempt takes longer.
+    return pollUntilFinalized(realmId, key, deadline, 25L);
+  }
+
+  private Uni<IdempotencyRecord> pollUntilFinalized(
+      String realmId, String key, Instant deadline, long delayMs) {
+    return Uni.createFrom()
+        .item(() -> store.load(realmId, key))
+        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
+        .onItem()
+        .transformToUni(
+            opt -> {
+              if (opt.isPresent() && opt.get().isFinalized()) {
+                return Uni.createFrom().item(opt.get());
+              }
+              if (!clock.instant().isBefore(deadline)) {
+                return Uni.createFrom()
+                    .failure(new TimeoutException("Timed out waiting for 
finalize"));
+              }
+              long nextDelayMs = Math.min(500L, Math.max(25L, delayMs * 2));
+              return Uni.createFrom()
+                  .nullItem()
+                  .onItem()
+                  .delayIt()
+                  .by(Duration.ofMillis(delayMs))
+                  .onItem()
+                  .transformToUni(
+                      ignored -> pollUntilFinalized(realmId, key, deadline, 
nextDelayMs));
+            });
+  }
+
+  private static boolean isMutatingMethod(String method) {
+    if (method == null) {
+      return false;
+    }
+    switch (method.toUpperCase(Locale.ROOT)) {
+      case "POST":
+      case "PUT":
+      case "PATCH":
+      case "DELETE":
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  private static boolean isIcebergMutationEndpoint(ContainerRequestContext rc) 
{
+    // Best-effort: rely on the matched JAX-RS resource method rather than 
path heuristics so we
+    // only apply idempotency to endpoints implemented by the Iceberg REST 
catalog adapter. If the
+    // matched resource/method isn't available, we return false (no 
idempotency).
+    try {
+      ResteasyReactiveRequestContext ctx = CurrentRequestManager.get();
+      if (ctx == null) {
+        return false;
+      }
+      ResteasyReactiveResourceInfo info = 
ctx.getResteasyReactiveResourceInfo();
+      if (info == null) {
+        return false;
+      }
+      if (info.getResourceClass() != IcebergCatalogAdapter.class) {
+        return false;
+      }
+      Method m = info.getResourceMethod();
+      if (m == null) {
+        return false;
+      }
+      String name = m.getName();
+      // Mutation endpoints that accept Idempotency-Key in the Iceberg REST 
API.
+      return name.equals("createNamespace")
+          || name.equals("dropNamespace")
+          || name.equals("updateProperties")
+          || name.equals("createTable")
+          || name.equals("registerTable")
+          || name.equals("updateTable")
+          || name.equals("dropTable")
+          || name.equals("renameTable")
+          || name.equals("createView")
+          || name.equals("replaceView")
+          || name.equals("dropView")
+          || name.equals("renameView")
+          || name.equals("commitTransaction");
+    } catch (Throwable ignored) {
+      return false;
+    }
+  }
+
+  /**
+   * Returns true only for Polaris-managed (INTERNAL) catalogs for Iceberg 
REST routes.
+   *
+   * <p>This is a best-effort guard: if we can't determine catalog type, we 
default to "not
+   * internal" and treat idempotency as a no-op.
+   */
+  private boolean isPolarisManagedInternalIcebergCatalogRequest(
+      String path, PolarisPrincipal principal) {
+    if (path == null || !path.startsWith("v1/")) {
+      // Non-Iceberg REST paths: leave existing behavior unchanged 
(scopes/mutating method gating).
+      return true;
+    }
+
+    // Expected Iceberg REST shape: v1/{prefix}/...
+    String[] parts = path.split("/", 3);
+    if (parts.length < 3) {
+      return false;
+    }
+    String prefix = parts[1];
+    if (prefix == null || prefix.isBlank()) {
+      return false;
+    }
+
+    final String catalogName;
+    try {
+      catalogName = prefixParser.prefixToCatalogName(prefix);
+    } catch (RuntimeException e) {
+      LOG.debugf(e, "Failed to parse Iceberg REST catalog prefix for 
idempotency gating");
+      return false;
+    }
+
+    final Resolver resolver;
+    try {
+      resolver = resolverFactory.createResolver(principal, catalogName);
+    } catch (RuntimeException e) {
+      LOG.debugf(e, "Failed to create resolver for idempotency 
internal-catalog gating");
+      return false;
+    }
+
+    ResolverStatus status;
+    try {
+      status = resolver.resolveAll();
+    } catch (RuntimeException e) {
+      LOG.debugf(e, "Failed to resolve catalog for idempotency 
internal-catalog gating");
+      return false;
+    }
+    if (!ResolverStatus.StatusEnum.SUCCESS.equals(status.getStatus())) {
+      LOG.debugf(
+          "Resolver status %s for idempotency internal-catalog gating; 
treating as non-internal",
+          status.getStatus());
+      return false;
+    }
+
+    ResolvedPolarisEntity resolved = resolver.getResolvedReferenceCatalog();
+    CatalogEntity catalogEntity = resolved == null ? null : 
CatalogEntity.of(resolved.getEntity());
+    return catalogEntity != null
+        && Catalog.TypeEnum.INTERNAL.equals(catalogEntity.getCatalogType());
+  }
+
+  private static String normalizeResourceId(ContainerRequestContext rc) {
+    // Use path only to avoid accidental mismatches from query 
ordering/irrelevant parameters.
+    String path = rc.getUriInfo().getPath();
+    return path;
+  }
+
+  private static String responseEntityAsString(
+      ContainerResponseContext response, ObjectMapper objectMapper) {
+    Object entity = response.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    if (entity instanceof String string) {
+      return string;
+    }
+    try {
+      return objectMapper.writeValueAsString(entity);

Review Comment:
   Agreed. In a follow-up I’ll switch from storing the full response body to a 
small, bounded response_summary, and handle streaming/unknown bodies explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to